1. 程式人生 > >SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合

SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合

在實際大資料工作中,常常有實時監測資料庫變化或實時同步資料到大資料儲存,解決大資料實時分析的需求。同時,增量同步資料庫資料相比全量查詢也減少了網路頻寬消耗。本文以Mysql的bin-log到Kafka為例,使用Canal Server,通過SODBASE引擎不用寫程式就可以設定資料同步規則。


一、搭建Canal Server

1.1. 開啟bin-log

(1)編輯mysql配置檔案

 vi /etc/my.cnf


新增

log-bin=mysql-bin #新增這一行就ok
binlog-format=ROW #選擇row模式
server_id=1#配置mysql replaction需要定義,不能和canal的slaveId重複
執行mysql客戶端
mysql -uroot -p


mysql>show binlog events;
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos |Event_type  | Server_id | End_log_pos |Info                                  |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| mysql-bin.000001 |   4 |Format_desc |         1 |         106 | Server ver: 5.1.51-log,Binlog ver: 4 |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
1 row in set (0.00 sec)

(2)重啟mysql

[[email protected] user]# service mysqld stop
Stoppingmysqld:                                          [  OK  ]
[[email protected] user]# service mysqld start
Startingmysqld:                                          [  OK  ]


1.2. 下載 canal.deployer-1.0.21.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

解壓
tar zxvfcanal.deployer-$version.tar.gz

 

1.3. 配置修改

應用引數:

vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
 
# position info,需要改成自己的資料庫資訊
canal.instance.master.address =127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
 
#canal.instance.standby.address =
#canal.instance.standby.journal.name=
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
 
# username/password,需要改成自己的資料庫資訊
canal.instance.dbUsername =canal 
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset =UTF-8
 
# table regex
canal.instance.filter.regex =.*\\..*
 
#################################################



說明:

 canal.instance.connectionCharset 代表資料庫的編碼方式對應到java中的編碼型別,比如UTF-8,GBK , ISO-8859-1

4. 準備啟動

 sh bin/startup.sh


5. 檢視日誌

 vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main]INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canalserver.

2013-02-05 22:45:28.113 [main]INFO  com.alibaba.otter.canal.deployer.CanalController- ## start the canal server[10.1.29.120:11111]

2013-02-0522:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server isrunning now ......

具體instance的日誌:

$ vi logs/example/example.log

2013-02-05 22:50:45.636 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [canal.properties]

2013-02-05 22:50:45.641 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [example/instance.properties]

2013-02-05 22:50:45.803 [main]INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - startCannalInstance for 1-example

2013-02-0522:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring- start successful....

二、建立同步規則模型

2.1. 編輯同步規則

在SODBASE Studio中新建一個模型canaltokafka(此模型也可以下載canaltokafka.sod,在Studio中匯入)

配置Input

加一個Filter,作為示例,過濾test資料庫的t2表


配置Output輸出


T1.* 表示表字段*的新值,例如T1.a 表示表字段a的新值

T1.b_* 表示表字段*修改前的值,例如T1.b_a 表示表字段a的原值

T1.eventtype表示資料操作型別,U為更新,I為插入,D為刪除

2.2 執行模型

方法一:

在SODBASE Studio中執行此模型(本示例是在Linux XWindow中執行的)

方法二:

規則模型匯出為soddata檔案,部署到SODBASE Server

部署方法參考 SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理

另外,soddata檔案可以和XML檔案相互轉化,即讀者可以通過編輯XML來修改同步規則。


三、測試

3.1 安裝Kafka,建立1個topic (testbinlog)

找一臺linux機器,從官方網站下載Kafka,解壓,啟動

 bin/zookeeper-server-start.sh config/zookeeper.properties &
 bin/kafka-server-start.sh config/server.properties &
 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testbinlog &
 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testbinlog


3.2 mysql修改資料

mysql> use test

 
mysql> create table t2(a int,b int);
Query OK, 0 rows affected (0.06 sec)
 
mysql> insert into t2 values(1,2);
Query OK, 1 row affected (0.00 sec)

mysql> insert into t2 values(1,3);
Query OK, 1 row affected (0.00 sec)

mysql> update t2 set a = 0 where b = 2;
Query OK, 1 row affected (0.08 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> delete from t2 where a = 0;
Query OK, 1 row affected (0.07 sec)


3.3 Kafka輸出

使用kafka-console-consumer.sh消費

可以看到資料庫變化都傳到Kafka了。此後可以進行資料實時分析或接入大資料儲存如HBase等,實現業務資料的實時同步和使用。


SODBASE 實時大樹據軟體用於輕鬆、高效實施資料監測、監控類、實時交易類專案微笑。EPL語法見SODSQL寫法與示例。圖形化建模請使用SODBASE Studio嵌入式方式程式設計參見執行第一個EPL例子快取擴充套件參見與分散式快取整合