玩轉KafkaIO與Flink
編輯推薦: |
來源infoq,將重點介紹 Apache Beam與Flink的關係,對Beam框架中的KafkaIO和Flink原始碼進行剖析,並結合應用示例和程式碼解讀帶你進一步瞭解如何結合Beam玩轉Kafka和Flink。 |
關於Apache Beam實戰指南系列文章
隨著大資料 2.0 時代悄然到來,大資料從簡單的批處理擴充套件到了實時處理、流處理、互動式查詢和機器學習應用。近年來湧現出諸多大資料應用元件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者經常要用到不同的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增加了選擇合適工具和框架的難度,開發者想要將所有的大資料元件熟練運用幾乎是一項不可能完成的任務。
面對這種情況,Google 在 2016 年 2 月宣佈將大資料流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣佈開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺乏瞭解,社群中文資料也比較少。InfoQ 期望通過 **Apache Beam 實戰指南系列文章** 推動 Apache Beam 在國內的普及。
一.概述
大資料發展趨勢從普通的大資料,發展成AI大資料,再到下一代號稱萬億市場的lOT大資料。技術也隨著時代的變化而變化,從Hadoop的批處理,到Spark Streaming,以及流批處理的Flink的出現,整個大資料架構也在逐漸演化。
Apache Beam作為新生技術,在這個時代會扮演什麼樣的角色,跟Flink之間的關係是怎樣的?Apache Beam和Flink的結合會給大資料開發者或架構師們帶來哪些意想不到的驚喜呢?
二.大資料架構發展演進歷程
2.1 大資料架構Hadoop
圖2-1 MapReduce 流程圖
最初做大資料是把一些日誌或者其他資訊收集後寫入Hadoop 的HDFS系統中,如果運營人員需要報表,則利用Hadoop的MapReduce進行計算並輸出,對於一些非計算機專業的統計人員,後期可以用Hive進行統計輸出。
2.2 流式處理Storm
圖2-2Storm流程圖
業務進一步發展,運營人員需要看到實時資料的展示或統計。例如電商網站促銷的時候,用於統計使用者實時交易資料。資料收集也使用MQ,用流式Storm解決這一業務需求問題。
2.3 Spark批處理和微批處理
圖2-3 Spark流程圖
業務進一步發展,服務前端加上了閘道器進行負載均衡,訊息中心也換成了高吞吐量的輕量級MQ Kafka,資料處理漸漸從批處理髮展到微批處理。
2.4 Flink:真正的流批處理統一
圖2-4 Flink 流程圖
隨著AI和loT的發展,對於感測裝置的資訊、報警器的警情以及視訊流的資料量微批計算引擎已經滿足不了業務的需求,Flink實現真正的流處理讓警情更實時。
2.5 下一代大資料處理統一標準Apache Beam
圖2-5 Apache Beam 流程圖
BeamSDKs封裝了很多的元件IO,也就是圖左邊這些重寫的高階API,使不同的資料來源的資料流向後面的計算平臺。通過將近一年的發展,Apache Beam 不光元件IO更加豐富了,並且計算平臺在當初最基本的 Apache Apex、Direct Runner、Apache Flink、Apache Spark、Google Cloud Dataflow之上,又增加了Gearpump、Samza 以及第三方的JStorm等計算平臺。
為什麼說Apache Beam 會是大資料處理統一標準呢?
因為很多現在大型公司都在建立自己的“大中臺”,建立統一的資料資源池,打通各個部門以及子公司的資料,以解決資訊孤島問題,把這些資料進行集中式管理並且進行後期的資料分析、BI、AI以及機器學習等工作。這種情況下會出現很多資料來源,例如之前用的SQL/">MySQL、MongodDB、HDFS、HBase、Solr 等,如果想建立中臺就會是一件令人非常苦惱的事情,並且多計算環境更是讓技術領導頭疼。Apache Beam的出現正好迎合了這個時代的新需求,它集成了很多資料庫常用的資料來源並把它們封裝成SDK的IO,開發人員沒必要深入學習很多技術,只要會寫Beam 程式就可以了,大大節省了人力、時間以及成本。
三.Apache Beam和Flink的關係
隨著阿里巴巴Blink的開源,Flink中國社群開始活躍起來。很多人會開始對各種計算平臺進行對比,比如Storm、Spark、JStorm、Flink等,並且有人提到之前阿里巴巴開源的JStorm比Flink效能高出10-15倍,為什麼阿里巴巴卻轉戰基於Flink的Blink呢? 在最近Flink的線下技術會議上,阿里巴巴的人已經回答了這一問題。其實很多技術都是從業務實戰出來的,隨著業務的發展可能還會有更多的計算平臺出現,沒有必要對此過多糾結。
不過,既然大家最近討論得這麼火熱,這裡也列出一些最近問的比較多的、有代表性的關於Beam的問題,逐一進行回答。
1. Flink支援SQL,請問Beam支援嗎?
現在Beam是支援SQL處理的,底層技術跟Flink底層處理是一樣的。
Beam SQL現在只支援Java,底層是Apache Calcite 的一個動態資料管理框架,用於大資料處理和一些流增強功能,它允許你自定義資料庫功能。例如Hive 使用了Calcite的查詢優化,當然還有Flink解析和流SQL處理。Beam在這之上添加了額外的擴充套件,以便輕鬆利用Beam的統一批處理/流模型以及對複雜資料型別的支援。 以下是Beam SQL具體處理流程圖:
Beam SQL一共有兩個比較重要的概念:
SqlTransform:用於PTransforms從SQL查詢建立的介面。
Row:Beam SQL操作的元素型別。例如:PCollection<Row>。
在將SQL查詢應用於PCollection 之前,集合中Row的資料格式必須要提前指定。 一旦Beam SQL 指定了 管道中的型別是不能再改變的。PCollection行中欄位/列的名稱和型別由Schema進行關聯定義。您可以使用Schema.builder()來建立 Schemas。
示例:
// Define the schema for the records.
Schema appSchema =
Schema
.builder()
.addInt32Field("appId")
.addStringField("description")
.addDateTimeField("rowtime")
.build();
// Create a concrete row with that type.
Row row =
Row
.withSchema(appSchema)
.addValues(1, "Some cool app", new Date())
.build();
// Create a source PCollection containing only that row
PCollection<Row> testApps =
PBegin
.in(p)
.apply(Create
.of(row)
.withCoder(appSchema.getRowCoder()));
也可以是其他型別,不是直接是Row,利用PCollection<T>通過應用ParDo可以將輸入記錄轉換為Row格式。如:
// An example POJO class.
class AppPojo {
Integer appId;
String description;
Date timestamp;
}
// Acquire a collection of POJOs somehow.
PCollection<AppPojo> pojos = ...
// Convert them to Rows with the same schema as defined above via a DoFn.
PCollection<Row> apps = pojos
.apply(
ParDo.of(new DoFn<AppPojo, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the current POJO instance
AppPojo pojo = c.element();
// Create a Row with the appSchema schema
// and values from the current POJO
Row appRow =
Row
.withSchema(appSchema)
.addValues(
pojo.appId,
pojo.description,
pojo.timestamp)
.build();
// Output the Row representing the current POJO
c.output(appRow);
}
}));
2. Flink 有並行處理,Beam 有嗎?
Beam 在抽象Flink的時候已經把這個引數抽象出來了,在Beam Flink 原始碼解析中會提到。
3. 我這裡有個流批混合的場景,請問Beam是不是支援?
這個是支援的,因為批也是一種流,是一種有界的流。Beam 結合了Flink,Flink dataset 底層也是轉換成流進行處理的。
4. Flink流批寫程式的時候和Beam有什麼不同?底層是Flink還是Beam?
打個比喻,如果Flink是Lucene,那麼Beam 就是Solr,把Flink 的API進行二次重寫,簡化了API,讓大家使用更簡單、更方便。此外,Beam提供了更多的資料來源,這是Flink不能比的。當然,Flink 後期可能也會往這方面發展。
四.Apache Beam KafkaIO原始碼剖析
Apache Beam KafkaIO 對kafka-clients支援依賴情況
KafkaIO是Kafka的API封裝,主要負責Apache Kafka讀取和寫入訊息。如果想使用KafkaIO,必須依賴beam-sdks-java-io-kafka ,KafkaIO 同時支援多個版本的Kafka客戶端,使用時建議用高版本的或最新的Kafka 版本,因為使用KafkaIO的時候需要包含kafka-clients 的依賴版本。
Apache Beam KafkaIO 對各個kafka-clients 版本的支援情況如下表:
表4-1 KafkaIO 與kafka-clients 依賴關係表
Apache Beam V2.1.0版本之前原始碼中的pom檔案都顯式指定了特定的0.9.0.1版本支援,但是從V2.1.0版本和V2.1.1兩個版本開始已經替換成了kafka-clients 的0.10.1.0 版本,並且原始碼中提示0.10.1.0 版本更安全。這是因為去年Kafka 0.10.1.0 之前的版本曝出了安全漏洞。在V2.2.0 以後的版本中,Beam對API做了調整和更新,對之前的兩種版本都支援,不過需要在pom中引用的時候自己指定Kafka的版本。但是在Beam V2.5.0 和V2.6.0 版本,原始碼中添加了以下提示:
/* <h3>Supported Kafka Client Versions</h3>
* KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
* <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions
* 0.9.x - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future.
* Please ensure that the version included with the application is compatible with the version of
* your Kafka cluster. Kafka client usually fails to initialize with a clear error message in
* case of incompatibility.
*/
也就說在這兩個版本已經移除了對Kafka 客戶端 0.10.1.0 以前版本的支援,舊版本還會支援,但是在以後不久就會刪除。所以大家在使用的時候要注意版本的依賴關係和客戶端的版本支援度。
如果想使用KafkaIO,pom 必須要引用,版本跟4-1表中的對應起來就可以了。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>...</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>a_recent_version</version>
<scope>runtime</scope>
</dependency>
KafkaIO讀寫原始碼解析
KafkaIO原始碼連結如下:
ofollow,noindex" target="_blank">連結
在KafkaIO裡面最主要的兩個方法是Kafka的讀寫方法。
KafkaIO讀操作
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")//
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer (LongDeserializer.class)
.withValueDeserializer (StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties (ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on 'LogAppendTime'. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
// Use withCreateTime() with topics that have 'CreateTime' timestamps.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
1) 指定KafkaIO的模型,從原始碼中不難看出這個地方的KafkaIO<K,V>型別是Long和String 型別,也可以換成其他型別。
2) 設定Kafka叢集的叢集地址。
3) 設定Kafka的主題型別,原始碼中使用了單個主題型別,如果是多個主題型別則用withTopics(List<String>)方法進行設定。設定情況基本跟Kafka原生是一樣的。
4) 設定序列化型別。Apache Beam KafkaIO 在序列化的時候做了很大的簡化,例如原生Kafka可能要通過Properties 類去設定 ,還要加上很長一段jar包的名字。
Beam KafkaIO的寫法:
.withKeyDeserializer (LongDeserializer.class)
.withValueDeserializer (StringDeserializer.class)
原生Kafka的設定:
Properties props = new Properties();
props.put("key.deserializer", " org.apache.kafka.common.serialization .ByteArrayDeserializer");
props.put("value.deserializer" ,"org.apache.kafka.common. serialization.ByteArray Deserializer");
5) 設定Kafka的消費者屬性,這個地方還可以設定其他的屬性。原始碼中是針對消費分組進行設定。
6) 設定Kafka吞吐量的時間戳,可以是預設的,也可以自定義。
7) 相當於Kafka 中"isolation.level", "read_committed" ,指定KafkaConsumer只應讀取非事務性訊息,或從其輸入主題中提交事務性訊息。流處理應用程式通常在多個讀取處理寫入階段處理其資料,每個階段使用前一階段的輸出作為其輸入。通過指定read_committed模式,我們可以在所有階段完成一次處理。針對"Exactly-once" 語義,支援Kafka 0.11版本。
8) 設定Kafka是否自動提交屬性"AUTO_COMMIT",預設為自動提交,使用Beam 的方法來設定。
set CommitOffsetsInFinalizeEnabled (boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()
9) 設定是否返回Kafka的其他資料,例如offset 資訊和分割槽資訊,不用可以去掉。
10) 設定只返回values值,不用返回key。例如 PCollection<String>,而不是PCollection<Long,String>。
KafkaIO寫操作
寫操作跟讀操作配置基本相似,我們看一下具體程式碼。
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers ("broker_1:9092, broker_2:9092")
.withTopic ("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// You can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
.updateProducerProperties (ImmutableMap.of ("compression.type", "gzip"))
// You set publish timestamp for the Kafka records.
.withInputTimestamp() // element timestamp is used while publishing to Kafka
// or you can also set a custom timestamp with a function.
.withPublishTimestampFunction ((elem, elemTs) -> ...)
// Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
.withEOS (20, "eos-sink-group-id");
);
下面這個是Kafka裡面比較重要的一個屬性設定,在Beam中是這樣使用的,非常簡單,但是要注意這個屬性.withEOS 其實就是Kafka中"Exactly-once"。
在寫入Kafka時完全一次性地提供語義,這使得應用程式能夠在Beam管道中的一次性語義之上提供端到端的一次性保證。它確保寫入接收器的記錄僅在Kafka上提交一次,即使在管道執行期間重試某些處理也是如此。重試通常在應用程式重新啟動時發生(如在故障恢復中)或者在重新分配任務時(如在自動縮放事件中)。Flink runner通常為流水線的結果提供精確一次的語義,但不提供變換中使用者程式碼的副作用。如果諸如Kafka接收器之類的轉換寫入外部系統,則這些寫入可能會多次發生。
在此處啟用EOS時,接收器轉換將相容的Beam Runners中的檢查點語義與Kafka中的事務聯絡起來,以確保只寫入一次記錄。由於實現依賴於runners checkpoint語義,因此並非所有runners都相容。Beam中FlinkRunner針對Kafka 0.11+版本才支援,然而Dataflow runner和Spark runner如果操作kafkaIO是完全支援的。
關於效能的注意事項
"Exactly-once" 在接收初始訊息的時候,除了將原來的資料進行格式化轉換外,還經歷了2個序列化 - 反序列化迴圈。根據序列化的數量和成本,CPU可能會漲的很明顯。通過寫入二進位制格式資料(即在寫入Kafka接收器之前將資料序列化為二進位制資料)可以降低CPU成本。
關於引數
numShards——設定接收器並行度。儲存在Kafka上的狀態元資料,使用sinkGroupId儲存在許多虛擬分割槽中。一個好的經驗法則是將其設定為Kafka主題中的分割槽數。
sinkGroupId——用於在Kafka上將少量狀態儲存為元資料的組ID。它類似於與KafkaConsumer一起使用的使用groupID。每個作業都應使用唯一的groupID,以便重新啟動/更新作業保留狀態以確保一次性語義。狀態是通過Kafka上的接收器事務原子提交的。有關更多資訊,請參閱KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期間執行多個健全性檢查以捕獲常見錯誤,以便它不會最終使用似乎不是由同一作業寫入的狀態。
五.Apache Beam Flink原始碼剖析
Apache Beam FlinkRunner對 Flink支援依賴情況
Flink 是一個流和批處理的統一的計算框架,Apache Beam 跟Flink API做了無縫整合。在Apache Beam中對Flink 的操作主要是 FlinkRunner.java,Apache Beam支援不同版本的flink 客戶端。我根據不同版本列了一個Flink 對應客戶端支援表如下:
圖5-1 FlinkRunner與Flink依賴關係表
從圖5-1中可以看出,Apache Beam 對Flink 的API支援的更新速度非常快,從原始碼可以看到2.0.0版本之前的FlinkRunner是非常low的,並且直接拿Flink的例項做為Beam的例項,封裝的效果也比較差。但是從2.0.0 版本之後 ,Beam就像打了雞血一樣API更新速度特別快,拋棄了以前的冗餘,更好地跟Flink整合,讓人眼前一亮。
Apache Beam Flink 原始碼解析
因為Beam在執行的時候都是顯式指定Runner,在FlinkRunner原始碼中只是成了簡單的統一入口,程式碼非常簡單,但是這個入口中有一個比較關鍵的介面類FlinkPipelineOptions。
請看程式碼:
/** Provided options. */
private final FlinkPipelineOptions options;
通過這個類我們看一下Apache Beam到底封裝了哪些Flink方法。
首先FlinkPipelineOptions是一個介面類,但是它繼承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三個介面類,第一個PipelineOptions大家應該很熟悉了,用於基本管道建立;第二個ApplicationNameOptions 用於設定應用程式名字;第三個用於判斷是流式資料還是批資料。原始碼如下:
public interface FlinkPipelineOptions extends
PipelineOptions, ApplicationNameOptions, StreamingOptions {
//....
}
1) 設定 Flink Master 方法 ,這個方法用於設定Flink 叢集地址的Master地址。可以填寫IP和埠,或者是hostname 和埠,預設local 。當然測試也可以是單機的,在Flink 1.4 利用 start-local.sh 啟動,而到了1.5以上就去掉了這個指令碼,本地直接換成了 start-cluster.sh。大家測試的時候需要注意一下。
/**
* The url of the Flink JobManager on which to execute pipelines. This can either be the the * address of a cluster JobManager, in the form "host:port" or one of the special Strings * "[collection]" will execute the pipeline on Java Collections while " [auto]" will let the system
*/
@Description ( "Address of the Flink Master where the Pipeline should be executed. Can"+ "[collection] or [auto].")
void setFlinkMaster (String value);
2) 設定 Flink 的並行數,屬於Flink 高階API裡面的屬性。設定合適的parallelism能提高運算效率,太多了和太少了都不行。設定parallelism有多種方式,優先順序為api>env>p>file。
@Description("The degree of parallelism to be used when distributing operations onto workers.")
@Default.InstanceFactory (DefaultParallelismFactory.class)
Integer getParallelism();
void setParallelism (Integer value);
3) 設定連續檢查點之間的間隔時間(即當前的快照)用於容錯的管道狀態。
@Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval (Long interval)
4) 定義一致性保證的檢查點模式,預設為"AT_LEAST_ONCE",在Beam的原始碼中定義了一個列舉類CheckpointingMode,除了預設的"AT_LEAST_ONCE",還有"EXACTLY_ONCE"。
"AT_LEAST_ONCE":這個模式意思是系統將以一種更簡單地方式來對operator和udf的狀態進行快照:在失敗後進行恢復時,在operator的狀態中,一些記錄可能會被重放多次。
"EXACTLY_ONCE":這種模式意思是系統將以如下語義對operator和udf(user defined function)進行快照:在恢復時,每條記錄將在operator狀態中只被重現/重放一次。
@Description ("The checkpointing mode that defines consistency guarantee.")
@Default.Enum ("AT_LEAST_ONCE")
CheckpointingMode getCheckpointingMode();
void setCheckpointingMode (CheckpointingMode mode);
5) 設定檢查點的最大超時時間,預設為20*60*1000(毫秒)=20(分鐘)。
@Description("The maximum time that a checkpoint may take before being discarded.")
@Default.Long (20 * 60 * 1000)
Long getCheckpointTimeoutMillis();
void setCheckpointTimeoutMillis (Long checkpointTimeoutMillis);
6) 設定重新執行失敗任務的次數,值為0有效地禁用容錯,值為-1表示使用系統預設值(在配置中定義)。
@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "+ "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries (Integer retries);
7) 設定執行之間的延遲,預設值為-1L。
@Description(
"Sets the delay between executions. A value of {@code -1} "
+ "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay (Long delay);
8) 設定重用物件的行為。
@Description ("Sets the behavior of reusing objects.")
@Default.Boolean(false)
Boolean getObjectReuse();
void setObjectReuse(Boolean reuse);
9) 設定狀態後端在計算期間儲存Beam的狀態,不設定從配置檔案中讀取預設值。注意:僅在執行時適用流媒體模式。
@Description ("Sets the state backend to use in streaming mode. "
@JsonIgnore
AbstractStateBackend getStateBackend();
void setStateBackend (AbstractStateBackend stateBackend);
10) 在Flink Runner中啟用/禁用Beam指標。
@Description ("Enable/disable Beam metrics in Flink Runner")
@Default.Boolean(true)
BooleangetEnableMetrics();
voidsetEnableMetrics(BooleanenableMetrics);
11) 啟用或禁用外部檢查點,與CheckpointingInterval一起使用。
@Description(
"Enables or disables externalized checkpoints."
+"Works in conjunction with CheckpointingInterval")
@Default.Boolean(false)
BooleanisExternalized CheckpointsEnabled();
voidsetExternalizedCheckpointsEnabled (BooleanexternalCheckpoints);
12) 設定當他們的Wartermark達到+ Inf時關閉源,Watermark在Flink 中其中一個作用是根據時間戳做單節點排序,Beam也是支援的。
@Description ("If set, s hutdown sources when their watermark reaches +Inf.")
@Default.Boolean (false)
BooleanisShutdownSources OnFinalWatermark();
voidsetShutdown SourcesOnFinalWatermark (BooleanshutdownOnFinalWatermark);
剩餘兩個部分這裡不再進行翻譯,留給大家去看原始碼。
六. KafkaIO和Flink實戰
本節通過解讀一個真正的KafkaIO和Flink實戰案例,幫助大家更深入地瞭解Apache Beam KafkaIO和Flink的運用。
設計架構圖和設計思路解讀
Apache Beam 外部資料流程圖
設計思路:Kafka訊息生產程式傳送testmsg到Kafka叢集,Apache Beam 程式讀取Kafka的訊息,經過簡單的業務邏輯,最後傳送到Kafka叢集,然後Kafka消費端消費訊息。
Apache Beam 內部資料處理流程圖
Apache Beam 程式通過kafkaIO讀取Kafka叢集的資料,進行資料格式轉換。資料統計後,通過KafkaIO寫操作把訊息寫入Kafka叢集。最後把程式執行在Flink的計算平臺上。
軟體環境和版本說明
系統版本 centos 7
Kafka叢集版本: kafka_2.10-0.10.1.1.tgz
Flink 版本:flink-1.5.2-bin -hadoop27-scala_2.11.tgz
Kafka叢集和Flink單機或叢集配置,大家可以去網上搜一下配置文章,操作比較簡單,這裡就不贅述了。
實踐步驟
1)新建一個Maven專案
2)在pom檔案中新增jar引用
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
3)新建BeamFlinkKafka.java類
4)編寫以下程式碼:
public static void main(String[] args) {
//建立管道工廠
PipelineOptions options = PipelineOptionsFactory.create();
// 顯式指定PipelineRunner: FlinkRunner必須指定如果不制定則為本地
options.setRunner (FlinkRunner.class);
//設定相關管道
Pipeline pipeline = Pipeline.create(options);
//這裡kV後說明kafka中的key和value均為String型別
PCollection<KafkaRecord<String, String>> lines =
pipeline.apply(KafkaIO.<String,
// 必需設定kafka的伺服器地址和埠
String>read().withBootstrapServers ("192.168.1.110:11092, 192.168.1.119:11092, 192.168.1.120:11092")
.withTopic ("testmsg")// 必需設定要讀取的kafka的topic名稱
.withKeyDeserializer (StringDeserializer.class)// 必需序列化key
.withValueDeserializer (StringDeserializer.class)// 必需序列化value
.updateConsumerProperties (ImmutableMap.<String, Object>of ("auto.offset.reset", "earliest")));//這個屬性kafka最常見的.
// 為輸出的訊息型別。或者進行處理後返回的訊息型別
PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of (new DoFn<KafkaRecord< String, String>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement (ProcessContext ctx) {
System.out.print ("輸出的分割槽為----:" + ctx.element().getKV());
ctx.output (ctx.element().getKV().getValue());// 其實我們這裡是把"張海 濤在傳送訊息***"進行返回操作
}
}));
PCollection<String> windowedEvents = kafkadata.apply (Window.<String>into (FixedWindows.of (Duration.standardSeconds(5))));
PCollection<KV<String, Long>> wordcount = windowedEvents.apply (Count.<String> perElement()); // 統計每一個kafka訊息的Count
PCollection<String> wordtj = wordcount.apply ("ConcatResultKVs", MapElements.via( // 拼接最後的格式化輸出(Key為Word,Value為Count)
new SimpleFunction<KV< String, Long>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply (KV<String, Long> input) {
System.out.print ("進行統計:" + input.getKey() + ": " + input.getValue());
return input.getKey() + ": " + input.getValue();
}
}));
wordtj.apply (KafkaIO.<Void, String>write() .withBootstrapServers ("192.168.1.110:11092, 192.168.1.119:11092, 192.168.1.120:11092")//設定寫會kafka的叢集配置地址
.withTopic ("senkafkamsg")//設定返回kafka的訊息主題
// .withKeySerializer (StringSerializer.class)//這裡不用設定了,因為上面 Void
.withValueSerializer (StringSerializer.class)
// Dataflow runner and Spark 相容, Flink 對kafka0.11才支援。我的版本是0.10不相容
//.withEOS (20, "eos-sink-group-id")
.values() // 只需要在此寫入預設的key就行了,預設為null值
); // 輸出結果
pipeline.run().waitUntilFinish();
}
5)打包jar,本示例是簡單的實戰,並沒有用Docker,Apache Beam新版本是支援Docker的。
6)通過Apache Flink Dashboard 提交job
7)檢視結果
程式接收的日誌如下:
七.實戰解析
本次實戰在原始碼分析中已經做過詳細解析,在這裡不做過多的描述,只選擇部分問題再重點解釋一下。此外,如果還沒有入門,甚至連管道和Runner等概念都還不清楚,建議先閱讀本系列的第一篇文章《Apache Beam實戰指南之基礎入門》。
1.FlinkRunner在實戰中是顯式指定的,如果想設定引數怎麼使用呢?其實還有另外一種寫法,例如以下程式碼:
//FlinkPipelineOptions options = PipelineOptionsFactory.as (FlinkPipelineOptions.class);
//options.setStreaming (true);
//options.setAppName ("app_test");
//options.setJobName ("flinkjob");
//options.setFlinkMaster ("localhost:6123");
//options.setParallelism (10);//設定flink 的並行度
//顯式指定PipelineRunner :FlinkRunner,必須指定,如果不指定則為本地
options.setRunner (FlinkRunner.class);
2.Kafka 有三種資料讀取型別,分別是 “earliest ”,“latest ”,“none ”,分別的意思代表是:
earliest
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
latest
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 。
none
topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常。
3.實戰中我自己想把Kafka的資料寫入,key不想寫入,所以出現了Kafka的key項為空,而values才是真正傳送的資料。所以開始和結尾要設定個.values(),如果不加上就會報錯。
KafkaIO.<Void, String>write()
.values() // 只需要在此寫入預設的key就行了,預設為null值
八.小結
隨著AI和loT的時代的到來,各個公司不同結構、不同型別、不同來源的資料進行整合的成本越來越高。Apache Beam 技術的統一模型和大資料計算平臺特性優雅地解決了這一問題,相信在loT萬億市場中,Apache Beam將會發揮越來越重要的角色。