1. 程式人生 > >「從零單排canal 04」 啟動模組deployer原始碼解析

「從零單排canal 04」 啟動模組deployer原始碼解析

基於1.1.5-alpha版本,具體原始碼筆記可以參考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

本文將對canal的啟動模組deployer進行分析。

Deployer模組(綠色部分)在整個系統中的角色如下圖所示,用來啟動canal-server.

 

模組內的類如下:

 

為了能帶著目的看原始碼,以幾個問題開頭,帶著問題來一起探索deployer模組的原始碼。

  • CanalServer啟動過程中配置如何載入?
  • CanalServer啟動過程中涉及哪些元件?
  • 叢集模式的canalServer,是如何實現instance的HA呢?
  • 每個canalServer又是怎麼獲取admin上的配置變更呢?

1.入口類CanalLauncher


這個類是整個canal-server的入口類。負責配置載入和啟動canal-server。

主流程如下:

  • 載入canal.properties的配置內容
  • 根據canal.admin.manager是否為空判斷是否是admin控制,如果不是admin控制,就直接根據canal.properties的配置來了
  • 如果是admin控制,使用PlainCanalConfigClient獲取遠端配置 新開一個執行緒池每隔五秒用http請求去admin上拉配置進行merge(這裡依賴了instance模組的相關配置拉取的工具方法) 用md5進行校驗,如果canal-server配置有更新,那麼就重啟canal-server
  • 核心是用canalStarter.start()啟動
  • 使用CountDownLatch保持主執行緒存活
  • 收到關閉訊號,CDL-1,然後關閉配置更新執行緒池,優雅退出
  1 public static void main(String[] args) {
  2 
  3     try {
  4 
  5         //note:設定全域性未捕獲異常的處理
  6 
  7         setGlobalUncaughtExceptionHandler();
  8 
  9         /**
 10 
 11          * note:
 12 
 13          * 1.讀取canal.properties的配置
 14 
 15          * 可以手動指定配置路徑名稱
 16 
 17          */
 18 
 19         String conf = System.getProperty("canal.conf", "classpath:canal.properties");
 20 
 21         Properties properties = new Properties();
 22 
 23         if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
 24 
 25             conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
 26 
 27             properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
 28 
 29         } else {
 30 
 31             properties.load(new FileInputStream(conf));
 32 
 33         }
 34 
 35         final CanalStarter canalStater = new CanalStarter(properties);
 36 
 37         String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
 38 
 39         /**
 40 
 41          * note:
 42 
 43          * 2.根據canal.admin.manager是否為空判斷是否是admin控制,如果不是admin控制,就直接根據canal.properties的配置來了
 44 
 45          */
 46 
 47         if (StringUtils.isNotEmpty(managerAddress)) {
 48 
 49             String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
 50 
 51             //省略一部分。。。。。。
 52           
 53 
 54             /**
 55 
 56              * note:
 57 
 58              * 2.1使用PlainCanalConfigClient獲取遠端配置
 59 
 60              */
 61 
 62             final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
 63 
 64                     user,
 65 
 66                     passwd,
 67 
 68                     registerIp,
 69 
 70                     Integer.parseInt(adminPort),
 71 
 72                     autoRegister,
 73 
 74                     autoCluster);
 75 
 76             PlainCanal canalConfig = configClient.findServer(null);
 77 
 78             if (canalConfig == null) {
 79 
 80                 throw new IllegalArgumentException("managerAddress:" + managerAddress
 81 
 82                         + " can't not found config for [" + registerIp + ":" + adminPort
 83 
 84                         + "]");
 85 
 86             }
 87 
 88             Properties managerProperties = canalConfig.getProperties();
 89 
 90             // merge local
 91 
 92             managerProperties.putAll(properties);
 93 
 94             int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
 95 
 96                     CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
 97 
 98                     "5"));
 99 
100             /**
101 
102              * note:
103 
104              * 2.2 新開一個執行緒池每隔五秒用http請求去admin上拉配置進行merge(這裡依賴了instance模組的相關配置拉取的工具方法)
105 
106              */
107 
108             executor.scheduleWithFixedDelay(new Runnable() {
109 
110                 private PlainCanal lastCanalConfig;
111 
112                 public void run() {
113 
114                     try {
115 
116                         if (lastCanalConfig == null) {
117 
118                             lastCanalConfig = configClient.findServer(null);
119 
120                         } else {
121 
122                             PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
123 
124                             /**
125 
126                              * note:
127 
128                              * 2.3 用md5進行校驗,如果canal-server配置有更新,那麼就重啟canal-server
129 
130                              */
131 
132                             if (newCanalConfig != null) {
133 
134                                 // 遠端配置canal.properties修改重新載入整個應用
135 
136                                 canalStater.stop();
137 
138                                 Properties managerProperties = newCanalConfig.getProperties();
139 
140                                 // merge local
141 
142                                 managerProperties.putAll(properties);
143 
144                                 canalStater.setProperties(managerProperties);
145 
146                                 canalStater.start();
147 
148                                 lastCanalConfig = newCanalConfig;
149 
150                             }
151 
152                         }
153 
154                     } catch (Throwable e) {
155 
156                         logger.error("scan failed", e);
157 
158                     }
159 
160                 }
161 
162             }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
163 
164             canalStater.setProperties(managerProperties);
165 
166         } else {
167 
168             canalStater.setProperties(properties);
169 
170         }
171 
172         canalStater.start();
173 
174         //note: 這樣用CDL處理和while(true)有點類似
175 
176         runningLatch.await();
177 
178         executor.shutdownNow();
179 
180     } catch (Throwable e) {
181 
182         logger.error("## Something goes wrong when starting up the canal Server:", e);
183 
184     }
185 
186 }

 

2.啟動類CanalStarter


從上面的入口類,我們可以看到canal-server真正的啟動邏輯在CanalStarter類的start方法。

這裡先對三個物件進行辨析:

  • CanalController:是canalServer真正的啟動控制器
  • canalMQStarter:用來啟動mqProducer。如果serverMode選擇了mq,那麼會用canalMQStarter來管理mqProducer,將canalServer抓取到的實時變更用mqProducer直接投遞到mq
  • CanalAdminWithNetty:這個不是admin控制檯,而是對本server啟動一個netty服務,讓admin控制檯通過請求獲取當前server的資訊,比如執行狀態、正在本server上執行的instance資訊等

start方法主要邏輯如下:

  • 根據配置的serverMode,決定使用CanalMQProducer或者canalServerWithNetty
  • 啟動CanalController
  • 註冊shutdownHook
  • 如果CanalMQProducer不為空,啟動canalMQStarter(內部使用CanalMQProducer將訊息投遞給mq)
  • 啟動CanalAdminWithNetty做伺服器
  1 public synchronized void start() throws Throwable {
  2 
  3     String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
  4 
  5     /**
  6 
  7      * note
  8 
  9      * 1.如果canal.serverMode不是tcp,載入CanalMQProducer,並且啟動CanalMQProducer
 10 
 11      * 回頭可以深入研究下ExtensionLoader類的相關實現
 12 
 13      */
 14 
 15     if (!"tcp".equalsIgnoreCase(serverMode)) {
 16 
 17         ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
 18 
 19         canalMQProducer = loader
 20 
 21                 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
 22 
 23         if (canalMQProducer != null) {
 24 
 25             ClassLoader cl = Thread.currentThread().getContextClassLoader();
 26 
 27             Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
 28 
 29             canalMQProducer.init(properties);
 30 
 31             Thread.currentThread().setContextClassLoader(cl);
 32 
 33         }
 34 
 35     }
 36 
 37     //note 如果啟動了canalMQProducer,就不使用canalWithNetty(這裡的netty是用在哪裡的?)
 38 
 39     if (canalMQProducer != null) {
 40 
 41         MQProperties mqProperties = canalMQProducer.getMqProperties();
 42 
 43         // disable netty
 44 
 45         System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
 46 
 47         if (mqProperties.isFlatMessage()) {
 48 
 49             // 設定為raw避免ByteString->Entry的二次解析
 50 
 51             System.setProperty("canal.instance.memory.rawEntry", "false");
 52 
 53         }
 54 
 55     }
 56 
 57     controller = new CanalController(properties);
 58 
 59     //note 2.啟動canalController
 60 
 61     controller.start();
 62 
 63     //note 3.註冊了一個shutdownHook,系統退出時執行相關邏輯
 64 
 65     shutdownThread = new Thread() {
 66 
 67         public void run() {
 68 
 69             try {
 70 
 71                 controller.stop();
 72 
 73                 //note 主執行緒退出
 74 
 75                 CanalLauncher.runningLatch.countDown();
 76 
 77             } catch (Throwable e) {
 78 
 79 
 80             } finally {
 81 
 82             }
 83 
 84         }
 85 
 86     };
 87 
 88     Runtime.getRuntime().addShutdownHook(shutdownThread);
 89 
 90     //note 4.啟動canalMQStarter,叢集版的話,沒有預先配置destinations。
 91 
 92     if (canalMQProducer != null) {
 93 
 94         canalMQStarter = new CanalMQStarter(canalMQProducer);
 95 
 96         String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
 97 
 98         canalMQStarter.start(destinations);
 99 
100         controller.setCanalMQStarter(canalMQStarter);
101 
102     }
103 
104     // start canalAdmin
105 
106     String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
107 
108     //note 5.根據填寫的canalAdmin的ip和port,啟動canalAdmin,用netty做伺服器
109 
110     if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
111 
112         String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
113 
114         String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
115 
116         CanalAdminController canalAdmin = new CanalAdminController(this);
117 
118         canalAdmin.setUser(user);
119 
120         canalAdmin.setPasswd(passwd);
121 
122         String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
123 
124         CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
125 
126         canalAdminWithNetty.setCanalAdmin(canalAdmin);
127 
128         canalAdminWithNetty.setPort(Integer.parseInt(port));
129 
130         canalAdminWithNetty.setIp(ip);
131 
132         canalAdminWithNetty.start();
133 
134         this.canalAdmin = canalAdminWithNetty;
135 
136     }
137 
138     running = true;
139 
140 }

 

3.CanalController


前面兩個類都是比較清晰的,一個是入口類,一個是啟動類,下面來看看核心邏輯所在的CanalController。

這裡用了大量的匿名內部類實現介面,看起來有點頭大,耐心慢慢剖析一下。

3.1 從構造器開始瞭解

整體初始化的順序如下:

  • 構建PlainCanalConfigClient,用於使用者遠端配置的獲取
  • 初始化全域性配置,順便把instance相關的全域性配置初始化一下
  • 準備一下canal-server,核心在於embededCanalServer,如果有需要canalServerWithNetty,那就多包裝一個(我們serverMode=mq是不需要這個netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作為instance 執行節點控制
  • 初始化InstanceAction,完成monitor機制。(監控instance配置變化然後呼叫ServerRunningMonitor進行處理)

這裡有幾個機制要詳細介紹一下。

3.1.1 CanalServer兩種模式

canalServer支援兩種模式,CanalServerWithEmbedded和CanalServerWithNetty。

在構造器中初始化程式碼部分如下:

 1 // 3.準備canal server
 2 
 3 //note: 核心在於embededCanalServer,如果有需要canalServerWithNetty,那就多包裝一個(我們serverMode=mq
 4 
 5 // 是不需要這個netty的)
 6 
 7 ip = getProperty(properties, CanalConstants.CANAL_IP);
 8 
 9 //省略一部分。。。
10 
11 embededCanalServer = CanalServerWithEmbedded.instance();
12 
13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設定自定義的instanceGenerator
14 
15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
16 
17 //省略一部分。。。
18 
19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
20 
21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
22 
23     canalServer = CanalServerWithNetty.instance();
24 
25     canalServer.setIp(ip);
26 
27     canalServer.setPort(port);
28 
29 }

 

embededCanalServer:型別為CanalServerWithEmbedded

canalServer:型別為CanalServerWithNetty

二者有什麼區別呢?

都實現了CanalServer介面,且都實現了單例模式,通過靜態方法instance獲取例項。

關於這兩種型別的實現,canal官方文件有以下描述:

 

說白了,就是我們可以不必獨立部署canal server。在應用直接使用CanalServerWithEmbedded直連mysql資料庫進行訂閱。

如果覺得自己的技術hold不住相關程式碼,就獨立部署一個canal server,使用canal提供的客戶端,連線canal server獲取binlog解析後資料。而CanalServerWithNetty是在CanalServerWithEmbedded的基礎上做的一層封裝,用於與客戶端通訊。

在獨立部署canal server時,Canal客戶端傳送的所有請求都交給CanalServerWithNetty處理解析,解析完成之後委派給了交給CanalServerWithEmbedded進行處理。因此CanalServerWithNetty就是一個馬甲而已。CanalServerWithEmbedded才是核心。

因此,在構造器中,我們看到,

用於生成CanalInstance例項的instanceGenerator被設定到了CanalServerWithEmbedded中,

而ip和port被設定到CanalServerWithNetty中。

關於CanalServerWithNetty如何將客戶端的請求委派給CanalServerWithEmbedded進行處理,我們將在server模組原始碼分析中進行講解。

 

3.1.2 ServerRunningMonitor

在CanalController的構造器中,canal會為每一個destination建立一個Instance,每個Instance都會由一個ServerRunningMonitor來進行控制。而ServerRunningMonitor統一由ServerRunningMonitors進行管理。

ServerRunningMonitor是做什麼的呢?

我們看下它的屬性就瞭解了。它主要用來記錄每個instance的執行狀態資料的。

 1 /**
 2 
 3 * 針對server的running節點控制
 4 
 5 */
 6 
 7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {
 8 
 9     private static final Logger        logger       = LoggerFactory.getLogger(ServerRunningMonitor.class);
10 
11     private ZkClientx                  zkClient;
12 
13     private String                     destination;
14 
15     private IZkDataListener            dataListener;
16 
17     private BooleanMutex               mutex        = new BooleanMutex(false);
18 
19     private volatile boolean           release      = false;
20 
21     // 當前服務節點狀態資訊
22 
23     private ServerRunningData          serverData;
24 
25     // 當前實際執行的節點狀態資訊
26 
27     private volatile ServerRunningData activeData;
28 
29     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
30 
31     private int                        delayTime    = 5;
32 
33     private ServerRunningListener      listener;
34 
35     public ServerRunningMonitor(ServerRunningData serverData){
36 
37         this();
38 
39         this.serverData = serverData;
40 
41     }
42         //。。。。。
43 
44 }

 

在建立ServerRunningMonitor物件時,首先根據ServerRunningData建立ServerRunningMonitor例項,之後設定了destination和ServerRunningListener。

ServerRunningListener是個介面,這裡採用了匿名內部類的形式構建,實現了各個介面的方法。

主要為instance在當前server上的狀態發生變化時呼叫。比如要在當前server上啟動這個instance了,就呼叫相關啟動方法,如果在這個server上關閉instance,就呼叫相關關閉方法。

具體的呼叫邏輯我們後面在啟動過程中分析,這裡大概知道下構造器中做了些什麼就行了,主要就是一些啟動、關閉的邏輯。

  1 new Function<String, ServerRunningMonitor>() {
  2 
  3     public ServerRunningMonitor apply(final String destination) {
  4 
  5         ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
  6 
  7         runningMonitor.setDestination(destination);
  8 
  9         runningMonitor.setListener(new ServerRunningListener() {
 10 
 11             /**
 12 
 13              * note
 14 
 15              * 1.內部呼叫了embededCanalServer的start(destination)方法。
 16 
 17              * 這裡很關鍵,說明每個destination對應的CanalInstance是通過embededCanalServer的start方法啟動的,
 18 
 19              * 這樣我們就能理解,為什麼之前構造器中會把instanceGenerator設定到embededCanalServer中了。
 20 
 21              * embededCanalServer負責呼叫instanceGenerator生成CanalInstance例項,並負責其啟動。
 22 
 23              *
 24 
 25              * 2.如果投遞mq,還會直接呼叫canalMQStarter來啟動一個destination
 26 
 27              */
 28 
 29             public void processActiveEnter() {
 30 
 31                //省略具體內容。。。
 32             }
 33 
 34             /**
 35 
 36              * note
 37 
 38              * 1.與開始順序相反,如果有mqStarter,先停止mqStarter的destination
 39 
 40              * 2.停止embedeCanalServer的destination
 41 
 42              */
 43 
 44             public void processActiveExit() {
 45 
 46                 //省略具體內容。。。
 47 
 48             }
 49 
 50             /**
 51 
 52              * note
 53 
 54              * 在Canalinstance啟動之前,destination註冊到ZK上,建立節點
 55 
 56              * 路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
 57 
 58              * 此方法會在processActiveEnter()之前被呼叫
 59 
 60              */
 61 
 62             public void processStart() {
 63 
 64                 //省略具體內容。。。
 65 
 66             }
 67 
 68             /**
 69 
 70              * note
 71 
 72              * 在Canalinstance停止前,把ZK上節點刪除掉
 73 
 74              * 路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
 75 
 76              * 此方法會在processActiveExit()之前被呼叫
 77 
 78              */
 79 
 80             public void processStop() {
 81 
 82                 //省略具體內容。。。
 83             }
 84 
 85         });
 86 
 87         if (zkclientx != null) {
 88 
 89             runningMonitor.setZkClient(zkclientx);
 90 
 91         }
 92 
 93         // 觸發建立一下cid節點
 94 
 95         runningMonitor.init();
 96 
 97         return runningMonitor;
 98 
 99     }
100 
101 }

 

3.2 canalController的start方法

具體執行邏輯如下:

  • 在zk的/otter/canal/cluster目錄下根據ip:port建立server的臨時節點,註冊zk監聽器
  • 先啟動embededCanalServer(會啟動對應的監控)
  • 根據配置的instance的destination,呼叫runningMonitor.start() 逐個啟動instance
  • 如果cannalServer不為空,啟動canServer (canalServerWithNetty)

這裡需要注意,canalServer什麼時候為空?

如果使用者選擇了serverMode為mq,那麼就不會啟動canalServerWithNetty,採用mqStarter來作為server,直接跟mq叢集互動。canalServerWithNetty只有在serverMode為tcp時才啟動,用來跟canal-client做互動。

所以如果以後想把embeddedCanal嵌入自己的應用,可以考慮參考mqStarter的寫法。後面我們在server模組中會做詳細解析。

  1 public void start() throws Throwable {
  2 
  3     // 建立整個canal的工作節點
  4 
  5     final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
  6 
  7     initCid(path);
  8 
  9     if (zkclientx != null) {
 10 
 11         this.zkclientx.subscribeStateChanges(new IZkStateListener() {
 12 
 13             public void handleStateChanged(KeeperState state) throws Exception {
 14 
 15             }
 16 
 17             public void handleNewSession() throws Exception {
 18 
 19                 initCid(path);
 20 
 21             }
 22 
 23             @Override
 24 
 25             public void handleSessionEstablishmentError(Throwable error) throws Exception{
 26 
 27                 logger.error("failed to connect to zookeeper", error);
 28 
 29             }
 30 
 31         });
 32 
 33     }
 34 
 35     // 先啟動embeded服務
 36 
 37     embededCanalServer.start();
 38 
 39     // 嘗試啟動一下非lazy狀態的通道
 40 
 41     for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
 42 
 43         final String destination = entry.getKey();
 44 
 45         InstanceConfig config = entry.getValue();
 46 
 47         // 建立destination的工作節點
 48 
 49         if (!embededCanalServer.isStart(destination)) {
 50 
 51             // HA機制啟動
 52 
 53             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
 54 
 55             if (!config.getLazy() && !runningMonitor.isStart()) {
 56 
 57                 runningMonitor.start();
 58 
 59             }
 60 
 61         }
 62 
 63         //note:為每個instance註冊一個配置監視器
 64 
 65         if (autoScan) {
 66 
 67             instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
 68 
 69         }
 70 
 71     }
 72 
 73     if (autoScan) {
 74 
 75         //note:啟動執行緒定時去掃描配置
 76 
 77         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
 78 
 79         //note:這部分程式碼似乎沒有用,目前只能是manager或者spring兩種方式二選一
 80 
 81         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
 82 
 83             if (!monitor.isStart()) {
 84 
 85                 monitor.start();
 86 
 87             }
 88 
 89         }
 90 
 91     }
 92 
 93     // 啟動網路介面
 94 
 95     if (canalServer != null) {
 96 
 97         canalServer.start();
 98 
 99     }
100 
101 }

 

我們重點關注啟動instance的過程,也就是ServerRunningMonitor的執行機制,也就是HA啟動的關鍵。

入口在runningMonitor.start()。

  • 如果zkClient != null,就用zk進行HA啟動
  • 否則,就直接processActiveEnter啟動,這個我們前面已經分析過了
 1 public synchronized void start() {
 2 
 3     super.start();
 4 
 5     try {
 6 
 7         /**
 8 
 9          * note
10 
11          * 內部會呼叫ServerRunningListener的processStart()方法
12 
13          */
14 
15         processStart();
16 
17         if (zkClient != null) {
18 
19             // 如果需要儘可能釋放instance資源,不需要監聽running節點,不然即使stop了這臺機器,另一臺機器立馬會start
20 
21             String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
22 
23             zkClient.subscribeDataChanges(path, dataListener);
24 
25             initRunning();
26 
27         } else {
28 
29             /**
30 
31              * note
32 
33              * 內部直接呼叫ServerRunningListener的processActiveEnter()方法
34 
35              */
36 
37             processActiveEnter();// 沒有zk,直接啟動
38 
39         }
40 
41     } catch (Exception e) {
42 
43         logger.error("start failed", e);
44 
45         // 沒有正常啟動,重置一下狀態,避免干擾下一次start
46 
47         stop();
48 
49     }
50 
51 }

 

重點關注下HA啟動方式,一般 我們都採用這種模式進行。

在叢集模式下,可能會有多個canal server共同處理同一個destination,

在某一時刻,只能由一個canal server進行處理,處理這個destination的canal server進入running狀態,其他canal server進入standby狀態。

同時,通過監聽對應的path節點,一旦發生變化,出現異常,可以立刻嘗試自己進入running,保證了instace的 高可用!!

啟動的重點還是在initRuning()。

利用zk來保證叢集中有且只有 一個instance任務在執行。

  • 還構建一個臨時節點的路徑:/otter/canal/destinations/{0}/running
  • 嘗試建立臨時節點。
  • 如果節點已經存在,說明是其他的canal server已經啟動了這個canal instance。此時會丟擲ZkNodeExistsException,進入catch程式碼塊。
  • 如果建立成功,就說明沒有其他server啟動這個instance,可以建立
 1 private void initRunning() {
 2     if (!isStart()) {
 3         return;
 4     }
 5 
 6 
 7     //note: 還是一樣構建一個臨時節點的路徑:/otter/canal/destinations/{0}/running
 8     String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
 9     // 序列化
10     byte[] bytes = JsonUtils.marshalToByte(serverData);
11     try {
12         mutex.set(false);
13         /**
14          * note:
15          * 嘗試建立臨時節點。如果節點已經存在,說明是其他的canal server已經啟動了這個canal instance。
16          * 此時會丟擲ZkNodeExistsException,進入catch程式碼塊。
17          */
18         zkClient.create(path, bytes, CreateMode.EPHEMERAL);
19         /**
20          * note:
21          * 如果建立成功,就開始觸發啟動事件
22          */
23         activeData = serverData;
24         processActiveEnter();// 觸發一下事件
25         mutex.set(true);
26         release = false;
27     } catch (ZkNodeExistsException e) {
28         /**
29          * note:
30          * 如果捕獲異常,表示建立失敗。
31          * 就根據臨時節點路徑查一下是哪個canal-sever建立了。
32          * 如果沒有相關資訊,馬上重新嘗試一下。
33          * 如果確實存在,就把相關資訊儲存下來
34          */
35         bytes = zkClient.readData(path, true);
36         if (bytes == null) {// 如果不存在節點,立即嘗試一次
37             initRunning();
38         } else {
39             activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
40         }
41     } catch (ZkNoNodeException e) {
42         /**
43          * note:
44          * 如果是父節點不存在,那麼就嘗試建立一下父節點,然後再初始化。
45          */
46         zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試建立父節點
47         initRunning();
48     }
49 }

 

那執行中的HA是如何實現的呢,我們回頭看一下

zkClient.subscribeDataChanges(path, dataListener);
對destination對應的running節點進行監聽,一旦發生了變化,則說明可能其他處理相同destination的canal server可能出現了異常,此時需要嘗試自己進入running狀態。

dataListener是在ServerRunningMonitor的構造方法中初始化的,

包括節點發生變化、節點被刪兩種變化情況以及相對應的處理邏輯,如下 :

 
 1 public ServerRunningMonitor(){
 2     // 建立父節點
 3     dataListener = new IZkDataListener() {
 4         /**
 5          * note:
 6          * 當註冊節點發生變化時,會自動回撥這個方法。
 7          * 我們回想一下使用過程中,什麼時候可能 改變節點當狀態呢?
 8          * 大概是在控制檯中,對canal-server中正在執行的 instance做"停止"操作時,改變了isActive。
 9          * 可以 觸發 HA。
10          */
11         public void handleDataChange(String dataPath, Object data) throws Exception {
12             MDC.put("destination", destination);
13             ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
14             if (!isMine(runningData.getAddress())) {
15                 mutex.set(false);
16             }
17 
18             if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現了主動釋放的操作,並且本機之前是active
19                 releaseRunning();// 徹底釋放mainstem
20             }
21 
22             activeData = (ServerRunningData) runningData;
23         }
24 
25 
26         /**
27          * note:
28          * 如果其他canal instance出現異常,臨時節點資料被刪除時,會自動回撥這個方法,此時當前canal instance要頂上去
29          */
30         public void handleDataDeleted(String dataPath) throws Exception {
31             MDC.put("destination", destination);
32             mutex.set(false);
33             if (!release && activeData != null && isMine(activeData.getAddress())) {
34                 // 如果上一次active的狀態就是本機,則即時觸發一下active搶佔
35                 initRunning();
36             } else {
37                 // 否則就是等待delayTime,避免因網路異常或者zk異常,導致出現頻繁的切換操作
38                 delayExector.schedule(new Runnable() {
39                     public void run() {
40                         initRunning();
41                     }
42                 }, delayTime, TimeUnit.SECONDS);
43             }
44         }
45     };
46 }
 

當註冊節點發生變化時,會自動回撥zkListener的handleDataChange方法。

我們回想一下使用過程中,什麼時候可能 改變節點當狀態呢?

就是在控制檯中,對canal-server中正在執行的 instance做"停止"操作時,改變了isActive,可以 觸發 HA。

如下圖所示

 

4.admin的配置監控原理

我們現在採用admin做全域性的配置控制。

那麼每個canalServer是怎麼監控配置的變化呢?

還記得上嗎cananlController的start方法中對配置監視器的啟動嗎?

 1 if (autoScan) {
 2         //note:啟動執行緒定時去掃描配置
 3         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
 4         //note:這部分程式碼似乎沒有用,目前只能是manager或者spring兩種方式二選一
 5         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
 6             if (!monitor.isStart()) {
 7                 monitor.start();
 8             }
 9         }
10     }

 

這個就是關鍵的配置監控。

我們來看deployer模組中的monitor包了。

 

4.1 InstanceAction

是一個介面,有四個方法,用來獲取配置後,對具體instance採取動作。

 1 /**
 2 * config配置變化後的動作
 3 *
 4 */
 5 public interface InstanceAction {
 6 
 7 
 8     /**
 9      * 啟動destination
10      */
11     void start(String destination);
12 
13 
14     /**
15      * 主動釋放destination執行
16      */
17     void release(String destination);
18 
19 
20     /**
21      * 停止destination
22      */
23     void stop(String destination);
24 
25 
26     /**
27      * 過載destination,可能需要stop,start操作,或者只是更新下記憶體配置
28      */
29     void reload(String destination);
30 }

 

具體實現在canalController的構造器中實現了匿名類。

4.2 InstanceConfigMonitor

這個介面有兩個實現,一個是基於spring的,一個基於manager(就是admin)。

我們看下基於manager配置的實現的ManagerInstanceConfigMonitor即可。

原理很簡單。

  • 採用一個固定大小執行緒池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
  • 然後通過defaultAction去start
  • 這個start在canalController的構造器的匿名類中實現,會使用instance對應的runningMonitor做HA啟動。具體邏輯上一小節已經詳細介紹過了。
 1 /**
 2 * 基於manager配置的實現
 3 *
 4 */
 5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
 6 
 7 
 8     private static final Logger         logger               = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
 9     private long                        scanIntervalInSecond = 5;
10     private InstanceAction              defaultAction        = null;
11     /**
12      * note:
13      * 每個instance對應的instanceAction,實際上我們看程式碼發現都是用的同一個defaultAction
14      */
15     private Map<String, InstanceAction> actions              = new MapMaker().makeMap();
16     /**
17      * note:
18      * 每個instance對應的遠端配置
19      */
20     private Map<String, PlainCanal>     configs              = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
21                                                                  public PlainCanal apply(String destination) {
22                                                                      return new PlainCanal();
23                                                                  }
24                                                              });
25     /**
26      * note:
27      * 一個固定大小執行緒池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
28      */
29     private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
30                                                                  new NamedThreadFactory("canal-instance-scan"));
31 
32     private volatile boolean            isFirst              = true;
33     /**
34      * note:
35      * 拉取admin配置的client
36      */
37     private PlainCanalConfigClient      configClient;
38 //…
39 }

 

5.總結

deployer模組的主要作用:

1)讀取canal.properties,確定canal instance的配置載入方式。如果使用了admin,那麼還會定時拉取admin上的配置更新。

2)確定canal-server的啟動方式:獨立啟動或者叢集方式啟動

3)利用zkClient監聽canal instance在zookeeper上的狀態變化,動態停止、啟動或新增,實現了instance的HA

4)利用InstanceConfigMonitor,採用固定執行緒定時輪訓admin,獲取instance的最新配置

5)啟動canal server,監聽客戶端請求

這裡還有個非常有意思的問題沒有展開說明,那就是CanalStarter裡面的配置載入,通過ExtensionLoader類的相關實現,如何通過不同的類載入器,實現SPI,後面再分析吧。

都看到最後了,原創不易,點個關注,點個贊吧~
文章持續更新,可以微信搜尋「阿丸筆記 」第一時間閱讀,回覆關鍵字【學習】有我準備的一線大廠面試資料。
知識碎片重新梳理,構建Java知識圖譜:github.com/saigu/JavaK…(歷史文章查閱非常方便)

&n