1. 程式人生 > >Flink與Spark Streaming在與kafka結合的區別!

Flink與Spark Streaming在與kafka結合的區別!

本文主要是想聊聊flink與kafka結合。當然,單純的介紹flink與kafka的結合呢,比較單調,也沒有可對比性,所以的準備順便幫大家簡單回顧一下Spark Streaming與kafka的結合。

看懂本文的前提是首先要熟悉kafka,然後瞭解spark Streaming的執行原理及與kafka結合的兩種形式,然後瞭解flink實時流的原理及與kafka結合的方式。

kafka

kafka作為一個訊息佇列,在企業中主要用於快取資料,當然,也有人用kafka做儲存系統,比如存最近七天的資料。kafka的基本概念請參考:kafka入門介紹

更多kafka的文章請關注浪尖公眾號,閱讀。

首先,我們先看下圖,這是一張生產訊息到kafka,從kafka消費訊息的結構圖。

640?wx_fmt=png

當然, 這張圖很簡單,拿這張圖的目的是從中可以得到的跟本節文章有關的訊息,有以下兩個:

1,kafka中的訊息不是kafka主動去拉去的,而必須有生產者往kafka寫訊息。

2,kafka是不會主動往消費者釋出訊息的,而必須有消費者主動從kafka拉取訊息。

spark Streaming結合kafka

Spark Streaming現在在企業中流處理也是用的比較廣泛,但是大家都知道其不是真正的實時處理,而是微批處理。

在spark 1.3以前,SPark Streaming與kafka的結合是基於Receiver方式,顧名思義,我們要啟動1+個Receiver去從kafka裡面拉去資料,拉去的資料會每隔200ms生成一個block,然後在job生成的時候,取出該job處理時間範圍內所有的block,生成blockrdd,然後進入Spark core處理。

自Spark1.3以後,增加了direct Stream API,這種呢,主要特點是去掉了Receiver,在生成job,去取rdd的時候,計算每個partition要取資料的offset範圍,然後生成一個kafkardd,該rdd特點是與kafka的分割槽是一一對應的。

有上面的特點可以看出,Spark Streaming是要生成rdd,然後進行處理的,rdd資料集我們可以理解為靜態的,然每個批次,都會生成一個rdd,該過程就體現了批處理的特性,由於資料集時間段小,資料小,所以又稱微批處理,那麼就說明不是真正的實時處理。

還有一點,spark Streaming與kafka的結合是不會發現kafka動態增加的topic或者partition。

Spark的詳細教程,請關注浪尖公眾號,檢視歷史推文。

Spark Streaming與kafka結合原始碼講解,請加入知識星球,獲取。

flink結合kafka

大家都知道flink是真正的實時處理,他是基於事件觸發的機制進行處理,而不是像spark Streaming每隔若干時間段,生成微批資料,然後進行處理。那麼這個時候就有了個疑問,在前面kafka小節中,我們說到了kafka是不會主動往消費者裡面吐資料的,需要消費者主動去拉去資料來處理。那麼flink是如何做到基於事件實時處理kafka的資料呢?在這裡浪尖帶著大家看一下原始碼,flink1.5.0為例。

1,flink與kafka結合的demo。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
 params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)

val messageStream = env
 .addSource(kafkaConsumer)
 .map(in => prefix + in)

// create a Kafka producer for Kafka 0.10.x
val kafkaProducer = new FlinkKafkaProducer010(
 params.getRequired("output-topic"),
new SimpleStringSchema,
params.getProperties)

// write data into Kafka
messageStream.addSink(kafkaProducer)

env.execute("Kafka 0.10 Example")

從上面的demo可以看出,資料來源的入口就是FlinkKafkaConsumer010,當然這裡面只是簡單的構建了一個物件,並進行了一些配置的初始化,真正source的啟動是在其run方法中run方法的呼叫過程在這裡不講解,後面會出教程講解。

首先看一下類的繼承關係

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>

其中,run方法就在FlinkKafkaConsumerBase裡,當然其中open方法裡面對kafka相關內容進行裡初始化。

從輸入到計算到輸出完整的計算鏈條的呼叫過程,後面浪尖會出文章介紹。在這裡只關心flink如何從主動消費資料,然後變成事件處理機制的過程。

由於其FlinkKafkaConsumerBase的run比較長,我這裡只看重要的部分,首先是會建立Kafka09Fetcher

this.kafkaFetcher = createFetcher(
     sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);

接著下面有段神器,flink嚴重優越於Spark Streaming的,程式碼如下:

final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
this.discoveryLoopThread = new Thread(new Runnable() {
  @Override
public void run() {
     try {
        // --------------------- partition discovery loop ---------------------
List<KafkaTopicPartition> discoveredPartitions;
// throughout the loop, we always eagerly check if we are still running before
        // performing the next operation, so that we can escape the loop as soon as possible
while (running) {
           if (LOG.isDebugEnabled()) {
              LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}

           try {
              discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
              // the partition discoverer may have been closed or woken up before or during the discovery;
              // this would only happen if the consumer was canceled; simply escape the loop
break;
}

           // no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
              kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}

           // do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
              try {
                 Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
                 // may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
           }
        }
     } catch (Exception e) {
        discoveryLoopErrorRef.set(e);
} finally {
        // calling cancel will also let the fetcher loop escape
        // (if not running, cancel() was already called)
if (running) {
           cancel();
}
     }
  }
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

它定義了一個執行緒池物件,去動態發現kafka新增的topic(支援正則形式指定消費的topic),或者動態發現kafka新增的分割槽。

接著肯定是啟動動態發現分割槽或者topic執行緒,並且啟動kafkaFetcher

discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();
// --------------------------------------------------------------------
// make sure that the partition discoverer is properly closed
partitionDiscoverer.close();
discoveryLoopThread.join();

接著,我們進入kafkaFetcher的runFetchLoop方法,映入眼簾的是

// kick off the actual Kafka consumer
consumerThread.start();

這個執行緒是在構建kafka09Fetcher的時候建立的

this.consumerThread = new KafkaConsumerThread(
     LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);

KafkaConsumerThread 繼承自Thread,然後在其run方法裡,首先看到的是

// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover = this.handover;

這個handover的作用呢暫且不提,接著分析run方法裡面內容

1,獲取消費者

try {
  this.consumer = getConsumer(kafkaProperties);
}

2,檢測分割槽並且會重分配新增的分割槽

try {
  if (hasAssignedPartitions) {
     newPartitions = unassignedPartitionsQueue.pollBatch();
}
  else {
     // if no assigned partitions block until we get at least one
     // instead of hot spinning this loop. We rely on a fact that
     // unassignedPartitionsQueue will be closed on a shutdown, so
     // we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
  if (newPartitions != null) {
     reassignPartitions(newPartitions);
}

3,消費資料

// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
  try {
     records = consumer.poll(pollTimeout);
}
  catch (WakeupException we) {
     continue;
}
}

4,通過handover將資料發出去

try {
  handover.produce(records);
records = null;
}

由於被kafkaConsumerThread打斷了kafkaFetcher的runFetchLoop方法的分析,我們在這裡繼續

1,拉取handover.producer生產的資料

while (running) {
  // this blocks until we get the next records
  // it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

2,資料格式整理,並將資料整理好後,逐個Record傳送,將迴圈主動批量拉取kafka資料,轉化為事件觸發。

// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

  List<ConsumerRecord<byte[], byte[]>> partitionRecords =
        records.records(partition.getKafkaPartitionHandle());
  for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
     final T value = deserializer.deserialize(
           record.key(), record.value(),
record.topic(), record.partition(), record.offset());
     if (deserializer.isEndOfStream(value)) {
        // end of stream signaled
running = false;
        break;
}

     // emit the actual record. this also updates offset state atomically
     // and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}

肯定會注意到這行程式碼emitRecord(value, partition, record.offset(), record);,從這裡開始flink變成事件觸發的流引擎。

handover-樞紐

handover是在構建kafkaFetcher的時候構建的

this.handover = new Handover();

handover是一個工具,將一組資料或者異常從生產者執行緒傳輸到消費者執行緒。它高效的扮演了一個阻塞佇列的特性。該類運行於flink kafka consumer,用來在kafkaConsumer 類和主執行緒之間轉移資料和異常。

handover有兩個重要方法,分別是:

1,producer

producer是將kafkaConusmer獲取的資料傳送出去,在KafkaConsumerThread中呼叫。程式碼如上

2,pollnext

從handover裡面拉去下一條資料,會阻塞的,行為很像是從一個阻塞佇列裡面拉去資料。

綜述

kafkaConsumer批量拉去資料,flink將其經過整理之後變成,逐個Record傳送的事件觸發式的流處理。這就是flink與kafka結合事件觸發時流處理的基本思路。

重要的事情再說一遍:Flink支援動態發現新增topic或者新增partition哦。具體實現思路,前面有程式碼為證,後面會對比spark Streaming的這塊(不支援動態發現新增kafka topic或者partition),來詳細講解。

推薦閱讀: