1. 程式人生 > >Flink流計算程式設計--Kafka+Flink整合demo

Flink流計算程式設計--Kafka+Flink整合demo

1、簡介

1.1、Kafka Consumer提供了2種API:high level與low level(SimpleConsumer)。
(1)high level consumer的API較為簡單,不需要關心offset、partition、broker等資訊,kafka會自動讀取zookeeper中該consumer group的last offset。
(2)low level consumer也叫SimpleConsumer,這個介面非常複雜,需要自己寫程式碼去實現對offset、partition、broker以及broker的切換,能不用就不用,那何時必須用?

1、Read a message multiple times
2、Consume only a subset of the partitions in a topic in a process
3、Manage transactions to make sure a message is processed once and only once

這裡寫圖片描述

2、Flink的開發準備

Flink提供了high level的API來消費kafka的資料:flink-connector-kafka-0.8_2.10。注意,這裡的0.8代表的是kafka的版本,你可以通過maven來匯入kafka的依賴,具體如下:
這裡寫圖片描述

例如你的kafka安裝版本是“kafka_2.10-0.8.2.1”,即此版本是由scala2.10編寫,kafka的自身版本是0.8.2.1.那此時你需要新增如下的內容到maven的pom.xml檔案中:

<dependency>
        <groupId>org.apache.flink</groupId
>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency>

注意:
$${flink.version}是個變數,自己調整下程式碼,例如可以直接寫1.0.0。我的專案裡採用的是添加了properties來控制${flink.version}:

<properties>
        <project.build.sourceEncoding
>
UTF-8</project.build.sourceEncoding> <flink.version>1.0.0</flink.version> </properties>

3、叢集環境準備

這裡主要是介紹下Flink叢集與kafka叢集的搭建。
基礎的軟體安裝包括JDK、scala、hadoop、zookeeper、kafka以及flink就不介紹了,直接看下flink的叢集配置以及kafka的叢集配置。
zookeeper–3.4.6
hadoop–2.6.0
kafka–2.10-0.8.2.1
flink–1.0.3

3.1、Flink叢集配置(standalone且沒有用zookeeper的HA)

3.1.1、環境變數
新增FLINK_HOME以及path的內容:

export FLINK_HOME=/usr/local/flink/flink-1.0.3
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${FLINK_HOME}/bin:$PATH
export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

3.1.2、修改conf/flink-conf.yaml
這裡寫圖片描述

這幾乎是最簡單的配置方式了,主要注意要修改jobmanager.rpc.address為叢集中jobManager的IP或hostname。檢查點以及HA的引數都沒有配置。

3.1.3、slaves檔案
這裡寫圖片描述

這個檔案中存放的資訊是taskmanager的hostname。

3.1.4、複製flink目錄以及.bashrc檔案到叢集中其他的機器,並使bashrc生效

root@master:/usr/local/flink# scp -r flink-1.0.3/ root@worker1:/usr/local/flink/
root@master:/usr/local/flink# scp -r flink-1.0.3/ root@worker2:/usr/local/flink/

root@master:/usr/local/flink# scp ~/.bashrc root@worker1:~/.bashrc 
root@master:/usr/local/flink# scp ~/.bashrc root@worker2:~/.bashrc 

root@worker1:~# source ~/.bashrc 
root@worker2:~# source ~/.bashrc 

3.2、kafka叢集配置

3.2.1、環境變數
省略

3.2.2、配置config/zookeeper.properties
由於kafka叢集依賴於zookeeper叢集,所以kafka提供了通過kafka去啟動zookeeper叢集的功能,當然也可以手動去啟動zookeeper的叢集而不通過kafka去啟動zookeeper的叢集。
這裡寫圖片描述
注意這裡的dataDir最好不要指定/tmp目錄下,因為機器重啟會刪除此目錄下的檔案。且指定的新路徑必須存在。

3.2.3、配置config/server.properties
這個檔案是啟動kafka叢集需要指定的配置檔案,注意2點:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
#port=9092
listeners=PLAINTEXT://:9092

broker.id在kafka叢集的每臺機器上都不一樣,我這裡3臺叢集分別是0、1、2.

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=master:2181,worker1:2181,worker2:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

zookeeper.connect要配置kafka叢集所依賴的zookeeper叢集的資訊,hostname:port。

3.2.4、複製kafka路徑及環境變數到其他kafka叢集的機器,並修改server.properties中的broker_id.
複製過程省略

3.3、啟動kafka叢集+Flink叢集

3.3.1、首先啟動zookeeper叢集(3臺zookeeper機器都要啟動):

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start
root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start
root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start

驗證zookeeper叢集:
程序是否啟動;zookeeper叢集中是否可以正常顯示leader以及follower。

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# jps
3295 QuorumPeerMain

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin#

root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# 

root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# 

3.3.2、啟動kafka叢集(3臺都要啟動)

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

root@worker1:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

root@worker2:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

驗證:
程序;日誌

3512 Kafka

3.3.3、啟動hdfs(master上啟動即可)

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# start-dfs.sh 

驗證:程序及webUI

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# jps
3798 NameNode
4007 SecondaryNameNode

root@worker1:/usr/local/hadoop/hadoop-2.6.0/sbin# jps
3843 DataNode

root@worker2:/usr/local/hadoop/hadoop-2.6.0/sbin# jps
3802 DataNode

webUI:50070,預設可配置
這裡寫圖片描述

3.3.4、啟動Flink叢集(master即可)

root@master:/usr/local/flink/flink-1.0.3/bin# start-cluster.sh 

驗證:程序及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps
4411 JobManager

root@worker1:/usr/local/flink/flink-1.0.3/bin# jps
4151 TaskManager

root@worker2:/usr/local/flink/flink-1.0.3/bin# jps
4110 TaskManager

WebUI:8081(預設,可配置)
這裡寫圖片描述

4、編寫Flink程式,實現consume kafka的資料(demo)

4.1、程式碼
這裡就是簡單的實現接收kafka的資料,要指定zookeeper以及kafka的叢集配置,並指定topic的名字。
最後將consume的資料直接打印出來。

import java.util.Properties

import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._

/**
  * 用Flink消費kafka
  */
object ReadingFromKafka {

  private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181"
  private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092"
  private val TRANSACTION_GROUP = "transaction"

  def main(args : Array[String]){
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // configure Kafka consumer
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)

    //topicd的名字是new,schema預設使用SimpleStringSchema()即可
    val transaction = env
      .addSource(
        new FlinkKafkaConsumer08[String]("new", new SimpleStringSchema(), kafkaProps)
      )

    transaction.print()

    env.execute()

  }

}

4.2、打包:

mvn clean package

這裡寫圖片描述
看到成功標誌,否則會提示error的地方。

4.3、釋出到叢集

root@master:/usr/local/flink/flink-1.0.3/bin# flink run -c wikiedits.ReadingFromKafka /root/Documents/wiki-edits-0.1.jar 

驗證:程序及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps
6080 CliFrontend

這裡寫圖片描述

5、kafka produce資料,驗證flink是否正常消費

5.1、通過kafka console produce資料
之前已經在kafka中建立了名字為new的topic,因此直接produce new的資料:

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic new

生產資料:
這裡寫圖片描述

5.2、檢視flink的標準輸出中,是否已經消費了這部分資料:

root@worker2:/usr/local/flink/flink-1.0.3/log# ls -l | grep out
-rw-r--r-- 1 root root   254  629 09:37 flink-root-taskmanager-0-worker2.out
root@worker2:/usr/local/flink/flink-1.0.3/log# 

我們在worker2的log中發現已經有了資料,下面看看內容:
這裡寫圖片描述

OK,沒問題,flink正常消費了資料。

6、總結

kafka作為一個訊息系統,本身具有高吞吐、低延時、持久化、分散式等特點,其topic可以指定replication以及partitions,使得可靠性和效能都可以很好的保證。
Kafka+Flink的架構,可以使flink只需關注計算本身。

相關推薦

Flink計算程式設計--Kafka+Flink整合demo

1、簡介 1.1、Kafka Consumer提供了2種API:high level與low level(SimpleConsumer)。 (1)high level consumer的API較為簡單,不需要關心offset、partition、broker

Flink計算程式設計--watermark(水位線)簡介

1、watermark的概念 watermark是一種衡量Event Time進展的機制,它是資料本身的一個隱藏屬性。通常基於Event Time的資料,自身都包含一個timestamp,例如1472693399700(2016-09-01 09:29:59.

Flink計算程式設計--看看別人怎麼用Session Window

1、簡介 流處理在實際生產中體現的價值越來越大,Apache Flink這個純流式計算框架也正在被越來越多的公司所關注並嘗試使用其流上的功能。 在2017年波蘭華沙大資料峰會上,有一家叫做GetInData的公司,分享了一個關於他們內部如何使用Flink的s

Flink計算程式設計--Flink中allowedLateness詳細介紹及思考

1、簡介 Flink中藉助watermark以及window和trigger來處理基於event time的亂序問題,那麼如何處理“late element”呢? 也許有人會問,out-of-order element與late element有什麼區別?不

Flink計算程式設計--Session Window實戰

1、session window簡介 Flink從1.1開始支援Session window,它是屬於基於時間的視窗。 這裡以EventTime為例,基於時間的視窗,可以分為3種:TumblingEventTimeWindows,SlidingEventTi

Flink計算程式設計--Flink sink to Oracle

1、Flink connectors 關於Flink connectors,Flink 1.1提供了許多內建的第三方聯結器,這些connectors包括: Apache Kafka (sink/source) Elasticsearch (sink) E

Flink計算程式設計--在雙流中體會joinedStream與coGroupedStream

一、joinedStream與coGroupedStream簡介 在實際的流計算中,我們經常會遇到多個流進行join的情況,Flink提供了2個Transformations來實現。 如下圖: 注意:Join(Cogroups) two data st

Flink計算隨筆(1)

實現思路 class 不能 inter oss 最終 sta 而是 連續存儲 相比 Spark Stream、Kafka Stream、Storm 等,為什麽阿裏會選擇 Flink 作為新一代流式計算引擎?前期經過了哪些調研和對比? 大沙:我們是 2015 年開始調研新一代

flink計算隨筆(6)

starting nts 取消 add multi nvi handle input .sh ?生成,編譯模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.

flink計算隨筆(3)

容錯 額外 text 之間 方式 project 平臺 一對一 put Stateful Computations over Data Streams(在數據流的有狀態計算)Apache Flink是一個用於分布式流和批處理數據的開源平臺。Flink的核心是一個流數據流引擎

Flink計算隨筆(1)

Apache Flink(下簡稱Flink)專案是大資料處理領域最近冉冉升起的一顆新星,其不同於其他大資料專案的諸多特性吸引了越來越多人的關注。本文將深入分析Flink的一些關鍵技術與特性,希望能夠幫助讀者對Flink有更加深入的瞭解,對其他大資料系統開發者也能有所裨益。本文

flink計算隨筆(2)

MACOS下安裝flink: $ brew install apache-flink ... $ flink --version MACOS下啟動flink: $cd /usr/local/Cellar/apache-flink/1.6.0 $./libexec/bi

flink計算隨筆(5)

Windows 聚合事件(例如計數、和)在流上的工作方式與批處理不同。例如,不可能計算流中的所有元素,因為流通常是無限的(無界的)。相反,流上的聚合(計數、和等)是由視窗限定作用域的,例如“過去5分鐘的

flink計算隨筆(7)

Flink中的DataStream程式是在資料流上實現轉換的常規程式(例如,過濾、更新狀態、定義視窗、聚合)。資料流最初是從各種來源(例如,訊息佇列、套接字流、檔案)建立的。結果通過sink返回,它可以

flink計算隨筆(8)

SQL標準的Apache Calcite statement: setStatement | resetStatement | explain | describe | insert | update |

flink計算隨筆(9)

​生成,編譯模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-q

Flink計算中SQL表的概念和原理

文章目錄 前言 動態表和動態查詢的概念 動態表的時間屬性 引用 前言 Fink在新發布的1.7版本中,不斷完善和加強了SQL&Table API方面的功能支援。這使得在流計算過程中,使用者同樣能夠運用熟悉的SQL

Flink計算與時序資料庫Influxdb+grafana

1、簡介 關於Influxdb和grafana,可以參考:介紹或者influxdb官方文件,grafana官方文件。這裡預設已經將influxdb和grafana安裝完成。 2、Flink sink to Influxdb influxdb不屬於Fli

7、Flink 計算處理和批處理平臺

一、Flink 基本概念 Flink 是一個批處理和流處理結合的統一計算框架,其核心是一個提供了資料分發以及並行化計算的流資料處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。Flink 與 Storm 類似,屬於事件驅動型實時流系統。 所謂說事件驅動型指

flink 式處理中如何整合mybatis框架

flink 中自身雖然實現了大量的connectors,如下圖所示,也實現了jdbc的connector,可以通過jdbc 去操作資料庫,但是flink-jdbc包中對資料庫的操作是以ROW來操作並且對資料庫事務的控制比較死板,有時候操作關係型資料庫我們會非常懷念在java web應用開發中的非常優秀的myb