1. 程式人生 > >精通flink讀書筆記(2)——用DataStreamAPI處理資料

精通flink讀書筆記(2)——用DataStreamAPI處理資料

涉及的部分包括: 一       Execution environment flink支援的環境建立方式有:
  • get一個已經存在的flink環境  方式為getExecutionEnvironment()
  • create一個local環境   createLocalEnvironment()
  • create一個romote環境   createRemoteEnvironment(String host,int port,String and .jar files)
二         Data Source        flink內建了兩種資料來源方式,一種是基於socket  一種是基於File 基於socket的有:
  • socketTextStream(hostName,port)
  • socketTextStream(hostName,port,delimiter)
  • socketTextStream(hostName,port,delimiter,maxRetry)
基於File的有:
  • readTextFile(String path)
  • readFile(FileInputFormat<out> inputFormat,String path)  如果資料來源不是text檔案的話,可以指定檔案的格式
flink也支援讀取流式檔案
  • readFileStream(String filePath,long intervalMills,FileMonitoringFunction.watchType  watchType)
其中interval 是輪詢的間隔,這個需要指定,而且watch type保留三種選擇
  • FileMonitoringFunction.WatchType.ONLY_NEW_FILES 只處理新檔案
  • FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED  只處理追加的內容
  • FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED  既要處理追加的內容,也會處理file中之前的內容
如果檔案不是text型別的檔案,我們還要指定輸入檔案的格式
  • readFile(fleInputFormat,path,watchType,interval,pathFilter,typeInfo)
當採用這種格式的時候,要注意這背後會有兩個子任務的產生,一個是基於watch type的子任務,它不是並行的,只是監控檔案的輪詢以及包括檔案處理的情況, 另一個是並行的任務,才是真正的處理檔案的
Transformations  轉換的操作 map inputStream.map{x => x * 5} flatMap inputStream.flatMap{ str => str.split(" ")} filter inputStream.filter{ _ != 1} KeyBy keyBy是根據key來進行邏輯分割槽,通常是根據hash函式來進行分割槽,結果是KeyedDataStream inputStream.keyBy("someKey") Reduce 是把一個keyedDataStream的資料流,按照給定的函式進行聚合操作 KeyedInputStream.reduce{_+_} Fold KeyedInputStream.fold("Start")((str,i) => {str + "=" + i}) 結果是 Start=1=2=3=4=5 Aggregations keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key") max 和maxBy的區別是,max返回的最大的值,max返回的是最大的值所對應的key window操作,window操作需要作用於keyedDataStream,而且要有window的分配器 window的分配器有四類:
  • tumbling window
  • sliding window
  • global window
  • session window
當然也可以自定義window ,利用windowAssginer 的類 inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))) Global window global window 是沒有結束點的window除非指定了一個觸發器,通常每一個元素是被分配給一個單獨的已經通過key邏輯分割槽的global window, 如果沒有指定觸發器的話,這個window是不會執行的 Tumbling window 依據一個固定的時間段來建立的window Sliding windows 滑動視窗 Session windows window視窗的邊界根據的是輸入的資料,所以該視窗的開始時間和視窗的大小是靈活不確定的,當然我們也可以配置超時時間。 windowAll windowAll不是併發的資料轉換操作 inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) Union 把兩個或多個流合併,如果一個流和它本身合併的話,每個元素會輸出兩次 inputStream.union(inputStream1,inputStream2,...) window join 我們也可以操作join的操作,依據的某些key,例如下面的例子,就是兩個流的join操作,join的條件是第一個流的屬性和第二個流的屬性一樣 inputStream.join(inputStream1) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply( joinFunction) Split  將一個混合的流分成多個指定的流
val split = inputStream.split(     (num : Int) => 
        (num % 2) match {             case 0 => List("even")
            case 1 => List("odd")
        }
) Select  從一個SplitStream流中選擇一個指定的流 val even = split select "even" val even = split select "odd" val all = split.select("even","odd") Project  選擇指定的屬性的資料 val in : DataStream[(Int,Double,String)] = // val out = in.project(2,1) 結果是 (1,10.0,A)  =>  (A,10.0) (2,20.0,C)  =>  (C,20.0) Physical Partition 下面的操作都是物理分割槽操作,keyedBy 等操作都是邏輯分割槽操作 custom partitioning   使用者自定義分割槽操作 inputStream.partitionCustom(partitioner,"somekey") inputStream.partitionCustom(partitioner,0) partitioner要實現一個高效的hash函式 Random partition inputStream.shuffle() Rebalancing partitioning 依據的是輪詢的方式 inputStream.rebalance() Rescaling  這個不會引起網路IO,只會在單個節點上進行rebalance inputStream.rescale() Broadcasting  是廣播到每個分割槽partition    spark是廣播到每個節點 inputStream.broadcast() 三        Data sinks
  • writeAsText()  
  • writeAsCsv()
  • print()/printErr()  以標準輸出的方式輸出結果
  • writeUsingOutputFormat()  以自定義的格式資料,定義的格式要繼承 OutputFormat,它提供了序列化和反序列化
  • writeToSocket()  要求定義 serializationSchema
四          Event time and  watermarks 下面的例子展示如何設定輸入時間和處理時間,  攝入時間和處理時間的watermark是自動生成的 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //or env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) 而對於事件時間戳的和水印的分配有兩種方式 一是直接採用資料來源的時間戳 二是利用一個時間戳的生成器    不過最好是用資料來源的時間戳 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 五             Connectors kafka connector kafka的資料先通過Flink-Kafka-Connector 進入到Flink Data Streams 再進入處理 需要一個jar檔案,flink-connector-kafka-0.9_2.11
Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("group.id","test"); DataStream<String> input = env.addSource(new FlinkKafkaConsumer09<String>("mytopic","new SimpleStringSchema(),properties")); scala: val properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper.connect","localhost:2181"); properties.setProperty("group.id","test"); stream = env.addSource(new FlinkKafkaConsumer09[String]("mytopic",new SimpleStringSchema(),properties)).print 要注意容錯,設定checkpoint kafka producer也可以作為sink來用 stream.addSink(new FlinkKafkaProducer09<String>("localhost:9092","mytopic",new SimpleStringSchema())); ElasticSearch connector ES有另個主要的版本,Flink都是支援的 jar檔案為  flink-connector-elasticsearch_2.11 Flink的ES connector提供了兩種連線ES的方式:嵌入式節點  和  運輸客戶端
Flink的資料通過Flink-ES的connector 進入到Flink-ES的sink,就可以分兩種方式傳入ES節點了 以嵌入式節點的方式:使用BulkProcessor向ES傳送文件資料,我們可以配置在往ES傳送請求之前可以快取多少的請求 val input : DataStream[String] = ... val config [ new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") text.addSink(new ElasticsearchSink(config,new IndexRequestBuilder[String] {     override def createIndexRequest(element :String ,ctx:RuntimeContext):IndexRequest = {         val json = new util.HashMap[String,AnyRef]
        json.put("data",element)
        Requests.indexRequest.index("my-index",'type'("my-type").source(json))
    }
})) 傳輸客戶端的模式 : ES也允許通過transport client 的模式在9300埠 val input : DataStream[String] = ... val config = new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") val transports = new ArrayList[String] transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) text.addSink(new ElasticsearchSink(config,transports,new IndexRequestBuilder[String] {     override def createIndexRequest(element : String ,ctx : RuntimeContext) : IndexRequest = {         val json  = new util.HashMap[String,AnyRef]
        json.put("data",element)
        Requests.indexRequest.index("my-index"),'type'("my-type").source(json)
    }
})) 感測器資料生成進入kafka再進入flink處理的例子 //設定流資料的執行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.enableCheckpointing(5000) env.setStreamtimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper,connect","localhost:2181"); properties.setProperty("group.id","test"); FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("temp",new SimpleStringSchema(),properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); 訊息的格式  :  Timestamp,Temperature,Sensor-Id 從訊息中抽取時間戳 public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> {     private static final long serialVersionUID = 1L;
    public long extractTimestramp(String arg0,long arg1){         if(null != arg0 && arg0.contains(",")) {             String parts[] = arg0.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0;
    }
    public Watermark checkAndGetNextWatermark(String arg0,long arg1){         if(null != arg0 && arg0.contains(",")) {             String parts[] = arg0.split(",")
            return new Watermark(Long.parseLong(parts[000]));
        }
        return null;
    }
} 對溫度計算平均值 DataStream<Tuple2<String,Double>> keyedStream = env.addSource().flatMap(new Splitter()).keyBy(0) .timeWindow(Time.second(300)) .apply(new WindowFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple,TimeWindow>() {     public void apply(Tuple key,TimeWindow window,Iterable<Tuple2<String,Double>> input,Collector<Tuple2<String,Double>> out) throws Exception{         double sum = 0L;
        int count = 0;
        for(Tuple2<String,Double> record : input) {             sum += record.f1;   
            count ++;
        }
    Tuple2<String,Double> result = input.iterator().next();
    result.f1 = (sum/count);     out.collect(result);
    }
});