讀書筆記-【從Paxos到ZooKeeper分散式一致性原理與實踐】 第七章 Zk技術內幕
系統模型
資料模型
ZNode是ZK中資料的最小單元,每個ZNode上都可以儲存資料,同時還可以掛載子節點,形成一個層次化的名稱空間——樹.
樹
Zk中每個資料節點都稱為ZNode,所有ZNode形成樹形結構。
事務ID
事務是指ZK改變伺服器狀態的操作,一般包括節點的建立、刪除、資料節點內容更新和客戶端會話建立與失效。 對每一個事務請求,ZK都會分配一個全域性事務ID,ZXID來表示。
節點特性
節點型別
ZK中,每個資料節點都是有生命週期的,zk中節點可以分為持久節點、臨時節點和順序節點三大類。 可以組合形成 持久節點、持久順序節點、臨時節點、臨時順序節點
狀態資訊
stat物件標識了一個數據節點的所有狀態資訊: * czxid 標識該資料節點在被建立時的事務ID * mzxid 該資料節點最後一次被更新的事務ID * ctime 建立時的時間 * mtime 最後一次被更新的時間 * version 資料節點的版本號 * cversion 子節點的版本號 * aversion 節點的ACL版本號 * ephemeralOwner 建立該臨時節點的會話的sessionID * dataLength 資料內容的長度 * numChildren 當前節點的子節點個數 * pzxid 子節點列表最後一次被修改的事務ID
版本——保證分散式資料原子性操作
ZK為資料節點引入了版本的概念,每個資料節點都具有三中型別的版本資訊,對資料節點的任何更新操作都會引起版本號的變化。 * version 當前資料節點資料內容的版本號 * cversion 當前資料節點子節點的版本號 * aversion 當前資料節點的ACL變更版本號
ZK的版本號值得是對資料節點的修改次數,即使前後兩次變更使得資料沒有變化,版本號也會增加。
Watcher——資料變更通知
ZK提供了資料的分佈-訂閱功能。ZK中引入Watcher機制實現這種分散式的通知功能。ZK允許客戶端向服務端註冊一個監聽,當服務端的一些指定事件觸發了這個Watcher,那麼就指向客戶端傳送一個事件通知來實現分散式的通知功能。 ![IMAGE](quiver-image-url/00BAF71A323A184EC09F78D695B6DE90.jpg =684x315)
Watcher介面
在ZK中,介面類Watcher用於表示一個標準的事件處理處理器,其定義了事件通知相關的邏輯,包含KeepState和EventType兩個列舉類,分別代表通知狀態和事件型別。
Watcher 事件
@InterfaceAudience.Public
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
SyncConnected (3),
/**
* Auth failed state
*/
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
Expired (-112);
private final int intValue; // Integer representation of value
// for sending over wire
KeeperState(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static KeeperState fromInt(int intValue) {
switch(intValue) {
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
default:
throw new RuntimeException("Invalid integer value for conversion to KeeperState");
}
}
}
```
#序列化與協議
##使用Jute進行序列化
package com.paxos.zk.chp07;
import org.apache.jute.*; import org.apache.zookeeper.server.ByteBufferInputStream;
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer;
public class MockRequestHeader implements Record {
private long sessionId;
private String type;
public long getSessionId() {
return sessionId;
}
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "MockRequestHeader{" +
"sessionId=" + sessionId +
", type='" + type + '\'' +
'}';
}
@Override
public void serialize(OutputArchive archive, String tag) throws IOException {
archive.startRecord(this, tag);
archive.writeLong(sessionId, "sessionId");
archive.writeString(type, "type");
archive.endRecord(this, tag);
}
@Override
public void deserialize(InputArchive archive, String tag) throws IOException {
archive.startRecord(tag);
sessionId = archive.readLong("sessionId");
type = archive.readString("type");
archive.endRecord(tag);
}
public static void main(String[] args) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
MockRequestHeader requestHeader = new MockRequestHeader();
requestHeader.setSessionId(-1L);
requestHeader.setType("test");
requestHeader.serialize(boa, "header");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
MockRequestHeader header2 = new MockRequestHeader();
header2.deserialize(bbia, "header");
System.out.println(header2);
bbis.close();
baos.close();
}
}
##深入Jute
###Record介面
/** * Interface that is implemented by generated classes. * */ @InterfaceAudience.Public public interface Record { public void serialize(OutputArchive archive, String tag) throws IOException; public void deserialize(InputArchive archive, String tag) throws IOException; }
###OutputArchive 和InputArchive
提供基本的讀寫方法
###通訊協議
實現各個更新和相應的Response
#客戶端
客戶端的核心元件如下:
* ZooKeeper例項:客戶端的入口
* ClientWatchManager:客戶端watch管理器
* HostProvider:客戶端地址列表管理器
* ClientCnxn:客戶端核心執行緒,其內部包含兩個執行緒:SendThread和EventThread。前者負責ZK客戶端和服務端直接的網路IO通訊,後者負責對服務端事件進行處理。
客戶端整體初始化和啟動步驟如下:
1. 設定預設Watch
2. 設定ZK伺服器地址列表
3. 建立CLientCnxn
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info(“Initiating client connection, connectString=” + connectString + ” sessionTimeout=” + sessionTimeout + ” watcher=” + watcher);
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
##一次會話的建立過程
###初始化階段
1. 初始化ZK物件
2. 設定會話預設Watcher
3. 構造ZK伺服器地址列表管理器:HostProvider
4. 建立並初始化客戶端網路聯結器:ClientCnxn
5. 初始化SendThread和EventThread
6. 啟動SendThread和EventThread
7. 獲取一個伺服器地址
8. 建立TCP連線
9. 構造ConnectRequest請求
10. 傳送請求
11. 接收服務端響應
12. 處理Response
13. 連線成功
14. 生成事件:SyncConnected-None
15. 查詢Watcher
16. 處理事件
###伺服器地址列表
/** * A parser for ZooKeeper Client connect strings. * * This class is not meant to be seen or used outside of ZooKeeper itself. * * The chrootPath member should be replaced by a Path object in issue * ZOOKEEPER-849. * * @see org.apache.zookeeper.ZooKeeper */ public final class ConnectStringParser {
private static final int DEFAULT_PORT = 2181;
private final String chrootPath;
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
###Chroot:客戶端隔離名稱空間
Chroot特性:允許每個客戶端為自己設定一個名稱空間,如果一個ZK客戶端設定了Chroot,那麼該客戶端對伺服器的任何操作,都會被限制在其自己的名稱空間下。
客戶端可以通過在connectString中新增字尾的方式來設定Chroot。
### HostProvider:地址列表管理器
在ConnectStringParser解析器中,對伺服器地址做一個簡單的處理,並將伺服器地址和相應的埠封裝成一個InetSocketAddress物件,以List形式儲存。然後經過處理的地址列表進一步封裝到StaticHostProvider類中。
HostProvider是StaticHostProvider的介面。
@InterfaceAudience.Public public interface HostProvider { public int size();
/**
* The next host to try to connect to.
*
* For a spinDelay of 0 there should be no wait.
*
* @param spinDelay Milliseconds to wait if all hosts have been tried once.
* @return The next host to try to connect to with resolved address. If the host is not resolvable, the unresolved
* address will be returned.
*/
public InetSocketAddress next(long spinDelay);
/**
* Notify the HostProvider of a successful connection.
*
* The HostProvider may use this notification to reset it's inner state.
*/
public void onConnected();
}
###StaticHostProvider
public final class StaticHostProvider implements HostProvider { public interface Resolver { InetAddress[] getAllByName(String name) throws UnknownHostException; }
private static final Logger LOG = LoggerFactory
.getLogger(StaticHostProvider.class);
private final List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(5);
private int lastIndex = -1;
private int currentIndex = -1;
private Resolver resolver;
##ClientCnxn
ClientCnxn是ZK的核心工作類,負責維護客戶端與服務端之間的網路連線並進行一系列網路通訊。
/** * This class manages the socket i/o for the client. ClientCnxn maintains a list * of available servers to connect to and “transparently” switches servers it is * connected to as needed. * */ public class ClientCnxn
###Packet
Packet是ClientCnxn內部定義的一個對協議層的封裝,作為ZK中請求與相應的載體,程式碼結構如下:
/** * This class allows us to pass the headers and the relevant records around. */ static class Packet { RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
public boolean readOnly;
其中包含,請求頭、響應頭、請求體、響應體、節點路徑和註冊的Watch。
createBB物件負責對Packet屬性進行序列化,最終生成ByteBuffer物件。
public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, “len”); // We’ll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, “header”); } if (request instanceof ConnectRequest) { request.serialize(boa, “connect”); // append “am-I-allowed-to-be-readonly” flag boa.writeBool(readOnly, “readOnly”); } else if (request != null) { request.serialize(boa, “request”); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn(“Ignoring unexpected exception”, e); } }
outgoingQueue和pendingQueue
ClientCnxn中,兩個比較核心的佇列outgoingQueue和pendingQueue,分別代表客戶端的請求傳送佇列和服務端的響應等待佇列。Outgoing佇列是一個請求傳送佇列,專門使用者儲存哪些餘姚傳送到服務端的Packet集合。Pending佇列為了儲存已經從客戶端傳送到服務端,但是需要等到服務端響應的Packet集合。
/** * These are the packets that have been sent and are waiting for a response. */ private final LinkedList pendingQueue = new LinkedList();
/**
* These are the packets that need to be sent.
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
##ClientCnxnSocket:底層Socket通訊
ClientCnxnSocketNIO預設實現了ZK的底層通訊,核心是doIO邏輯。
###請求傳送
正常情況下,從outgoing佇列中提取一個可傳送的Packet物件,同時生成一個客戶端請求序號XID並將其設定到Packet請求頭中,序列化後傳送。請求傳送完成後,會立即將該Packet儲存到Pending佇列中,以便等待服務端相應返回後進行處理。
###相應接收
客戶端獲取服務端的相應資料後,根據客戶端不同的請求型別,會進行不同的處理:
* 如果檢測到當前客戶端尚未初始化,說明當前客戶端與服務端正在進行會話建立,那麼就直接將接收到的ByteBuffer序列化成ConnectResponse物件。
* 如果當前客戶端已經和服務端正常會話,冰球接收到服務端相應是一個事件,那麼ZK客戶端會將接收到的ByteBuffer序列化成WatcherEvent物件,並將該事件放入待處理佇列。
* 如果是一個常規的請求相應,那麼會從PendingQueue中取出一個Packet來進行相應的處理。ZK會檢驗服務端響應中包含的XID值來確保請求處理的順序性,然後將接收到的ByteBuffer序列化成相應的Response。
###SendThread
/** * This class services the outgoing request queue and generates the heart * beats. It also spawns the ReadThread. */ class SendThread extends ZooKeeperThread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;
###EventThread
class EventThread extends ZooKeeperThread { private final LinkedBlockingQueue waitingEvents = new LinkedBlockingQueue();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
#會話
##會話建立
###session
Session是ZK中的會話實體,代表了一個客戶端會話。包含四個基本屬性:
* sessionID:會話id,用來唯一標識一個會話
* TimeOut:會話超時時間
* TickTime:下次會話超時時間。
* isClosing:標記會話是否已經被關閉
###SessionTracker
SessionTracker是ZK服務端的會話管理器,負責會話的建立、管理和清理等工作。
/** * This is the basic interface that ZooKeeperServer uses to track sessions. The * standalone and leader ZooKeeperServer use the same SessionTracker. The * FollowerZooKeeperServer uses a SessionTracker which is basically a simple * shell to track information to be forwarded to the leader. */ public interface SessionTracker { public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); } public static interface SessionExpirer { void expire(Session session);
long getServerId();
}
```
/**
* This is a full featured SessionTracker. It tracks session in grouped by tick
* interval. It always rounds up the tick interval to provide a sort of grace
* period. Sessions are thus expired in batches made up of sessions that expire
* in a given interval.
*/
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
long nextSessionId = 0;
long nextExpirationTime;
int expirationInterval;
- sessionById:用來根絕sessionID管理session實體
- sessionWithTimeout:根據sessionID來管理會話的超時時間。
- sessionSets:根據下次會話超時時間點來歸檔會話,便於進行會話管理和超時檢查。
建立連線
服務端對於客戶端建立會話的請求分為四個步驟:處理ConnectRequest請求、會話建立、處理器鏈路處理、會話相應。
會話管理
分桶策略
ZK的會話管理主要由SessionTracker負責,採用一種特殊的會話管理方式,稱之為分桶策略。分桶策略是指,將類似的會話放在同一區塊中進行管理,以便於ZK對會話進行不同區塊的格里處理以及同一區塊的統一處理。
伺服器啟動
單機伺服器啟動
![IMAGE](quiver-image-url/24008D21E77CC4BD8444FE8F089EED34.jpg =967x473) ZK伺服器啟動分為五個主要步驟:配置檔案解析、初始化資料管理器、初始化網路IO、資料恢復和對外訪問。
預啟動
預啟動步驟如下: 1. 統一由QuorumPeerMain作為啟動類。無論是單機版還是叢集版,ZK都是配置QuorumPeerMain作為入口類啟動。
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
- 解析配置檔案
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
- 建立並啟動歷史檔案清理器DatadirCleanupManager
- 判斷當前啟動是器群模式還是單機模式
- 再次解析配置檔案
- 建立ZK例項
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
quorumPeer = getQuorumPeer();
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
初始化
初始化步驟如下: 1. 建立伺服器統計器ServerStats
/**
* Basic Server Statistics
*/
public class ServerStats {
private long packetsSent;
private long packetsReceived;
private long maxLatency;
private long minLatency = Long.MAX_VALUE;
private long totalLatency = 0;
private long count = 0;
private AtomicLong fsyncThresholdExceedCount = new AtomicLong(0);
- 建立ZK資料管理器FileTxnSnapLog
/**
* This is a helper class
* above the implementations
* of txnlog and snapshot
* classes
*/
public class FileTxnSnapLog {
//the direcotry containing the
//the transaction logs
private final File dataDir;
//the directory containing the
//the snapshot directory
private final File snapDir;
private TxnLog txnLog;
private SnapShot snapLog;
public final static int VERSION = 2;
public final static String version = "version-";
- 設定伺服器tickTime和會話超時時間
- 建立ServerCnxnFactory
- 初始化ServerCnxnFactory
- 啟動ServerCnxnFactory主執行緒
- 恢復本地資料
- 建立並啟動會話管理器SessionTracker
- 初始化ZK的請求處理鏈
- 註冊KMX
- 註冊ZK伺服器例項
叢集版初始化
![IMAGE](quiver-image-url/D6AC260F715831A9CF7ACDC0E2F5CE03.jpg =964x540)
預啟動
- 統一由QuorumPeerMain作為啟動類
- 解析配置檔案zoo.cfg
- 建立並啟動歷史檔案清理器DatadirCleanupManager
- 判斷是叢集模式還是單機模式
初始化
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException
*/
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
final ZooKeeperServer zkServer = new ZooKeeperServer();
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
txnLog.setServerStats(zkServer.serverStats());
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
- 建立ServerCnxnFactory
- 初始化ServerCnxnFactory
- 建立ZK的資料管理器FileTxnSnapLog
- 建立QuorumPeer例項
- 建立記憶體資料庫ZKDatabaser
- 初始化QuorumPeer
- 恢復本地資料
- 啟動ServerCnxnFactory
Leader選舉
- 初始化Leader選舉。ZK首先根據自身的SID,lastLoggedZxid和當前伺服器的epoch來生成一個初始化的投票,簡單來說就是每個伺服器都給自己投票。ZK根據zoo.cfg的electionAlg屬性來指定對應的選舉演算法。
- 註冊JMX服務
- 檢測當前伺服器狀態
- Leader選舉
Leader和Follower啟動期互動過程
![IMAGE](quiver-image-url/6182D5E7FBA76FFEE149FD97CB64AD3F.jpg =970x514) 1. 建立Leader伺服器和Follower伺服器 2. Leader伺服器啟動Follower接收器LearnerCnxAcceptor 3. Learner伺服器開始和Leader建立連線 4. Leader伺服器建立LearnerHandler 5. 向leader註冊 6. Leader解析Learner資訊,計算新的epoch 7. 傳送Leader狀態 8. Leaner傳送ACK訊息 9. 資料同步 10. 啟動leader和Learner伺服器
Leader選舉
伺服器啟動時期的Leader選舉
- 每個Server會發出一個投票
每次投票包含的最基本的元素有:所推舉的伺服器的myid和ZXID,初始化階段每個伺服器都會給自己投票(myid, zxid),然後將投票結果發給叢集中的其他機器。 2. 接收來自各個伺服器的投票
每個伺服器都會接收來自其他伺服器的投票。 3. 處理投票。 在接收來自其他伺服器的投票後,針對每一個投票,伺服器都需要將別人的投票和自己的投票PK,規則如下: * 優先檢查ZXID.ZXID比較大的優先作為Leader * 如果ZXID相同,那麼比較myid,myid較大的伺服器作為Leader
- 統計投票
伺服器統計局所有投票,判斷是否已經有過半(大於或等於(n/2+1))的機器接收到相同的投票資訊 5. 改變伺服器狀態
伺服器執行期間的Leader選舉
- 變更狀態。Leader掛掉後,其他非Observer伺服器都會將自己的伺服器狀態變更為LOOKING,然後進入選舉Leader流程
- 每個Server發乎投票,與氣動期間基本相同。
- 接受各個伺服器的投票
- 處理投票
- 統計投票
- 改變伺服器狀態
Leader選舉演算法
在ZK中提供了三種選舉演算法,分別是LeaderElection、UDP版本的FastLeaderElection和TCP版本的FastLeaderElection,可以通過配置檔案zoo.cfg中使用electionAlg屬性來指定。最新版本已經廢棄了其他演算法,只保留TCP版本的FastLeaderElection選舉蘇凡。
SID:伺服器ID
SID是一個數字,用來唯一標識一臺ZK叢集中的機器,每臺機器不能重複,和myid值一致。
ZXID:事務ID
ZXID是一個事務id,用來唯一標識一次伺服器狀態的變更。在某一時刻,叢集每臺機器的ZXID值不一定全都一致,這和ZK對客戶端更新請求的處理邏輯有關。
Vote:投票
Leader選舉,當叢集中的機器發現自己無法檢測到Leader機器的時候,就會開始嘗試進行投票。
Quorum:過半機器數
過半機器數 quorum = (n /2 + 1)
演算法分析
進入Leader選舉
當zk叢集中的一臺伺服器出現以下兩種情況之一時,就會開始進入Leader選舉。 * 伺服器初始化啟動 * 伺服器執行期間無法和Leader保持連線
而當一臺機器進入Leader選舉流程時,當前叢集也可能會處於以下兩種狀態: * 叢集本來就已經存在一個Leader * 叢集中確實不存在Leader
如下流程,主要是當Leader不存在的情況下,如何進行選舉: ##### 開始第一次投票 通常情況下兩種情況會導致叢集中不存在Leader,一種情況是在整個伺服器剛剛初始化啟動時,一種是當前Leader伺服器掛掉了。此時叢集所有的機器都處於一種試圖選舉出一個Leader的狀態,我們把這種狀態稱為LOOKING,意思是正在尋找Leader。當一臺伺服器處理LOOKING狀態的時候,那麼它就會向叢集中所有其他機器傳送訊息,我們稱這個訊息為投票。 在這個投票資訊中包含兩個最基本的訊息:所推舉的伺服器的SID和ZXID,分別標識被推舉的伺服器的唯一標識和事務ID,以(SID,ZXID)來標識一次投票資訊。 在第一次投票的時候,由於還無法檢測到叢集中其他機器的狀態資訊,因此每臺機器都是將自己作為被推舉的物件來進行投票。
變更投票
叢集中每臺機器發出自己的投票後,也會接收到來自叢集中其他機器的投票。每臺機器根據一定的規則,來處理收到的其他機器的投票,並以此來決定是否需要變更自己的投票。如下為基本的術語: * vote_sid: 接收到的投票中所推舉Leader伺服器的SID * vote_zxid: 接收到的投票中所推舉Leader伺服器的ZXID * self_sid:當前伺服器自己的SID * self_zxid:當前伺服器自己的ZXID
每次對於收到的投票的處理,都是對一個(vote_sid, vote_zxid) (self_sid, self_zxid)的對比過程。 * 規則1:如果vote_zxid大於self_zxid,就認可當前收到的投票,並再次將該投票傳送出去 * 規則2:如果vote_zxid小於self_zxid,那麼就堅持自己的投票,不做變更 * 規則3:如果vote_zxid等於self_zxid,那麼就對比sid,選擇sid較大者 * 規則4:如果vote_zxid等於self_zxid,並且vote_sid小於self_sid,那麼堅持自己的投票,不做變更。
確定Leader
經過第二次投票後,叢集中每臺機器都會再次收到其他機器的投票,然後統計結果,如果一臺機器收到了超過半數的相同投票,那麼這個投票對應的sid的機器則為Leader。
Leader選舉實現細節
伺服器狀態
在QuorumPeer.ServerState類中列舉了4中伺服器狀態,分別是:LOOKING、FOLLOWING、LEADING和OBSERVING。 * LOOKING:尋找Leader狀態,當伺服器處於該狀態時,它會認為當前叢集中沒有Leader,因此需要進入Leader選舉狀態。 * FOLLOWING:跟隨者狀態,表明當前伺服器角色是FOLLOWER * LEADING:領導者狀態,表明當前伺服器角色是Leader * OBSERVING:觀察者狀態,表明當前伺服器角色是OBSERVER
投票資料結構
在Leader選舉過程中主要通過投票來實現,每個投票包含兩個最基本的資訊:所推舉伺服器的SID和ZXID。
public class Vote {
private static final Logger LOG = LoggerFactory.getLogger(Vote.class);
public Vote(long id,
long zxid) {
this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
this.peerEpoch = -1;
this.state = ServerState.LOOKING;
}
QuorumCnxManager:網路IO
ClientCnxn是ZK客戶端中用於處理網路IO的一個管理器。每臺伺服器啟動的時候都會啟動一個QuorumCnxManager,負責每臺伺服器之間的底層Leader選舉過程中的網路通訊。
訊息佇列
/*
* Counter to count connection processing threads.
*/
private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
/*
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
/*
* Reception queue
*/
public final ArrayBlockingQueue<Message> recvQueue;
QuprumCnxManager在內部維護了一系列的佇列,用於儲存接受到的、待發送的訊息以及訊息的傳送器。除接收佇列以外,這裡的所有佇列都是按SID分組形成佇列集合。 * recvQueue:訊息接收佇列,用於存放哪些從其他伺服器接收到的訊息 * queueSendMap:訊息傳送佇列,用於儲存那些待發送的訊息。queueSendMap是一個MAP,按照SID分組,分別為叢集中每臺機器分配了一個單獨的佇列,從而保證每臺機器之間的訊息傳送互不影響。 * senderWorkerMap:傳送器集合。每個SendWorker訊息傳送器,都對應一臺遠端ZK伺服器,負責訊息的傳送。同樣,在senderWorkerMap中,也按照SID進行分組。 * lastMessageSent:最近傳送過訊息,在這個集合中,為每個SID保留最近傳送過的一個訊息。
建立連線
// visible for testing
public QuorumCnxManager(final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled,
ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
this.senderWorkerMap = senderWorkerMap;
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
listener = new Listener();
}
/**
* Thread to listen on some port
*/
public class Listener extends ZooKeeperThread {
volatile ServerSocket ss = null;
public Listener() {
// During startup of thread, thread name will be overridden to
// specific election address
super("ListenerThread");
}
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
啟動QuorumCnxManager時,建立一個ServerSocket來監聽Leader選舉埠。開啟埠監聽後,ZK能夠不斷的接收到來自其他伺服器的建立連線,在接收到其他伺服器的請求時,會由receiveConnection函式來處理。為了避免兩臺機器之間重複建立TCP連線,ZK設計了一種規則:只允許最大SID的伺服器主動和其他伺服器建立連線,否則斷開連線。
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
* to this server already or not. If it does, then it sends the smallest
* possible long value to lose the challenge.
*
*/
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connec