1. 程式人生 > >Spark整合Kafka實時流計算Java案例

Spark整合Kafka實時流計算Java案例

package com.test;

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;

public class Test5 {

	public static void main(String[] args) throws InterruptedException {
		// 接收資料的地址和埠
		final JavaPairRDD<String, Integer>[] lastRdd = new JavaPairRDD[1];

		SparkConf conf = new SparkConf().setMaster("local").setAppName(
				"streamingTest");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("ERROR");
		sc.setCheckpointDir("./checkpoint");
		JavaStreamingContext ssc = new JavaStreamingContext(sc,
				Durations.seconds(10));

		// kafka相關引數,必要!缺了會報錯
		Map<String, Object> kafkaParams = new HashMap<>();
		kafkaParams.put("bootstrap.servers", "192.168.174.200:9092");
		kafkaParams.put("key.deserializer", StringDeserializer.class);
		kafkaParams.put("value.deserializer", StringDeserializer.class);
		kafkaParams.put("group.id", "newgroup2");
		kafkaParams.put("auto.offset.reset", "latest");
		kafkaParams.put("enable.auto.commit", false);

		Collection<String> topics = Arrays.asList("test");

		JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
				.createDirectStream(ssc, LocationStrategies.PreferConsistent(),
						ConsumerStrategies.<String, String> Subscribe(topics,
								kafkaParams));

		// 注意這邊的stream裡的引數本身是個ConsumerRecord物件
		JavaPairDStream<String, Integer> counts = stream
				.flatMap(
						x -> Arrays.asList(x.value().toString().split(" "))
								.iterator())
				.mapToPair(x -> new Tuple2<String, Integer>(x, 1))
				.reduceByKey((x, y) -> x + y);
		//counts.print();

		JavaPairDStream<String, Integer> result = counts
				.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Optional<Integer> call(List<Integer> values,
							Optional<Integer> state) throws Exception {
						/**
						 * values:經過分組最後 這個key所對應的value,如:[1,1,1,1,1]
						 * state:這個key在本次之前之前的狀態
						 */
						Integer updateValue = 0;
						if (state.isPresent()) {
							updateValue = state.get();
						}

						for (Integer value : values) {
							updateValue += value;
						}
						return Optional.of(updateValue);
					}
				});

		result.print();

		ssc.start();
		ssc.awaitTermination();
		ssc.close();
	}
}