1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(7)

資料庫路由中介軟體MyCat - 原始碼篇(7)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

3. 連線模組


3.4 FrontendConnection前端連線

構造方法:

public FrontendConnection(NetworkChannel channel) throws IOException {     super(channel);
     InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
     InetSocketAddress remoteAddr = null;     if (channel instanceof SocketChannel) {
         remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();    

     } else if (channel instanceof AsynchronousSocketChannel) {
         remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
     }     this.host = remoteAddr.getHostString();     this.port = localAddr.getPort();     this.localPort = remoteAddr.getPort();     this.handler = new FrontendAuthenticator(this);
 }

FrontendConnection是對前端連線channel的封裝,接受NetworkChannel作為引數構造。前端連線建立,需要先驗證其許可權,所以,handler首先設定為FrontendAuthenticator 等到驗證成功,handler會被設定成FrontendCommandHandler。 下面來看和FrontendConnection相關的Handler:  FrontendCommandHandler會先解析請求型別,之後呼叫不同的方法處理不同型別的請求。例如,FrontendQueryHandler會解析query型別的sql請求語句:

 @Override
    public void handle(byte[] data)
    {        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
        {
            MySQLMessage mm = new MySQLMessage(data);            int  packetLength = mm.readUB3();            if(packetLength+4==data.length)
            {
                source.loadDataInfileData(data);
            }            return;
        }        switch (data[4])
        {            case MySQLPacket.COM_INIT_DB:
                commands.doInitDB();
                source.initDB(data);                break;            case MySQLPacket.COM_QUERY:
                commands.doQuery();
                source.query(data);                break;            case MySQLPacket.COM_PING:
                commands.doPing();
                source.ping();                break;            case MySQLPacket.COM_QUIT:
                commands.doQuit();
                source.close("quit cmd");                break;            case MySQLPacket.COM_PROCESS_KILL:
                commands.doKill();
                source.kill(data);                break;            case MySQLPacket.COM_STMT_PREPARE:
                commands.doStmtPrepare();
                source.stmtPrepare(data);                break;            case MySQLPacket.COM_STMT_EXECUTE:
                commands.doStmtExecute();
                source.stmtExecute(data);                break;            case MySQLPacket.COM_STMT_CLOSE:
                commands.doStmtClose();
                source.stmtClose(data);                break;            case MySQLPacket.COM_HEARTBEAT:
                commands.doHeartbeat();
                source.heartbeat(data);                break;            default:
                     commands.doOther();
                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,                             "Unknown command");

        }
    }

FrontendCommandHandler會呼叫FrontendConnection合適的方法解析處理不同的請求,例如它的initDB(byte[] data)方法:

    public void initDB(byte[] data) {

        MySQLMessage mm = new MySQLMessage(data);
        mm.position(5);
        String db = mm.readString();        // 檢查schema的有效性
        if (db == null || !privileges.schemaExists(db)) {
            writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'");            return;
        }        if (!privileges.userExists(user, host)) {
            writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'");            return;
        }

        Set<String> schemas = privileges.getUserSchemas(user);        if (schemas == null || schemas.size() == 0 || schemas.contains(db)) {            this.schema = db;
            write(writeToBuffer(OkPacket.OK, allocate()));
        } else {
            String s = "Access denied for user '" + user + "' to database '" + db + "'";
            writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
        }
    }

方法呼叫: 這裡寫圖片描述 通過檢視可以發現,在command packet被解析出是initDB型別的請求時(其實就是使用者傳送的查詢語句為“use XXX”),會呼叫此方法進行處理,同時,這些方法都是被RW執行緒執行的。 此方法從FrontedPrivilege中驗證使用者是否有許可權訪問這個邏輯庫,如果有就把當前連線的邏輯庫設為使用者請求的邏輯庫。 其他方法與handler也是相似的關係,可以看出,FrontendConnection組合了多種封裝的handler來處理不同的請求的不同階段。至於各種handler,會在之後sql解析,sql路由,協議實現等模組詳細介紹。

3.4.1 ServerConnection服務端連線

前端連線包括ServerConnection(服務端連線)和ManagerConnection(管理端連線)。前端連結不會直接建立,而是通過工廠建立: 工廠方法:

@Override
    protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
        SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
        ServerConnection c = new ServerConnection(channel);
        MycatServer.getInstance().getConfig().setSocketParams(c, true);
        c.setPrivileges(MycatPrivileges.instance());
        c.setQueryHandler(new ServerQueryHandler(c));
        c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));        // c.setPrepareHandler(new ServerPrepareHandler(c));
        c.setTxIsolation(sys.getTxIsolation());
        c.setSession2(new NonBlockingSession(c));        return c;
    }

可以看出,每個新的ServerConnection都會繫結一個新的ServerQueryHandler負責處理sql指令,一個ServerLoadDataInfileHandler負責處理檔案載入命令,一個session負責處理事務 下面是相關的類圖  這裡的所有獨立的handler裡面都是static方法,可供其他類直接呼叫。每個ServerConnection都會有一個NonBlockingSession來處理。 這裡說下連線、會話、邏輯庫、MyCat例項的關係(與MySQL裡面的連線、會話、資料庫、MySQL例項的關係不太一樣);首先每個MyCat例項都管理多個數據庫。連線是針對MyCat例項建立的,並且,MyCat的連線(AbstractConnection)是不可複用的,在close方法會關閉連線並清理使用的資源。但是快取資源(buffer)是可以複用的。比如,在一個前端連線長時間空閒時或者出現異常時,會被清理掉。每個連線會擁有一個session來處理事務,儲存會話資訊。 這裡,每個連線擁有一個會話。每個連線中的方法,被RW執行緒執行,相當於與RW執行緒繫結。RW執行緒是可以複用的,這裡相當於MySQL中的連線是可以複用的(連線池)。 Session.java:

public interface Session {    /**
     * 取得源端連線
     */
    FrontendConnection getSource();    /**
     * 取得當前目標端數量
     */
    int getTargetCount();    /**
     * 開啟一個會話執行
     */
    void execute(RouteResultset rrs, int type);    /**
     * 提交一個會話執行
     */
    void commit();    /**
     * 回滾一個會話執行
     */
    void rollback();    /**
     * 取消一個正在執行中的會話
     * 
     * @param sponsor
     *            如果發起者為null,則表示由自己發起。
     */
    void cancel(FrontendConnection sponsor);    /**
     * 終止會話,必須在關閉源端連線後執行該方法。
     */
    void terminate();

}

下面我們著重研究它的實現類NonBlockingSession: 首先,取得源端連線方法FrontendConnection getSource();,其實就是NonBlockingSession在建立時就已繫結一個連線,誰會呼叫這個方法取得源端連結呢?  可以發現,主要有各種查詢的handler還有SQLengine會去呼叫。因為處理無論返回什麼結果,都需要返回給源端。 int getTargetCount();取得當前目標端數量。根據目標端的數量不同會用不同的handler處理轉發SQL和合並結果。

@Override
    public void execute(RouteResultset rrs, int type) {        // 清理之前處理用的資源
        clearHandlesResources();        if (LOGGER.isDebugEnabled()) {
            StringBuilder s = new StringBuilder();
            LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
        }        // 檢查路由結果是否為空
        RouteResultsetNode[] nodes = rrs.getNodes();        if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                || nodes[0].getName().equals("")) {            //如果為空,則表名有誤,提示客戶端
            source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,                    "No dataNode found ,please check tables defined in schema:"
                            + source.getSchema());            return;
        }        //如果路由結果個數為1,則為單點查詢或事務
        if (nodes.length == 1) {            //使用SingleNodeHandler處理單點查詢或事務
            singleNodeHandler = new SingleNodeHandler(rrs, this);            try {
                singleNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        } else {            //如果路由結果>1,則為多點查詢或事務
            boolean autocommit = source.isAutocommit();
            SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                    .getSystem();            //mutiNodeLimitType沒有用。。。
            int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();            //使用multiNodeHandler處理多點查詢或事務
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,                    this);            try {
                multiNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        }
    }

每次一個Session執行SQL時,會先清理handler使用的資源。SingleNodeHandler與multiNodeHandler之後會講。這裡的handler我們之後會在每個模組去講,Session之後也還會提到,敬請期待


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 Android TV 開發(5)
【推薦】 分散式儲存系統可靠性系列三:設計模式