1. 程式人生 > >Kafka原始碼深度解析-序列4 -Producer -network層核心原理

Kafka原始碼深度解析-序列4 -Producer -network層核心原理

在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基於NIO構建自己的network層。

network層的分層架構

下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關係:
圖中淡紫色的方框表示介面或者抽象類,白色方框是具體實現。

整個架構圖也體現了“面向介面程式設計”的思想:最底層Java NIO往上層全部以介面形式暴露,上面的3層,也都定義了相應的介面,逐層往上暴露。

介面的例項化(包括KafkaClient, Selectable, ChannelBuilder),也都在最外層的容器類KafkaProducer的建構函式中完成,KafkaProducer也就充當了一個“工廠”的角色,裝配所有這些底層元件。
這裡寫圖片描述

network層元件與NIO元件的對映關係

從上圖也可以看出:
KakfaChannel基本是對SocketChannel的封裝,只是這個中間多個一個間接層:TransportLayer,為了封裝普通和加密的Channel;

Send/NetworkReceive是對ByteBuffer的封裝,表示一次請求的資料包;

Kafka的Selector封裝了NIO的Selector,內含一個NIO Selector物件。

Kafka Selector實現思路

1.從上圖可以看出, Selector內部包含一個Map, 也就是它維護了所有連線的連線池。這些KafkaChannel都由ChannelBuilder介面建立。

    private final Map<String, KafkaChannel> channels;

2.所有的io操作:connect, read, write其實都是在poll這1個函式裡面完成的。具體什麼意思呢?

NetworkClient的send()函式,呼叫了selector.send(Send send), 但這個時候資料並沒有真的傳送出去,只是暫存在了selector內部相對應的channel裡面。下面看程式碼:

//Selector
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());  //找到資料包相對應的connection
try { channel.setSend(send); //暫存在這個connection(channel)裡面 } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } } //KafkaChannel public void setSend(Send send) { if (this.send != null) //關鍵點:當前的沒有發出去之前,不能暫存下1個!!!關於這個,後面還要詳細分析 throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; //暫存這個資料包 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } public class KafkaChannel { private final String id; private final TransportLayer transportLayer; private final Authenticator authenticator; private final int maxReceiveSize; private NetworkReceive receive; private Send send; //關鍵點:1個channel一次只能存放1個數據包,在當前的send資料包沒有完整發出去之前,不能存放下一個 ... }

暫存在channel中之後,poll函式進行處理,我們抽象出一個輸入-輸出模型如下:
輸入:暫存的send資料包
輸出:完成的sends, 完成的receive(針對上1次的send), 建立的連線, 斷掉的連線。

這裡寫圖片描述

    @Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
        clear();  //關鍵點:每次poll之前,會清空“輸出”
        if (hasStagedReceives())
            timeout = 0;
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                KafkaChannel channel = channel(key);

                // register all per-connection metrics at once
                sensors.maybeRegisterConnectionMetrics(channel.id());
                lruConnections.put(channel.id(), currentTimeNanos);

                try {
                    /* complete any connections that have finished their handshake */
                    if (key.isConnectable()) {
                        channel.finishConnect();    //把建立的連線,加入輸出結果集合
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    }

                    ...

                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null)
                            addToStagedReceives(channel, networkReceive);
                    }

                    if (channel.ready() && key.isWritable()) {
                        Send send = channel.write();
                        if (send != null) {
                            this.completedSends.add(send);  //把完成的傳送,加入輸出結果集合
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }

                    if (!key.isValid()) {
                        close(channel);
                        this.disconnected.add(channel.id());  //把斷掉的連線,加入輸出結果集合
                    }
                } catch (Exception e) {
                    String desc = channel.socketDescription();
                    if (e instanceof IOException)
                        log.debug("Connection with {} disconnected", desc, e);
                    else
                        log.warn("Unexpected error from {}; closing connection", desc, e);
                    close(channel);
                    this.disconnected.add(channel.id()); //把斷掉的連線,加入輸出結果集合
                }
            }
        }

        addToCompletedReceives(); //把完成的接收,加入輸出結果集合

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }

核心原理之1 – 訊息的分包

在上面的程式碼中,為什麼會有addToStagedReceives? 什麼叫做staged receives呢? 這叫要從資料的分包說起:

在NetworkClient中,往下傳的是一個完整的ClientRequest,進到Selector,暫存到channel中的,也是一個完整的Send物件(1個數據包)。但這個Send物件,交由底層的channel.write(Bytebuffer b)的時候,並不一定一次可以完全傳送,可能要呼叫多次write,才能把一個Send物件完全發出去。這是因為write是非阻塞的,不是等到完全發出去,才會返回。所以才有上面的程式碼:

                    if (channel.ready() && key.isWritable()) {
                        Send send = channel.write(); //send不為空,表示完全傳送出去,返回發出去的這個Send物件。如果沒完全發出去,返回null
                        if (send != null) {  
                            this.completedSends.add(send);
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }

同樣,在接收的時候,channel.read(Bytebuffer b),一個response也可能要read多次,才能完全接收。所以就有了上面的while迴圈程式碼:

                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null)  //迴圈接收,直到1個response完全接收到,才會從while迴圈退出
                            addToStagedReceives(channel, networkReceive);
                    }

核心原理之2 – 訊息的分界

從上面知道,底層資料的通訊,是在每一個channel上面,2個源源不斷的byte流,一個send流,一個receive流。
send的時候,還好說,傳送之前知道一個完整的訊息的大小;
那接收的時候,我怎麼知道一個msg response什麼時候結束,然後開始接收下一個response呢?

這就需要一個小技巧:在所有request,response頭部,首先是一個定長的,4位元組的頭,receive的時候,至少呼叫2次read,先讀取這4個位元組,獲取整個response的長度,接下來再讀取訊息體。

public class NetworkReceive implements Receive {
    private final String source;
    private final ByteBuffer size;  //頭部4位元組的buffer
    private final int maxSize;
    private ByteBuffer buffer;  //後面整個訊息response的buffer

    public NetworkReceive(String source) {
        this.source = source;
        this.size = ByteBuffer.allocate(4);   //先分配4位元組的頭部
        this.buffer = null;
        this.maxSize = UNLIMITED;
   }
}

核心原理之3 - 訊息時序保證

在InFlightRequests中,存放了所有發出去,但是response還沒有回來的request。request發出去的時候,入對;response回來,就把相對應的request出對。

final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
}

這個有個關鍵點:我們注意到request與response的配對,在這裡是用隊列表達的,而不是Map。用佇列的入隊,出隊,完成2者的匹配。要實現這個,伺服器就必須要保證訊息的時序:即在一個socket上面,假如發出去的reqeust是0, 1, 2,那返回的response的順序也必須是0, 1, 2。

但是伺服器是1 + N + M模型,所有的請求進入一個requestQueue,然後是多執行緒並行處理的。那它如何保證訊息的時序呢?

答案是mute/unmute機制:每當一個channel上面接收到一個request,這個channel就會被mute,然後等response返回之後,才會再unmute。這樣就保證了同1個連線上面,同時只會有1個請求被處理。

下面是服務端的程式碼:

    selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
          selector.mute(receive.source)    //收到請求,把這個請求對應的channel, mute
        }

        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination)  //傳送response之後,把這個responese對應的channel, unmute
        }

NetworkClient實現思路

上面已經講到:
(1)Selector維護了所有連線的連線池,所有連線上,訊息的傳送、接收都是通過poll函式進行的
(2)一個channel一次只能暫存1個Send物件。

但如果這個Send物件,一次poll之後,沒有完全傳送出去怎麼辦呢?看上層NetworkClient怎麼處理的:

關鍵的client.ready函式

先從Sender的run()函式看起:

    public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {   //關鍵函式!!!
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);

        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }

        for (ClientRequest request : requests)  //每個request分屬於不同的Node
            client.send(request, now);   //client的send就是直接呼叫了selector.send,訊息暫存在channel裡面,沒有傳送

        this.client.poll(pollTimeout, now); //呼叫selector.poll,處理連線、傳送、接收
    }

在上面的程式碼中,有一個關鍵函式:client.ready(Node n, ..), 這個函式內部會判斷這個node有沒有ready,如果沒有ready,就會從readNodes裡面移除,接下來就不會往這個Node傳送訊息。

那什麼叫ready呢? 我們看一下程式碼:

    public boolean ready(Node node, long now) {
        if (isReady(node, now))
            return true;

        if (connectionStates.canConnect(node.idString(), now))
            initiateConnect(node, now);
        return false;
    }

    public boolean isReady(Node node, long now) {
        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
    }

    private boolean canSendRequest(String node) {
        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
    }

    public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

    public boolean completed() {
        return remaining <= 0 && !pending;
    }

上面的程式碼封了好幾層,但總結下來,一個Node ready,可以向其傳送請求,需要符合以下幾個條件:
1. metadata正常,不需要update: !metadataUpdater.isUpdateDue(now)
2. 連線正常 connectionStates.isConnected(node)
3. channel是ready狀態:這個對於PlaintextChannel, 一直返回true
4. 當前該channel中,沒有in flight request,所有請求都處理完了
5. 當前該channel中,佇列尾部的request已經完全傳送出去, request.completed(),並且inflight request數目,沒有超過設定的最大值(預設為5,即允許在“天上飛”的request最多有5個,所謂在“天上飛”,就是發出去了,response還沒有回來)

而上面的第5個條件,正是解決了上面的問題:一個channel裡面的Send物件要是隻傳送了部分,下1次就不會處於ready狀態了。

client.poll函式

下面看一下client.poll,是如何封裝selector.poll的:

    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        //上面說到,selector.poll函式,會把處理結果,放到一堆的狀態變數裡面(輸出結果集),現在就是處理這堆輸出結果的時候了。

        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
   }

//Selector中的那堆狀態變數,在每次poll之前,被clear情況掉,每次poll之後,填充。
//然後在client.poll裡面,這堆輸出結果被處理
public class Selector implements Selectable {
    。。。
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final List<String> disconnected;
    private final List<String> connected;
。。。
}

連線檢測 & 自動重連機制

在所有tcp長連結的程式設計中,都有一個基本問題要解決:如何判斷1個連線是否斷開?客戶端需要維護所有連線的狀態(connecting, connected, disconnected),然後根據連線狀態做不同邏輯。

但在NIO中,並沒有一個函式,可以直接告訴你一個連線是否斷開了;在NetworkClient裡面,也並沒有開一個執行緒,不斷髮送心跳訊息,來檢測連線。那它是如何處理的呢?

檢測連線斷開的手段

在networkClient的實現中,用了3種手段,來判斷一個連線是否斷開:
手段1:所有的IO函式,connect, finishConnect, read, write都會拋IOException,因此任何時候,呼叫這些函式的時候,只要拋異常,就認為連線已經斷開。

手段2:selectionKey.isValid()

手段3:inflightRequests,所有發出去的request,都設定有一個response返回的時間。在這個時間內,response沒有回來,就認為連線斷了。

前2種手段,都集中在Select.poll函式裡面:

    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
        clear();
        if (hasStagedReceives())
            timeout = 0;
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                KafkaChannel channel = channel(key);

                // register all per-connection metrics at once
                sensors.maybeRegisterConnectionMetrics(channel.id());
                lruConnections.put(channel.id(), currentTimeNanos);

                try {
                    /* complete any connections that have finished their handshake */
                    if (key.isConnectable()) {
                        channel.finishConnect();
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    }

                    /* if channel is not ready finish prepare */
                    if (channel.isConnected() && !channel.ready())
                        channel.prepare();

                    /* if channel is ready read from any connections that have readable data */
                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null)
                            addToStagedReceives(channel, networkReceive);
                    }

                    /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                    if (channel.ready() && key.isWritable()) {
                        Send send = channel.write();
                        if (send != null) {
                            this.completedSends.add(send);
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }

                    if (!key.isValid()) {   //手段2
                        close(channel);
                        this.disconnected.add(channel.id());
                    }
                } catch (Exception e) {  //手段1:任何一個io函式,只要拋錯,就認為連線斷了
                    String desc = channel.socketDescription();
                    if (e instanceof IOException)
                        log.debug("Connection with {} disconnected", desc, e);
                    else
                        log.warn("Unexpected error from {}; closing connection", desc, e);
                    close(channel);
                    this.disconnected.add(channel.id());
                }
            }
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }

第3種手段,在NetworkClient裡面:

    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow); //手段3:處理所有TimeOutRequests

        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
        connectionStates.disconnected(nodeId, now);
        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
            log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
            if (!metadataUpdater.maybeHandleDisconnection(request)) //把MetaDataRequest排除在外,其它所有請求,只要超時,就認為連線斷開
                responses.add(new ClientResponse(request, now, true, null));
        }
    }

除了上述的2個地方,還要一個地方,就是初始化的時候

    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) { //檢測到連線斷開
            connectionStates.disconnected(nodeConnectionId, now);
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

檢測時機

從上面程式碼我們可以看出,連線的檢測時機,有2個:
一個是初始建立連線的時候,一個就是每次poll迴圈,每poll一次,就收集到一個斷開的連線集合。

下面分別是Selector和NetworkClient中,關於連線狀態的資料結構:

//Selector中的連線狀態
public class Selector implements Selectable {
    private final List<String> disconnected;
    private final List<String> connected;
    ..
}
//NetworkClient中的連線狀態維護
public class NetworkClient implements KafkaClient {
    private final ClusterConnectionStates connectionStates;
    ...
}

final class ClusterConnectionStates {
    private final long reconnectBackoffMs; //重連的時間間隔
    private final Map<String, NodeConnectionState> nodeState;
}

    private static class NodeConnectionState {
        ConnectionState state;
        long lastConnectAttemptMs;  //上1次發起重連的時間
        ...
    }

public enum ConnectionState {
    DISCONNECTED, CONNECTING, CONNECTED
}

總結:
1. Selector中的連線狀態,在每次poll之前,會呼叫clear清空;在poll之後,收集。
2. Selector中的連線狀態,會傳給上層NetworkClient,用於它更新自己的連線狀態
3. 出了來自Selctor,NetworkClient自己內部的inflightRequests,也就是上面的手段3, 也用於檢測連線狀態。

通過上面的機制,就保證了NetworkClient可以實時準確維護所有connection的狀態。

自動重連 - ready函式

狀態知道了,那剩下的就是自動重連了。這個發生在更上層的Send的run函式裡面:

//Sender
    public void run(long now) {
        Cluster cluster = metadata.fetch();
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {  //關鍵的ready函式
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

    public boolean ready(Node node, long now) {
        if (isReady(node, now))
            return true;

        if (connectionStates.canConnect(node.idString(), now))
            initiateConnect(node, now);   //發起重連

        return false;
    }

    public boolean canConnect(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null)
            return true;
        else
            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
    }

從上面函式可以看出,每次Send發資料之前,會先呼叫client.ready(node)判斷該node的連線是否可用。

在ready內部,如果連線不是connected狀態,會再判斷是否可以發起自動重連,檢測條件有2個:

條件1: 它不能是connecting狀態,必須是disconnected

條件2: 重連不能太頻繁。當前時間距離上1次重連時間,要有一定的間隔。如果broker掛了,你太頻繁的重連也不起作用。

這裡有個關鍵點:因為都是非阻塞呼叫,本次雖然檢測到連線斷了,但只是發起連線,不會等到連線建立好了,再執行下面的程式碼。
會在poll之後,判斷連線是否建立;在下1次或者下幾次poll之前,可能連線才會建立好,ready才會返回true.

相關推薦

Kafka原始碼深度解析序列4 Producer network核心原理

在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基於NIO構建自己的network層。 network層的分層架構 下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關係:

Kafka原始碼深度解析序列3 Producer Java NIO

在上一篇我們分析了Metadata的更新機制,其中涉及到一個問題,就是Sender如何跟伺服器通訊,也就是網路層。同很多Java專案一樣,Kafka client的網路層也是用的Java NIO,然後在上面做了一層封裝。 下面首先看一下,在Sender和伺服器

Kafka原始碼深度解析序列5 Producer RecordAccumulator佇列分析

在Kafka原始碼分析-序列2中,我們提到了整個Producer client的架構圖,如下所示: 其它幾個元件我們在前面都講過了,今天講述最後一個元件RecordAccumulator. Batch傳送 在以前的kafka client中,每條訊

Kafka原始碼深度解析序列9 Consumer SubscriptionState內部結構分析

在前面講了,KafkaConsumer的一個重要部件就是SubscriptionState,這個部件維護了Consumer的消費狀態,本篇對其內部結構進行分析。 2種訂閱策略 在第1篇講過,consumer可以自己指定要消費哪個partition,而不是

Kafka原始碼深度解析系列1 訊息佇列的策略與語義

-Kafka關鍵概念介紹 -訊息佇列的各種策略與語義 作為一個訊息佇列,Kafka在業界已經相當有名。相對傳統的RabbitMq/ActiveMq,Kafka天生就是分散式的,支援資料的分片、複製以及叢集的方便擴充套件。 與此同時,Kafka是高可靠的、持

SnapHelper原始碼深度解析

目錄介紹 01.SnapHelper簡單介紹 1.1 SnapHelper作用 1.2 SnapHelper類分析 1.3 LinearSnapHelper類分析 1.4 PagerSnapHelper類分析 02.SnapHelper原始碼分析

FeignClient原始碼深度解析

微信公眾號:吉姆餐廳ak 學習更多原始碼知識,歡迎關注。 全文共16984字左右。 概述 springCloud feign主要對netflix feign進行了增強和包裝,本篇從原始碼角度帶你過一遍裝配流程,揭開feign底層的神祕面紗。 主要包括feign整合r

《Spring原始碼深度解析》讀後感

大概三週看完《Spring原始碼深度解析》寫下一篇讀後感玩 首先高度概括:內容過於豐富 重點不突出 本書共分8個模組 1、XML解析部分非常全面, 各種配置方法, 解析步驟都有介紹,這裡其實就是些巢狀的呼叫,Spring原始碼肯定比自己寫的優美。  

原始碼系列Spring,Mybatis,Springboot,Netty原始碼深度解析-Spring的整體架構與容器的基本實現-mybatis原始碼深度解析與最佳實踐

6套原始碼系列Spring,Mybatis,Springboot,Netty原始碼深度解析視訊課程   6套原始碼套餐課程介紹: 1、6套精品是掌櫃最近整理出的最新課程,都是當下最火的技術,最火的課程,也是全網課程的精品;   2、6套資源包含:全套完整

《Spring原始碼深度解析》學習筆記

《Spring原始碼深度解析》學習筆記——Spring的整體架構與容器的基本實現 spring框架是一個分層架構,它包含一系列的功能要素,並被分為大約20個模組,如下圖所示 這些模組被總結為以下幾個部分: Core Container Core Container

Mybatis攔截器原始碼深度解析

目錄: 一. 建立攔截器鏈 1. 建立物件 2. 建立配置檔案 3. 載入攔截器鏈 二. 方法呼叫解析 1. 對請求物件進行攔截器包裝 2. 執行呼叫 三. 小結 Mybatis攔截器 可以幫助我們在執行sql語句過程中增加外掛以實現一些通用的邏輯,比

Spring原始碼深度解析-1、Spring核心類簡單介紹

在更新JAVA基礎原始碼學習的同時,也有必要把Spring抓一抓,以前對於spring的程度僅在於使用,以及一點IOC/AOP的概念,具體深層的瞭解不是很深入,每次看了一點原始碼就看不下去,然後一轉眼都忘記看了啥。 所以這次專門買了書,來細細品味下Spring。 希望能從這一波學習中加強自己

Mybatis原始碼深度解析

前言:     mybatis是我們常用的一種操作資料庫的框架。     我們在使用的mybatis有多種方式:原生mybatis、與Spring結合使用的mybatis、與SprinBoot結合使用的mybatis。     使

Spring原始碼深度解析,事務案例講解高階

Spring的整體架構Spring框架是一個分層架構,它包含一系列的功能要素,並被分為大約20個模組,如下圖所示   這些模組被總結為以下幾個部分: Core Container Core Container(核心容器)包含有Core、Beans、Context和Expression Lan

Springboot原始碼深度解析,方法解析,類載入解析,容器建立

springboot的啟動都是從main方法開始的,如下:@SpringBootApplicationpublic class Application { public static void main(String[] args) { SpringApplication.run(Application.cl

spring原始碼深度解析筆記(三)

之前提到在xmlBeanFactory建構函式中呼叫了XmlBeanDefinitionReader型別的reader屬性提供的方法this.reader.loadBeanDefinitions(resource),這就是載入整個資源載入的切入點。 當進入XmlBeanDe

spring原始碼深度解析筆記(四)

DTD與XSD的區別 DTD(Document Type Definition)即文件型別定義,是一種XML約束模式語言,是XML檔案的驗證機制,是屬於XML檔案組成的一部分。DTD是一種保證XML文件格式正確的有效方法,可以通過比較XML文件和DTD檔案來看

python3網路爬蟲-破解天眼查+企業工商資料-分散式爬蟲系統-原始碼深度解析

Python爬蟲-2018年-我破解天眼查和啟信寶企業資料爬蟲--破解反爬技術那些事情 最近在自己用python3+mongdb寫了一套分散式多執行緒的天眼查爬蟲系統,實現了對天眼查整個網站的全部資料各種維度的採集和儲存,主要是為了深入學習爬蟲技術使用,並且根據天眼查網頁的

RecyclerView用法和原始碼深度解析

目錄介紹 1.RecycleView的結構 2.Adapter 2.1 RecyclerView.Adapter扮演的角色 2.2 重寫的方法 2.3 notifyDataSetChanged()重新整理資料 2.4 資料變更通知之觀察者模式 a.首先看.

《Spring原始碼深度解析》pdf附網盤下載連結送給還在迷茫的你

技術書閱讀方法論 一.速讀一遍(最好在1~2天內完成) 人的大腦記憶力有限,在一天內快速看完一本書會在大腦裡留下深刻印象,對於之後複習以及總結都會有特別好的作用。 對於每一章的知識,先閱讀標題,弄懂大概講的是什麼主題,再去快速看一遍,不懂也沒有關係,但是一定要在不懂的