1. 程式人生 > >Flink的sink實戰之三:cassandra3

Flink的sink實戰之三:cassandra3

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 本篇概覽 本文是《Flink的sink實戰》系列的第三篇,主要內容是體驗Flink官方的cassandra connector,整個實戰如下圖所示,我們先從kafka獲取字串,再執行wordcount操作,然後將結果同時列印和寫入cassandra: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201110151126316-545035704.png) ### 全系列連結 1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628) 2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224) 3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968) 4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511) ### 軟體版本 本次實戰的軟體版本資訊如下: 1. cassandra:3.11.6 2. kafka:2.4.0(scala:2.12) 3. jdk:1.8.0_191 4. flink:1.9.2 5. maven:3.6.0 6. flink所在作業系統:CentOS Linux release 7.7.1908 7. cassandra所在作業系統:CentOS Linux release 7.7.1908 8. IDEA:2018.3.5 (Ultimate Edition) ### 關於cassandra 本次用到的cassandra是三臺叢集部署的叢集,搭建方式請參考[《ansible快速部署cassandra3叢集》](https://xinchen.blog.csdn.net/article/details/105602584) ### 準備cassandra的keyspace和表 先建立keyspace和table: 1. cqlsh登入cassandra: ```shell cqlsh 192.168.133.168 ``` 2. 建立keyspace(3副本): ```shell CREATE KEYSPACE IF NOT EXISTS example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}; ``` 3. 建表: ```shell CREATE TABLE IF NOT EXISTS example.wordcount ( word text, count bigint, PRIMARY KEY(word) ); ``` ### 準備kafka的topic 1. 啟動kafka服務; 2. 建立名為test001的topic,參考命令如下: ```shell ./kafka-topics.sh \ --create \ --bootstrap-server 127.0.0.1:9092 \ --replication-factor 1 \ --partitions 1 \ --topic test001 ``` 3. 進入傳送訊息的會話模式,參考命令如下: ```shell ./kafka-console-producer.sh \ --broker-list kafka:9092 \ --topic test001 ``` 4. 在會話模式下,輸入任意字串然後回車,都會將字串訊息傳送到broker; ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201110151126747-1186886631.png) ### 兩種寫入cassandra的方式 flink官方的connector支援兩種方式寫入cassandra: 1. Tuple型別寫入:將Tuple物件的欄位對齊到指定的SQL的引數中; 2. POJO型別寫入:通過DataStax,將POJO物件對應到註解配置的表和欄位中; 接下來分別使用這兩種方式; ### 開發(Tuple寫入) 1. [《Flink的sink實戰之二:kafka》](https://xinchen.blog.csdn.net/article/details/105598224)中建立了flinksinkdemo工程,在此繼續使用; 2. 在pom.xml中增加casandra的connector依賴: ```xml ``` 3. 另外還要新增flink-streaming-scala依賴,否則編譯CassandraSink.addSink這段程式碼會失敗: ```xml ``` 4. 新增CassandraTuple2Sink.java,這就是Job類,裡面從kafka獲取字串訊息,然後轉成Tuple2型別的資料集寫入cassandra,寫入的關鍵點是Tuple內容和指定SQL中的引數的匹配: ```java package com.bolingcavalry.addsink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; public class CassandraTuple2Sink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設定並行度 env.setParallelism(1); //連線kafka用到的屬性物件 Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消費者的groupId properties.setProperty("group.id", "flink-connector"); //例項化Consumer類 FlinkKafka