1. 程式人生 > >kafka 副本機制和容錯處理 -2

kafka 副本機制和容錯處理 -2

[文章來源於本人的印象筆記,如出現格式問題可訪問該連結檢視原文](https://app.yinxiang.com/fx/475c6308-a882-4ffb-a9c5-b2070da9af9a) [原創宣告:作者:Arnold.zhao 部落格園地址:https://www.cnblogs.com/zh94](https://www.cnblogs.com/zh94) ## 副本機制 Kafka的副本機制會在多個服務端節點上對每個主題分割槽的日誌進行復制,當叢集中的某個節點上出現故障時,訪問故障節點的請求會被轉移到其他正常節點的副本上,副本的單位是主題的分割槽; kafka每個主題的每個分割槽都會有一個主副本(Leader)以及0個或多個備份副本(Follower),主副本負責客戶端的讀和寫,備份副本則負責向主副本拉取資料,以便和主副本的資料同步,當主副本掛掉後,kafka會在備份副本中選擇一個作為主副本,繼續為客戶端提供讀寫服務; ![](https://img2020.cnblogs.com/blog/1104472/202012/1104472-20201201102552172-1090614210.png) 分割槽的主副本應該均勻的分佈在各個伺服器上,如上圖所示,通常主題分割槽的數量要比伺服器的數量多很多,所以每個伺服器都可以成為一些分割槽的主副本,也能同時成為一些分割槽的備份副本; 備份副本始終儘量保持與主副本的資料同步,備份副本的日誌檔案和主副本的日誌總是相同的,它們都有相同的偏移量和相同順序的訊息。 ## 容錯處理 分散式系統處理故障容錯時,需要明確的定義節點是否處於存活狀態,kafka對節點的存活定義有如下兩個條件 1. 節點必須和ZK保持會話 2. 如果這個節點是某個分割槽的備份副本,他必須對分割槽主副本的寫操作進行復制,並且複製的進度不能落後太多 滿足上述兩個條件,叫做(in - sync)正在同步中,每個分割槽的主副本會跟蹤正在同步中的備份副本節點(In Sync Replicas 即ISR ),如果一個備份副本掛掉,沒有響應或者同步進度落戶太多,主副本就會將其從同步副本集合中移除,反之,如果備份副本重新趕上主副本,它就會加入到主副本的同步集合中;(這個主副本的同步集合,包含主副本自己以及主副本保持一定同步的備份副本 則統稱為ISR) 在Kafka 中,一條訊息只有被ISR集合的所有副本都運用到本地的日誌檔案,才會認為訊息被成功 提交了, 任何時刻,只要ISR至少有一個副本是存活的, Kafka就可以保證“一條訊息一旦被提交,就不會丟失” 只有已經提交的訊息才能被消費者消費,因此消費者不用擔心會看到因為主副本失敗而 丟失的訊息; 對於生產者所傳送的一條訊息已經寫入到主副本中,但是此時備份副本還沒來及進行資料copy時,主副本就掛掉的情況,那麼此時訊息就不算寫入成功,生產者會重新發送該訊息,當所傳送的一條訊息成功的複製到ISR的所有副本後,它們才會被認為是提交的,此時才對消費者是可見的。 # 服務驗證 驗證kafka的服務副本機制和容錯處理。 驗證kafka的副本機制,只需部署一臺測試用kafka即可,驗證kafka的容錯處理,及服務節點down機後的處理方式,則需要最低3個服務搭建kafka叢集進行模擬; ## 單節點驗證 先官網下載一個kafka,也可以在此處網盤直接下載 ``` 連結:https://pan.baidu.com/s/1_tmQA1AqTgh4T81HuW8OlA 提取碼:zc1q ``` 修改config目錄下server.properties內容的基本資訊,此處修改內容如下 ``` broker.id=1 listeners=PLAINTEXT://10.0.3.17:9092 log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp zookeeper.connect=10.0.3.17:2181 ``` zookeeper.properties內容保持不變,然後啟動kafka預設自帶的zk服務,以及kafka服務即可; ``` 啟動zk ./zookeeper-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/zookeeper.properties 啟動kafka ./kafka-server-start.sh -daemon /opt/gangtise/kafka/kafka_2.11-2.1.0/config/server.properties ``` 新建一個只有一個副本一個分割槽的topic主題 ``` [root@dev bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic arnold_test WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "arnold_test". ``` 此時在我們所配置的tmp資料夾下則對應新建了一個相同topic名稱的日誌檔案,由於我們此時新建 arnold_test topic時只指定建立了一個分割槽,所以該日誌目錄下只建立了一個分割槽資料夾;索引位為0; **除此之外,還可以看到在我們所產生的分割槽檔案同級別的目錄下,存在如下4個 checkpoint結尾的檔案,其中 replication-offset-checkpoint(複製偏移檢查點) 記錄對應的各主題分割槽的偏移量;recovery-point-offset-checkpoint(恢復點偏移檢查點) 記錄對應的各主題分割槽的恢復點的偏移量;做個標記,後續給詳細的說明** ``` [root@dev kakfa_tmp]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp [root@dev kakfa_tmp]# ll total 16 drwxr-xr-x. 2 root root 141 Nov 25 17:21 arnold_test-0 -rw-r--r--. 1 root root 0 Nov 25 17:12 cleaner-offset-checkpoint -rw-r--r--. 1 root root 4 Nov 25 17:24 log-start-offset-checkpoint -rw-r--r--. 1 root root 54 Nov 25 17:12 meta.properties -rw-r--r--. 1 root root 20 Nov 25 17:24 recovery-point-offset-checkpoint -rw-r--r--. 1 root root 20 Nov 25 17:24 replication-offset-checkpoint ``` **檢視arnold_test-0這個分割槽下的內容如下,其中 .log 結尾的檔案則是二進位制的日誌檔案,該日誌檔案中所儲存的內容則是我們通過kafka生產者向這個topic中所輸入的訊息內容;可以使用 string 命令檢視該二進位制檔案的內容。** ``` [root@dev arnold_test-0]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test-0 [root@dev arnold_test-0]# ll total 4 -rw-r--r--. 1 root root 10485760 Nov 25 17:21 00000000000000000000.index -rw-r--r--. 1 root root 0 Nov 25 17:21 00000000000000000000.log -rw-r--r--. 1 root root 10485756 Nov 25 17:21 00000000000000000000.timeindex -rw-r--r--. 1 root root 8 Nov 25 17:21 leader-epoch-checkpoint ``` 再新建一個只有一個副本三個分割槽的topic主題 ``` ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic arnold_test_many ``` 檢視日誌分割槽檔案如下 ``` [root@dev kakfa_tmp]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp [root@dev kakfa_tmp]# ll total 16 drwxr-xr-x. 2 root root 141 Nov 25 17:21 arnold_test-0 drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-0 drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-1 drwxr-xr-x. 2 root root 141 Nov 25 19:18 arnold_test_many-2 -rw-r--r--. 1 root root 0 Nov 25 17:12 cleaner-offset-checkpoint -rw-r--r--. 1 root root 4 Nov 25 19:19 log-start-offset-checkpoint -rw-r--r--. 1 root root 54 Nov 25 17:12 meta.properties -rw-r--r--. 1 root root 83 Nov 25 19:19 recovery-point-offset-checkpoint -rw-r--r--. 1 root root 83 Nov 25 19:20 replication-offset-checkpoint ``` **使用console 生產者,向arnold_test_many topic生產6條資料,可以看到此時我們生成6條content內容,按照我們上一章的說法,由於我們沒有使用key值的方式進行儲存,而是直接儲存的內容,那麼此時所產生的6條內容,則會按照輪訓的方式依次插入對應的分割槽中。** ``` [root@dev bin]# ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_test_many >message1 >message2 >message3 >message4 >message5 >message6 ``` **此時我們直接檢視kafka分割槽偏移量記錄的檔案,可以看到arnold_test_many 0,arnold_test_many 1,arnold_test_many 2 所對應的分割槽 所對應的偏移量都是2,因為我們此時插入了6條資料,輪訓均衡的插入到了3個分割槽中,所以各個分割槽中的檢查點偏移量都為2。** ``` [root@dev kakfa_tmp]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp [root@dev kakfa_tmp]# cat replication-offset-checkpoint 0 4 arnold_test_many 2 2 arnold_test_many 1 2 arnold_test 0 0 arnold_test_many 0 2 ``` 此時我們使用strings 直接檢視各分割槽中所儲存的日誌內容 ``` [root@dev arnold_test_many-0]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-0 [root@dev arnold_test_many-0]# strings 00000000000000000000.log message2 message5 [root@dev arnold_test_many-1]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-1 [root@dev arnold_test_many-1]# strings 00000000000000000000.log message1 message4 [root@dev arnold_test_many-2]# pwd /opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp/arnold_test_many-2 [root@dev arnold_test_many-2]# strings 00000000000000000000.log message3 message6 ``` 如上所示,資料採用輪訓的方式均勻的插入到了各個分割槽中,整理後結果如下所示。 ``` arnold_test_many1 arnold_test_many0 arnold_test_many2 message1 message2 message3 message4 message5 message6 ``` 此時可能你會有如此疑問,為什麼資料的輪訓不是從 arnold_test_many0開始輪訓插入的,而是從arnold_test_many1開始的?這個我還沒做嚴格測試,所以不排除輪訓的起始分割槽的確是隨機的,至少並不是按照分割槽的下標來確定順序的。不過該topic的分割槽的順序只要第一次插入資料確定後,後續就都將按照這個順序執行,所以,起始分割槽的確也不是最重要的問題了。 **此時我們使用消費者來消費該topic下的所有分割槽內的資料,可以看到消費者消費kafka中的資料時,並沒有按照生產者的插入順序來讀取出來;這也驗證了我們上一章的說法,kafka並不保證當前topic的全域性訊息順序,而只是保證當前該topic下各分割槽的訊息順序,也就是我們看到的 分割槽arnold_test_many2中的message3訊息一定在message6前面,分割槽 arnold_test_many1中的 message1肯定在message4前面;** ``` [root@dev bin]# ./kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092 --from-beginning --topic arnold_test_many message3 message6 message1 message4 message2 message5 ``` [原創宣告:作者:Arnold.zhao 部落格園地址:https://www.cnblogs.com/zh94](https://www.cnblogs.com/zh94) ## 分散式模式驗證 直接將我們kafka的部署包,分別cp到不同的機器上,然後修改對應的配置,啟動即可,主要配置內容如下 10.0.3.17 ``` broker.id=1 listeners=PLAINTEXT://10.0.3.17:9092 log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kakfa_tmp zookeeper.connect=10.0.3.17:2181 ``` 10.0.6.39 ``` broker.id=2 listeners=PLAINTEXT://10.0.3.17:9092 log.dirs=/home/gangtise/kafka/kafka_2.11-2.1.0/logs/kafka_tmp zookeeper.connect=10.0.3.17:2181 ``` 10.0.3.18 ``` broker.id=3 listeners=PLAINTEXT://10.0.3.18:9092 log.dirs=/opt/gangtise/kafka/kafka_2.11-2.1.0/logs/kafka_tmp zookeeper.connect=10.0.3.17:2181 ``` 啟動對應的kafka節點後,通過zk的圖形化工具檢視當前的kafka註冊資訊,可以看到對應的kafka節點資訊都已經註冊到zk上,此時表明部署完成;(由於此處主要是驗證下kafka叢集的特性,所以zk的節點則只部署了一個,測試用即可) ![](https://img2020.cnblogs.com/blog/1104472/202012/1104472-20201201102622048-1960957463.png) ### 新建Topic 由於我們此時已經有了3個伺服器節點,對應的伺服器broker.id分別為:1,2,3; 此時我們建立一個新的Topic,指定該Topic為三個副本,三個分割槽,如下:--replication-factor 3 表示建立對應的副本數為3, --partitions 3 表示建立對應的分割槽數為3 ``` ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic arnold_test_many_three ``` 使用--describe命令檢視該topic的詳細資訊 ``` [root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs: Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: arnold_test_many_three Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 ``` 上述查詢結果詳細解釋一下,如下: 第一行含義是: Topic arnold_test_many_three 一共有三個分割槽(PartitionCount:3),三個副本(ReplicationFactor:3),對應的配置的具體資訊,則是看下方的詳情(Config) 第二行含義是: Topic arnold_test_many_three 的第0個分割槽(Partition: 0),對應的主副本Leader節點是3(Leader: 3),該0分割槽對應的副本分別是在3,1,2節點上儲存(Replicas: 3,1,2),其中當前的ISR資訊分別是3,1,2(Isr: 3,1,2); 那麼對應的第三行的含義則也是與第二行相似,表示當前的 Partition 1分割槽的三個副本,分別是在broker.id 為1,2,3的三個服務節點上儲存,其中主副本Leader副本則是broker.id為1的伺服器上的副本,則是主副本;當前該分割槽的同步資訊 isr 是1,2,3,表示所有節點均同步正常; ISR表示正在同步的節點集合,詳細解釋,可以看上述容錯處理中的內容解釋; ### 手動停止節點 此時我們將broker.id為2的節點服務停止掉,再觀察下當前的副本情況; ``` 直接使用kill -9 強制程序退出的方式; [gangtise@yinhua-ca000003 arnold_test_many_three-0]$ kill -9 94130 ``` 此時再觀察對應的topic的詳細資訊 ``` [root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs: Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1 Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3 Topic: arnold_test_many_three Partition: 2 Leader: 3 Replicas: 2,3,1 Isr: 3,1 ``` 可以看到,分割槽的主副本已經進行了轉移,原先的Partition 2 分割槽的,Leader節點由原來的2,變更為了現在的3; 也就是說當前的Partition2分割槽的Leader節點主副本任務由原來的2伺服器節點,變更成了由伺服器節點3來接管Partition2分割槽的主副本任務; 另外則是,所有的Replicas 副節點的資訊,是沒有變化的,還是和以前的一摸一樣,畢竟2節點只是down機,修復好以後還是要重新承擔副本的任務的; 雖然每個分割槽的Replicas沒有變化,但是每個分割槽的ISR中不再包含 2節點;(也就是說,後續的訊息的同步集合中,只需要1,3節點同步完成,則同步完成,便會直接給生產者返回一個訊息,OK同步完成;而2節點,等到後續節點修復後,主動copy其他分割槽的副本內容到自身的節點上,等到進度完全和其它節點保持一致後,則重新加入到ISR同步集合中。) 那麼此時,我們再重新啟動下 2節點; ``` [gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ ./bin/kafka-server-start.sh -daemon /home/gangtise/kafka/kafka_2.11-2.1.0/config/server.properties ``` 然後再重新檢視當前Topic的詳細資訊,啪一下啊,很快啊,可以看到ISR中的同步集合中,2節點也已經正常回歸; ``` [root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs: Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 Topic: arnold_test_many_three Partition: 2 Leader: 3 Replicas: 2,3,1 Isr: 3,1,2 ``` 如上,可以看到,此時的ISR中已經包含了2節點;但是,觀察上面結果可以看到,Partition0分割槽和Partition2分割槽的主節點都還是在 3伺服器節點上,也就是當前該兩個分割槽都過於依賴3伺服器的主副本節點,而主副本節點又要負責訊息的讀寫的任務,所以3服務節點,壓力較大,此時通過執行平衡操作,來解決分割槽主節點的問題 ### 執行自平衡 ``` [root@dev bin]# ./kafka-preferred-replica-election.sh --zookeeper 10.0.3.17:2181 [root@dev bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic arnold_test_many_three Topic:arnold_test_many_three PartitionCount:3 ReplicationFactor:3 Configs: Topic: arnold_test_many_three Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: arnold_test_many_three Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 Topic: arnold_test_many_three Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2 ``` Partition2的副本集[2,3,1],原有的Leader編號是3,通過執行上述平衡操作後,分割槽2的主副本節點由原有3,遷移到了節點2。 不過,如果在服務down機後,不手動執行平衡操作的情況下,那麼過一段時間,kafka會自動進行一下平衡,只要新上來的服務是穩定的情況下,kafka會自動將主副本平衡到新的節點上; # 命令彙總 上述所使用到的幾個命令彙總 ``` 建立Topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic arnold_test_many 生產者 ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_test_many 消費者 ./kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092 --from-beginning --topic arnold_test_many 檢視Topic詳細資訊(包含節點的分佈情況) ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_test_many_three 分割槽自平衡 ./kafka-preferred-replica-election.sh --zookeeper 10.0.3.17:2181 ``` [原創宣告:作者:Arnold.zhao 部落格園地址:https://www.cnblogs.com/zh94](https://www.cnblogs.com/zh94)