Spark實戰(二):Kafka-SparkStreaming-Elasticsearch
阿新 • • 發佈:2019-01-25
本文介紹saprk實時部分----spark-streaming。spark-streaming可以實現實時批處理功能,實際上還是相當於小的批處理,但是是7*24工作,可以近實時但需要維護成本。本文裡的用java寫的demo,實現功能是將kafka作為spark-streaming的輸入源,Elasticsearch作為輸出源來實現實時處理架構。
還是先上程式碼
maven
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.3.0</version> </dependency> </dependencies>
程式碼
import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; public static void main(String[] args) throws Exception { // 從SparkConf建立StreamingContext並指定1秒鐘的批處理大小 String master = "spark://192.168.7.51:7077"; // String master = "local[2]"; SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest") // .set("spark.executor.memory", "1g") // .set("spark.dynamicAllocation.enabled", "false") .set("spark.es.nodes", "192.168.7.51")//指定es地址 .set("spark.es.port", "9200")//指定es埠號 .setJars(new String[] {//如果是叢集模式要指定jar包,整個專案打包 "/Users/username/eclipse-workspace/spark-streaming/target/spark-streaming-0.0.1-SNAPSHOT.jar" }); //指定1秒獲取一次kafka資料 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); String brokers = "kafka1:9092";//指定kafka地址 String groupId = "kafka";//指定消費者組id String topics = "test1";//指定topic Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //取出1秒內的資料轉成rddstream JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams)); //取出每條message中的value JavaDStream<String> lines = messages.map(record -> record.value()); //拼成可以插入Elasticsearch的格式 JavaDStream<String> out = lines.map(str -> "{\"test\":\""+str+"\"}"); //列印 out.print(); //寫入Elasticsearch JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc"); //啟動streaming jssc.start(); // 等待作業完成 jssc.awaitTermination(); jssc.stop(); }
spark-streaming接收kafka資料
spark-streaming接收kafka資料使用spark-streaming-kafka-0-10_2.11包。
maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
scala
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092,anotherhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topicA", "topicB") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value))
java
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
spark-streaming輸出結果到Elasticsearch
maven
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.3.0</version>
</dependency>
scala
import org.elasticsearch.spark.streaming.EsSparkStreaming;
java
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
需要指定Elasticsearch,可以寫在配置檔案裡,也可以在程式裡指定。
- 配置檔案:spark-default.conf
spark.es.nodes eshosts
spark.es.port 9200
- 寫在程式裡
SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest")
// .set("spark.executor.memory", "1g")
// .set("spark.dynamicAllocation.enabled", "false")
.set("spark.es.nodes", "192.168.7.51")
.set("spark.es.port", "9200");
寫入es
JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");