Spark Streaming+kafka訂單實時統計實現
阿新 • • 發佈:2019-01-10
package com.lm.sparkLearning.orderexmaple; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.AtomicDouble; import com.lm.sparkLearning.utils.ConstantUtils; import com.lm.sparkLearning.utils.SparkUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; /** * spark streaming統計訂單量和訂單總值 * * @author liangming.deng * */ public class OrderSparkStreaming { private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class); private static AtomicLong orderCount = new AtomicLong(0); private static AtomicDouble totalPrice = new AtomicDouble(0); public static void main(String[] args) { // Create context with a 2 seconds batch interval JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount", "local[2]", null, Durations.seconds(20)); Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(","))); Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE); kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); // json與物件對映物件 final ObjectMapper mapper = new ObjectMapper(); JavaDStream<Order> orderDStream = orderMsgStream .map(new Function<Tuple2<String, String>, Order>() { /** * */ private static final long serialVersionUID = 1L; @Override public Order call(Tuple2<String, String> t2) throws Exception { Order order = mapper.readValue(t2._2, Order.class); return order; } }).cache(); // 對DStream中的每一個RDD進行操作 orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<Order> orderJavaRDD) throws Exception { long count = orderJavaRDD.count(); if (count > 0) { // 累加訂單總數 orderCount.addAndGet(count); // 對RDD中的每一個訂單,首先進行一次Map操作,產生一個包含了每筆訂單的價格的新的RDD // 然後對新的RDD進行一次Reduce操作,計算出這個RDD中所有訂單的價格眾合 Float sumPrice = orderJavaRDD.map(new Function<Order, Float>() { /** * */ private static final long serialVersionUID = 1L; @Override public Float call(Order order) throws Exception { return order.getPrice(); } }).reduce(new Function2<Float, Float, Float>() { /** * */ private static final long serialVersionUID = 1L; @Override public Float call(Float a, Float b) throws Exception { return a + b; } }); // 然後把本次RDD中所有訂單的價格總和累加到之前所有訂單的價格總和中。 totalPrice.getAndAdd(sumPrice); // 資料訂單總數和價格總和,生產環境中可以寫入資料庫 logger.warn("-------Total order count : " + orderCount.get() + " with total price : " + totalPrice.get()); } } }); orderDStream.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } }