Spark2.3.0 結構化流 進行streaming+kafka的可操作運算元流
阿新 • • 發佈:2018-10-31
工作上正在進行Streaming運算元的研究學習,需要做到在流的基礎上,通過kafka接收資料到 中間若干的計算運算元,再到最後的輸出。開始使用傳統的streaming+kafka,但由於無法返回後續使用的dataset,只能放棄, 後來大牛提出永spark的結構化流處理,於是經過參考文件資料編寫了一個可以進行過程處理的streaming處理流程。(只有與主題相關的程式碼,其餘設計工作的沒貼,看看實現思想即可,哈哈,與各位共勉。。。。。^.^)
kafkaIputStep
rawDataSet=sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topics) .load().withWatermark("timestamp","1 minutes");//對dataset每一分鐘進行一次水位擷取然後進行合併傳送
driver
foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append")
即可對dataset進行有方向輸出
public void run() { if (stepList.size() == 0) return; int processIndex = 0; dataSets = new Dataset[stepList.size()]; try { //處理第一個Input運算元,構建Stream DataSet ModelStep firstModelStep = stepList.get(processIndex); if (firstModelStep.getAncestors().size() != 0) { //throw exception } Dataset<Row> streamSource = ((InputStep) processSteps[processIndex]).compute(sparkSession); dataSets[0]=streamSource; //只留最後一個步驟 for (processIndex = 1; processIndex < stepList.size()-1; processIndex++) { runComputeStep(processIndex);//中間的計算運算元 }//Step 處理結束 //lastStep 進行dataset合併 Dataset<Row> outputDataSet = unionAncesstorDataSet(dataSets, processIndex); //User default trigger; //將結果通過輸出運算元輸出到對應的地方 StreamingQuery handle = outputDataSet.writeStream().foreach((ForeachWriter<Row>)processSteps[processIndex]).outputMode("append").start(); handle.awaitTermination(); }catch (Exception e) { LogUtil.error(GICS_STEP_PROCESS_ERROR + ":" + e.getMessage(),e); return; } finally { sparkSession.stop(); } }
jdbcoutputStep 類要繼承 ForeachWriter<Row>
用於結構化流writeStream()的定向接收方法,只需要對在接收類裡進行重寫對應方法即可對資料進行專門處理,因為做的是通用運算元,所以資料用OGG的特殊格式規定傳輸
private String url; private String username; private String password; private String driver; transient private Gson gson; transient private Connection connection; transient private Statement stmt_batch; @Override public void compute(final Dataset<Row> dataset, SparkSession sparkSession) throws Exception { throw new GICSException(GICS_METHOD_NOT_SUPPORT); } @Override public void initialize(StepInfo stepInfo, List<String> resourcePaths) throws GICSException { Map<String, String> params = stepInfo.getStepParams(); driver= GICSUtils.getStepParams(params,"driver",true); url=GICSUtils.getStepParams(params,"jdbcurl",true); username=GICSUtils.getStepParams(params,"username",true); password=GICSUtils.getStepParams(params,"password",true); } private String constructInsertSQL(StructMessage row) { return null; } private String constructDeleteSQL(StructMessage row){ return null; } private String constructUpdateSQL(StructMessage row){ return null; } //Method used for stream writing @Override public boolean open(long partitionId, long version) { try { connection = DBUtils.getConnection(url, username, password, driver); connection.setAutoCommit(false); stmt_batch = connection.createStatement(); gson = new GsonBuilder() //.setLenient()// json寬鬆 .enableComplexMapKeySerialization()//支援Map的key為複雜物件的形式 .serializeNulls() //智慧null .setPrettyPrinting()// 調教格式 .disableHtmlEscaping() //預設是GSON把HTML 轉義的 .create(); return true; }catch (Exception ex){ LogUtil.error(ex.getMessage(),ex); return false; } } @Override public void process(Row row) { String sql=null; try { //Check field op_type byte[] jsonBytes=row.getAs("value"); String jsonVal= null; StructMessage result = null; try { jsonVal = new String(jsonBytes,UTF8); Type type = new TypeToken<StructMessage>() {}.getType(); result = gson.fromJson(jsonVal, type); } catch (Exception e) { LogUtil.error("訊息格式錯誤",jsonVal,e); } String op_type=result.getOp_type(); if(CommonUtil.testStringEmpty(op_type)) throw new GICSException("No op_type field for record"); if("I".equals(op_type)) { sql=constructInsertSQL(result); } if("D".equals(op_type)) { sql=constructDeleteSQL(result); } if("U".equals(op_type)) { sql=constructUpdateSQL(result); } stmt_batch.addBatch(sql); } catch (Exception e) { LogUtil.error("Executing JDBC update error",sql,e); } } @Override public void close(Throwable errorOrNull) { try { stmt_batch.executeBatch(); connection.commit(); } catch (Exception e) { LogUtil.error("Executing JDBC commit",e); }finally { DBUtils.close(connection); } }