1. 程式人生 > >Spark2.3.0 結構化流 進行streaming+kafka的可操作運算元流

Spark2.3.0 結構化流 進行streaming+kafka的可操作運算元流

工作上正在進行Streaming運算元的研究學習,需要做到在流的基礎上,通過kafka接收資料到 中間若干的計算運算元,再到最後的輸出。開始使用傳統的streaming+kafka,但由於無法返回後續使用的dataset,只能放棄, 後來大牛提出永spark的結構化流處理,於是經過參考文件資料編寫了一個可以進行過程處理的streaming處理流程。(只有與主題相關的程式碼,其餘設計工作的沒貼,看看實現思想即可,哈哈,與各位共勉。。。。。^.^)

sprak2.3.0官網流媒體程式設計文件

kafkaIputStep

 rawDataSet=sparkSession
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", brokers)
                    .option("subscribe", topics)
                    .load().withWatermark("timestamp","1 minutes");//對dataset每一分鐘進行一次水位擷取然後進行合併傳送

driver   

foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append")
即可對dataset進行有方向輸出
  public void run() {
        if (stepList.size() == 0)
            return;

        int processIndex = 0;
        dataSets = new Dataset[stepList.size()];

        try {
            //處理第一個Input運算元,構建Stream DataSet
            ModelStep firstModelStep = stepList.get(processIndex);
            if (firstModelStep.getAncestors().size() != 0) {
                //throw exception
            }

            Dataset<Row> streamSource = ((InputStep) processSteps[processIndex]).compute(sparkSession);
            dataSets[0]=streamSource;

            //只留最後一個步驟
            for (processIndex = 1; processIndex < stepList.size()-1; processIndex++) {
                runComputeStep(processIndex);//中間的計算運算元
            }//Step 處理結束

            //lastStep  進行dataset合併
            Dataset<Row> outputDataSet = unionAncesstorDataSet(dataSets, processIndex);

            //User default trigger; //將結果通過輸出運算元輸出到對應的地方
            StreamingQuery handle = outputDataSet.writeStream().foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append").start();
            handle.awaitTermination();
        }catch (Exception e) {
            LogUtil.error(GICS_STEP_PROCESS_ERROR + ":" + e.getMessage(),e);
            return;
        } finally {
            sparkSession.stop();
        }
    }

jdbcoutputStep  類要繼承 ForeachWriter<Row> 

用於結構化流writeStream()的定向接收方法,只需要對在接收類裡進行重寫對應方法即可對資料進行專門處理,因為做的是通用運算元,所以資料用OGG的特殊格式規定傳輸

 private String url;
    private String username;
    private String password;
    private String driver;
    transient private Gson gson;
    transient private Connection connection;
    transient private Statement stmt_batch;

    @Override
    public void compute(final Dataset<Row> dataset, SparkSession sparkSession) throws Exception {
        throw  new GICSException(GICS_METHOD_NOT_SUPPORT);
    }

    @Override
    public void initialize(StepInfo stepInfo, List<String> resourcePaths) throws GICSException {
        Map<String, String> params = stepInfo.getStepParams();

        driver= GICSUtils.getStepParams(params,"driver",true);
        url=GICSUtils.getStepParams(params,"jdbcurl",true);
        username=GICSUtils.getStepParams(params,"username",true);
        password=GICSUtils.getStepParams(params,"password",true);

    }

    private String constructInsertSQL(StructMessage row) {
        return null;
    }

    private String constructDeleteSQL(StructMessage row){
        return null;
    }
    private String constructUpdateSQL(StructMessage row){
        return null;
    }

    //Method used for stream writing
    @Override
    public boolean open(long partitionId, long version) {
        try {
            connection = DBUtils.getConnection(url, username, password, driver);
            connection.setAutoCommit(false);
            stmt_batch = connection.createStatement();
            gson = new GsonBuilder()              //.setLenient()// json寬鬆
                    .enableComplexMapKeySerialization()//支援Map的key為複雜物件的形式
                    .serializeNulls() //智慧null
                    .setPrettyPrinting()// 調教格式
                    .disableHtmlEscaping() //預設是GSON把HTML 轉義的
                    .create();

            return true;
        }catch (Exception ex){
            LogUtil.error(ex.getMessage(),ex);
            return false;
        }
    }

    @Override
    public void process(Row row) {

        String sql=null;
        try {
            //Check field op_type
            byte[] jsonBytes=row.getAs("value");
            String jsonVal= null;
            StructMessage result = null;
            try {
                jsonVal = new String(jsonBytes,UTF8);
                Type type = new TypeToken<StructMessage>() {}.getType();
                result = gson.fromJson(jsonVal, type);
            } catch (Exception e) {
                LogUtil.error("訊息格式錯誤",jsonVal,e);
            }

            String op_type=result.getOp_type();
            if(CommonUtil.testStringEmpty(op_type))
                throw new GICSException("No op_type field for record");

            if("I".equals(op_type)) {
                sql=constructInsertSQL(result);
            }

            if("D".equals(op_type)) {
                sql=constructDeleteSQL(result);
            }

            if("U".equals(op_type)) {
                sql=constructUpdateSQL(result);
            }

            stmt_batch.addBatch(sql);

        } catch (Exception e) {
            LogUtil.error("Executing JDBC update error",sql,e);
        }
    }

    @Override
    public void close(Throwable errorOrNull) {
        try {
            stmt_batch.executeBatch();
            connection.commit();
        } catch (Exception e) {
            LogUtil.error("Executing JDBC commit",e);
        }finally {
            DBUtils.close(connection);
        }
    }