原始碼分析之: WebSocket 和 是如何將收發到的訊息投遞給cocos主執行緒的
阿新 • • 發佈:2019-01-02
-->websocket的3種使用場景:
1)h5瀏覽器中websocket由瀏覽器提供
2)node.js中,可以使用ws模組寫伺服器
3)native app中,可以使用c++版本的websocket匯出c++介面給cocos creator客戶端使用
-->WebSocket的執行緒將訊息投遞給cocos主執行緒的過程:
websocket執行緒投遞給cocos主執行緒的訊息的執行緒安全的函式:
void WsThreadHelper::sendMessageToCocosThread(const std::function<void()>& cb) { Director::getInstance()->getScheduler()->performFunctionInCocosThread(cb); }
OPEN 連線狀態:
void WebSocket::onConnectionOpened() { ... _readyState = State::OPEN; ... _wsHelper->sendMessageToCocosThread([this, isDestroyed](){ if (*isDestroyed) { LOGD("WebSocket instance was destroyed!\n"); } else { _delegate->onOpen(this); } }); }
收到訊息onClientReceivedData
void WebSocket::onClientReceivedData(void* in, ssize_t len) { ... _wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary, isDestroyed](){ // In UI thread LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize); Data data; data.isBinary = isBinary; data.bytes = (char*)frameData->data(); data.len = frameSize; if (*isDestroyed) { LOGD("WebSocket instance was destroyed!\n"); } else { _delegate->onMessage(this, data); } delete frameData; }); } }
Error狀態
void WebSocket::onConnectionError()
{
LOGD("WebSocket (%p) onConnectionError ...\n", this);
_readStateMutex.lock();
_readyState = State::CLOSING;
_readStateMutex.unlock();
std::shared_ptr<bool> isDestroyed = _isDestroyed;
_wsHelper->sendMessageToCocosThread([this, isDestroyed](){
if (*isDestroyed)
{
LOGD("WebSocket instance was destroyed!\n");
}
else
{
_delegate->onError(this, ErrorCode::CONNECTION_FAILURE);
}
});
}
Closed狀態
void WebSocket::onConnectionClosed()
{
_readStateMutex.lock();
if (_readyState == State::CLOSED)
{
LOGD("onConnectionClosed: WebSocket (%p) was closed, no need to close it again!\n", this);
_readStateMutex.unlock();
return;
}
LOGD("WebSocket (%p) onConnectionClosed ...\n", this);
_readyState = State::CLOSED;
_readStateMutex.unlock();
_wsHelper->quitWebSocketThread();
std::shared_ptr<bool> isDestroyed = _isDestroyed;
_wsHelper->sendMessageToCocosThread([this, isDestroyed](){
if (*isDestroyed)
{
LOGD("WebSocket instance was destroyed!\n");
}
else
{
// Waiting for the subThread safety exit
_wsHelper->joinWebSocketThread();
_delegate->onClose(this);
}
});
}
Send資料(傳送字串)
void WebSocket::send(const std::string& message)
{
if (_readyState == State::OPEN)
{
// In main thread
Data* data = new (std::nothrow) Data();
data->bytes = (char*)malloc(message.length() + 1);
// Make sure the last byte is '\0'
data->bytes[message.length()] = '\0';
strcpy(data->bytes, message.c_str());
data->len = static_cast<ssize_t>(message.length());
WsMessage* msg = new (std::nothrow) WsMessage();
msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING;
msg->obj = data;
_wsHelper->sendMessageToWebSocketThread(msg);
}
else
{
LOGD("Couldn't send message since websocket wasn't opened!\n");
}
}
最後看:Scheduler中update的渲染處理(_functionsToPerform變數)
// main loop
void Scheduler::update(float dt)
{
// updates with priority < 0
DL_FOREACH_SAFE(_updatesNegList, entry, tmp)
{
if ((! entry->paused) && (! entry->markedForDeletion))
{
entry->callback(dt);
}
}
// updates with priority == 0
DL_FOREACH_SAFE(_updates0List, entry, tmp)
{
if ((! entry->paused) && (! entry->markedForDeletion))
{
entry->callback(dt);
}
}
// updates with priority > 0
DL_FOREACH_SAFE(_updatesPosList, entry, tmp)
{
if ((! entry->paused) && (! entry->markedForDeletion))
{
entry->callback(dt);
}
}
// Iterate over all the custom selectors
for (tHashTimerEntry *elt = _hashForTimers; elt != nullptr; )
{
_currentTarget = elt;
_currentTargetSalvaged = false;
if (! _currentTarget->paused)
{
// The 'timers' array may change while inside this loop
for (elt->timerIndex = 0; elt->timerIndex < elt->timers->num; ++(elt->timerIndex))
{
elt->currentTimer = (Timer*)(elt->timers->arr[elt->timerIndex]);
elt->currentTimerSalvaged = false;
elt->currentTimer->update(dt);
if (elt->currentTimerSalvaged)
{
// The currentTimer told the remove itself. To prevent the timer from
// accidentally deallocating itself before finishing its step, we retained
// it. Now that step is done, it's safe to release it.
elt->currentTimer->release();
}
elt->currentTimer = nullptr;
}
}
// elt, at this moment, is still valid
// so it is safe to ask this here (issue #490)
elt = (tHashTimerEntry *)elt->hh.next;
// only delete currentTarget if no actions were scheduled during the cycle (issue #481)
if (_currentTargetSalvaged && _currentTarget->timers->num == 0)
{
removeHashElement(_currentTarget);
}
}
// delete all updates that are marked for deletion
// updates with priority < 0
DL_FOREACH_SAFE(_updatesNegList, entry, tmp)
{
if (entry->markedForDeletion)
{
this->removeUpdateFromHash(entry);
}
}
// updates with priority == 0
DL_FOREACH_SAFE(_updates0List, entry, tmp)
{
if (entry->markedForDeletion)
{
this->removeUpdateFromHash(entry);
}
}
// updates with priority > 0
DL_FOREACH_SAFE(_updatesPosList, entry, tmp)
{
if (entry->markedForDeletion)
{
this->removeUpdateFromHash(entry);
}
}
_updateHashLocked = false;
_currentTarget = nullptr;
#if CC_ENABLE_SCRIPT_BINDING
//
// Script callbacks
//
// Iterate over all the script callbacks
if (!_scriptHandlerEntries.empty())
{
for (auto i = _scriptHandlerEntries.size() - 1; i >= 0; i--)
{
SchedulerScriptHandlerEntry* eachEntry = _scriptHandlerEntries.at(i);
if (eachEntry->isMarkedForDeletion())
{
_scriptHandlerEntries.erase(i);
}
else if (!eachEntry->isPaused())
{
eachEntry->getTimer()->update(dt);
}
}
}
#endif
//
// Functions allocated from another thread
//
// Testing size is faster than locking / unlocking.
// And almost never there will be functions scheduled to be called.
//----就是這了,投遞過來之後,在update的最後面進行處理
if( !_functionsToPerform.empty() ) {
_performMutex.lock();
// fixed #4123: Save the callback functions, they must be invoked after '_performMutex.unlock()', otherwise if new functions are added in callback, it will cause thread deadlock.
auto temp = _functionsToPerform;
_functionsToPerform.clear();
_performMutex.unlock();
for( const auto &function : temp ) {
function();
}
}
}