Flink基於Kafka-Connector 資料流容錯回放機制及程式碼案例實戰-Flink牛刀小試
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,如有任何問題,可隨時聯絡。
Flink牛刀小試系列目錄
- ofollow,noindex">Flink牛刀小試-Flink 叢集執行原理兼部署及Yarn執行模式深入剖析
- Flink牛刀小試-Flink Window型別及使用原理案例實戰
- Flink牛刀小試-Flink Broadcast 與 Accumulators 應用案例實戰
- Flink牛刀小試-Flink與SparkStreaming之Counters& Accumulators 累加器雙向應用案例實戰
- Flink牛刀小試-Flink分散式快取Distributed Cache應用案例實戰
- Flink牛刀小試-Flink狀態管理與checkPoint資料容錯機制深入剖析
- Flink牛刀小試-Flink Window分析及Watermark解決亂序資料機制深入剖析
- Flink牛刀小試-Flink Restart Strategies 重啟策略機制深入剖析
- Flink牛刀小試-Flink CheckPoint狀態點恢復與savePoint機制對比剖析
- Flink牛刀小試-Flink SQL Table 我們一起去看2018中超聯賽
- [Flink牛刀小試-Flink基於Kafka-Connector 資料流容錯回放機制及程式碼案例實戰]
- [Flink牛刀小試-Flink DataStreamAPI與DataSetAPI應用案例實戰]
- [Flink牛刀小試-Flink並行度 Parallel及Slots關係原理深入剖析]
- [Flink牛刀小試-Flink叢集HA配置及高可用機制深入剖析]
- [Flink牛刀小試-Flink批處理與流處理案例實戰深入剖析]
- [Flink牛刀小試-Flink綜合性應用案例實踐及垂直業務深入剖析]
1 Kafka-connector 再次親密牽手Flink
- Kafka中的partition機制和Flink的並行度機制深度結合,實現資料恢復。
- Kafka可以作為Flink的source和sink,牛在這裡。
- 任務失敗,通過設定kafka的offset來恢復應用
2 回顧Spark Streaming針對kafka使用技術
// 設定檢查點目錄 ssc.checkpoint("./streaming_checkpoint") // 獲取Kafka配置(通過配置檔案讀取,ConfigurationManager自定義方法) val broker_list = ConfigurationManager.config.getString("kafka.broker.list") val topics = ConfigurationManager.config.getString("kafka.topics") // kafka消費者配置 val kafkaParam = Map( "bootstrap.servers" -> broker_list,//用於初始化連結到叢集的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用於標識這個消費者屬於哪個消費團體 "group.id" -> "commerce-consumer-group", //如果沒有初始化偏移量或者當前的偏移量不存在任何伺服器上,可以使用這個配置屬性 //可以使用這個配置,latest自動重置偏移量為最新的偏移量 "auto.offset.reset" -> "latest", //如果是true,則這個消費者的偏移量會在後臺自動提交 "enable.auto.commit" -> (false: java.lang.Boolean) ) // 建立DStream,返回接收到的輸入資料 // LocationStrategies:根據給定的主題和叢集地址建立consumer // LocationStrategies.PreferConsistent:持續的在所有Executor之間分配分割槽 // ConsumerStrategies:選擇如何在Driver和Executor上建立和配置Kafka Consumer // ConsumerStrategies.Subscribe:訂閱一系列主題 val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam)) 複製程式碼
3 再論 Flink Kafka Consumer
3.1 理論時間
-
setStartFromGroupOffsets()【預設消費策略】
預設讀取上次儲存的offset資訊 如果是應用第一次啟動,讀取不到上次的offset資訊,則會根據這個引數auto.offset.reset的值來進行消費資料
-
setStartFromEarliest() 從最早的資料開始進行消費,忽略儲存的offset資訊
-
setStartFromLatest() 從最新的資料進行消費,忽略儲存的offset資訊
-
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)

-
當checkpoint機制開啟的時候,KafkaConsumer會定期把kafka的offset資訊還有其他operator的狀態資訊一塊儲存起來。當job失敗重啟的時候,Flink會從最近一次的checkpoint中進行恢復資料,重新消費kafka中的資料。
-
為了能夠使用支援容錯的kafka Consumer,需要開啟checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次
-
Kafka Consumers Offset 自動提交有以下兩種方法來設定,可以根據job是否開啟checkpoint來區分:
(1) Checkpoint關閉時: 可以通過下面兩個引數配置
enable.auto.commit
(2) Checkpoint開啟時:當執行checkpoint的時候才會儲存offset,這樣保證了kafka的offset和checkpoint的狀態偏移量保持一致。 可以通過這個引數設定
setCommitOffsetsOnCheckpoints(boolean)
這個引數預設就是true。表示在checkpoint的時候提交offset, 此時,kafka中的自動提交機制就會被忽略
3.2 案例實戰
依賴引入: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.3</version> </dependency> 案例實戰: public class StreamingKafkaSource { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設定statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); String topic = "kafkaConsumer"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","SparkMaster:9092"); prop.setProperty("group.id","kafkaConsumerGroup"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//預設消費策略 DataStreamSource<String> text = env.addSource(myConsumer); text.print().setParallelism(1); env.execute("StreamingFromCollection"); } } 複製程式碼
4 再論 Flink Kafka Producer
4.1 理論時間
-
Kafka Producer的容錯-Kafka 0.9 and 0.10
-
如果Flink開啟了checkpoint,針對FlinkKafkaProducer09和FlinkKafkaProducer010 可以提供 at-least-once的語義,還需要配置下面兩個引數:
setLogFailuresOnly(false)
setFlushOnCheckpoint(true)
-
注意:建議修改kafka 生產者的重試次數retries【這個引數的值預設是0】
-
Kafka Producer的容錯-Kafka 0.11,如果Flink開啟了checkpoint,針對FlinkKafkaProducer011 就可以提供 exactly-once的語義,但是需要選擇具體的語義
Semantic.NONE
Semantic.AT_LEAST_ONCE【預設】
Semantic.EXACTLY_ONCE
4.2 KafkaSink案例實戰
public class StreamingKafkaSink { public static void main(String[] args) throws Exception { //獲取Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設定statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://SparkMaster:9000/flink/checkpoints",true)); DataStreamSource<String> text = env.socketTextStream("SparkMaster", 9001, "\n"); String brokerList = "SparkMaster:9092"; String topic = "kafkaProducer"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers",brokerList); //第一種解決方案,設定FlinkKafkaProducer011裡面的事務超時時間 //設定事務超時時間 //prop.setProperty("transaction.timeout.ms",60000*15+""); //第二種解決方案,設定kafka的最大事務超時時間,主要是kafka的配置檔案設定。 //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); //使用僅一次語義的kafkaProducer FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); text.addSink(myProducer); env.execute("StreamingFromCollection"); } } 複製程式碼
5 結語
kafka必不可少,關於kafka還有很多要說的內容,詳情請參考我的kafka商業環境實戰系列吧。
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,如有任何問題,可隨時聯絡。
- kafka 商業環境實戰-kafka生產環境規劃
- kafka 商業環境實戰-kafka生產者和消費者吞吐量測試
- kafka 商業環境實戰-kafka生產者Producer引數設定及引數調優建議
- kafka 商業環境實戰-kafka叢集管理重要操作指令運維兵書
- kafka 商業環境實戰-kafka叢集Broker端引數設定及調優準則建議
- kafka 商業環境實戰-kafka之Producer同步與非同步訊息傳送及事務冪等性案例應用實戰
- kafka 商業環境實戰-kafka Poll輪詢機制與消費者組的重平衡分割槽策略剖析
- kafka 商業環境實戰-kafka Rebalance 機制與Consumer多種消費模式案例應用實戰
- kafka 商業環境實戰-kafka叢集訊息格式之V1版本到V2版本的平滑過渡詳解
- kafka 商業環境實戰-kafka ISR設計及水印與leader epoch副本同步機制深入剖析
- kafka 商業環境實戰-kafka日誌索引儲存及Compact機制深入剖析
- [kafka 商業環境實戰-kafka精確一次語義EOS的原理深入剖析]
- [kafka 商業環境實戰-kafka訊息的冪等性與事務支援機制深入剖析]
- [kafka 商業環境實戰-kafka叢集Controller競選與責任設計思路架構詳解]
- [kafka 商業環境實戰-kafka叢集日誌檔案系統設計與留存機制及Compact深入研究]
- [kafka 商業環境實戰-kafka叢集Consumer group狀態機及Coordinaor管理深入剖析]
- [kafka 商業環境實戰-kafka調優過程在吞吐量,永續性,低延時,可用性等指標的折中選擇研究]
秦凱新 於深圳 20181127023