精通flink讀書筆記(2)——用DataStreamAPI處理資料
阿新 • • 發佈:2019-01-31
涉及的部分包括:
一 Execution environment
flink支援的環境建立方式有:
Transformations 轉換的操作 map inputStream.map{x => x * 5} flatMap inputStream.flatMap{ str => str.split(" ")} filter inputStream.filter{ _ != 1} KeyBy keyBy是根據key來進行邏輯分割槽,通常是根據hash函式來進行分割槽,結果是KeyedDataStream inputStream.keyBy("someKey") Reduce 是把一個keyedDataStream的資料流,按照給定的函式進行聚合操作 KeyedInputStream.reduce{_+_} Fold KeyedInputStream.fold("Start")((str,i) => {str + "=" + i}) 結果是 Start=1=2=3=4=5 Aggregations keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key") max 和maxBy的區別是,max返回的最大的值,max返回的是最大的值所對應的key window操作,window操作需要作用於keyedDataStream,而且要有window的分配器 window的分配器有四類:
val split = inputStream.split( (num : Int) =>
(num % 2) match { case 0 => List("even")
case 1 => List("odd")
}
) Select 從一個SplitStream流中選擇一個指定的流 val even = split select "even" val even = split select "odd" val all = split.select("even","odd") Project 選擇指定的屬性的資料 val in : DataStream[(Int,Double,String)] = // val out = in.project(2,1) 結果是 (1,10.0,A) => (A,10.0) (2,20.0,C) => (C,20.0) Physical Partition 下面的操作都是物理分割槽操作,keyedBy 等操作都是邏輯分割槽操作 custom partitioning 使用者自定義分割槽操作 inputStream.partitionCustom(partitioner,"somekey") inputStream.partitionCustom(partitioner,0) partitioner要實現一個高效的hash函式 Random partition inputStream.shuffle() Rebalancing partitioning 依據的是輪詢的方式 inputStream.rebalance() Rescaling 這個不會引起網路IO,只會在單個節點上進行rebalance inputStream.rescale() Broadcasting 是廣播到每個分割槽partition spark是廣播到每個節點 inputStream.broadcast() 三 Data sinks
Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("group.id","test"); DataStream<String> input = env.addSource(new FlinkKafkaConsumer09<String>("mytopic","new SimpleStringSchema(),properties")); scala: val properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper.connect","localhost:2181"); properties.setProperty("group.id","test"); stream = env.addSource(new FlinkKafkaConsumer09[String]("mytopic",new SimpleStringSchema(),properties)).print 要注意容錯,設定checkpoint kafka producer也可以作為sink來用 stream.addSink(new FlinkKafkaProducer09<String>("localhost:9092","mytopic",new SimpleStringSchema())); ElasticSearch connector ES有另個主要的版本,Flink都是支援的 jar檔案為 flink-connector-elasticsearch_2.11 Flink的ES connector提供了兩種連線ES的方式:嵌入式節點 和 運輸客戶端
Flink的資料通過Flink-ES的connector 進入到Flink-ES的sink,就可以分兩種方式傳入ES節點了 以嵌入式節點的方式:使用BulkProcessor向ES傳送文件資料,我們可以配置在往ES傳送請求之前可以快取多少的請求 val input : DataStream[String] = ... val config [ new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") text.addSink(new ElasticsearchSink(config,new IndexRequestBuilder[String] { override def createIndexRequest(element :String ,ctx:RuntimeContext):IndexRequest = { val json = new util.HashMap[String,AnyRef]
json.put("data",element)
Requests.indexRequest.index("my-index",'type'("my-type").source(json))
}
})) 傳輸客戶端的模式 : ES也允許通過transport client 的模式在9300埠 val input : DataStream[String] = ... val config = new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") val transports = new ArrayList[String] transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) text.addSink(new ElasticsearchSink(config,transports,new IndexRequestBuilder[String] { override def createIndexRequest(element : String ,ctx : RuntimeContext) : IndexRequest = { val json = new util.HashMap[String,AnyRef]
json.put("data",element)
Requests.indexRequest.index("my-index"),'type'("my-type").source(json)
}
})) 感測器資料生成進入kafka再進入flink處理的例子 //設定流資料的執行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.enableCheckpointing(5000) env.setStreamtimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper,connect","localhost:2181"); properties.setProperty("group.id","test"); FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("temp",new SimpleStringSchema(),properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); 訊息的格式 : Timestamp,Temperature,Sensor-Id 從訊息中抽取時間戳 public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> { private static final long serialVersionUID = 1L;
public long extractTimestramp(String arg0,long arg1){ if(null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(",");
return Long.parseLong(parts[0]);
}
return 0;
}
public Watermark checkAndGetNextWatermark(String arg0,long arg1){ if(null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(",")
return new Watermark(Long.parseLong(parts[000]));
}
return null;
}
} 對溫度計算平均值 DataStream<Tuple2<String,Double>> keyedStream = env.addSource().flatMap(new Splitter()).keyBy(0) .timeWindow(Time.second(300)) .apply(new WindowFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple,TimeWindow>() { public void apply(Tuple key,TimeWindow window,Iterable<Tuple2<String,Double>> input,Collector<Tuple2<String,Double>> out) throws Exception{ double sum = 0L;
int count = 0;
for(Tuple2<String,Double> record : input) { sum += record.f1;
count ++;
}
Tuple2<String,Double> result = input.iterator().next();
result.f1 = (sum/count); out.collect(result);
}
});
- get一個已經存在的flink環境 方式為getExecutionEnvironment()
- create一個local環境 createLocalEnvironment()
- create一個romote環境 createRemoteEnvironment(String host,int port,String and .jar files)
- socketTextStream(hostName,port)
- socketTextStream(hostName,port,delimiter)
- socketTextStream(hostName,port,delimiter,maxRetry)
- readTextFile(String path)
- readFile(FileInputFormat<out> inputFormat,String path) 如果資料來源不是text檔案的話,可以指定檔案的格式
- readFileStream(String filePath,long intervalMills,FileMonitoringFunction.watchType watchType)
- FileMonitoringFunction.WatchType.ONLY_NEW_FILES 只處理新檔案
- FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 只處理追加的內容
- FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED 既要處理追加的內容,也會處理file中之前的內容
- readFile(fleInputFormat,path,watchType,interval,pathFilter,typeInfo)
Transformations 轉換的操作 map inputStream.map{x => x * 5} flatMap inputStream.flatMap{ str => str.split(" ")} filter inputStream.filter{ _ != 1} KeyBy keyBy是根據key來進行邏輯分割槽,通常是根據hash函式來進行分割槽,結果是KeyedDataStream inputStream.keyBy("someKey") Reduce 是把一個keyedDataStream的資料流,按照給定的函式進行聚合操作 KeyedInputStream.reduce{_+_} Fold KeyedInputStream.fold("Start")((str,i) => {str + "=" + i}) 結果是 Start=1=2=3=4=5 Aggregations keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key") max 和maxBy的區別是,max返回的最大的值,max返回的是最大的值所對應的key window操作,window操作需要作用於keyedDataStream,而且要有window的分配器 window的分配器有四類:
- tumbling window
- sliding window
- global window
- session window
val split = inputStream.split( (num : Int) =>
(num % 2) match { case 0 => List("even")
case 1 => List("odd")
}
) Select 從一個SplitStream流中選擇一個指定的流 val even = split select "even" val even = split select "odd" val all = split.select("even","odd") Project 選擇指定的屬性的資料 val in : DataStream[(Int,Double,String)] = // val out = in.project(2,1) 結果是 (1,10.0,A) => (A,10.0) (2,20.0,C) => (C,20.0) Physical Partition 下面的操作都是物理分割槽操作,keyedBy 等操作都是邏輯分割槽操作 custom partitioning 使用者自定義分割槽操作 inputStream.partitionCustom(partitioner,"somekey") inputStream.partitionCustom(partitioner,0) partitioner要實現一個高效的hash函式 Random partition inputStream.shuffle() Rebalancing partitioning 依據的是輪詢的方式 inputStream.rebalance() Rescaling 這個不會引起網路IO,只會在單個節點上進行rebalance inputStream.rescale() Broadcasting 是廣播到每個分割槽partition spark是廣播到每個節點 inputStream.broadcast() 三 Data sinks
- writeAsText()
- writeAsCsv()
- print()/printErr() 以標準輸出的方式輸出結果
- writeUsingOutputFormat() 以自定義的格式資料,定義的格式要繼承 OutputFormat,它提供了序列化和反序列化
- writeToSocket() 要求定義 serializationSchema
Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("group.id","test"); DataStream<String> input = env.addSource(new FlinkKafkaConsumer09<String>("mytopic","new SimpleStringSchema(),properties")); scala: val properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper.connect","localhost:2181"); properties.setProperty("group.id","test"); stream = env.addSource(new FlinkKafkaConsumer09[String]("mytopic",new SimpleStringSchema(),properties)).print 要注意容錯,設定checkpoint kafka producer也可以作為sink來用 stream.addSink(new FlinkKafkaProducer09<String>("localhost:9092","mytopic",new SimpleStringSchema())); ElasticSearch connector ES有另個主要的版本,Flink都是支援的 jar檔案為 flink-connector-elasticsearch_2.11 Flink的ES connector提供了兩種連線ES的方式:嵌入式節點 和 運輸客戶端
Flink的資料通過Flink-ES的connector 進入到Flink-ES的sink,就可以分兩種方式傳入ES節點了 以嵌入式節點的方式:使用BulkProcessor向ES傳送文件資料,我們可以配置在往ES傳送請求之前可以快取多少的請求 val input : DataStream[String] = ... val config [ new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") text.addSink(new ElasticsearchSink(config,new IndexRequestBuilder[String] { override def createIndexRequest(element :String ,ctx:RuntimeContext):IndexRequest = { val json = new util.HashMap[String,AnyRef]
json.put("data",element)
Requests.indexRequest.index("my-index",'type'("my-type").source(json))
}
})) 傳輸客戶端的模式 : ES也允許通過transport client 的模式在9300埠 val input : DataStream[String] = ... val config = new util.HashMap[String,String] config.put("bulk.flush.max.actions","1") config.put("cluster.name","cluster-name") val transports = new ArrayList[String] transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) transports.add(new InetSockettransportAddress("es-node-1",9300)) text.addSink(new ElasticsearchSink(config,transports,new IndexRequestBuilder[String] { override def createIndexRequest(element : String ,ctx : RuntimeContext) : IndexRequest = { val json = new util.HashMap[String,AnyRef]
json.put("data",element)
Requests.indexRequest.index("my-index"),'type'("my-type").source(json)
}
})) 感測器資料生成進入kafka再進入flink處理的例子 //設定流資料的執行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.enableCheckpointing(5000) env.setStreamtimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("zookeeper,connect","localhost:2181"); properties.setProperty("group.id","test"); FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("temp",new SimpleStringSchema(),properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); 訊息的格式 : Timestamp,Temperature,Sensor-Id 從訊息中抽取時間戳 public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> { private static final long serialVersionUID = 1L;
public long extractTimestramp(String arg0,long arg1){ if(null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(",");
return Long.parseLong(parts[0]);
}
return 0;
}
public Watermark checkAndGetNextWatermark(String arg0,long arg1){ if(null != arg0 && arg0.contains(",")) { String parts[] = arg0.split(",")
return new Watermark(Long.parseLong(parts[000]));
}
return null;
}
} 對溫度計算平均值 DataStream<Tuple2<String,Double>> keyedStream = env.addSource().flatMap(new Splitter()).keyBy(0) .timeWindow(Time.second(300)) .apply(new WindowFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple,TimeWindow>() { public void apply(Tuple key,TimeWindow window,Iterable<Tuple2<String,Double>> input,Collector<Tuple2<String,Double>> out) throws Exception{ double sum = 0L;
int count = 0;
for(Tuple2<String,Double> record : input) { sum += record.f1;
count ++;
}
Tuple2<String,Double> result = input.iterator().next();
result.f1 = (sum/count); out.collect(result);
}
});