1. 程式人生 > >Flink開發實戰二 ——實戰案例

Flink開發實戰二 ——實戰案例

 

本文以flink本地模式 進行各個場景的實戰開發

flink本地執行模式

Flink支援兩種不同的本地執行機制:

  1.  LocalExecutionEnvironment啟動完整的Flink執行環境,包括一個JobManager和一個TaskManager。這些包含了記憶體管理以及在叢集模式下執行時所執行的所有內部演算法。

         LocalEnvironment也可以向Flink傳入使用者自定義配置。

        Configuration conf = new Configuration();

        conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);

       final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

  1.  CollectionEnvironment在Java集合上執行Flink程式(executing the Flink program on Java collections)。這種模式不會啟動完整的Flink執行環境,因此執行開銷比較低以及輕量級。例如,DataSet的map轉換操作將map()函式應用於Java列表中的所有元素上。

環境獲取

Flink 批處理環境

val env = ExecutionEnvironment.getExecutionEnvironment

Flink 流處理環境

val env = StreamExecutionEnvironment.getExecutionEnvironment

實戰案例

基於檔案(本地,hdfs的wordcount

public class FunctionTest {
    public static void main(String[] args) throws Exception {
        //建立流執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //讀取文字檔案中的資料
        DataStreamSource<String> streamSource = env.readTextFile("C:/flink_data/1.txt");
        //進行邏輯計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = streamSource
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1);
        dataStream.print();
        //設定程式名稱
        env.execute("Window WordCount");
    }
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public  class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

、基於socket的wordcount(scala版本)

1.傳送資料

在linux機器上執行  nc-lk傳送資料

    nc -lk 9999

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object ScortWc {
  def main(args: Array[String]): Unit = {
    //1.建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.定義資料流來源
    val text = env.socketTextStream("127.0.0.11", 9999)
    //3.進行wordcount計算
    val counts = text.flatMap(_.toLowerCase.split(" ") filter (_.nonEmpty))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    //4.列印結果
      counts.print
    //觸發計算
    env.execute("Window Stream WordCount")
  }

}

    執行效果

三.基於kafka的wordcount

新增maven依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>

程式程式碼

object DataFkafka {
  def main(args: Array[String]): Unit = {
    //設定kafka連線引數
    val  properties = new Properties()
    properties.setProperty("bootstrap.servers", "10.10.4.11:9092,10.10.49.183:9092,10.10.49.207:9092");
    properties.setProperty("zookeeper.connect", "10.10.4.11:2181,10.10.49.183:2181");
    properties.setProperty("group.id", "res");
    //獲取流執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設定時間型別
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //設定檢查點時間間隔
    env.enableCheckpointing(1000)
    //設定檢查點模式
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //建立kafak消費者,獲取kafak中的資料
    val myConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String]("flink", new SimpleStringSchema(), properties)
    val kafkaData: DataStream[String] = env.addSource(myConsumer)
    kafkaData.print()
    //資料儲存到hdfs
    kafkaData.writeAsText("hdfs://10.10.4.11:9000/output/flink.txt")
    print("kafka")
    //設定程式名稱
    env.execute("data_from_kafak_wangzh")

  }

}

,事件時間的使用 event time

資料準備

準備一組時間亂序的資料  然後使用  nc -lk 9999 這個指令模擬實時資料流

67000,boos2,pc1,200.0
62000,boos2,pc2,500.0
78000,boos2,pc2,600.0
71010,boos2,pc2,700.0
62010,boos2,pc2,500.0

67000 6200 是時間的毫秒值 正好差5s

需求


計算真實資料流,五秒鐘之內的價格總和

顯然如果不使用事件時間,是無法區分事件真實時間的,因此這個這種需求下必須使用event time、也就是處理亂序的資料流。

程式碼實現

/**
  * Created by  ${WangZhiHua} on 2018/10/31
  */

object  EventTime_test {
    def main(args: Array[String]) {
      import org.apache.flink.api.scala._
      //1.建立執行環境,並設定為使用EventTime
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      //置為使用EventTime
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

      //2.建立資料流,並進行資料轉化
      val source = env.socketTextStream("127.0.0.11", 9999)
      //定義一個樣例類去封裝資料
      case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
      val dst1: DataStream[SalePrice] = source.map(value => {
        val columns = value.split(",")
        SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
      })

      //3.使用EventTime進行求最值操作
      val dst2 = dst1
        //提取訊息中的時間戳屬性
        .assignAscendingTimestamps(_.time)
        .keyBy(_.productName)
        //.timeWindow(Time.seconds(5))//設定window方法一
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .sum(3)
      //設定window方法二
      // .max("price")
      //4.顯示結果
      dst2.print()
      //5.觸發流計算
      env.execute()
    }

列印結果

 

五,生成並跟蹤watermark程式碼

我們從socket接收資料,然後經過map後立刻抽取timetamp並生成watermark,之後應用window來看看watermark和event time如何變化,才導致window被觸發的

package com.missfresh.flinkCore

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
  * Created by  ${WangZhiHua} on 2018/10/31
  */


object WaterMarks_test {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //獲取實時流
    val input = env.socketTextStream("127.0.0.11", 9999)

    val inputMap = input.map(f => {
      val arr = f.split(",")
      val code = arr(0)
      val time = arr(1).toLong
      (code, time)
    })

    val watermark = inputMap.
      //獲取時間戳和水印
      assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
        var currentMaxTimestamp = 0L
        val maxOutOfOrderness = 10000L
        //最大允許的亂序時間是10s
        var a: Watermark = null
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      //獲取水印
        override def getCurrentWatermark: Watermark = {
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          a
        }
      //獲取時間戳
        override def extractTimestamp(t: (String, Long), l: Long): Long = {
          val timestamp = t._2
          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
          println("timestamp:" + t._1 + "," + t._2 + "|" + format.format(t._2) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)
          timestamp
        }
      })

    val window = watermark
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .apply(new WindowFunctionTest)
      window.print()
     env.execute()
  }

  class WindowFunctionTest extends WindowFunction[(String, Long), (String, Int, String, String, String, String), String, TimeWindow] {

    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Int, String, String, String, String)]): Unit = {
      val list = input.toList.sortBy(_._2)
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      out.collect(key, input.size, format.format(list.head._2), format.format(list.last._2), format.format(window.getStart), format.format(window.getEnd))
    }

  }

}