1. 程式人生 > >mongodb原始碼分析(二)mongod的啟動

mongodb原始碼分析(二)mongod的啟動

   mongod是mongodb的儲存伺服器,其程式碼入口在mongo/db/db.cpp中,mongod的大部分程式碼都在mongo/db這個資料夾中。
int main(int argc, char* argv[]) {  
    int exitCode = mongoDbMain(argc, argv);  
    ::_exit(exitCode);  
}  
static int mongoDbMain(int argc, char* argv[]) {
    static StaticObserver staticObserver;
    getcurns = ourgetns;

    //下面幾部分就是mongod的主要啟動選項類別,具體有哪些選項可以參考mongodb相應的文件。
    po::options_description general_options("General options");
#if defined(_WIN32)
    po::options_description windows_scm_options("Windows Service Control Manager options");
#endif
    po::options_description replication_options("Replication options");
    po::options_description ms_options("Master/slave options");
    po::options_description rs_options("Replica set options");
    po::options_description sharding_options("Sharding options");
    po::options_description visible_options("Allowed options");
    po::options_description hidden_options("Hidden options");
    po::options_description ssl_options("SSL options");

跟隨原始碼來到這裡
        Module::configAll( params );//這裡module的配置,mms是其中部分,但是除錯時發現module中沒有任何模組,沒有設定例項吧。
        dataFileSync.go();//mongodb使用記憶體檔案對映管理資料,所以過一段時間有必要把髒資料寫回磁碟,這裡開啟一個執行緒乾的就是這個事情
          //預設是60s刷一次,啟動時可以用--syncdelay加秒數來設定多少時間重新整理一次若設定為0則反而不重新整理,而只是執行5s的睡眠,然後繼續
        if (params.count("command")) {//睡眠5s,直到通過db.adminCommand({"setParameter":1,syncdelay:20})設定時間後再重新整理
            vector<string> command = params["command"].as< vector<string> >();

            if (command[0].compare("run") == 0) {
                if (command.size() > 1) {
                    cout << "Too many parameters to 'run' command" << endl;
                    cout << visible_options << endl;
                    return 0;
                }

                initAndListen(cmdLine.port);
                return 0;
            }
最後到這裡:
    StartupTest::runTests();//啟動測試,有幾十項,主要是做些sanity check,測試失敗則mongod啟動失敗,具體可自己分析原始碼
    initAndListen(cmdLine.port);//cmdLine.port可配置,若不設定預設為27017
    dbexit(EXIT_CLEAN);
    void initAndListen(int listenPort) {
        try {
            _initAndListen(listenPort);
        }
        catch ( DBException &e ) {
            log() << "exception in initAndListen: " << e.toString() << ", terminating" << endl;
            dbexit( EXIT_UNCAUGHT );
        }
        catch ( std::exception &e ) {
            log() << "exception in initAndListen std::exception: " << e.what() << ", terminating" << endl;
            dbexit( EXIT_UNCAUGHT );
        }
        catch ( int& n ) {
            log() << "exception in initAndListen int: " << n << ", terminating" << endl;
            dbexit( EXIT_UNCAUGHT );
        }
        catch(...) {
            log() << "exception in initAndListen, terminating" << endl;
            dbexit( EXIT_UNCAUGHT );
        }
    }
    void _initAndListen(int listenPort ) {//這裡刪除了些無關程式碼,貼出了主要的程式碼

        Client::initThread("initandlisten");

        Database::_openAllFiles = false;

        Logstream::get().addGlobalTee( new RamLog("global") );

        bool is32bit = sizeof(int*) == 4;

        acquirePathLock(forceRepair);//檔案鎖,保證一個目錄下啟動只能有一個mongod例項,若另一個例項啟動起來則因為前一個例項的這個鎖失敗。
        boost::filesystem::remove_all( dbpath + "/_tmp/" );

        FileAllocator::get()->start();//專門的檔案分配執行緒,mongodb分配檔案時預分配檔案時就是通過這裡啟動的執行緒預分配檔案

        MONGO_ASSERT_ON_EXCEPTION_WITH_MSG( clearTmpFiles(), "clear tmp files" );

        dur::startup();//mongodb的日誌,後面專門文章來分析mongo的journal

        if( cmdLine.durOptions & CmdLine::DurRecoverOnly )
            return;

        // comes after getDur().startup() because this reads from the database
        clearTmpCollections();
        Module::initAll();//除錯的時候發現沒有啟動的module,mms應該屬於這裡

        if ( scriptingEnabled ) {
            ScriptEngine::setup();
            globalScriptEngine->setCheckInterruptCallback( jsInterruptCallback );
            globalScriptEngine->setGetInterruptSpecCallback( jsGetInterruptSpecCallback );
        }

        repairDatabasesAndCheckVersion();

        /* we didn't want to pre-open all files for the repair check above. for regular
           operation we do for read/write lock concurrency reasons.
        */
        Database::_openAllFiles = true;

        if ( shouldRepairDatabases )
            return;

        /* this is for security on certain platforms (nonce generation) */
        srand((unsigned) (curTimeMicros() ^ startupSrandTimer.micros()));

        snapshotThread.go();//似乎是統計沒一段時間內的插入查詢修改刪除等狀況的執行緒,沒仔細看有待研究
        d.clientCursorMonitor.go();//報告記憶體使用情況和過時過時刪除客戶端遊標的執行緒,不會分析,感興趣自己研究吧
        PeriodicTask::theRunner->go();//執行任務的執行緒,其它部分向它提交任務
        if (missingRepl) {
            log() << "** warning: not starting TTL monitor" << startupWarningsLog;
            log() << "**          if this member is not part of a replica set and you want to use "
                  << startupWarningsLog;
            log() << "**          TTL collections, remove local.system.replset and restart"
                  << startupWarningsLog;
        }
        else {
            startTTLBackgroundJob();//2.2的特性,建立索引時設定一個超時時間,超過時間後自動刪除資料如:db.log.events.ensureIndex({"status:1"},{expireAfterS    //econds:3600})。這裡專門啟動一個執行緒來做這件事。

        }
        listen(listenPort);
        exitCleanly(EXIT_NET_ERROR);
}
    void listen(int port) {
        //testTheDb();
        MessageServer::Options options;
        options.port = port;
        options.ipList = cmdLine.bind_ip;

        MessageServer * server = createServer( options , new MyMessageHandler() );
        server->setAsTimeTracker();

        startReplication();//mongodb有兩種replcation模式,master/slave,和replset模式,這兩種模式都在這裡啟動後面分析時會分析到
        if ( !noHttpInterface )//啟動一個webService執行緒,使得我們可以在瀏覽器中訪問mongod的一些資訊,埠為server的埠加1000,預設server埠為27017,所以其       //預設埠為28017
            boost::thread web( boost::bind(&webServerThread, new RestAdminAccess() /* takes ownership */));

#if(TESTEXHAUST)
        boost::thread thr(testExhaust);
#endif
        server->run();//伺服器的啟動
    }
server->run這裡執行流程是這樣的:PostMessageServer::run->listener::initAndListen。

initAndListen執行流程:繫結IP(這裡IP可以是多IP地址,若是多IP地址則繫結到每一個IP地址上),通過select偵聽來自客戶端的連線請求,接收請求後來到:

    void Listener::accepted(boost::shared_ptr<Socket> psocket, long long connectionId ) {
        MessagingPort* port = new MessagingPort(psocket);
        port->setConnectionId( connectionId );
        acceptedMP( port );//這是一個虛擬函式,將執行PortMessageServer的acceptedMP
    }
        virtual void acceptedMP(MessagingPort * p) {

            if ( ! connTicketHolder.tryAcquire() ) {//這裡超過了配置的最大連線數,windows預設的連線數為20000,其它系統則檢視系統限制,最大還是20000,
//若系統限制小於20000,則連線數為系統限制的數目
                log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl;

                // TODO: would be nice if we notified them...
                p->shutdown();
                delete p;
                sleepmillis(2); // otherwise we'll hard loop
                return;
            }

            try {
#ifndef __linux__  // TODO: consider making this ifdef _WIN32
                {
                    boost::thread thr( boost::bind( &pms::threadRun , p ) );
                }
#else
                pthread_attr_t attrs;
                pthread_attr_init(&attrs);
                pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);

                static const size_t STACK_SIZE = 1024*1024; // if we change this we need to update the warning

                struct rlimit limits;
                verify(getrlimit(RLIMIT_STACK, &limits) == 0);
                if (limits.rlim_cur > STACK_SIZE) {//因為連線數多了後每一個連線開一個執行緒棧空間佔用大這裡顯示的配置佔空間大小為1M
                    pthread_attr_setstacksize(&attrs, (DEBUG_BUILD
                                                        ? (STACK_SIZE / 2)
                                                        : STACK_SIZE));
                } else if (limits.rlim_cur < 1024*1024) {
                    warning() << "Stack size set to " << (limits.rlim_cur/1024) << "KB. We suggest 1MB" << endl;
                }


                pthread_t thread;
                int failed = pthread_create(&thread, &attrs, (void*(*)(void*)) &pms::threadRun, p);//建立服務執行緒

                pthread_attr_destroy(&attrs);

                if (failed) {
                    log() << "pthread_create failed: " << errnoWithDescription(failed) << endl;
                    throw boost::thread_resource_error(); // for consistency with boost::thread
                }
#endif
            }
        }
最後描述下threadRun流程,首先其接收來自客戶端的連線請求,然後接收資料,然後出來,最後斷掉連線類似程式碼如下:
threadrun(){
handler->connected()//若是來自本機的連線請求則設定_isLocalHost=true,和_isLocalHostAndLocalHostIsAuthorizedForAll=true,當然再檢查admin.system.users是否
//有資料,有則_isLocalHostAndLocalHostIsAuthorizedForAll=false,要不然本機讀取到使用者了怎麼行呢
p->recv()   //接收資料
handler->process()//處理請求
handler->disconnect()
}

好了,分析到這裡mongod的啟動部分也就完成了,下一篇文章我將分析mongo的啟動部分。