無鏡--kafka之消費者(三)
消費者輪詢通過拉取器(Fetcher)傳送拉取請求,拉取器會呼叫消費者網路客戶端的傳送方法(send)和網路輪詢方法(poll)。在拉取器的層面拉取請求是沒有真正傳送到服務端的。傳送方法只是把請求存在到變數中,真正傳送到服務端是呼叫了消費者網路客戶端物件的網路輪詢方法(poll)。
消費者網路客戶端:ConsumerNetworkClient,對NetworkClient的一層封裝。
消費者網路客戶端傳送方法
// 傳送請求,只是把請求暫時存放在unsent變數中 private RequestFuture<ClientResponse> send(Node node, ApiKeys api, short version, AbstractRequest request) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); RequestHeader header = client.nextRequestHeader(api, version); RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); put(node, new ClientRequest(now, true, send, completionHandler)); // 沒有真正傳送 client.wakeup(); return completionHandler.future; } private void put(Node node, ClientRequest request) { synchronized (this) { List<ClientRequest> nodeUnsent = unsent.get(node); if (nodeUnsent == null) { nodeUnsent = new ArrayList<>(); unsent.put(node, nodeUnsent); } nodeUnsent.add(request); } }
儲存到變數unsent(Map<Node,List<ClientRequest>>)中的物件是ClientRequest。
ClientRequest相關類圖:

請求物件相關的類
消費者網路客戶端非同步傳送請求涉及的相關類說明:
1,非同步請求完成處理器(RequestFutureCompletionHandler)
2,非同步請求(RequestFuture):客戶端呼叫傳送請求的返回值。當非同步請求完成時,可以獲取非同步請求的結果。
3,非同步請求監聽器(RequestFutureListener):非同步請求新增監聽器,當非同步請求有結果時,呼叫監聽器的onSuccess方法。
以拉取器傳送拉取請求為例,看看監聽器的使用方式:
1,消費者客戶端呼叫ConsumerNetworkClient傳送拉取請求返回非同步請求物件(RequestFuture)。同時也建立了非同步請求完成處理器(RequestFutureCompletionHandler),RequestFutureCompletionHandler例項持有RequestFuture例項,返回其實也是返回的RequestFutureCompletionHandler持有的RequestFuture例項。
2,在返回的非同步請求物件上新增非同步請求監聽器,監聽器會處理拉取的到結果。
3,客戶端輪詢,在收到拉取請求結果後,呼叫回撥處理器的onComplete方法(RequestFutureCompletionHandler.onComplete(ClientResponse))。
4,觸發監聽器onSuccess方法(RequestFutureListener.onSuccess(ClientResponse))
呼叫傳送方法(send)返回的是一個非同步請求物件:RequestFuture。RequestFuture封裝了請求相關的資訊,表示非同步請求的結果。傳送方法中還會建立一個非同步請求的完成處理器(RequestFutureCompletionHandler:當請求被伺服器處理完成,並返回響應結果給客戶端,客戶端會根據響應結果執行具體邏輯)
網路客戶端輪詢到拉取請求結果後的處理流程圖:

網路客戶端輪詢到拉取請求結果後的處理流程
網路客戶端輪詢到拉取請求結果後的處理時序圖:

網路客戶端輪詢到拉取請求結果後的處理時序圖
組合加介面卡模式(compose+adapter)
非同步請求(RequestFuture)為客戶端提供了呼叫自定義業務處理邏輯的的入口,除了新增監聽器的方式(類似拉取請求),還可以使用組合加介面卡模式:監聽器+介面卡(對客戶端響應結果做進行解析成客戶想要的資料格式)。所以一句話概括 組合加介面卡:監聽器+轉換響應結果成客戶想要的資料格式。組合表示組裝一個監聽器,介面卡表示對客戶端響應結果進行適配。
普通模式
虛擬碼(傳送拉取請求):為非同步請求新增監聽器,當請求完成時,會呼叫監聽器的回撥方法會對響應進行處理。
client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener() { // 監聽器 public void onSuccess(ClientResponse resp) { } }
組合加介面卡模式
虛擬碼(傳送列舉偏移量請求給分割槽的主節點):介面卡中對響應結果進行適配。在組合方法(RequestFuture.compose())中,建立一個新的非同步請求物件S,在舊的非同步請求物件T(呼叫compose方法的非同步請求物件)新增監聽器,並傳遞新的非同步請求物件S到介面卡的onSuccess方法中。在介面卡的onSuccess方法中進行訊息的轉換。
client.send(node, ApiKeys.LIST_OFFSETS, request) .compose(new RequestFutureAdapter>() { // 介面卡 public void onSuccess(ClientResponse response, RequestFuture> future) { future.complete(); // 新建的非同步請求的complete方法 觸發呼叫新建的非同步請求的監聽器的onSuccess方法 } }); public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { // 在非同步請求T裡,新建了一個非同步請求S final RequestFuture<S> adapted = new RequestFuture<>(); addListener(new RequestFutureListener<T>() { // 為T舊的非同步請求新增監聽器 // 客戶端輪詢到結果時,會呼叫監聽器的回撥方法 public void onSuccess(T value) { adapter.onSuccess(value, adapted); // 呼叫介面卡的回撥方法 進行訊息轉換 } public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); return adapted; // 返回新建的非同步請求物件S }
組合+介面卡模式的呼叫流程(列舉偏移量請求)

組合+介面卡模式的呼叫流程(列舉偏移量請求)
普通模式和組合加介面卡模式的區別:
普通模式使用監聽器,並將非同步請求的結果穿給監聽器的回撥方法。組合模式的監聽器,對非同步請求的結果在介面卡中做一次轉換。
ConsumerNetworkClient.polll(RequestFuture)和ConsumerNetworkClient.poll(timeout)
ConsumerNetworkClient.polll(RequestFuture):必須等到非同步請求完成才會結束,在輪詢結束後可以獲取非同步請求的結果。
ConsumerNetworkClient.poll(timeout):不管非同步請求有沒有完成,都會在給定的超時時間內返回。這時在輪詢完成後獲取非同步請求的結果不一定有結果。
非同步請求的鏈式模式
將另一個非同步請求連結起來。
鏈式呼叫和組合模式的區別
1,組合模式和連結模式都會為當前非同步請求新增一個監聽器。
2,組合模式會建立一個新的非同步請求,連結模式則傳入一個已有的非同步請求。
3,組合模式返回新非同步請求,連結模式返回當前非同步請求,不是返回傳入的非同步請求。
// 鏈路模式 @param future 已有的非同步請求 public void chain(final RequestFuture future) { addListener(new RequestFutureListener() { public void onSuccess(T value) { // value是當前非同步請求的結果 // 用當前非同步請求的結果作為傳入的非同步請求的的結果 future.complete(value); // 呼叫非同步請求的完成方法 } public void onFailure(RuntimeException e) { future.raise(e); } }); // 沒有返回值,所以呼叫方還是使用的當前的非同步請求物件,而不是傳入的的非同步請求 }
組合模式會呼叫新非同步請求的complete方法。鏈式模式會呼叫傳入已有非同步請求的complete方法。
組合模式和鏈式模式的非同步請求呼叫流程區別
組合模式:在當前非同步請求完成時,呼叫當前非同步請求的監聽器回撥,在監聽器回撥中將新非同步物件傳給介面卡的回撥方法。在介面卡的回撥方法中會呼叫新非同步請求的complete方法,完成新非同步請求。即組合模式返回的非同步請求(新的)
鏈式模式:在當前非同步請求完成時,呼叫當前非同步請求的監聽器回撥,在監聽器回撥中呼叫傳入的非同步請求的complete方法,完成傳入的非同步請求。沒有返回值。
鏈式模式運用
消費者加入消費組就是運用了監聽器 + 組合模式 + 鏈式模式,保證業務邏輯的準確和非同步請求的呼叫順序。
消費者加入消費組:加入消費組必須完成同步組,加入消費組請求必須比同步組請求先發送,同步組的非同步請求完成後加入組的非同步請求才能完成。
消費者加入消費組步驟:
1,客戶端傳送加入組請求JoinGroup,採用組合模式返回加入組的非同步請求。
2,在加入組的介面卡處理中,傳送同步組請求,採用組合模式返回同步組的非同步請求。
3,將同步組的非同步請求使用鏈式模式連結上加入組的非同步請求,為同步組的非同步請求新增一個監聽器。
4,當同步組請求收到客戶端響應結果,完成同步組的非同步請求。
5,呼叫同步組非同步請求的監聽器回撥方法,完成加入組的非同步請求。
6,加入組請求完成,獲取加入組非同步請求的結果。
虛擬碼:
第一段:
RequestFuture joinFuture = sendJoinGroupRequest();
client.poll(joinFuture);
ByteBuffer byteBuffer = future.value(); // 第六步:獲取JoinGroup非同步請求的結果
第二段:
// 採用組合模式傳送加入組請求
private RequestFuture sendJoinGroupRequest() {
return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); // 第一步:傳送JoinGroup請求
}
第三段:
private class JoinGroupResponseHandler extends RequestFutureAdapter {
public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:JoinGroup的非同步請求
SyncGroupRequest syncGroupRequest = new SyncGroupRequest();
RequestFuture syncFuture = sendSyncGroupRequest(syncGroupRequest); // 傳送同步組請求
syncFuture.chain(joinFuture); // 第三步:將JoinGroup非同步請求連結到SyncGroup非同步請求
}
}
第四段:
// 採用組合模式傳送同步組請求
private RequestFuture sendSyncGroupRequest(SyncGroupRequest request) {
return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); // 第二步:傳送SyncGroup請求
}
第五段:
private class SyncGroupResponseHandler extends RequestFutureAdapter {
public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:SyncGroup的非同步請求
future.complete(future) // 第四步:完成SyncGroup的非同步請求
}
}
第六段:
public void chain(final RequestFuture future) {
addListener(new RequestFutureListener() {
public void onSuccess(T value) {
future.complete(value); // 第五步:完成JoinGroup非同步請求
}
以上重點分析了非同步請求的相關呼叫流程,後面將重點分析消費者網路客戶端的輪詢。
參考資料:
Kafka技術內幕:圖文詳解Kafka原始碼設計與實現