無鏡--kafka之生產者(三)
話說上回中,KafkaProducer已經將生產的記錄追加到了RecordAccumulator中。那麼接下來的事情,就是怎麼樣把這些記錄提交到服務端了。
是否還記得在KafkaProducer.doSend方法一下程式碼段:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); // 5.如果 batch已經滿了或者是新建立的Batch,喚醒 sender執行緒傳送資料 if (result.batchIsFull || result.newBatchCreated) { this.sender.wakeup(); }
在一個RecordBatch已經滿了或是新建立了一個RecordBatch(之所以新建是因為舊的放不下訊息了。因此意味著舊的就可以傳送了)。就喚醒傳送執行緒,準備提交記錄到服務端。
this.sender.wakeup(); // 將Sender執行緒從阻塞中喚醒
Sender
實現Runnable介面的物件。一個KafkaProducer持有一個Sender例項。Sender執行緒迭代RecordAccumulator中batches變數的每個分割槽(tp),獲取分割槽對應的主副本節點,然後取出分割槽對應佇列中的RecordBatch,提交到服務端。(追加訊息到記錄收集器中(RecordAccumulator)都是按照分割槽分好組了,所以每個分割槽佇列都是儲存的即將傳送到這個分割槽主副本對應的節點上的記錄)。
Sender.run
void run(long now) { // 從元資料物件中獲取叢集資訊 Cluster cluster = metadata.fetch(); // 遍歷所有的topic-partition,如果其對應的RecordBatch可以傳送(大小達到 batch.size或時間達到 linger.ms) 就取出其對應的leader。 // 返回ReadyCheckResult例項,包含:可以傳送的RecordBatch對應的節點(leader)等資訊 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 如果有topic-partition的leader是未知的,就強制metadata更新 if (!result.unknownLeaderTopics.isEmpty()) { // ready()方法中遇到沒有leader的tp就將其加入ReadyCheckResult.unknownLeaderTopics的set集合中 // 然後會去請求這些tp的的meta for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 如果與node沒有連線(如果允許連線但還沒連線,就初始化連線),就證明該node暫時不能接收資料,暫時移除該 node // 建立到主節點的網路連線,移除還沒有準備好的節點(leader還沒有選擇出來的節點) Iterator iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // 返回該 node 對應的所有可以傳送的 RecordBatch 組成的 batches(key 是 node.id),並將 RecordBatch 從對應的 queue 中移除 // 讀取記錄收集器,返回組合好的在同一個節點上的所有主副本對應的分割槽的RecordBatch // Map<nodeID,要準備傳送到該節點的所有RecordBatch(包括不同的分割槽)> Map<Integer,List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now); if (guaranteeMessageOrder) { // 記錄將要傳送的 topicPartition for (List batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 將由於元資料不可用等情況而導致不能傳送的 RecordBatch移除 List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); // 構建以節點為級別的生產請求列表,既每個節點只有一個客戶端請求 // 減少客戶端到服務端的請求次數 List requests = createProduceRequests(batches, now); long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // 儲存要傳送的客戶端請求,這裡沒有真正的傳送 // 執行真正的網路讀寫請求,將上面的客戶端請求真正傳送出去 this.client.poll(pollTimeout, now); }
在傳送執行緒傳送訊息時,記錄收集器會按照節點維度將RecordBatch重新組裝(Map<nodeID,要準備傳送到該節點的所有RecordBatch>),返回給傳送執行緒,再由傳送執行緒為每一個節點建立一個客戶端請求。
細看一下run中的方法:
RecordAccumulator.ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque deque = entry.getValue(); Node leader = cluster.leaderFor(part); // 查詢tp的leader對應的節點資訊 synchronized (deque) { if (leader == null && !deque.isEmpty()) { unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !muted.contains(part)) { // 如果 muted 集合包含這個 tp,那麼在遍歷時將不會處理它對應的 deque, // 也就是說,如果一個 tp在muted集合中,說明它還有RecordBatch正在處理中(沒有收到響應) // 那麼即使它對應的RecordBatch可以傳送了,也不會處理 RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); // batch滿了 boolean expired = waitedTimeMs >= timeToWaitMs; // batch超時 boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); // 將可以傳送的leader新增到集合中 } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
ready方法返回的ReadyCheckResult物件包括:可以傳送的RecordBatch對應的節點(leader)資訊,下一次就緒檢查點的時間,分割槽的leader未知的topic資訊。發現有分割槽的leader未知的topic資訊那麼就會去強制更新元資料裡面的叢集資訊。
RecordAccumulator.drain
public Map<Integer,List<RecordBatch>> drain(Cluster cluster, Set nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer,List<RecordBatch>> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<RecordBatch> ready = new ArrayList<>(); int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); if (!muted.contains(tp)) { // 被 mute 的 tp 依然不會被遍歷 Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { // tp有對應的佇列有資料,會選擇出來,加上已經被選擇出來的RecordBatch,直到達到最大的請求長度,才停止 // 這樣一個RecordBatch及時沒有達到傳送條件(沒有裝滿),為了保證每個請求儘可能多的傳送資料,也會被髮送出去。 synchronized (deque) { RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { break; } else { RecordBatch batch = deque.pollFirst(); batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
drain方法,在max.request.size的範圍內傳送儘可能多的RecordBatch。並且重新按照節點維度重新整合記錄。
在記錄收集器中的儲存資料格式為:batches-->Map<TopicPartition,Deque<RecordBatch>>。傳送執行緒獲取資料時記錄收集器返回的資料格式為:batches-->Map<nodeId,List<RecordBatch>>
記錄收集器(RecordAccumulator),傳送執行緒(Sender),服務端(Broker)
參考了《Kafka技術內幕:圖文詳解Kafka原始碼設計與實現》中的圖

wakeup方法把傳送執行緒喚醒,但是Sender並不負責真正傳送客戶端請求到服務端,它做的事情只是從記錄收集器(RecordAccumulator)中,取出可以傳送的記錄,封裝成Map<nodeId,List<RecordBatch>>結構,建立好客戶端請求,然後把請求交給NetworkClient(客戶端網路物件)去傳送。
NetworkClient
Kafka客戶端傳送是基於IO/">NIO構建自己的通訊層NetworkClient。它管理了客戶端和服務端的網路通訊。

以上是NetworkClient關於通訊層方面的生態類。
NetworkClient重要的幾個方法:
ready(): 連線所有可以連線的節點。如果伺服器不能連線,就把節點移除掉。
send():在當前節點可以傳送新的請求的情況下(這裡的可以發是在能正常連線的情況下,同一個節點,一個客戶端請求還沒有完成時,就不能傳送新的客戶端請求), 把Sender傳送執行緒建立的客戶端請求,存到節點對應的通道中(KafkaChannel),並快取到“沒有收到響應的佇列”中(InFlightRequests)。
poll(): 輪詢,真正的執行網路請求,傳送請求到節點,讀取響應。此方法中要呼叫org.apache.kafka.common.network.Selector.poll()方法。在一次poll之後會對這次poll資料進行相關的處理:
1,處理已經完成的Send,包括那些傳送完成後不需要響應的Send-->handleCompletedSends。
2,處理從服務端接收到響應-->handleCompletedReceives。
3,處理連線失敗那些連線-->handleDisconnections。
4,處理新建立的那些連線-->handleConnections。
5,超時的請求-->handleTimedOutRequests。
6,呼叫請求的回撥函式。
Selector(org.apache.kafka.common.network)
來回顧一下java NIO中的一些概念:以下描述 參考:《Kafka技術內幕:圖文詳解Kafka原始碼設計與實現》
SocketChannel:客戶端網路連線通道,底層的位元組資料讀寫都發生在通道上(從通道中讀取資料,將資料寫入通道中),通道會和位元組緩衝區一起使用,從通道中讀取資料時需要構造一個緩衝區,呼叫channel.read(buffer)就會將通道中的資料新增到緩衝區中。將資料寫入通道時。要先將資料寫到快取區中,呼叫channel.write(buffer)將緩衝區中的每個位元組寫入到通道中。
Selector:發生在通道上的事件有讀和寫,選擇器會通過選擇鍵的方式監聽讀寫事件的發生。
SelectionKey:將通道註冊到選擇器上:channel.register(selector)會返回選擇鍵。選擇鍵將通道和選擇器關聯起來。讀和寫事件發生時,通過選擇鍵可以得到對應的通道,從而在通道上進行讀寫操作。
Sender,NetworkClient,Selector

KafkaChannel
id:NodeId
TransportLayer:負責位元組操作的傳輸層,KafkaChannel要操作SockerChannel時,都交給TransportLayer傳輸層去做。
NetworkReceive:接收的資料。
Send:傳送的請求資料,一個KafkaChannel一次只存放一個請求資料。等著資料傳送完成後,才能傳送下一個請求資料。
TransportLayer
傳輸層對SockerChannel做了簡單的封裝(都實現了ScatteringByteChannel和GatheringByteChannel介面),選擇器Selector在呼叫KafkaChannel.write和read方法時,實際是呼叫Send.writeTo和NetworkReceive.readFrom,再呼叫底層SockerChannel.write和read方法

Selector輪詢
選擇器監聽到了客戶端的讀寫事件,會獲取繫結到選擇鍵(SelectionKey)上的KafkaChannel,KafkaChannel會將讀寫操作交給傳輸層(TransportLayer),TransportLayer再使用底層的SocketChannel完成資料的操作。
NetworkClient.ready
在確認節點是否可以傳送的時,允許連線但是沒有連線的情況下會初始化連線,呼叫org.apache.kafka.common.network.Selector.connect,連線動作使用java原生的SocketChannel完成。在此方法中會 構建KafkaChannel,讓KafkaChannel和SelectionKey關聯起來 。還維護了節點和KafkaChannel的對映關係(<nodeId,KafkaChannel>)。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); boolean connected = socketChannel.connect(address); // 發起連線請求 SelectionKey key = socketChannel.register(java.nio.channels.Selector, SelectionKey.OP_CONNECT); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); // 構建KafkaChannel物件 key.attach(channel); // 將KafkaChannel註冊到選擇鍵上 this.channels.put(id, channel); // 節點和KafkaChannel的對映關係
NetworkClient.send
客戶端傳送的ClientRequest請求,經過NetworkClient.send()--->org.apache.kafka.common.network.Selector.send()--->KafkaChannel.setSend()。儲存到對應的KafkaChannel中,但在KafkaChannel還有未傳送成功的Send請求,則後面的請求則不能傳送(在一個KafkaChannel中,一次只能傳送一個客戶端請求)。

KafkaChannel一次只處理一個Send,每次都會註冊寫事件,當Send傳送成功後,就登出寫事件。這裡的傳送完成是整個Send請求傳送完成,如果呼叫一次底層的write方法沒有完成寫完,那麼寫事件不會被登出,會繼續監聽寫事件,直到整個Send請求傳送完成。
註冊寫事件,當Selector輪詢後,寫事件準備就緒,就會從KafkaChannel取出客戶端請求,呼叫底層的write方法進行傳送。
NetworkClient.poll
NetworkClient的輪詢會呼叫Selector.poll(),在選擇鍵上處理讀寫事件,當事件發生時,呼叫KafkaChannel上的read和write會得到返回值NetworkReceive和Send物件,加入到List<Send>:completedSends(傳送完成的客戶端請求物件集合)和Map<KafkaChannel,Deque<NetworkReceive>>:stagedReceives(完全接收完服務端響應儲存到KafkaChannel對應的佇列中)。最後這些集合中的資料服務於poll方法後續的handle開頭的方法中。
private void pollSelectionKeys(Iterable selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); // 獲取繫結到選擇鍵中的KafkaChannel sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { // 處理一些剛建立 tcp 連線的 channel if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 連線已經建立 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); } else continue; } if (channel.isConnected() && !channel.ready()) channel.prepare(); // 在讀取一個響應的時候,可能會呼叫很多次的read,所以需要迴圈讀取 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) // 迴圈接收,直到讀取到一個完整的 Receive,才退出迴圈 addToStagedReceives(channel, networkReceive); // 讀取完成後將響應資料新增到集合中 } // 底層傳送的時候,並不定一次可以完全傳送,所以會呼叫很多次的write,才會完成一個Send的傳送 // write是非阻塞的,不會等到全部發送才返回 // 所以在沒有全部發送的時候,不會登出寫事件 //在epoll的預設模式下(LT(水平觸發)):寫緩衝區只要不滿,就一直會觸發寫事件。所以只要不登出寫事件,那麼就會觸發寫事件,直到把一個完整的Send傳送完成 // 在LT模式下,寫緩衝區為滿的概率很小,所以寫完Send後,要登出寫事件,否則會出現一直觸發寫事件 if (channel.ready() && key.isWritable()) { Send send = channel.write(); // send不為空,表示完全傳送出去了,返回此send物件,如果沒有完全傳送出去,就返回NULL if (send != null) { this.completedSends.add(send); // 將完成的 send新增到list中 this.sensors.recordBytesSent(channel.id(), send.size()); } } if (!key.isValid()) { // 關閉斷開的連線 close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); close(channel); this.disconnected.add(channel.id()); } } }
以上就是生產者的產出客戶端請求通過Sender-->NetworkClient-->Selector-->KafkaChannel-->Send/NetworkReceive-->TransportLayer-->SocketChannel。這個鏈條進行傳送和訊息接收。