1. 程式人生 > >[HBase] 服務端RPC機制及代碼梳理

[HBase] 服務端RPC機制及代碼梳理

con already oem http ade cer 同比 nging 劃分

基於版本:CDH5.4.2

上述版本較老,但是目前生產上是使用這個版本,所以以此為例。

1. 概要


技術分享圖片

技術分享圖片

說明:

  1. 客戶端API發送的請求將會被RPCServer的Listener線程監聽到。

  2. Listener線程將分配Reader給到此Channel用戶後續請求的相應。

  3. Reader線程將請求包裝成CallRunner實例,並將通過RpcScheduler線程根據請求屬性分類dispatch到不同的Executor線程。

  4. Executor線程將會保存這個CallRunner實例到隊列。

  5. 每一個Executor隊列都被綁定了指定個數的Handler線程進行消費,消費很簡單,即拿出隊列的CallRunner實例,執行器run()方法。

  6. run()方法將會組裝response到Responder線程中。

  7. Responder線程將會不斷地將不同Channel的結果返回到客戶端。

2. 代碼梳理


總體來說服務端RPC處理機制是一個生產者消費者模型。

2.1 組件初始化

  • RpcServer是在master或者regionserver啟動時候進行初始化的,關鍵代碼如下:

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
checkCodecs(this.conf);
.....
rpcServices.start();
.....
}
  • rpcServeice聲明RSRpcServices類型,為RpcServer類的實現接口。start()方法將會啟動三個主要生產和消費 線程

      /** Starts the service.  Must be called before any calls will be handled. */
    @Override
    public synchronized void start() {
    if (started) return;
    ......
    responder.start();
    listener.start();
    scheduler.start();
    started = true;
    }

2.2 客戶端API請求接收和包裝

Listener通過NIO機制進行端口監聽,客戶端API連接服務端指定端口將會被監聽。

  • Listener對於API請求的接收:

    void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
?
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
......
// 當一個API請求過來時候將會打開一個Channel,Listener將會分配一個Reader註冊。
// reader實例個數有限,采取順序分配和復用,即一個reader可能為多個Channel服務。
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
// 同時也將保存這個Channel,用於後續的結果返回等
c = getConnection(channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
......
}
}

上述中Reader個數是有限的並且可以順序復用的,個數可以通過如下參數進行設定,默認為10個。

this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);

當生產能力不足時,可以考慮增加此配置值。

  • Reader讀取請求並包裝請求

    當Reader實例被分配到一個Channel後,它將讀取此通道過來的請求,並包裝成CallRunner用於調度。

        void doRead(SelectionKey key) throws InterruptedException {
    ......
    try {
    // 此時將調用connection的讀取和處理方法
    count = c.readAndProcess();
    ......
    }
    }
        public int readAndProcess() throws IOException, InterruptedException {
    ......
    // 通過connectionPreambleRead標記為判斷此鏈接是否為新連接,如果是新的那麽需要讀取
    // 頭部報文信息,用於判斷當前鏈接屬性,比如是當前采取的是哪種安全模式?
    if (!connectionPreambleRead) {
    count = readPreamble();
    if (!connectionPreambleRead) {
    return count;
    }
    ......
    ?
    count = channelRead(channel, data);
    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
    // 實際處理請求,裏面也會根據鏈接的頭報文讀取時候判斷出的兩種模式進行不同的處理。
    process();
    }
    ?
    return count;
    }
        private void process() throws IOException, InterruptedException {
    ......
    if (useSasl) {
    // Kerberos安全模式
    saslReadAndProcess(data.array());
    } else {
    // AuthMethod.SIMPLE模式
    processOneRpc(data.array());
    }
    .......
    }

    如下以AuthMethod.SIMPLE模式為例進行分析:

        private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
    if (connectionHeaderRead) {
    // 處理具體請求
    processRequest(buf);
    } else {
    // 再次判斷鏈接Header是否讀取,未讀取則取出頭報文用以確定請求的服務和方法等。
    processConnectionHeader(buf);
    this.connectionHeaderRead = true;
    if (!authorizeConnection()) {
    throw new AccessDeniedException("Connection from " + this + " for service "
    connectionHeader.getServiceName() + " is unauthorized for user: " + user);
    }
    }
    }
      protected void processRequest(byte[] buf) throws IOException, InterruptedException {
    long totalRequestSize = buf.length;
    ......
    // 這裏將會判斷RpcServer做接收到的請求是否超過了maxQueueSize,註意這個值為
    // RpcServer級別的變量
    if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
    final Call callTooBig =
    new Call(id, this.service, null, null, null, null, this,
    responder, totalRequestSize, null);
    ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
    setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
    "Call queue is full on " + getListenerAddress() +
    ", is hbase.ipc.server.max.callqueue.size too small?");
    responder.doRespond(callTooBig);
    return;
    }
    ......
    Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
    totalRequestSize,
    traceInfo);
    // 此時請求段處理結束,將請求包裝成CallRunner後發送到不同的Executer的隊列中去。
    scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
    }

    註意這個值為 RpcServer級別的變量,默認值為1G,超過此閾值將會出現Call queue is full錯誤。

    callQueueSize的大小會在請求接收的時候增加,在請求處理結束(調用完畢CallRunner的run方法後)減去相應值。

    this.maxQueueSize =this.conf.getInt("hbase.ipc.server.max.callqueue.size",DEFAULT_MAX_CALLQUEUE_SIZE);

2.3 請求轉發與調度

客戶端請求在經過接收和包裝為CallRunner後將會被具體的Scheduler進行dispatch,master和regionserver

調度器並不相同,這裏以regionserver的調度器進行講解。具體為:SimpleRpcScheduler。

  public RSRpcServices(HRegionServer rs) throws IOException {
......
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());

  • 請求轉發

    前面已經提到請求包裝完CallRunner後由具體的RpcScheduler實現類的dispacth方法進行轉發。

    具體代碼為:

      @Override
    public void dispatch(CallRunner callTask) throws InterruptedException {
    RpcServer.Call call = callTask.getCall();
    // 取得優先級,一般也是根據請求的內容事先定義好的一些操作作為高優先級
    int level = priority.getPriority(call.getHeader(), call.param);
    if (priorityExecutor != null && level > highPriorityLevel) {
    // 高優先級則進入高優先級執行器內
    priorityExecutor.dispatch(callTask);
    } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
    // replication級別的進入相應的replication執行器內
    replicationExecutor.dispatch(callTask);
    } else {
    // 其他的一般請求為一般執行器內,大部分的請求都將落入此執行器
    callExecutor.dispatch(callTask);
    }
    }
  • 執行器介紹-隊列初始化

    在此調度器中共分為三個級別的調度執行器:

    1. 高優先請求級執行器

    2. 一般請求執行器

    3. replication請求執行器

        private final RpcExecutor callExecutor;
      private final RpcExecutor priorityExecutor;
      private final RpcExecutor replicationExecutor;

    上述中callExecutor為最主要一般請求執行器,在當前版本中此執行器中可以將讀取和寫入初始化為不同比例的隊列,並將handler也分成不同比例進行隊列的綁定。即一個隊列上面只有被綁定的handler具體處理權限。默認的不劃分讀寫分離的場景下就只有一個隊列,所有請求都進入其中,所有的handler也將去處理這個隊列。

    具體我們以讀寫分離隊列為例進行代碼分析:

    float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
    int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
    ?
    LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
    ?
    if (numCallQueues > 1 && callqReadShare > 0) {
    // multiple read/write queues
    if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
    CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
    // 實例化RW讀取執行器,構造參數中的為讀寫比例,其中讀取又分為一般讀取和scan讀取比例
    // 後續將會調用重載的其他構造方法,最終將會計算出各個讀取隊列的個數和handler的比例數
    callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
    callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
    BoundedPriorityBlockingQueue.class, callPriority);
    } else {

如下為最終調用的重載構造方法:

    public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
int numWriteQueues, int numReadQueues, float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));

int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
if ((numReadQueues - numScanQueues) > 0) {
numReadQueues -= numScanQueues;
readHandlers -= scanHandlers;
} else {
numScanQueues = 0;
scanHandlers = 0;
}
// 確定各個主要隊列參數
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = getBalancer(numScanQueues);

queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
// 初始化隊列列表,註意queues為有序列表,如下隊列位置初始化後不會變動,在後續按照具體的請求
// 通過具體的getBalancer方法進行查找
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}

for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
}
  • 執行器介紹--handler綁定

    當請求被分類放入不同的執行器隊列後,將有此隊列上被綁定的handler進行處理,handler是請求的消費者。

    如下為RWQueueRpcExecutor類中handler綁定邏輯:

      @Override
    protected void startHandlers(final int port) {
    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
    startHandlers(".scan", scanHandlersCount, queues,
    numWriteQueues + numReadQueues, numScanQueues, port);
    }

    具體startHandlers方法,此方法中將根據參數指定的index和size進行綁定:

      protected void startHandlers(final String nameSuffix, final int numHandlers,
    final List<BlockingQueue<CallRunner>> callQueues,
    final int qindex, final int qsize, final int port) {
    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
    for (int i = 0; i < numHandlers; i++) {
    final int index = qindex + (i % qsize);
    Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
    // 值處理指定隊列的請求
    consumerLoop(callQueues.get(index));
    }
    });
    t.setDaemon(true);
    t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
    ",queue=" + index + ",port=" + port);
    t.start();
    LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
    handlers.add(t);
    }
    }
  • 執行器介紹--handler消費

    handler的消費很簡單,不斷的讀取指定隊列的CallRunner實例,並執行CallRunner實例的run方法。

      protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
    .......
    while (running) {
    try {
    // 請求取得
    CallRunner task = myQueue.take();
    try {
    activeHandlerCount.incrementAndGet();
    // 指定callrunner的run方法
    task.run();
    .......
    }

    接著看一下CallRunner的run方法:

      public void run() {
    .......
    // 執行具體操作
    // make the call
    resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
    .......
    // Set the response for undelayed calls and delayed calls with
    // undelayed responses.
    // 將response放入實例中
    if (!call.isDelayed() || !call.isReturnValueDelayed()) {
    Message param = resultPair != null ? resultPair.getFirst() : null;
    CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
    call.setResponse(param, cells, errorThrowable, error);
    }
    ........
    // call中有connection的句柄,將response放入具體connection的返回隊列中
    call.sendResponseIfReady();
    .....

call中有connection的句柄,將response放入具體connection的返回隊列中

  // If there is already a write in progress, we don‘t wait. This allows to free the handlers
// immediately for other tasks.
if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
try {
if (call.connection.responseQueue.isEmpty()) {
// If we‘re alone, we can try to do a direct call to the socket. It‘s
// an optimisation to save on context switches and data transfer between cores..
if (processResponse(call)) {
return; // we‘re done.
}
// Too big to fit, putting ahead.
call.connection.responseQueue.addFirst(call);
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
call.connection.responseWriteLock.unlock();
}
}
?
if (!added) {
call.connection.responseQueue.addLast(call);
}
call.responder.registerForWrite(call.connection);
?
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
?

2.4 Response返回

CallRunner的run方法將會具體執行請求操作,並將response放入Responder實例的對應的connection的返回隊列中用於後續返回

具體為Responder實例也是一個線程實例,它的run方法最終執行如下代碼:

 private void doAsyncWrite(SelectionKey key) throws IOException {
Connection connection = (Connection) key.attachment();
if (connection == null) {
throw new IOException("doAsyncWrite: no connection");
}
if (key.channel() != connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}
?
if (processAllResponses(connection)) {
try {
// We wrote everything, so we don‘t need to be told when the socket is ready for
// write anymore.
key.interestOps(0);
} catch (CancelledKeyException e) {
/* The Listener/reader might have closed the socket.
* We don‘t explicitly cancel the key, so not sure if this will
* ever fire.
* This warning could be removed.
*/
LOG.warn("Exception while changing ops : " + e);
}
}
}
?
/**

3. 結束語


上述介紹服務端HRegionserver端的RPC接受與處理的過程,粗粒度的介紹了代碼的結構,希望後續遇到這方面的問題時能夠幫助進行代碼級別的問題定位和解決。

[HBase] 服務端RPC機制及代碼梳理