1. 程式人生 > >java實現kafka整合spark streaming完成wordCount,updateStateByKey完成實時狀態更新

java實現kafka整合spark streaming完成wordCount,updateStateByKey完成實時狀態更新

引入依賴

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.2.0</version> </dependency>

1,receiveSpark,基於zookeeper儲存offset等元資料

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.regex.Pattern; /** * java實現spark streamingkafka整合並統計各單詞數 */ public class KafkaReceiverSpark { public static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); Map<String, Integer> map = new HashMap<String, Integer>(); //主題和對應的分割槽 map.put("kafka_streaming_topic", 1); //zookeeper地址及埠 String zk = "hadoop000:2181"; //group id String groupId = "test"; //獲取到的RDD資料集 JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(ssc, zk, groupId, map); //每行以空格分割成字元陣列 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception { return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator(); } }); //字串=》(字串,1 JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //統計各單詞數 JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } }

2,directSpark,不通過zookeeper,直接通過kafka

import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;
import java.util.regex.Pattern;

public class KafkaDirectSpark {
    public static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws InterruptedException {

        SparkConf sparkConf = new SparkConf();
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));

        //主題集合
        Set<String> topicSet = new HashSet<String>();
        topicSet.add("kafka_streaming_topic");
        //kafka引數(不通過zk        HashMap<String, String> kafkaParam = new HashMap<String, String>();
        kafkaParam.put("metadata.broker.list", "hadoop000:9092");

        JavaPairInputDStream<String, String> lines =
                KafkaUtils.createDirectStream(ssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParam,
                        topicSet);

        //每行以空格分割成字元陣列
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
            @Override
            public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator();
            }
        });
        //字串=》(字串,1        JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //統計各單詞數
        JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        wordCounts.print();

        ssc.start();
        ssc.awaitTermination();
    }
}
3,updateStateByKey實時狀態更新,job開始到現在的實時情況彙總

import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;
import java.util.regex.Pattern;

/**
 * java實現spark streaming整合kafka作為資料來源統計字數,實時更新狀態
 */
public class KafkaUpdateByKey {
    public static final Pattern SPACE = Pattern.compile(" ");


    public static void main(String[] args) throws InterruptedException {

        SparkConf sparkConf = new SparkConf();
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        ssc.checkpoint("/home/hadoop/data/checkpoint");

        //主題集合
        Set<String> topicSet = new HashSet<String>();
        topicSet.add("kafka_streaming_topic");
        //kafka引數(不通過zk        HashMap<String, String> kafkaParam = new HashMap<String, String>();
        kafkaParam.put("metadata.broker.list", "hadoop000:9092");

        JavaPairInputDStream<String, String> lines =
                KafkaUtils.createDirectStream(ssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParam,
                        topicSet);

        //每行以空格分割成字元陣列
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
            @Override
            public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator();
            }
        });
        //字串=》(字串,1        JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //統計各單詞數
        JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).updateStateByKey(new Function2<List<Integer>, org.apache.spark.api.java.Optional<Integer>, org.apache.spark.api.java.Optional<Integer>>() {
            @Override
            public org.apache.spark.api.java.Optional<Integer> call(List<Integer> integers, org.apache.spark.api.java.Optional<Integer> integerOptional) throws Exception {
                Integer updateValue = 0;
                if (integerOptional.isPresent()){
                    updateValue = integerOptional.get();
                }
                for (Integer integer : integers) {
                    updateValue += integer;
                }
                return Optional.of(updateValue);
            }
        });

        wordCounts.print();

        ssc.start();
        ssc.awaitTermination();
    }
}

4,測試

打包成jar包,在spark上執行,--packages功能是在網上下載指定的依賴包,若下載了,可以用--jar指定

bin/spark-submit --name KafkaUpdateByKey --master local[2] --class KafkaUpdateByKey --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/hadoop/app/myjars/kafka-spark-updateState1.0-SNAPSHOT.jar