《從0到1學習Flink》—— Data Sink 介紹
前言
再上一篇文章中 《從0到1學習Flink》—— Data Source 介紹 講解了 Flink Data Source ,那麼這裡就來講講 Flink Data Sink 吧。
首先 Sink 的意思是:
大概可以猜到了吧!Data sink 有點把資料儲存下來(落庫)的意思。
如上圖,Source 就是資料的來源,中間的 Compute 其實就是 Flink 乾的事情,可以做一系列的操作,操作完後就把計算後的資料結果 Sink 到某個地方。(可以是 SQL/">MySQL、ElasticSearch、Kafka、Cassandra 等)。這裡我說下自己目前做告警這塊就是把 Compute 計算後的結果 Sink 直接告警出來了(傳送告警訊息到釘釘群、郵件、簡訊等),這個 sink 的意思也不一定非得說成要把資料儲存到某個地方去。其實官網用的 Connector 來形容要去的地方更合適,這個 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。
Flink Data Sink
前面文章 《從0到1學習Flink》—— Data Source 介紹 介紹了 Flink Data Source 有哪些,這裡也看看 Flink Data Sink 支援的有哪些。
看下原始碼有哪些呢?
可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。
SinkFunction
從上圖可以看到 SinkFunction 介面有 invoke 方法,它有一個 RichSinkFunction 抽象類。
上面的那些自帶的 Sink 可以看到都是繼承了 RichSinkFunction 抽象類,實現了其中的方法,那麼我們要是自己定義自己的 Sink 的話其實也是要按照這個套路來做的。
這裡就拿個較為簡單的 PrintSinkFunction 原始碼來講下:
1@PublicEvolving 2public class PrintSinkFunction<IN> extends RichSinkFunction<IN> { 3private static final long serialVersionUID = 1L; 4 5private static final boolean STD_OUT = false; 6private static final boolean STD_ERR = true; 7 8private boolean target; 9private transient PrintStream stream; 10private transient String prefix; 11 12/** 13* Instantiates a print sink function that prints to standard out. 14*/ 15public PrintSinkFunction() {} 16 17/** 18* Instantiates a print sink function that prints to standard out. 19* 20* @param stdErr True, if the format should print to standard error instead of standard out. 21*/ 22public PrintSinkFunction(boolean stdErr) { 23target = stdErr; 24} 25 26public void setTargetToStandardOut() { 27target = STD_OUT; 28} 29 30public void setTargetToStandardErr() { 31target = STD_ERR; 32} 33 34@Override 35public void open(Configuration parameters) throws Exception { 36super.open(parameters); 37StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 38// get the target stream 39stream = target == STD_OUT ? System.out : System.err; 40 41// set the prefix if we have a >1 parallelism 42prefix = (context.getNumberOfParallelSubtasks() > 1) ? 43((context.getIndexOfThisSubtask() + 1) + "> ") : null; 44} 45 46@Override 47public void invoke(IN record) { 48if (prefix != null) { 49stream.println(prefix + record.toString()); 50} 51else { 52stream.println(record.toString()); 53} 54} 55 56@Override 57public void close() { 58this.stream = null; 59this.prefix = null; 60} 61 62@Override 63public String toString() { 64return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); 65} 66}
可以看到它就是實現了 RichSinkFunction 抽象類,然後實現了 invoke 方法,這裡 invoke 方法就是把記錄打印出來了就是,沒做其他的額外操作。
如何使用?
1SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();
這樣就可以了,如果是其他的 Sink Function 的話需要換成對應的。
使用這個 Function 其效果就是列印從 Source 過來的資料,和直接 Source.print() 效果一樣。
下篇文章我們將講解下如何自定義自己的 Sink Function,並使用一個 demo 來教大家,讓大家知道這個套路,且能夠在自己工作中自定義自己需要的 Sink Function,來完成自己的工作需求。
最後
本文主要講了下 Flink 的 Data Sink,並介紹了常見的 Data Sink,也看了下原始碼的 SinkFunction,介紹了一個簡單的 Function 使用, 告訴了大家自定義 Sink Function 的套路,下篇文章帶大家寫個。
關注我
轉載請務必註明原創地址為:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/
原文釋出時間為:2018-11-19
本文作者:zhisheng
本文來自雲棲社群合作伙伴“ ofollow,noindex">zhisheng ”,瞭解相關資訊可以關注“ zhisheng ”。