對Flink流處理模型的抽象

我們開發的一個基於大資料平臺的資料倉庫,選擇了Flink作為資料處理的底層框架。我們主要看重於它在流處理的低延遲性,訊息傳遞保證的extractly once特性;它為流處理和批處理提供了相對統一的API,支援Java、Scala和Python等主流開發語言,同時還較好地支援了SQL。Flink搭建了非常棒的基礎設施,例如它可以和ZooKeeper、YARN整合起來,保證處理功能的高可用性與水平擴充套件的叢集能力,同時還提供了相對開放的擴充套件能力,使得我們可以較容易地在已有功能基礎之上實現定製開發。
我們基於Flink開發了自己的底層框架“海納(haina)”,這是取“海納百川有容乃大”之意。haina以庫的形式為我們的產品提供了資料採集、治理和共享等功能,是整個平臺最核心的資料處理基礎設施,邏輯架構如下圖所示:

抽象的流處理模型
由於我們的產品對資料的處理主要包括三個方面:採集、治理與共享,這之間流轉的皆為採集器從上游系統採集獲得的資料。我們結合Flink的架構,並參考了Apex、Storm、Flume等其他流處理框架,抽象出自己的流處理模型。這個模型中各個概念之間的關係與層次如下圖所示:

在這個流處理模型中,一個Job對應一個實際的物理環境(Environment)。多數情況下,為了保證Job執行的獨立性,可以為每個Job分配一個單獨的執行節點,提供專有的執行資源。每個Job核心的邏輯概念是Flow,它由Source、Processor和Sink組成,它們都是Flink的Operator,其中Processor對應於Flink的Transformation Operator。在實時流處理中,一個典型的Processor其實就是我們常用的map、filter或flatMap函式。例如:
public class ArchiveJsonMetaProcessor implements MapFunction<String, String> { private String target; public ArchiveJsonMetaProcessor(String target) { this.target = target; } @Override public String map(String msg) { JSONObject message = JSONObject.parseObject(msg); String messageId = message.getString("messageId"); String originTimestamp = message.getString("originalTimestamp"); String archivedTimestamp = DateUtil.transformTime(new Date(), DateUtil.YYYYMMDDHHMMSSS); ArchivedMetaData archivedMetaData = ArchivedMetaData.fillArchivedMetaData(messageId, target, originTimestamp, archivedTimestamp); return JSON.toJSONString(archivedMetaData); } }
Processor的設計原則
我們之所以要抽象出Processor概念,是因為我們遵循了管道-過濾器模式,希望每個operator都是一個最小的可以重用的邏輯單元。管道就是我們定義的Flow,Source是管道的上游入口,Sink是管道的下游出口,每個細粒度的Processor就是每個負責處理資料流的過濾器。我們的底層框架haina實現了這些邏輯單元,至於它們該如何組裝,則交由框架的使用者。正因為此,我們制定了Processor的設計原則,其根本思想就是保持Processor的細粒度,嚴格分離與業務無關和有關的Processor,保證Processor在組成Flow時儘可能被重用。
如下為設計Processor應該遵循的原則:
- 業務上對資料流的處理可以拆分為多個階段,每個Processor對應一個階段。
- 儘可能把有副作用的和無副作用的職責分離到不同的Processor。
- 把需要訪問外部資源的職責儘可能分離到不同的Processor。
- 儘可能確保Processor的程式碼短小,這樣可以保證將Processor真正的職責轉移到別的類,例如物件的轉換邏輯。轉移出去的類與Flink平臺無關,有利於編寫和執行單元測試。
- 每個Processor的上游與下游,即
MapFunction
或其他介面對應的型別引數T
與O
,應儘量採用平臺定義的模型物件,而非如String之類的基礎型別。這樣就能保證呼叫者對Processor進行組裝時,通過編譯就能檢查到不必要的組裝錯誤。 - 每個Processor的命名採用動賓短語,並以Processor作為類的字尾。例如將一條航班資訊拆分成多個機位資訊,命名為
SplitFlightToStandsProcessor
。好的命名可以幫助我們更容易發現它,進而促進呼叫者對它的重用。例如在IntelliJ中,就可以直接以*Processor
來搜尋所有的Processor,然後根據它的命名就能推測出這個Processor到底是做什麼的。 - 每個Processor需要的外部資料,都通過Processor的建構函式來傳遞。
- 每個Processor都應該實現Flink提供的transfomation介面。
- 第一個Processor接收的是
String
型別的訊息,則要求必須對傳入的訊息進行驗證。用於驗證的Processor應該實現FilterFunction<String>
。 - 應保證每個Processor都不要丟擲出人意料不可控制的異常,否則可能導致執行Job時出現錯誤從而導致整個Application停止或者重啟。
自定義Source與Sink
針對Source與Sink,除了重用Flink本身提供的source與sink之外,我們還開發了大量的滿足自己需求的自定義Source與Sink。例如,我們為獨立開發的ESB系統提供了Source,為關係型資料庫和WebService提供了具有輪詢能力的Source,為ElasticSearch開發了滿足批量新增資料的Sink,同時還實現了具有回撥能力的自定義Sink。如下就是針對Oracle編寫的具有輪詢能力的自定義Source:
public class OracleClobSource extends RichSourceFunction<String> { private static final Logger log = LoggerFactory.getLogger(OracleClobSource.class); private JdbcGateway jdbcGateway; private Long period; private DSLContext executor; public OracleClobSource(JdbcGateway jdbcGateway, Long period) { this.jdbcGateway = jdbcGateway; this.period = period; } @Override public void open(Configuration parameters) throws Exception { executor = jdbcGateway.executor(); } @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { Result<Record> results = executor.select().from(table(tableName)).where(rownum().le(10)).fetch(); for (Record record : results) { String msgcontext = record.get(field(filedName), String.class); ctx.collect(msgcontext); Long id = record.get(field("ID"), Long.class); executor.delete(table(tableName)).where(field("ID").equal(id)).execute(); } Thread.sleep(period); } } @Override public void cancel() { executor.close(); } }
為了便於使用,我們還為這些內建與定製source和sink分別定義了靜態工廠。
Flow與Job
Flow相當於是傳遞DataStream的拓撲圖,由Source、Processor和Sink組成。我們之所以引入這個概念,一方面是為Job提供更粗粒度的重用單元,另一方面也承擔了封裝業務流程的主要職責。例如,一個航班資料從上游系統進入我們的大資料平臺,需要進行多次資料格式的轉換、驗證與治理,我們就可以定義一個FlightFlow來完成這些細小職責的組裝:
public class FlightFlow extends AbstractFlow { public FlightFlow(Environment env, Config config) { super(env, config); } private static final String FLIGHT = "flight"; @Override public void run() { DataStream<String> source = env.addSource(sourcesFactory.createSslKafkaSource("INBOUND")); SingleOutputStreamOperator flightStream = source .filter(new FilterFlightProcessor()) .map(new TransformXmlToJsonProcessor()) .map(new TransformJsonStringToJsonObjectProcessor()) .map(new TransformJsonMessageToFlightsProcessor()) .map(new FlattenDomainModelProcessor()); flightStream .map(new DeletedDiscusedRecordsProcessor(DETAIL, FLIGHT)) .map(new DeleteFullModelProcessor(DETAIL, FLIGHT)) .map(new InsertFullModelProcessor(DETAIL, FLIGHT)) .map(new UpdateChangedRecordsProcessor(DETAIL, FLIGHT)) .map(new TransformModelToEventProcessor()) .map(new TransformObjectToJsonProcessor()) .addSink(sinksFactory.createSslKafkaSink("trigger")); } }
Job與Flow之間的關係是一對多關係。這種關係可以根據資源情況與業務需求的不同隨時調整。因而我們引入配置方式來保證這種靈活性。Job是一個容器,通過它可以傳入Flink Job的執行環境,然後在配置檔案中配置Job與Flow之間的關係。如下配置檔案就是為資料探針任務配置的Job,它包含了三個完全不同的Flow,運行同一個Flink叢集上:
<job name="AirportToKafkaJob"> <flow name="FlightToKafkaFlow" flowClassName="haina.airprobe.flow.FlightToKafkaFlow"/> <flow name="PassengerToKafkaFlow" flowClassName="haina.airprobe.flow.PassengerToKafkaFlow"/> <flow name="AcdmToKafkaFlow" flowClassName="haina.airprobe.flow.AcdmToKafkaFlow"/> </job>
核心與外部應用
haina在針對flink的流處理模型進行了抽象和擴充套件開發後,就形成了圍繞flink為核心的邏輯架構。如下圖所示:

flink是haina的核心,提供了基本的運算、執行和部署的能力,而haina則根據我們產品的需求對flink進行擴充套件,並遵循前面提及的抽象流處理模型提供各個可以被重用的細粒度組成單元,並實現了通用的組成邏輯,簡化了開發工作量。圖中所示的air-probe、air-jobs等模組滿足不同的資料處理功能。這些模組都是建立在haina之上薄薄的一層應用,只需要建立滿足業務流程處理的Flow,並配置Flow與Job的關係即可。
同時,我們還自行開發了一套配置框架,可以簡化整個大資料平臺要使用到的外部資源,包括YARN、Flink、Kafka、ElasticSearch、RabbitMQ、ZooKeeper等,並在AbstractJob中完成了Flink執行環境與具體Job之間的繫結以及對外部環境的使用。