Zookeeper客戶端原始碼分析(一)建立連線
阿新 • • 發佈:2019-09-09
本文基於zookeeper-3.4.14,由於zookeeper的很多構造方法都是呼叫的另一個構造方法,所以分析程式碼的時候直接分析最終的那個構造方法
demo程式碼如下
public class Life { public static void main(String[] args) throws Exception { new Life().start(); } private void start() throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("預設watch被觸發"); } }); zooKeeper.getData("/data", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("獲取資料"); } }, null); } }
一、初始化流程
1.1 引數設定
//Zookeeper.java //第438行 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); //客戶端新建時的watcher作為defaultWatcher,每次請求都會被呼叫 watchManager.defaultWatcher = watcher; //將傳入的字串(127.0.0.1:2181/aaa)解析成具體的地址 //'/aaa'在zookeeper中稱作chrootPath,以後此客戶端的所有請求的路徑前邊都會加上它 //比如zooKeeper.getData("/data"),實際取的路徑是/aaa/data ConnectStringParser connectStringParser = new ConnectStringParser( connectString); //StaticHostProvider負責解析域名(其實就是呼叫Java的api),負責給客戶端提供服務端連線地址 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); //getClientCnxnSocket()用來獲取ClientCnxnSocket的實現類,預設是ClientCnxnSocketNIO, //可以通過zookeeper.clientCnxnSocket引數配置實現類的全限定名 //但是ClientCnxnSocket是同包內可見,所以你不能自己實現,只能用zookeeper提供的;或者建一個跟它一樣的包 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } //ClientCnxn.java //第386行 public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; //處理請求和響應的執行緒 sendThread = new SendThread(clientCnxnSocket); //處理事件的執行緒 eventThread = new EventThread(); } //ClientCnxn.java //第420行 public void start() { sendThread.start(); eventThread.start(); } //ClientCnxn.java中的兩個無界佇列 //已經發送出去等待回覆的資料包 private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); //要傳送出去的資料包 private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
1.2 連線zookeeper服務端
1、建立socket連線
//ClientCnxn$SendThread //第1032行 public void run() { //省略部分程式碼 while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { //如果狀態是還未連線成功,但是isFirstConnect為false,說明之前已經有操作在連線了,就等一會 if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } //如果連線關閉了就不再重連,直接跳出迴圈(結束執行緒) if (closing || !state.isAlive()) { break; } //當客戶端未配置readOnly屬性,卻連線到了一臺只讀的服務端的時候, //後續的程式碼中會檢測可以寫的客戶端存入rwServerAddress //TODO:rwServerAddress有什麼用,難道會重連嗎 if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } //省略部分程式碼 } //省略部分程式碼 } //省略部分程式碼 } //ClientCnxn$SendThread //第992行 private void startConnect(InetSocketAddress addr) throws IOException { saslLoginFailed = false; state = States.CONNECTING; setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); //是否開啟Sasl認證,預設是開啟的 //TODO:暫時沒搞懂這是啥玩意兒,先不管 if (ZooKeeperSaslClient.isEnabled()) { try { zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr)); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } //記錄一下日誌 logStartConnect(addr); clientCnxnSocket.connect(addr); } //ClientCnxnSocketNIO.java //第284行 @Override void connect(InetSocketAddress addr) throws IOException { //建立SocketChannel物件 SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; } //ClientCnxnSocketNIO.java //第259行 SocketChannel createSock() throws IOException { SocketChannel sock; sock = SocketChannel.open(); sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); return sock; } //ClientCnxnSocketNIO.java //第274行 void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { //監聽OP_CONNECT事件 sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //連線服務端 //因為配置的是非阻塞模式,所以這裡返回false,詳情可以看SocketChannel#connect()方法的註釋 boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } }
2、傳送連線請求
//ClientCnxn$SendThread
//第1032行
public void run() {
//省略部分程式碼
while (state.isAlive()) {
try {
//這裡是剛才分析的建立socket連線的程式碼
//省略部分程式碼
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
//這裡很多都是java NIO的標準寫法,不熟悉的同學先去學習一下
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
//之前設定過監聽OP_CONNECT事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
//這裡會把之前因為返回了false未執行的操作執行掉
//傳送連線請求給服務端
sendThread.primeConnection();
}
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxn$SendThread
//第878行
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
//上邊的註釋看不懂,先放著
//預設disableAutoWatchReset=false,即每次請求都需要重新設定watch
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
//這裡可能有部分watch的路徑加不進去(因為總長超過限制)
//但是似乎開發人員認為這個無關緊要
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
//將當前所有的watch監聽的路徑傳送給服務端
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
//TODO:為什麼設定為readonly模式
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//將監聽事件修改為讀和寫
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
//ClientCnxn$SendThread
//第1032行
public void run() {
//省略部分程式碼
//第二輪while迴圈,之前是第一輪
while (state.isAlive()) {
try {
//這個判斷現在為false
if (!clientCnxnSocket.isConnected()) {
//省略部分程式碼
}
//省略部分程式碼
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
//省略部分程式碼
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
//省略部分程式碼
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxnSocketNIO.java
//第61行
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//因為要發請求,所以這裡省略讀的程式碼
if (sockKey.isReadable()) {
//省略部分程式碼
}
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
//從傳送佇列中尋找待發送的資料包
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
//傳送資料到服務端
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
//從傳送佇列中移除該資料包
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
//新增到等待回覆的佇列中
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
//如果傳送佇列為空,暫時關閉對寫操作的處理
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
//這裡看不太懂,先放著
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
3、接收服務端的連接回復
//ClientCnxn$SendThread
//第1032行
public void run() {
//省略部分程式碼
while (state.isAlive()) {
try {
//省略部分程式碼
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
//省略部分程式碼
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
//省略部分程式碼
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
//省略部分程式碼
}
//省略部分程式碼
}
//ClientCnxnSocketNIO.java
//第61行
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//省略部分程式碼
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//省略部分程式碼
}
}
}
if (sockKey.isWritable()) {
//省略部分程式碼
}
}
//ClientCnxnSocket.java
//第118行
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
//ClientCnxn$SendThread
//第1271行
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
//session的超時時間需要客戶端跟服務端協商,以服務端的返回值為準
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
//設定客戶端狀態為已連線
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
//釋出一個事件
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
//ClientCnxn$EventThread
//第466行
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
//根據事件尋找符合的watch
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
//ClientCnxn$EventThread
//第500行
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
//ClientCnxn$EventThread
//第526行
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
//省略部分程式碼
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
至此,連線的程式碼分析完畢
但是程式碼中還有一些地方看不懂,後續隨著深入搞明白的話回