1. 程式人生 > >mongodb原始碼分析(五)查詢2之mongod的資料庫載入

mongodb原始碼分析(五)查詢2之mongod的資料庫載入

        上一篇文章分析到了客戶端查詢請求的傳送,接著分析服務端的處理動作,分析從服務端響應開始到資料庫

正確載入止,主要流程為資料庫的讀入過程與使用者的認證.

        mongod服務對於客戶端請求的處理在mongo/db/db.cpp MyMessageHandler::process中,其中呼叫了

函式assembleResponse完成請求響應,我們就從這個函式開始入手分析,程式碼很長,刪除一些支流或者不相關的程式碼.

    void assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort& remote ) {
        if ( op == dbQuery ) {
            if( strstr(ns, ".$cmd") ) {
                isCommand = true;
                opwrite(m);//寫入診斷用的log,預設loglevel為0,未開啟,需要開啟啟動時加入--diaglog x,0 = off; 1 = writes, 2 = reads, 3 = both
                if( strstr(ns, ".$cmd.sys.") ) {//7 = log a few reads, and all writes.
                    if( strstr(ns, "$cmd.sys.inprog") ) {
                        inProgCmd(m, dbresponse);//檢視當前進度的命令
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.killop") ) {
                        killOp(m, dbresponse);//終止當前操作
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.unlock") ) {
                        unlockFsync(ns, m, dbresponse);
                        return;
                    }
                }
            }
            else {
                opread(m);
            }
        }
        else if( op == dbGetMore ) {
            opread(m);
        }
        else {
            opwrite(m);
        }
        long long logThreshold = cmdLine.slowMS;//啟動的時候設定的引數預設是100ms,當操作超過了這個時間且啟動時設定--profile為1或者2
        bool shouldLog = logLevel >= 1;//時mongodb將記錄這次慢操作,1為只記錄慢操作,即操作時間大於了設定的slowMS,2表示記錄所有操作
        if ( op == dbQuery ) {         //可通過--slowms設定slowMS
            if ( handlePossibleShardedMessage( m , &dbresponse ) )//這裡和shard有關,以後會的文章會講到
                return;
            receivedQuery(c , dbresponse, m );//真正的查詢入口
        }
        else if ( op == dbGetMore ) {//已經查詢了資料,這裡只是執行得到更多資料的入口
            if ( ! receivedGetMore(dbresponse, m, currentOp) )
                shouldLog = true;
        }
                if ( op == dbKillCursors ) {
                    currentOp.ensureStarted();
                    logThreshold = 10;
                    receivedKillCursors(m);
                }
                else if ( op == dbInsert ) {//插入操作入口
                    receivedInsert(m, currentOp);
                }
                else if ( op == dbUpdate ) {//更新操作入口
                    receivedUpdate(m, currentOp);
                }
                else if ( op == dbDelete ) {//刪除操作入口
                    receivedDelete(m, currentOp);
                }
        if ( currentOp.shouldDBProfile( debug.executionTime ) ) {//該操作將被記錄,原因可能有二:一,啟動時設定--profile 2,則所有操作將被
            // performance profiling is on                    //記錄.二,啟動時設定--profile 1,且操作時間超過了預設的slowMs,那麼操作將被            else {//這個地方if部分被刪除了,就是在不能獲取鎖的狀況下不記錄該操作的程式碼
                Lock::DBWrite lk( currentOp.getNS() );//記錄具體記錄操作,就是在xxx.system.profile集合中插入該操作的具體記錄
                if ( dbHolder()._isLoaded( nsToDatabase( currentOp.getNS() ) , dbpath ) ) {
                    Client::Context cx( currentOp.getNS(), dbpath, false );
                    profile(c , currentOp );
                }
            }
        }

前進到receivedQuery,其解析了接收到的資料,然後呼叫runQuery負責處理查詢,然後出來runQuery丟擲的異常,直接進入runQuery.

    string runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result) {        
	shared_ptr<ParsedQuery> pq_shared( new ParsedQuery(q) );
        if ( pq.couldBeCommand() ) {//這裡表明這是一個命令,關於mongodb的命令的講解這裡有一篇文章,我就不再分析了.
            BSONObjBuilder cmdResBuf;//
http://www.cnblogs.com/daizhj/archive/2011/04/29/mongos_command_source_code.html
if ( runCommands(ns, jsobj, curop, bb, cmdResBuf, false, queryOptions) ){} bool explain = pq.isExplain();//這裡的explain來自這裡db.coll.find().explain(),若使用了.explain()則為true,否則false BSONObj order = pq.getOrder(); BSONObj query = pq.getFilter(); // Run a simple id query. if ( ! (explain || pq.showDiskLoc()) && isSimpleIdQuery( query ) && !pq.hasOption( QueryOption_CursorTailable ) ) { if ( queryIdHack( ns, query, pq, curop, result ) ) {//id查詢的優化 return ""; } } bool hasRetried = false; while ( 1 ) {//這裡的ReadContext這這篇文章的主角,其內部在第一次鎖資料庫時完成了資料庫的載入動作 Client::ReadContext ctx( ns , dbpath ); // read locks replVerifyReadsOk(&pq);//還記得replset模式中無法查詢secondary伺服器嗎,就是在這裡限制的 BSONObj oldPlan; if ( ! hasRetried && explain && ! pq.hasIndexSpecifier() ) { scoped_ptr<MultiPlanScanner> mps( MultiPlanScanner::make( ns, query, order ) ); oldPlan = mps->cachedPlanExplainSummary(); }//這裡才是真正的查詢,其內部很複雜,下一篇文章將講到 return queryWithQueryOptimizer( queryOptions, ns, jsobj, curop, query, order, pq_shared, oldPlan, shardingVersionAtStart, pgfs, npfe, result ); } } }
Client::ReadContext::ReadContext(const string& ns, string path, bool doauth ) {
        {
            lk.reset( new Lock::DBRead(ns) );//資料庫鎖,這裡mongodb的鎖機制本文將不會涉及到,感興趣的自己分析
            Database *db = dbHolder().get(ns, path);
            if( db ) {//第一次載入時顯然為空
                c.reset( new Context(path, ns, db, doauth) );
                return;
            }
        }
        if( Lock::isW() ) { //全域性的寫鎖
			// write locked already
                DEV RARELY log() << "write locked on ReadContext construction " << ns << endl;
                c.reset( new Context(ns, path, doauth) );
            }
        else if( !Lock::nested() ) { 
            lk.reset(0);
            {
                Lock::GlobalWrite w;//加入全域性的寫鎖,這裡是真正的資料庫載入地點
                Context c(ns, path, doauth);
            }
            // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing.
            lk.reset( new Lock::DBRead(ns) );
            c.reset( new Context(ns, path, doauth) );
        }
    }
    Client::Context::Context(const string& ns, string path , bool doauth, bool doVersion ) :
        _client( currentClient.get() ), 
        _oldContext( _client->_context ),
        _path( path ), 
        _justCreated(false), // set for real in finishInit
        _doVersion(doVersion),
        _ns( ns ), 
        _db(0) 
    {
        _finishInit( doauth );
    }
繼續看_finishInit函式:
    void Client::Context::_finishInit( bool doauth ) {
        _db = dbHolderUnchecked().getOrCreate( _ns , _path , _justCreated );//讀取或者建立資料庫
        checkNsAccess( doauth, writeLocked ? 1 : 0 );//認證檢查
    }
    Database* DatabaseHolder::getOrCreate( const string& ns , const string& path , bool& justCreated ) {
        string dbname = _todb( ns );//將test.coll這種型別的字串轉換為test
        {
            SimpleMutex::scoped_lock lk(_m);
            Lock::assertAtLeastReadLocked(ns);
            DBs& m = _paths[path];//在配置的路徑中找到已經載入的資料庫,直接返回
            {
                DBs::iterator i = m.find(dbname); 
                if( i != m.end() ) {
                    justCreated = false;
                    return i->second;
                }
            }
        Database *db = new Database( dbname.c_str() , justCreated , path );//實際的資料讀取
        {
            SimpleMutex::scoped_lock lk(_m);//資料庫載入完成後按照路徑資料庫記錄
            DBs& m = _paths[path];
            verify( m[dbname] == 0 );
            m[dbname] = db;
            _size++;
        }
        return db;
    }
    Database::Database(const char *nm, bool& newDb, const string& _path )
        : name(nm), path(_path), namespaceIndex( path, name ),
          profileName(name + ".system.profile")
    {
        try {
            newDb = namespaceIndex.exists();//檢視xxx.ns檔案是否儲存,存在表示資料庫已經建立
            // If already exists, open.  Otherwise behave as if empty until
            // there's a write, then open.
            if (!newDb) {
                namespaceIndex.init();//載入具體的xxx.ns檔案
                if( _openAllFiles )
                    openAllFiles();//載入所有的資料檔案xxx.0,xxx.1,xxx.2這種型別的檔案
            }
            magic = 781231;
    }
繼續看namespaceIndex::init函式,若其未初始化則呼叫_init初始化,初始化了則什麼也不做,直接去到namespaceIndex::_init
    NOINLINE_DECL void NamespaceIndex::_init() {
        unsigned long long len = 0;
        boost::filesystem::path nsPath = path();//xxx.ns
        string pathString = nsPath.string();
        void *p = 0;
        if( boost::filesystem::exists(nsPath) ) {//如果存在該檔案,則使用記憶體對映檔案map該檔案
            if( f.open(pathString, true) ) {//這裡f為MongoMMF物件
                len = f.length();
                if ( len % (1024*1024) != 0 ) {
                    log() << "bad .ns file: " << pathString << endl;
                    uassert( 10079 ,  "bad .ns file length, cannot open database", len % (1024*1024) == 0 );
                }
                p = f.getView();//這裡得到map的檔案的指標
            }
        }
        else {
            // use lenForNewNsFiles, we are making a new database
            massert( 10343, "bad lenForNewNsFiles", lenForNewNsFiles >= 1024*1024 );
            maybeMkdir();
            unsigned long long l = lenForNewNsFiles;//建立具體的ns檔案,預設大小是16M,可以用--nssize 來設定大小,MB為單位,只對新建立的資料庫
            if( f.create(pathString, l, true) ) {   //起作用
                getDur().createdFile(pathString, l); // always a new file
                len = l;
                verify( len == lenForNewNsFiles );
                p = f.getView();
            }
        }
        verify( len <= 0x7fffffff );
        ht = new HashTable<Namespace,NamespaceDetails>(p, (int) len, "namespace index");
        if( checkNsFilesOnLoad )
            ht->iterAll(namespaceOnLoadCallback);
    }
繼續看MongoMMF::open流程:
    bool MongoMMF::open(string fname, bool sequentialHint) {
        LOG(3) << "mmf open " << fname << endl;
        setPath(fname);
        _view_write = mapWithOptions(fname.c_str(), sequentialHint ? SEQUENTIAL : 0);//這裡是真正的對映,
        return finishOpening();
    }
    bool MongoMMF::finishOpening() {
        if( _view_write ) {
            if( cmdLine.dur ) {//開啟了journal功能後建立一個私有的map,這個日誌功能我將以後專門寫一篇文章分析.
                _view_private = createPrivateMap();
                if( _view_private == 0 ) {
                    msgasserted(13636, str::stream() << "file " << filename() << " open/create failed in createPrivateMap (look in log for more information)");
                }
                privateViews.add(_view_private, this); // note that testIntent builds use this, even though it points to view_write then...
            }
            else {
                _view_private = _view_write;
            }
            return true;
        }
        return false;
    }
回到namespaceIndex::_init函式:
        ht = new HashTable<Namespace,NamespaceDetails>(p, (int) len, "namespace index");
這裡有必要關注下NamespaceDetails結構,每一個集合對應於一個NamespaceDetails結構,該結構作用如下(來自NamespaceDetails結構的上的描述)

NamespaceDetails : this is the "header" for a collection that has all its details.
       It's in the .ns file and this is a memory mapped region (thus the pack pragma above).

    class NamespaceDetails {
    public:
        enum { NIndexesMax = 64, NIndexesExtra = 30, NIndexesBase  = 10 };
        /*-------- data fields, as present on disk : */
        DiskLoc firstExtent;//記錄第一個extent,在分析資料的插入時會具體討論mongodb的儲存
        DiskLoc lastExtent;//記錄的最後一個extent
        /* NOTE: capped collections v1 override the meaning of deletedList.
                 deletedList[0] points to a list of free records (DeletedRecord's) for all extents in
                 the capped namespace.
                 deletedList[1] points to the last record in the prev extent.  When the "current extent"
                 changes, this value is updated.  !deletedList[1].isValid() when this value is not
                 yet computed.
        */
        DiskLoc deletedList[Buckets];
        // ofs 168 (8 byte aligned)
        struct Stats {
            // datasize and nrecords MUST Be adjacent code assumes!
            long long datasize; // this includes padding, but not record headers
            long long nrecords;
        } stats;
        int lastExtentSize;
        int nIndexes;
    private:
        // ofs 192
        IndexDetails _indexes[NIndexesBase];//10個索引儲存到這裡,若1個集合索引超過10其它的索引以extra的形式存在,extra地址儲存在下面的
        // ofs 352 (16 byte aligned)        //extraOffset處
        int _isCapped;                         // there is wasted space here if I'm right (ERH)
        int _maxDocsInCapped;                  // max # of objects for a capped table.  TODO: should this be 64 bit?
        double _paddingFactor;                 // 1.0 = no padding.
        // ofs 386 (16)
        int _systemFlags; // things that the system sets/cares about
    public:
        DiskLoc capExtent;
        DiskLoc capFirstNewRecord;
        unsigned short dataFileVersion;       // NamespaceDetails version.  So we can do backward compatibility in the future. See filever.h
        unsigned short indexFileVersion;
        unsigned long long multiKeyIndexBits;
    private:
        // ofs 400 (16)
        unsigned long long reservedA;
        long long extraOffset;                // where the $extra info is located (bytes relative to this)
    public:
        int indexBuildInProgress;             // 1 if in prog
    private:
        int _userFlags;
        char reserved[72];
        /*-------- end data 496 bytes */
}
從這裡可以明白ns儲存了所有集合的頭資訊,其中包括了該集合的起始位置,結束位置以及索引所在.

_init函式執行完畢,網上回到Database::Database()函式:

                if( _openAllFiles )
                    openAllFiles();//這裡對映所有的xx.0,xx.1這種檔案,記錄對映的檔案,對映的方式如同對映xx.ns,在開啟了journal時同時儲存兩份地址.這裡不再分析,感興趣的自己研究吧
至此資料庫的對映工作完成.往上回到Client::Context::_finishInit函式,下面來看看許可權的檢查函式checkNsAccess,其最終呼叫了下面的函式,通過認證返回true,

未通過將返回false,返回false,將導致mongod向客戶端傳送未認證資訊,客戶端的操作請求失敗

    bool AuthenticationInfo::_isAuthorized(const string& dbname, Auth::Level level) const {
        if ( noauth ) {//啟動時可--noauth設定為true,--auth設定為false,預設為false
            return true;
        }
        {
            scoped_spinlock lk(_lock);
    //查詢dbname這個資料庫是否已經得到認證,這裡的認證資料是在mongo啟動時連線服務端認證通過後儲存的
            if ( _isAuthorizedSingle_inlock( dbname , level ) )
                return true;

            if ( _isAuthorizedSingle_inlock( "admin" , level ) )
                return true;

            if ( _isAuthorizedSingle_inlock( "local" , level ) )
                return true;
        }
        return _isAuthorizedSpecialChecks( dbname );//若未通過上面的認證將會檢視是否打開了_isLocalHostAndLocalHostIsAuthorizedForAll,也就是該連線是否是來自於本地連線.
    }

       本文到這裡結束,主要是搞清楚了mongod接收到來自客戶端請求後的執行流程到資料庫的載入,重要的

是明白ns檔案的作用,普通資料檔案xx.0,xx.1的對映,下一篇文章我們將繼續分析查詢請求的處理.

作者: yhjj0108,楊浩