Flume 、 Kafka 和SparkStreaming 簡單整合
阿新 • • 發佈:2018-12-10
flume 傳遞資料給Kafka ,然後Spark 從Kafka 中接收資料進行處理.
本文使用netcat 工具作為flume 的輸入源 , 話不多說,直接貼程式碼.
1、flume
配置檔案配置:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test3 a1.sinks.k1.kafka.bootstrap.servers = big-data-2:6667 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.channels.c1.type=memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2、Kafka
監聽的埠是6667 (使用Ambari 工具,預設的埠就是這個).
建立一個topic
./kafka-topics.sh --create --zookeeper big-data-2:2181 --replication-factor 3 --partitions 3 --topic test3
然後用如下命令可以檢視建立的topic
./kafka-topics.sh --list --zookeeper big-data-2:2181
3、SparkStreaming
package com.it18zhang.spark.java; import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import scala.Tuple2; /** */ public class KafkaSparkStreamingDemo { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf(); conf.setAppName("kafkaSpark"); conf.setMaster("local[4]"); //建立Spark流應用上下文 JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Seconds.apply(5)); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "big-data-2:6667"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "g6"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("test3"); final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); //壓扁 JavaDStream<String> wordsDS = stream.flatMap(new FlatMapFunction<ConsumerRecord<String,String>, String>() { public Iterator<String> call(ConsumerRecord<String, String> r) throws Exception { String value = r.value(); List<String> list = new ArrayList<String>(); String[] arr = value.split(" "); for (String s : arr) { list.add(s); } return list.iterator(); } }); //對映成元組 JavaPairDStream<String, Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //聚合 JavaPairDStream<String, Integer> countDS = pairDS.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //列印 countDS.print(); streamingContext.start(); streamingContext.awaitTermination(); } }