Flink 寫入資料到 Elasticsearch
Elasticsearch 的 maven 依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
說明:flink1.3.0版本及以上版本支援ES1.X、ES2.X、ES5.X
<properties> <scala.version>2.11</scala.version> <flink.version>1.5.5</flink.version> </properties>
Flink 將流資料 Sink 到 Elasticsearch,一般需要自己自定義 Sink 的實現。
// 構建應用的Env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 開啟checkpoint機制,確保精確處理一次 env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); // 接入Kafka資料來源 DataStream<DataType> result = env.addSource(new KafkaSource()); // 獲取ES對應Sink ElasticsearchSink<DataType> elasticsearchSink = WriteToES.getElasticsearchSink(); result.addSink(elasticsearchSink); // 觸發job執行 env.execute("Source from Kafka, sink to Elasticsearch.");
自定義 KafkaSource:
public class KafkaSource extends RichParallelSourceFunction<DataType> { public void open(Configuration parameters) throws Exception { // <YOUR CODE> } public void run(final SourceContext<DataType> ctx) throws Exception { // <YOUR CODE> // ctx.collect(data); } }
自定義 ElasticsearchSink:
public class WriteToES { private static final Logger LOG = LoggerFactory.getLogger(WriteToES.class); private static final String INDEX = "<INDEX_NAME>"; // index name private static final String DOCUMENT_TYPE = "<DOCUMENT_TYPE>"; // document type public static ElasticsearchSink<DataType> getElasticsearchSink() throws Exception { // 設定ES屬性資訊 Map<String, String> config = new HashMap<String, String>(); config.put(ElasticsearchConstant.CLUSTER_NAME, "<CLUSTER_NAME>"); // 該配置表示批量寫入ES時的記錄條數 config.put(ElasticsearchConstant.BULK_FLUSH_MAX_ACTIONS, "100"); // 設定ES叢集節點列表 List<InetSocketAddress> transportAddresses = new ArrayList<>(); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("<CLUSTER_IP>"), 40103)); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("<CLUSTER_IP>"), 40103)); // 建立ElasticsearchSinkFunction,需覆寫process方法 ElasticsearchSinkFunction<DataType> elasticsearchSinkFunction = new ElasticsearchSinkFunction<DataType>() { public IndexRequest createIndexRequest(DataType element) { String id = String.valueOf(element.getId()); return Requests.indexRequest() .index(INDEX) .type(DOCUMENT_TYPE) .id(id) .routing(id) .source(transfer(element)); } public void process(DataType element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }; // 根據conf、addresses、sinkFunction構建ElasticsearchSink例項 ElasticsearchSink<DataType> elasticsearchSink = new ElasticsearchSink<DataType>(config, transportAddresses, elasticsearchSinkFunction); return elasticsearchSink; } /** * 將訂購實體轉換成es儲存的資料 */ private static Map<String, Object> transfer(DataType element) { Map<String, Object> esMap = new HashMap<String, Object>(); // <YOUR CODE> return esMap; } }
Reference
https://blog.csdn.net/lisongjia123/article/details/81121994
http://www.54tianzhisheng.cn/2018/12/30/Flink-ElasticSearch-Sink/
本文受原創保護,未經作者授權,禁止轉載。 linkedkeeper.com (文/張鬆然)