1. 程式人生 > >canal實戰(一):canal連線kafka實現實時同步mysql資料

canal實戰(一):canal連線kafka實現實時同步mysql資料

前面已經介紹過了canal-kafka的應用。canal-kafka是把kafka作為客戶端,嵌入到canal中,並且在canal基礎上對原始碼進行了修改,以達到特定的實現canal到kafka的傳送。

canal-kafka是阿里雲最近更新的一個新的安裝包。主要功能是實現canal與kafka的對接,實現海量的訊息傳輸同步。在canal-kafka中,訊息是以ByteString進行傳輸的,並且使用者只能通過配置來指定一些kafka的配置,從某種程度上有一定的侷限性,所以我們使用canal來自定義客戶端kafka,會有更好的靈活性,但維護成本會更大,所以如何選擇根據實際情況而定。

構建maven依賴

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.25</version>
</dependency> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId
>
<version>1.1.0</version> </dependency>

注意版本對應

SimpleCanalClient

package com.unigroup.client.canal;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal
.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.Message; import com.unigroup.core.canal.CanalToKG; /** * @Title: SimpleCanalClient.java * @Package com.unigroup.canal * @Description: canal單例項介面 * @author 桃花惜春風 * @date 2018年8月29日 上午11:56:09 * @version V1.0 */ public class SimpleCanalClient { private CanalConnector connector=null; public SimpleCanalClient(String ip,String port,String instance) { // 建立連結 connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", ""); } public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException { //int batchSize = 1; int emptyCount = 0; Object obj = clazz.newInstance(); Method method = clazz.getMethod("send",Message.class); try { connector.connect(); // connector.subscribe(".*\\..*"); connector.subscribe("test.test1"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; method.invoke(obj, message); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾資料 } System.out.println("empty too many times, exit"); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { connector.disconnect(); } return null; } }

CanalKafkaProducer

package com.unigroup.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;

/**   
* @Title: CanalKafkaProducer.java 
* @Package com.unigroup.kafka.producer 
* @Description: 
* @author 桃花惜春風   
* @date 2018年9月3日 上午11:53:35 
* @version V1.0   
*/
public class CanalKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

    private Producer<String, Message> producer;

    public void init(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", "all");
        properties.put("retries", kafkaProperties.getRetries());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", MessageSerializer.class.getName());
        producer = new KafkaProducer<String, Message>(properties);
    }

    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            producer.close();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        } finally {
            logger.info("## kafka producer is down.");
        }
    }

    public void send(Topic topic, Message message) throws IOException {

        ProducerRecord<String, Message> record;
        if (topic.getPartition() != null) {
            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
        } else {
            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
        }
        producer.send(record);
        if (logger.isDebugEnabled()) {
            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
        }
    }
}

canalToKafkaServer

package com.unigroup.kafka.server;

import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;

/**   
* @Title: canal.java 
* @Package com.unigroup.kafka.server 
* @Description: 
* @author 桃花惜春風   
* @date 2018年9月3日 上午11:23:35 
* @version V1.0   
*/
public class canalToKafkaServer {
    public static void execute() {
        SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
                GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
        try {
            simpleCanalClient.execute(1,CanalKafkaProducer.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

至此一個簡單的canal到kafka的demo已經完成。這些都只是測試程式碼,實際應用中根據不同的情況,可以自己開發更多功能。