1. 程式人生 > >Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。

eas array 記錄 splay span ack timestamp b- each

需求:

目前kafka的topic上有一批數據,這些數據被分配到9個不同的partition中(就是發布時key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)這些數據的唯一鍵值:int_id+start_time,其中int_id和start_time是topic record中的記錄。這9組數據按照唯一鍵值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.primarykey1)。

偽代碼:

m組成字段包含:

public class MS_PLRULQX {
    
private String key; private String int_id; private String start_time; private long MS_PLRULQX_00; private long MS_PLRULQX_01; public String getPrimaryKey() {    return this.int_id + "_" + this.scan_start_time; } }

完整MS_PLRULQX類定義:

技術分享圖片
import java.io.Serializable;
import org.apache.spark.sql.Row; public class MS_PLRULQX implements Serializable, Comparable<MS_PLRULQX> { private static final long serialVersionUID = -2873721171908282946L; public MS_PLRULQX() { } public MS_PLRULQX(Row row) { this.key = row.getAs("key");
this.int_id = row.getAs("int_id"); this.start_time = row.getAs("start_time"); this.MS_PLRULQX_00 = row.getAs("MS_PLRULQX_00"); this.MS_PLRULQX_01 = row.getAs("MS_PLRULQX_01"); } private String key; private String int_id; private String start_time; private long MS_PLRULQX_00; private long MS_PLRULQX_01; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getInt_id() { return int_id; } public void setInt_id(String int_id) { this.int_id = int_id; } public String getStart_time() { return start_time; } public void setStart_time(String start_time) { this.start_time = start_time; } public long getMS_PLRULQX_00() { return MS_PLRULQX_00; } public void setMS_PLRULQX_00(long MS_PLRULQX_00) { this.MS_PLRULQX_00 = MS_PLRULQX_00; } public long getMS_PLRULQX_01() { return MS_PLRULQX_01; } public void setMS_PLRULQX_01(long MS_PLRULQX_01) { this.MS_PLRULQX_01 = MS_PLRULQX_01; } public String getPrimaryKey() { return this.int_id + "_" + this.scan_start_time; } @Override public int compareTo(MS_PLRULQX other) { // key format:MS_PLRULQX1,MS_PLRULQX2,..MS_PLRULQX9 if (this.getKey().toLowerCase().indexOf("MS_PLRULQX".toLowerCase()) != -1) { NumberUtils numberUtils = new NumberUtils(); String thisKeyStr = this.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), ""); String otherKeyStr = other.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), ""); if (numberUtils.isNumber(thisKeyStr)) { int thisKeyValue = Integer.valueOf(thisKeyStr); int otherKeyValue = Integer.valueOf(otherKeyStr); if (thisKeyValue > otherKeyValue) { return 1; } else if (thisKeyStr == otherKeyStr) { return 0; } else { return -1; } } } return this.key.compareTo(other.key); } }
View Code

MS_PLRULQX在9個topic中各有一份,把它們拼接起來,拼接條件primarykey相同的數據才能一起拼接,拼接後保留實體字段如下:

public class MS_PLRULQX_Combine implements Serializable {
    private String key;
    private String int_id;
    private String start_time;

    private long mr_packetlossrateulqci_1_00;
    private long mr_packetlossrateulqci_1_01;

    private long mr_packetlossrateulqci_2_00;
    private long mr_packetlossrateulqci_2_01;

    private long mr_packetlossrateulqci_3_00;
    private long mr_packetlossrateulqci_3_01;

    private long mr_packetlossrateulqci_4_00;
    private long mr_packetlossrateulqci_4_01;

    private long mr_packetlossrateulqci_5_00;
    private long mr_packetlossrateulqci_5_01;

    private long mr_packetlossrateulqci_6_00;
    private long mr_packetlossrateulqci_6_01;

    private long mr_packetlossrateulqci_7_00;
    private long mr_packetlossrateulqci_7_01;

    private long mr_packetlossrateulqci_8_00;
    private long mr_packetlossrateulqci_8_01;
    
    private long mr_packetlossrateulqci_9_00;
    private long mr_packetlossrateulqci_9_01;
}

完整MS_PLRULQX_Combine 類定義:

技術分享圖片
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class MS_PLRULQX_Combine implements Serializable {
    private static final long serialVersionUID = -944128402186054489L;

    public MS_PLRULQX_Combine() {
    }

    public MS_PLRULQX_Combine(List<MS_PLRULQX> list) {
        int sizeOfList = list.size();
        if (sizeOfList > 9) {
            throw new RuntimeException("the measurement group items‘s length(" + list.size() + ") over than 9");
        }

        if (sizeOfList >= 1) {
            setItem1(list.get(0));
        }
        if (sizeOfList >= 2) {
            setItem2(list.get(1));
        }
        if (sizeOfList >= 3) {
            setItem3(list.get(2));
        }
        if (sizeOfList >= 4) {
            setItem4(list.get(3));
        }
        if (sizeOfList >= 5) {
            setItem5(list.get(4));
        }
        if (sizeOfList >= 6) {
            setItem6(list.get(5));
        }
        if (sizeOfList >= 7) {
            setItem7(list.get(6));
        }
        if (sizeOfList >= 8) {
            setItem8(list.get(7));
        }
        if (sizeOfList >= 9) {
            setItem9(list.get(8));
        }
    }

    private void setItem9(MS_PLRULQX item9) {
        if (item9 != null) {
            this.mr_packetlossrateulqci_9_00 = item9.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_9_01 = item9.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem8(MS_PLRULQX item8) {
        if (item8 != null) {
            this.mr_packetlossrateulqci_8_00 = item8.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_8_01 = item8.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem7(MS_PLRULQX item7) {
        if (item7 != null) {
            this.mr_packetlossrateulqci_7_00 = item7.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_7_01 = item7.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem6(MS_PLRULQX item6) {
        if (item6 != null) {
            this.mr_packetlossrateulqci_6_00 = item6.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_6_01 = item6.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem5(MS_PLRULQX item5) {
        if (item5 != null) {
            this.mr_packetlossrateulqci_5_00 = item5.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_5_01 = item5.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem4(MS_PLRULQX item4) {
        if (item4 != null) {
            this.mr_packetlossrateulqci_4_00 = item4.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_4_01 = item4.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem3(MS_PLRULQX item3) {
        if (item3 != null) {
            this.mr_packetlossrateulqci_3_00 = item3.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_3_01 = item3.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem2(MS_PLRULQX item2) {
        if (item2 != null) {
            this.mr_packetlossrateulqci_2_00 = item2.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_2_01 = item2.getMr_packetlossrateulqci_01();
        }
    }

    private void setItem1(MS_PLRULQX item1) {
        if (item1 != null) {
            this.key = item1.getKey();
            this.int_id = item1.getInt_id();
            this.start_time = item1.getStart_time();

            this.mr_packetlossrateulqci_1_00 = item1.getMr_packetlossrateulqci_00();
            this.mr_packetlossrateulqci_1_01 = item1.getMr_packetlossrateulqci_01();
        }
    }

    private String key;
    private String int_id;
    private String start_time;

    private long mr_packetlossrateulqci_1_00;
    private long mr_packetlossrateulqci_1_01;

    private long mr_packetlossrateulqci_2_00;
    private long mr_packetlossrateulqci_2_01;

    private long mr_packetlossrateulqci_3_00;
    private long mr_packetlossrateulqci_3_01;

    private long mr_packetlossrateulqci_4_00;
    private long mr_packetlossrateulqci_4_01;

    private long mr_packetlossrateulqci_5_00;
    private long mr_packetlossrateulqci_5_01;

    private long mr_packetlossrateulqci_6_00;
    private long mr_packetlossrateulqci_6_01;

    private long mr_packetlossrateulqci_7_00;
    private long mr_packetlossrateulqci_7_01;

    private long mr_packetlossrateulqci_8_00;
    private long mr_packetlossrateulqci_8_01;
    
    private long mr_packetlossrateulqci_9_00;
    private long mr_packetlossrateulqci_9_01;

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getInt_id() {
        return int_id;
    }

    public void setInt_id(String int_id) {
        this.int_id = int_id;
    }

    public String getStart_time() {
        return start_time;
    }

    public void setStart_time(String start_time) {
        this.start_time = start_time;
    }

    public long getMr_packetlossrateulqci_1_00() {
        return mr_packetlossrateulqci_1_00;
    }

    public void setMr_packetlossrateulqci_1_00(long mr_packetlossrateulqci_1_00) {
        this.mr_packetlossrateulqci_1_00 = mr_packetlossrateulqci_1_00;
    }

    public long getMr_packetlossrateulqci_1_01() {
        return mr_packetlossrateulqci_1_01;
    }

    public void setMr_packetlossrateulqci_1_01(long mr_packetlossrateulqci_1_01) {
        this.mr_packetlossrateulqci_1_01 = mr_packetlossrateulqci_1_01;
    }

    public long getMr_packetlossrateulqci_2_00() {
        return mr_packetlossrateulqci_2_00;
    }

    public void setMr_packetlossrateulqci_2_00(long mr_packetlossrateulqci_2_00) {
        this.mr_packetlossrateulqci_2_00 = mr_packetlossrateulqci_2_00;
    }

    public long getMr_packetlossrateulqci_2_01() {
        return mr_packetlossrateulqci_2_01;
    }

    public void setMr_packetlossrateulqci_2_01(long mr_packetlossrateulqci_2_01) {
        this.mr_packetlossrateulqci_2_01 = mr_packetlossrateulqci_2_01;
    }

    public long getMr_packetlossrateulqci_3_00() {
        return mr_packetlossrateulqci_3_00;
    }

    public void setMr_packetlossrateulqci_3_00(long mr_packetlossrateulqci_3_00) {
        this.mr_packetlossrateulqci_3_00 = mr_packetlossrateulqci_3_00;
    }

    public long getMr_packetlossrateulqci_3_01() {
        return mr_packetlossrateulqci_3_01;
    }

    public void setMr_packetlossrateulqci_3_01(long mr_packetlossrateulqci_3_01) {
        this.mr_packetlossrateulqci_3_01 = mr_packetlossrateulqci_3_01;
    }

    public long getMr_packetlossrateulqci_4_00() {
        return mr_packetlossrateulqci_4_00;
    }

    public void setMr_packetlossrateulqci_4_00(long mr_packetlossrateulqci_4_00) {
        this.mr_packetlossrateulqci_4_00 = mr_packetlossrateulqci_4_00;
    }

    public long getMr_packetlossrateulqci_4_01() {
        return mr_packetlossrateulqci_4_01;
    }

    public void setMr_packetlossrateulqci_4_01(long mr_packetlossrateulqci_4_01) {
        this.mr_packetlossrateulqci_4_01 = mr_packetlossrateulqci_4_01;
    }

    public long getMr_packetlossrateulqci_5_00() {
        return mr_packetlossrateulqci_5_00;
    }

    public void setMr_packetlossrateulqci_5_00(long mr_packetlossrateulqci_5_00) {
        this.mr_packetlossrateulqci_5_00 = mr_packetlossrateulqci_5_00;
    }

    public long getMr_packetlossrateulqci_5_01() {
        return mr_packetlossrateulqci_5_01;
    }

    public void setMr_packetlossrateulqci_5_01(long mr_packetlossrateulqci_5_01) {
        this.mr_packetlossrateulqci_5_01 = mr_packetlossrateulqci_5_01;
    }

    public long getMr_packetlossrateulqci_6_00() {
        return mr_packetlossrateulqci_6_00;
    }

    public void setMr_packetlossrateulqci_6_00(long mr_packetlossrateulqci_6_00) {
        this.mr_packetlossrateulqci_6_00 = mr_packetlossrateulqci_6_00;
    }

    public long getMr_packetlossrateulqci_6_01() {
        return mr_packetlossrateulqci_6_01;
    }

    public void setMr_packetlossrateulqci_6_01(long mr_packetlossrateulqci_6_01) {
        this.mr_packetlossrateulqci_6_01 = mr_packetlossrateulqci_6_01;
    }

    public long getMr_packetlossrateulqci_7_00() {
        return mr_packetlossrateulqci_7_00;
    }

    public void setMr_packetlossrateulqci_7_00(long mr_packetlossrateulqci_7_00) {
        this.mr_packetlossrateulqci_7_00 = mr_packetlossrateulqci_7_00;
    }

    public long getMr_packetlossrateulqci_7_01() {
        return mr_packetlossrateulqci_7_01;
    }

    public void setMr_packetlossrateulqci_7_01(long mr_packetlossrateulqci_7_01) {
        this.mr_packetlossrateulqci_7_01 = mr_packetlossrateulqci_7_01;
    }

    public long getMr_packetlossrateulqci_8_00() {
        return mr_packetlossrateulqci_8_00;
    }

    public void setMr_packetlossrateulqci_8_00(long mr_packetlossrateulqci_8_00) {
        this.mr_packetlossrateulqci_8_00 = mr_packetlossrateulqci_8_00;
    }

    public long getMr_packetlossrateulqci_8_01() {
        return mr_packetlossrateulqci_8_01;
    }

    public void setMr_packetlossrateulqci_8_01(long mr_packetlossrateulqci_8_01) {
        this.mr_packetlossrateulqci_8_01 = mr_packetlossrateulqci_8_01;
    }

    public long getMr_packetlossrateulqci_9_00() {
        return mr_packetlossrateulqci_9_00;
    }

    public void setMr_packetlossrateulqci_9_00(long mr_packetlossrateulqci_9_00) {
        this.mr_packetlossrateulqci_9_00 = mr_packetlossrateulqci_9_00;
    }

    public long getMr_packetlossrateulqci_9_01() {
        return mr_packetlossrateulqci_9_01;
    }

    public void setMr_packetlossrateulqci_9_01(long mr_packetlossrateulqci_9_01) {
        this.mr_packetlossrateulqci_9_01 = mr_packetlossrateulqci_9_01;
    }
}
View Code

從topic上獲取數據流:

Dataset<Row> dsParsed = this.sparkSession.readStream().format("kafka").options(this.kafkaOptions).option("subscribe", topicName)
                .option("startingOffsets", "earliest").load();

String waterMarkName = "query" + this.getTopicEncodeName(topicName) + "Agg";
int windowDuration = 2 * 60;
int slideDuration = 60;

try {
    dsParsed.withWatermark("timestamp", "2 hour").createTempView(waterMarkName);
} catch (AnalysisException e1) {
    e1.printStackTrace();
    throw new RuntimeException(e1);
}

String aggSQL = "xxx";
Dataset<Row> dsSQL1 = sparkSession.sql(aggSQL);
dsSQL1.printSchema();

對獲取的數據流按照key進行數據拼接:

正確的處理方式:按照key對數據進行分組,然後對同一組數據按照key進行排序,之後完成數據合並,把合並結果打印到console上。

KeyValueGroupedDataset<String, Row> tuple2Dataset = dsSQL1.groupByKey((MapFunction<Row, String>) row -> {
    String int_id = row.getAs("int_id");
    String start_time = row.getAs("start_time");
    String key = int_id + "_" + start_time;
    return key;
}, Encoders.STRING());

Dataset<MS_PLRULQX_Combine> tuple2FlatMapDataset = tuple2Dataset.flatMapGroups(
        new FlatMapGroupsFunction<String, Row, MS_PLRULQX_Combine>() {
            private static final long serialVersionUID = 1400167811199763836L;

            @Override
            public Iterator<MS_PLRULQX_Combine> call(String key, Iterator<Row> values) throws Exception {
                List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();

                while (values.hasNext()) {
                    Row value = values.next();
                    MS_PLRULQX item = new MS_PLRULQX(value);
                    list.add(item);
                }

                Collections.sort(list, (v1, v2) -> -(v1.compareTo(v2)));

                return Arrays.asList(new MS_PLRULQX_Combine(list)).iterator();
            }
        }, Encoders.bean(MS_PLRULQX_Combine.class));

Dataset<Row> rows = tuple2FlatMapDataset.toDF();
rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();

對獲取的數據流按照key進行數據拼接,另外一種方案遇到的問題:

該方案使用JavaRDD進行分組,排序,合並。

import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;

JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {
    private static final long serialVersionUID = -5203498264050492910L;

    @Override
    public Tuple2<String, MS_PLRULQX> call(Row row) throws Exception {
        MS_PLRULQX value = new MS_PLRULQX(row);

        return new Tuple2<String, MS_PLRULQX>(value.getPrimaryKey(), value);
    }
});

JavaPairRDD<String, Iterable<MS_PLRULQX>> group = pairs.groupByKey();

JavaPairRDD<String, MS_PLRULQX_Combine> keyVsValuePairRDD = group.mapToPair(tuple -> {
    List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>();
    Iterator<MS_PLRULQX> it = tuple._2.iterator();
    while (it.hasNext()) {
        MS_PLRULQX score = it.next();
        list.add(score);
    }

    Collections.sort(list, (v1, v2) -> -(v1.compareTo(v2)));

    return new Tuple2<String, MS_PLRULQX_Combine>(tuple._1, new MS_PLRULQX_Combine(list));
});

JavaRDD<MS_PLRULQX_Combine> javaRDD = keyVsValuePairRDD
        .map(new Function<Tuple2<String, MS_PLRULQX_Combine>, MS_PLRULQX_Combine>() {
            private static final long serialVersionUID = -3031600976005716506L;

            @Override
            public MS_PLRULQX_Combine call(Tuple2<String, MS_PLRULQX_Combine> v1) throws Exception {
                return v1._2;
            }
        });

Dataset<Row> rows = this.sparkSession.createDataFrame(javaRDD, MS_PLRULQX_Combine.class);
rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();
sparkSession.streams().awaitAnyTermination();

拋出錯誤的位置就是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
at com.xx.xx.streaming.drivers.XXXDriver.run(xxxxDriver.java:85) 錯誤代碼執行“JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {”該行。
該錯誤代碼,看起來像是“執行了.toJavaRDD()和執行dsSQL1.show/dsSQL1.collection.foreach(println(_))一樣。”

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。