1. 程式人生 > >hbase客戶端原始碼分析--put流程

hbase客戶端原始碼分析--put流程

—client 的呼叫流程

table.put(put); 操作

HTable table = new HTable(conf, Bytes.toBytes(tableName));

呼叫流程如上面的delete流程一樣
首先建立一個muti的操作物件

new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);

然後呼叫

BufferedMutatorImpl.mutate(Mutation m)

在建立 BufferedMutatorImpl 物件的時候,在低層有非同步建立

ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);

實時非同步批量的操作提交

AsyncProcess.submit

判斷提交的那一個row物件是在那個region當中
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);

該過程和之前的delete的過程中查詢row的過程一樣,先到zk中拿到meta然後在meta 的regionserver中掃描對應的行在那個regionserver當中

    submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
  locationErrors, locationErrorRows, actionsByServer, pool)

如下程式碼又建立一個非同步的提交物件

        <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
  List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
  Object[] results, boolean needResults, List<Exception> locationErrors,
  List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
  ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
  tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any
if (locationErrors != null) {
  for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
  Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
  }
}
ars.sendMultiAction(actionsByServer, 1, null, false);
return ars;
  }

然後根據傳送到不同的regionser進行起動多個執行緒進行傳送,

      for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
    ServerName server = e.getKey();
    MultiAction<Row> multiAction = e.getValue();
    incTaskCounters(multiAction.getRegions(), server);
    Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
        numAttempt);

對每個region建立對應的執行緒

 Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
callsInProgress);

進行非同步傳送過去。線上程中建立

new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi)

然後在callable物件中建立proto物件,組裝資料,傳送過去

         for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
  final byte [] regionName = e.getKey();
  final List<Action<R>> actions = e.getValue();
  regionActionBuilder.clear();
  regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );


  if (this.cellBlock) {
// Presize.  Presume at least a KV per Action.  There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
  regionActionBuilder, actionBuilder, mutationBuilder);
  } else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
  regionActionBuilder, actionBuilder, mutationBuilder);
  }
  multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}

// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
try {
  responseProto = getStub().multi(controller, requestProto);
} catch (ServiceException e) {
  throw ProtobufUtil.getRemoteException(e);
}
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
  }