NATS--NATS Streaming持久化
前言
最近專案中需要使用到一個訊息佇列,主要用來將原來一些操作非同步化。根據自己的使用場景和熟悉程度,選擇了NATS Streaming。之所以,選擇NATS Streaming。一,因為我選型一些中介軟體,我會優先選取一些自己熟悉的語言編寫的,這樣方便排查問題和進一步的深究。二,因為自己一直做k8s等雲原生這塊,偏向於cncf基金會管理的專案,畢竟這些專案從一開始就考慮瞭如何部署在k8s當中。三,是評估專案在不斷髮展過程中,引入的元件是否能夠依舊滿足需求。
訊息佇列的使用場景
如果問為什麼這麼做,需要說一下訊息佇列的使用場景。之前看知乎的時候,看到一些回答比較認同,暫時拿過來,更能形象表達。感謝ScienJus 同學的精彩解答。
訊息佇列的主要特點是非同步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為訊息放入訊息佇列。同時由於使用了訊息佇列,只要保證訊息格式不變,訊息的傳送方和接收方並不需要彼此聯絡,也不需要受對方的影響,即解耦和。
使用場景的話,舉個例子:
假設使用者在你的軟體中註冊,服務端收到使用者的註冊請求後,它會做這些操作:
- 校驗使用者名稱等資訊,如果沒問題會在資料庫中新增一個使用者記錄
- 如果是用郵箱註冊會給你傳送一封註冊成功的郵件,手機註冊則會發送一條簡訊
- 分析使用者的個人資訊,以便將來向他推薦一些志同道合的人,或向那些人推薦他
- 傳送給使用者一個包含操作指南的系統通知等等……
但是對於使用者來說,註冊功能實際只需要第一步,只要服務端將他的賬戶資訊存到資料庫中他便可以登入上去做他想做的事情了。至於其他的事情,非要在這一次請求中全部完成麼?值得使用者浪費時間等你處理這些對他來說無關緊要的事情麼?所以實際當第一步做完後,服務端就可以把其他的操作放入對應的訊息佇列中然後馬上返回使用者結果,由訊息佇列非同步的進行這些操作。
或者還有一種情況,同時有大量使用者註冊你的軟體,再高併發情況下注冊請求開始出現一些問題,例如郵件介面承受不住,或是分析資訊時的大量計算使cpu滿載,這將會出現雖然使用者資料記錄很快的新增到資料庫中了,但是卻卡在發郵件或分析資訊時的情況,導致請求的響應時間大幅增長,甚至出現超時,這就有點不划算了。面對這種情況一般也是將這些操作放入訊息佇列(生產者消費者模型),訊息佇列慢慢的進行處理,同時可以很快的完成註冊請求,不會影響使用者使用其他功能。
所以在軟體的正常功能開發中,並不需要去刻意的尋找訊息佇列的使用場景,而是當出現效能瓶頸時,去檢視業務邏輯是否存在可以非同步處理的耗時操作,如果存在的話便可以引入訊息佇列來解決。否則盲目的使用訊息佇列可能會增加維護和開發的成本卻無法得到可觀的效能提升,那就得不償失了。
其實,總結一下訊息佇列的作用
- 削峰,形象點的話,可以比喻為蓄水池。比如elk日誌收集系統中的kafka,主要在日誌高峰期的時候,在犧牲實時性的同時,保證了整個系統的安全。
- 同步系統異構化。原先一個同步操作裡的諸多步驟,可以考慮將一些不影響主線發展的步驟,通過訊息佇列非同步處理。比如,電商行業,一個訂單完成之後,一般除了直接返回給客戶購買成功的訊息,還要通知賬戶組進行扣費,通知處理庫存變化,通知物流進行派送等,通知一些使用者組做一些增加會員積分等操作等。
NATS Streaming 簡介
NATS Streaming是一個由NATS驅動的資料流系統,用Go程式語言編寫。 NATS Streaming伺服器的可執行檔名是nats-streaming-server 。 NATS Streaming與核心NATS平臺無縫嵌入,擴充套件和互操作。 NATS Streaming伺服器作為Apache-2.0許可下的開源軟體提供。 Synadia積極維護和支援NATS Streaming伺服器。
特點
除了核心NATS平臺的功能外,NATS Streaming還提供以下功能:
- 增強訊息協議
NATS Streaming使用谷歌協議緩衝區實現自己的增強型訊息格式。這些訊息通過二進位制資料流在NATS核心平臺進行傳播,因此不需要改變NATS的基本協議。NATS Streaming資訊包含以下欄位:
- 序列 - 一個全域性順序序列號為主題的通道 - 主題 - 是NATS Streaming 交付物件 - 答覆內容 - 對應"reply-to"對應的物件內容 - 資料 - 真是資料內容 - 時間戳 - 接收的時間戳,單位是納秒 - 重複傳送 - 標誌這條資料是否需要服務再次傳送 - CRC32 - 一個迴圈冗餘資料校驗選項,在資料儲存和資料通訊領域裡,為了保證資料的正確性所採用的檢錯手段,這裡使用的是 IEEE CRC32 演算法
- 訊息/事件的永續性
NATS Streaming提供了可配置的訊息持久化,持久目的地可以為記憶體或者檔案。另外,對應的儲存子系統使用了一個公共介面允許我們開發自己自定義實現來持久化對應的訊息
- 至少一次的傳送
NATS Streaming提供了釋出者和伺服器之間的訊息確認(釋出操作) 和訂閱者和伺服器之間的訊息確認(確認訊息傳送)。其中訊息被儲存在伺服器端記憶體或者輔助儲存(或其他外部儲存器)用來為需要重新接受訊息的訂閱者進行重發訊息。
- 釋出者傳送速率限定
NATS Streaming提供了一個連線選項叫 MaxPubAcksInFlight,它能有效的限制一個釋出者可能隨意的在任何時候傳送的未被確認的訊息。當達到這個配置的最大數量時,非同步傳送呼叫介面將會被阻塞,直到未確認訊息降到指定數量之下。
-每個訂閱者的速率匹配/限制
NATS Streaming執行指定的訂閱中設定一個引數為 MaxInFlight,它用來指定已確認但未消費的最大資料量,當達到這個限制時,NATS Streaming 將暫停傳送訊息給訂閱者,直到未確認的資料量小於設定的量為止
- 以主題重發的歷史資料
新訂閱的可以在已經儲存起來的訂閱的主題頻道指定起始位置訊息流。通過使用這個選項,訊息就可以開始傳送傳遞了:
1. 訂閱的主題儲存的最早的資訊 2. 與當前訂閱主題之前的最近儲存的資料,這通常被認為是 "最後的值" 或 "初值" 對應的快取 3. 一個以納秒為基準的 日期/時間 4. 一個歷史的起始位置相對當前服務的 日期/時間,例如:最後30秒 5. 一個特定的訊息序列號
- 持久訂閱
訂閱也可以指定一個“持久化的名稱”可以在客戶端重啟時不受影響。持久訂閱會使得對應服務跟蹤客戶端最後確認訊息的序列號和持久名稱。當這個客戶端重啟或者重新訂閱的時候,使用相同的客戶端ID 和 持久化的名稱,對應的服務將會從最早的未被確認的訊息處恢復。
docker 執行NATS Streaming
在執行之前,前面已經講過NATS Streaming 相比nats,多了持久化的一個future。所以我們在接下來的demo演示中,會重點說這點。
執行基於memory的持久化示例:
docker run -ti -p 4222:4222 -p 8222:8222nats-streaming:0.12.0
你將會看到如下的輸出:
[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0 [1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3 [1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5 [1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1 [1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set] [4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222 [1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222 [1] 2019/02/26 08:13:01.770581 [INF] Server is ready [1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state... [1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state [1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY [1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ---------- [1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:100 * [1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits -------- [1] 2019/02/26 08:13:02.052601 [INF] STREAM:Subscriptions:1000 * [1] 2019/02/26 08:13:02.052613 [INF] STREAM:Messages:1000000 * [1] 2019/02/26 08:13:02.052624 [INF] STREAM:Bytes:976.56 MB * [1] 2019/02/26 08:13:02.052635 [INF] STREAM:Age:unlimited * [1] 2019/02/26 08:13:02.052649 [INF] STREAM:Inactivity:unlimited * [1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------
可以看出預設的是基於記憶體的持久化。
執行基於file的持久化示例:
docker run -ti -v /Users/gao/test/mq:/datastore-p 4222:4222 -p 8222:8222nats-streaming:0.12.0-store file --dir /datastore -m 8222
你將會看到如下的輸出:
[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0 [1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM [1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5 [1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1 [1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set] [5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222 [1] 2019/02/26 08:16:07.643932 [INF] Server is ready [1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state... [1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state [1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE [1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore [1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ---------- [1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:100 * [1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits -------- [1] 2019/02/26 08:16:07.933711 [INF] STREAM:Subscriptions:1000 * [1] 2019/02/26 08:16:07.933749 [INF] STREAM:Messages:1000000 * [1] 2019/02/26 08:16:07.933793 [INF] STREAM:Bytes:976.56 MB * [1] 2019/02/26 08:16:07.933837 [INF] STREAM:Age:unlimited * [1] 2019/02/26 08:16:07.933857 [INF] STREAM:Inactivity:unlimited * [1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------
PS
- 如果部署在k8s當中,那麼就可以採取基於file的持久化,通過掛載一個塊儲存來保證,資料可靠。比如,aws的ebs或是ceph的rbd。
- 4222為客戶端連線的埠。8222為監控埠。
啟動以後訪問:localhost:8222,可以看到如下的網頁:
啟動引數解析
Streaming Server Options: -cid, --cluster_id<string>Cluster ID (default: test-cluster) -st,--store <string>Store type: MEMORY|FILE|SQL (default: MEMORY) --dir <string>For FILE store type, this is the root directory -mc,--max_channels <int>Max number of channels (0 for unlimited) -msu, --max_subs <int>Max number of subscriptions per channel (0 for unlimited) -mm,--max_msgs <int>Max number of messages per channel (0 for unlimited) -mb,--max_bytes <size>Max messages total size per channel (0 for unlimited) -ma,--max_age <duration>Max duration a message can be stored ("0s" for unlimited) -mi,--max_inactivity <duration>Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited) -ns,--nats_server <string>Connect to this external NATS Server URL (embedded otherwise) -sc,--stan_config <string>Streaming server configuration file -hbi, --hb_interval <duration>Interval at which server sends heartbeat to a client -hbt, --hb_timeout <duration>How long server waits for a heartbeat response -hbf, --hb_fail_count <int>Number of failed heartbeats before server closes the client connection --ft_group <string>Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore -sl,--signal <signal>[=<pid>]Send signal to nats-streaming-server process (stop, quit, reopen) --encrypt <bool>Specify if server should use encryption at rest --encryption_cipher <string>Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES --encryption_key <sting>Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead Streaming Server Clustering Options: --clustered <bool>Run the server in a clustered configuration (default: false) --cluster_node_id <string>ID of the node within the cluster if there is no stored ID (default: random UUID) --cluster_bootstrap <bool>Bootstrap the cluster if there is no existing state by electing self as leader (default: false) --cluster_peers <string>List of cluster peer node IDs to bootstrap cluster state. --cluster_log_path <string>Directory to store log replication data --cluster_log_cache_size <int>Number of log entries to cache in memory to reduce disk IO (default: 512) --cluster_log_snapshots <int>Number of log snapshots to retain (default: 2) --cluster_trailing_logs <int>Number of log entries to leave after a snapshot and compaction --cluster_sync <bool>Do a file sync after every write to the replication log and message store --cluster_raft_logging <bool>Enable logging from the Raft library (disabled by default) Streaming Server File Store Options: --file_compact_enabled <bool>Enable file compaction --file_compact_frag <int>File fragmentation threshold for compaction --file_compact_interval <int>Minimum interval (in seconds) between file compactions --file_compact_min_size <size>Minimum file size for compaction --file_buffer_size <size>File buffer size (in bytes) --file_crc <bool>Enable file CRC-32 checksum --file_crc_poly <int>Polynomial used to make the table used for CRC-32 checksum --file_sync <bool>Enable File.Sync on Flush --file_slice_max_msgs <int>Maximum number of messages per file slice (subject to channel limits) --file_slice_max_bytes <size>Maximum file slice size - including index file (subject to channel limits) --file_slice_max_age <duration>Maximum file slice duration starting when the first message is stored (subject to channel limits) --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed --file_fds_limit <int>Store will try to use no more file descriptors than this given limit --file_parallel_recovery <int>On startup, number of channels that can be recovered in parallel --file_truncate_bad_eof <bool>Truncate files for which there is an unexpected EOF on recovery, dataloss may occur Streaming Server SQL Store Options: --sql_driver <string>Name of the SQL Driver ("mysql" or "postgres") --sql_source <string>Datasource used when opening an SQL connection to the database --sql_no_caching <bool>Enable/Disable caching for improved performance --sql_max_open_conns <int>Maximum number of opened connections to the database Streaming Server TLS Options: -secure <bool>Use a TLS connection to the NATS server without verification; weaker than specifying certificates. -tls_client_key <string>Client key for the streaming server -tls_client_cert <string>Client certificate for the streaming server -tls_client_cacert <string>Client certificate CA for the streaming server Streaming Server Logging Options: -SD, --stan_debug=<bool>Enable STAN debugging output -SV, --stan_trace=<bool>Trace the raw STAN protocol -SDVDebug and trace STAN --syslog_nameOn Windows, when running several servers as a service, use this name for the event source (See additional NATS logging options below) Embedded NATS Server Options: -a, --addr <string>Bind to host address (default: 0.0.0.0) -p, --port <int>Use port for clients (default: 4222) -P, --pid <string>File to store PID -m, --http_port <int>Use port for http monitoring -ms,--https_port <int>Use port for https monitoring -c, --config <string>Configuration file Logging Options: -l, --log <string>File to redirect log output -T, --logtime=<bool>Timestamp log entries (default: true) -s, --syslog <string>Enable syslog as log method -r, --remote_syslog <string>Syslog server addr (udp://localhost:514) -D, --debug=<bool>Enable debugging output -V, --trace=<bool>Trace the raw protocol -DVDebug and trace Authorization Options: --user <string>User required for connections --pass <string>Password required for connections --auth <string>Authorization token required for connections TLS Options: --tls=<bool>Enable TLS, do not verify clients (default: false) --tlscert <string>Server certificate file --tlskey <string>Private key for server certificate --tlsverify=<bool>Enable TLS, verify client certificates --tlscacert <string>Client certificate CA for verification NATS Clustering Options: --routes <string, ...>Routes to solicit and connect --cluster <string>Cluster URL for solicited routes Common Options: -h, --helpShow this message -v, --versionShow version --help_tlsTLS help.
原始碼簡單分析NATS Streaming 持久化
目前NATS Streaming支援以下4種持久化方式:
- MEMORY
- FILE
- SQL
- RAFT
其實看原始碼可以知道:NATS Streaming的store 基於介面實現,很容易擴充套件到更多的持久化方式。具體的介面如下:
// Store is the storage interface for NATS Streaming servers. // // If an implementation has a Store constructor with StoreLimits, it should be // noted that the limits don't apply to any state being recovered, for Store // implementations supporting recovery. // type Store interface { // GetExclusiveLock is an advisory lock to prevent concurrent // access to the store from multiple instances. // This is not to protect individual API calls, instead, it // is meant to protect the store for the entire duration the // store is being used. This is why there is no `Unlock` API. // The lock should be released when the store is closed. // // If an exclusive lock can be immediately acquired (that is, // it should not block waiting for the lock to be acquired), // this call will return `true` with no error. Once a store // instance has acquired an exclusive lock, calling this // function has no effect and `true` with no error will again // be returned. // // If the lock cannot be acquired, this call will return // `false` with no error: the caller can try again later. // // If, however, the lock cannot be acquired due to a fatal // error, this call should return `false` and the error. // // It is important to note that the implementation should // make an effort to distinguish error conditions deemed // fatal (and therefore trying again would invariably result // in the same error) and those deemed transient, in which // case no error should be returned to indicate that the // caller could try later. // // Implementations that do not support exclusive locks should // return `false` and `ErrNotSupported`. GetExclusiveLock() (bool, error) // Init can be used to initialize the store with server's information. Init(info *spb.ServerInfo) error // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...). Name() string // Recover returns the recovered state. // Implementations that do not persist state and therefore cannot // recover from a previous run MUST return nil, not an error. // However, an error must be returned for implementations that are // attempting to recover the state but fail to do so. Recover() (*RecoveredState, error) // SetLimits sets limits for this store. The action is not expected // to be retroactive. // The store implementation should make a deep copy as to not change // the content of the structure passed by the caller. // This call may return an error due to limits validation errors. SetLimits(limits *StoreLimits) error // GetChannelLimits returns the limit for this channel. If the channel // does not exist, returns nil. GetChannelLimits(name string) *ChannelLimits // CreateChannel creates a Channel. // Implementations should return ErrAlreadyExists if the channel was // already created. // Limits defined for this channel in StoreLimits.PeChannel map, if present, // will apply. Otherwise, the global limits in StoreLimits will apply. CreateChannel(channel string) (*Channel, error) // DeleteChannel deletes a Channel. // Implementations should make sure that if no error is returned, the // channel would not be recovered after a restart, unless CreateChannel() // with the same channel is invoked. // If processing is expecting to be time consuming, work should be done // in the background as long as the above condition is guaranteed. // It is also acceptable for an implementation to have CreateChannel() // return an error if background deletion is still happening for a // channel of the same name. DeleteChannel(channel string) error // AddClient stores information about the client identified by `clientID`. AddClient(info *spb.ClientInfo) (*Client, error) // DeleteClient removes the client identified by `clientID` from the store. DeleteClient(clientID string) error // Close closes this store (including all MsgStore and SubStore). // If an exclusive lock was acquired, the lock shall be released. Close() error }
官方也提供了mysql和pgsql兩種資料的支援:
postgres.db.sql
CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow)); CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id)); CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id)); CREATE INDEX Idx_ChannelsName ON Channels (name(256)); CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq)); CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp); CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid)); CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row)); CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq); CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0); -- Updates for 0.10.0 ALTER TABLE Clients ADD proto BYTEA;
mysql.db.sql
CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow)); CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256))); CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256))); CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp)); CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid)); CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq)); CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0); # Updates for 0.10.0 ALTER TABLE Clients ADD proto BLOB;
總結
後續會詳細解讀一下程式碼實現和一些叢集部署。當然肯定少不了如何部署高可用的叢集在k8s當中。