1. 程式人生 > >Spark Streaming+kafka訂單實時統計實現

Spark Streaming+kafka訂單實時統計實現

package com.lm.sparkLearning.orderexmaple;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;
import com.lm.sparkLearning.utils.ConstantUtils;
import com.lm.sparkLearning.utils.SparkUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

/**
 * spark streaming統計訂單量和訂單總值
 * 
 * @author liangming.deng
 *
 */
public class OrderSparkStreaming {
	private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
	private static AtomicLong orderCount = new AtomicLong(0);
	private static AtomicDouble totalPrice = new AtomicDouble(0);

	public static void main(String[] args) {

		// Create context with a 2 seconds batch interval
		JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
				"local[2]", null, Durations.seconds(20));

		Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
		Map<String, String> kafkaParams = new HashMap<>();
		kafkaParams.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);
		kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);

		// Create direct kafka stream with brokers and topics
		JavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc,
				String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
				topicsSet);

		// json與物件對映物件
		final ObjectMapper mapper = new ObjectMapper();
		JavaDStream<Order> orderDStream = orderMsgStream
				.map(new Function<Tuple2<String, String>, Order>() {
					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					@Override
					public Order call(Tuple2<String, String> t2) throws Exception {
						Order order = mapper.readValue(t2._2, Order.class);
						return order;
					}
				}).cache();

		// 對DStream中的每一個RDD進行操作
		orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public void call(JavaRDD<Order> orderJavaRDD) throws Exception {
				long count = orderJavaRDD.count();
				if (count > 0) {
					// 累加訂單總數
					orderCount.addAndGet(count);
					// 對RDD中的每一個訂單,首先進行一次Map操作,產生一個包含了每筆訂單的價格的新的RDD
					// 然後對新的RDD進行一次Reduce操作,計算出這個RDD中所有訂單的價格眾合
					Float sumPrice = orderJavaRDD.map(new Function<Order, Float>() {
						/**
						 * 
						 */
						private static final long serialVersionUID = 1L;

						@Override
						public Float call(Order order) throws Exception {
							return order.getPrice();
						}
					}).reduce(new Function2<Float, Float, Float>() {
						/**
						 * 
						 */
						private static final long serialVersionUID = 1L;

						@Override
						public Float call(Float a, Float b) throws Exception {
							return a + b;
						}
					});
					// 然後把本次RDD中所有訂單的價格總和累加到之前所有訂單的價格總和中。
					totalPrice.getAndAdd(sumPrice);

					// 資料訂單總數和價格總和,生產環境中可以寫入資料庫
					logger.warn("-------Total order count : " + orderCount.get()
							+ " with total price : " + totalPrice.get());
				}
			}
		});
		orderDStream.print();

		jssc.start(); // Start the computation
		jssc.awaitTermination(); // Wait for the computation to terminate
	}
}