1. 程式人生 > >《深入理解Spark》之 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo

《深入理解Spark》之 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo

最近在做關於spark Streaming + spark sql 結合處理結構化的資料的業務,下面是一個小栗子,有需要的拿走!

​
package com.unistack.tamboo.compute.process.impl;

import com.alibaba.fastjson.JSONArray;
import com.google.common.collect.Maps;
import com.unistack.tamboo.compute.process.StreamProcess;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;

/**
 * @author  hero.li
 * spark sql處理流資料
 */
public class SqlProcess implements StreamProcess{
    private static Logger LOGGER = LoggerFactory.getLogger(SqlProcess.class);

    private Properties outputInfo;
    private String toTopic;

    /**
     * {"datasources":[{"password":"welcome1","port":"3308","ip":"192.168.1.192","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"},
     * {"password":"welcome1","port":"3308","ip":"192.168.1.191","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"}]
     * ,"sql":"select * from ....","windowLen":"時間範圍,2秒的倍數","windowSlide":"滾動間隔,2的倍數"}
     */

    public SqlProcess(Properties outputInfo,String toTopic){
        this.outputInfo = outputInfo;
        this.toTopic = toTopic;
    }


    @Override
    public void logic(JavaRDD<ConsumerRecord<String, String>> rdd) {
        rdd.foreachPartition(itr->{
            while(itr.hasNext()){
                String recored = itr.next().value();



            }
        });
    }


    public static void main(String[] args) throws InterruptedException  {
        try{
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e){
            e.printStackTrace();
        }

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        SparkSession spark = SparkSession.builder().appName("test_kane").getOrCreate();


        Map<String,String> map = Maps.newHashMap();
//        map.put("url", "jdbc:mysql://x.x.x.x:3309/test?user=root&password=welcome1&characterEncoding=UTF8");

        map.put("url","jdbc:mysql://x.x.x.x:3309/test?characterEncoding=UTF8");
        map.put("user","root");
        map.put("password", "welcome1");
        map.put("dbtable", "t2");
        Dataset<Row> hiveJob = spark.read().format("jdbc").options(map).load();
        hiveJob.createOrReplaceTempView("t2");

        System.setProperty("java.security.auth.login.config","/Users/frank/Desktop/shell/lyh.conf");
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "x.x.x.x:9999");
        kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("group.id",String.valueOf(System.currentTimeMillis()));
        kafkaParams.put("auto.offset.reset","earliest");
        kafkaParams.put("enable.auto.commit",true);
        kafkaParams.put("sasl.mechanism","PLAIN");
        kafkaParams.put("security.protocol","SASL_PLAINTEXT");


        Collection<String> topics = Arrays.asList("xxTopic");
        JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));

        stream.flatMap(r->Arrays.asList(new String(r.value())).iterator())
              .foreachRDD((JavaRDD<String> rdd) ->{
                    if(rdd.count() > 0){
                        Dataset<Row> df = spark.read().json(spark.createDataset(rdd.rdd(),Encoders.STRING()));
                        df.createOrReplaceTempView("streamData");
                        df.cache();

                        try{
                            Dataset<Row>  aggregators = spark.sql("select a.*,b.* from streamData a  join  t2 b on  a.id = b.id");
                            String[] colsName = aggregators.columns();
                            Iterator<Row> itr = aggregators.toLocalIterator();
                            while(itr.hasNext()){
                                Row row = itr.next();
                                for(int i=0;i<colsName.length;i++){
                                    String cn = colsName[i];
                                    Object as = row.getAs(cn);
                                    System.out.print(cn+"="+as+",   ");
                                }
                                System.out.println();
                            }
                        }catch(Exception e){
                            System.out.println("::::::::::::::::::::::::::::::::::::::::err::::::::::::::::::::::::::::::::::::::::::::");
                            e.printStackTrace();
                        }
                    }
              });

        jssc.start();
        jssc.awaitTermination();
    }
}

​