1. 程式人生 > >懶松鼠Flink-Boot(Flink+Spring):一款將Flink與Spring生態完美融合的腳手架工程

懶松鼠Flink-Boot(Flink+Spring):一款將Flink與Spring生態完美融合的腳手架工程

[TOC] 還在為開發Flink流處理應用程式時無法像開發Spring Boot程式那麼優雅的分層以及裝配Bean而煩惱嗎? ## 你可能面臨如下苦惱: 1. 開發的Flink流處理應用程式,業務邏輯全部寫在Flink的操作符中,程式碼無法服用,無法分層 2. 要是有一天它可以像開發Spring Boot程式那樣可以優雅的分層,優雅的裝配Bean,不需要自己new物件好了 3. 可以使用各種Spring生態的框架,一些瑣碎的邏輯不再硬編碼到程式碼中。 GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,再也不需要手動在Java類中建立臃腫的Java物件,簡直是開發大型流處理應用程式的必不可少的工具。地址:[懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設計與實踐原理》作者開發。](https://github.com/intsmaze/flink-boot) ![](https://img2020.cnblogs.com/blog/758427/202012/758427-20201203213155435-1655368684.png) ### 介面快取 **你的現狀** ``` static Map cache=new HashMap(); public String findUUID(FlowData flowData) { String value=cache.get(flowData.getSubTestItem()); if(value==null) { String uuid=userMapper.findUUID(flowData); cache.put(uuid,value); return uuid; } return value; } ``` **你想要的是這樣** ``` @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); } ``` ### 重試機制 **你的現狀** ```java public void insertFlow(FlowData flowData) { try{ userMapper.insertFlow(flowData); }Cache(Exception e) { Thread.sleep(10000); userMapper.insertFlow(flowData); } } ``` **你想要的是這樣** ```java @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); } ``` ### Bean校驗 **你的現狀** ``` if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) { return null; } if(flowData.getBillNumber()==null) { return null; } ``` **你想要的是這樣** ``` Map validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } public class FlowData { private String uuid; //宣告該引數的校驗規則字串長度必須在7到20之間 @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間") private String subTestItem; //宣告該引數的校驗規則字串不能為空 @NotBlank(message = "billNumber不能為空") private String billNumber; } ``` ### 等等...... GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,再也不需要手動在Java類中建立臃腫的Java物件,簡直是開發大型流處理應用程式的必不可少的工具。[懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設計與實踐原理》作者開發。](https://github.com/intsmaze/flink-boot) ## 它為流計算開發工程師解決了 1. 將所有物件的建立和依賴關係的維護工作都交給Spring容器的管理,降低了物件之間的耦合性,使程式碼變得更簡潔,拒絕臃腫。 2. 消除在工程中對單例的過多使用。 3. 宣告式事務處理,通過配置就可以完成對事物的管理,而無須手動程式設計。 4. 宣告式註解,可以通過註解定義方法的緩衝功能,無序手動程式設計。 5. 註解式定義Bean物件的校驗規則,通過註解即可完成對物件的引數校驗,無序手動程式設計。 6. 整合MyBatis ORM框架,註解式維護例項物件的依賴關係。 7. 解耦Flink SQL,SQL語句剝離出JAVA檔案,以簡潔的模式表現在XML檔案中。 8. 封裝Flink API,僅提供業務方法去編寫,Spring生態融合全部搞定,無需操心。 ### 有了它你的程式碼就像這樣子: ``` /** * github地址: https://github.com/intsmaze * 部落格地址:https://www.cnblogs.com/intsmaze/ * 出版書籍《深入理解Flink核心設計與實踐原理》 隨書程式碼 * RichFlatMapFunction為Flink框架的一個通用型操作符(運算元),開發者一般在該運算元的flatMap方法中編寫業務邏輯 * @auther: intsmaze(劉洋) * @date: 2020/10/15 18:33 */ public class MybatisFlatMap extends RichFlatMapFunction { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); protected ApplicationContext beanFactory; //mybatis的Service物件,操作資料庫的user表 private UserService userService; @Override public void open(Configuration parameters) { ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); userService = beanFactory.getBean(UserServiceImpl.class); } @Override public void flatMap(String value, Collector out){ FlowData flowData = gson.fromJson(message, new TypeToken() { }.getType()); Map validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } //資料庫查詢,遮蔽掉獲取資料庫連線,是否資料庫連線,事務的宣告等 String flowUUID = userService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); //資料庫插入,遮蔽掉獲取資料庫連線,是否資料庫連線,事務的宣告等 userService.insertFlow(flowData); } out.collect(gson.toJson(flowData)); } } public interface UserService { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //通過註解例項化Bean物件。 @Service //通過註解宣告進行事務管理 @Transactional //通過註解宣告方法具有異常重試機制 @EnableRetry public class UserServiceImpl implements UserService { //通過註解進行依賴注入 @Resource private UserMapper userMapper; @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") @Override public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); } //通過註解宣告該方法異常後的重試機制,無需手動程式設計 @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); } } public interface UserMapper { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //註解式宣告引數校驗規則 public class FlowData { private String uuid; //宣告該引數的校驗規則字串長度必須在7到20之間 @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間") private String subTestItem; //宣告該引數的校驗規則字串不能為空 @NotBlank(message = "billNumber不能為空") private String billNumber; @NotBlank(message = "barcode不能為空") private String barcode; private String flowName; private String flowStatus; ...... } ``` ## 倉庫地址:[懶松鼠Flink-Boot](https://github.com/intsmaze/flink-boot) **倉庫地址:[懶松鼠Flink-Boot](https://github.com/intsmaze/flink-boot)腳手架由《深入理解Flink核心設計與實踐原理》作者開發。** ![](https://img2020.cnblogs.com/blog/758427/202012/758427-20201203213448222-135360969.png) 1. 該腳手架遮蔽掉組裝Flink API細節,讓跨界變得簡單,使得開發者能以傳統Java WEB模式的開發方式開發出具備分散式計算能力的流處理程式。 2. 開發者完全不需要理解分散式計算的理論知識和Flink框架的細節,便可以快速編寫業務程式碼實現。 3. 為了進一步提升開發者使用該腳手架開發大型專案的敏捷的程度,該腳手架工程預設整合Spring框架進行Bean管理,同時將微服務以及WEB開發領域中經常用到的框架整合進來,進一步提升開發速度。 4. 除此之外針對目前流行的各大Java框架,該Flink腳手架工程也進行了整合,加快開發人員的編碼速度,比如: * 整合Jbcp-template對Mysql,Oracle,SQLServer等關係型資料庫的快速訪問。 * 整合Hibernate Validator框架進行引數校驗。 * 整合Spring Retry框架進行重試標誌。 * 整合Mybatis框架,提高對關係型資料庫增,刪,改,查的開發速度。 * 整合Spring Cache框架,實現註解式定義方法快取。 * ...... ### 1. 組織結構 ``` lua Flink-Boot ├── Flink-Base -- Flink-Boot工程基礎模組 ├── Flink-Client -- Flink-Boot 客戶端模組 ├── flink-annotation -- 註解生效模組 ├── flink-mybatis -- mybatis orm模組 ├── flink-retry -- 註解重試機制模式 ├── flink-validate -- 校驗模組 ├── flink-sql -- Flink SQL解耦至XML配置模組 ├── flink-cache-annotation -- 介面緩衝模組 ├── flink-junit -- 單元測試模組 ├── flink-apollo -- 阿波羅配置客戶端模組 ``` ### 2. 技術選項和整合情況 技術 | 名稱 | 狀態 | ----|------|---- Spring Framework | 容器 | 已整合 Spring 基於XML方式配置Bean | 裝配Bean | 已整合 Spring 基於註解方式配置Bean | 裝配Bean | 已整合 Spring 基於註解宣告方法重試機制 | Retry註解 | 已整合 Spring 基於註解宣告方法快取 | Cache註解 | 已整合 Hibernate Validator | 校驗框架 | 已整合 Druid | 資料庫連線池 | 已整合 MyBatis | ORM框架 | 已整合 Kafka | 訊息佇列 | 已整合 HDFS | 分散式檔案系統 | 已整合 Log4J | 日誌元件 | 已整合 Junit | 單元測試 | 已整合 Mybatis-Plus | MyBatis擴充套件包 | 進行中 PageHelper | MyBatis物理分頁外掛 | 進行中 ZooKeeper | 分散式協調服務 | 進行中 Dubbo | 分散式服務框架 | 進行中 Redis | 分散式快取資料庫 | 進行中 Solr & Elasticsearch | 分散式全文搜尋引擎 | 進行中 Ehcache | 程序內快取框架 | 進行中 sequence | 分散式高效ID生產 | 進行中 Dubbole消費者 | 服務消費者 | 進行中 Spring eurake消費者 | 服務消費者 | 進行中 Apollo配置中心 | 攜程阿波羅配置中心 | 進行中 Spring Config配置中心 | Spring Cloud Config配置中心 | 進行中 ### 3. 快速開始 下面是整合Spring生態的基礎手冊. #### 3.1 核心基礎工程 * flink-base :基礎工程,封裝了開發Flink工程的必須引數,同時整合Spring容器,為後續整合Spring各類框架提供了支撐。 1. 可以在本地開發環境和Flink叢集執行環境中隨意切換。 2. 可以在增量檢查點和全量檢查點之間隨意切換。 3. 內建使用HDFS作為檢查點的持久儲存介質。 4. 預設使用Kafka作為資料來源 5. 內建實現了任務的暫停機制-達到任務仍在執行但不再接收Kafka資料來源中的資料,代替了停止任務後再重新部署任務這一繁瑣流程。 * flink-client:業務工程,該工程依賴flink-base工程,開發任務在該工程中進行業務邏輯的開發。 #### 3.2 Spring容器 該容器模式配置了JdbcTemplate例項,資料庫連線池採用Druid,在業務方法中只需要獲取容器中的JdbcTemplate例項便可以快速與關係型資料庫進行互動,dataService例項封裝了一些訪問資料庫表的方法。 ##### topology-base.xml ```
``` ##### config.properties ``` jdbc.user = intsmaze jdbc.password = intsmaze jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8 ``` #### 3.3 啟動類示例 如下是SimpleClient(com.intsmaze.flink.client.SimpleClient)類的示例程式碼,該類繼承了BaseFlink,可以看到對應實現的方法中分別設定如下: * public String getTopoName():定義本作業的名稱。 * public String getConfigName():定義本作業需要讀取的spring配置檔案的名稱 * public String getPropertiesName():定義本作業需要讀取的properties配置檔案的名稱。 * public void createTopology(StreamExecutionEnvironment builder):構造本作業的拓撲結構。 ``` /** * github地址: https://github.com/intsmaze * 部落格地址:https://www.cnblogs.com/intsmaze/ * 出版書籍《深入理解Flink核心設計與實踐原理》 隨書程式碼 * * @auther: intsmaze(劉洋) * @date: 2020/10/15 18:33 */ public class SimpleClient extends BaseFlink { public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } @Override public String getTopoName() { return "SimpleClient"; } @Override public String getConfigName() { return "topology-base.xml"; } @Override public String getPropertiesName() { return "config.properties"; } @Override public void createTopology(StreamExecutionEnvironment builder) { DataStream inputDataStrem = env.addSource(new SimpleDataSource()); DataStream processDataStream = inputDataStrem.flatMap(new SimpleFunction()); processDataStream.print("輸出結果"); } } ``` #### 3.4 資料來源 採用自定義資料來源,使用者需要編寫自定義DataSource類,該類需要繼承XXX抽象類,實現如下方法。 * public abstract void open(StormBeanFactory beanFactory):獲取本作業在Spring配置檔案中配置的bean物件。 * public abstract String sendMessage():本作業spout生成資料的方法,在該方法內編寫業務邏輯產生源資料,產生的資料以String型別進行返回。 ``` public class SimpleDataSource extends CommonDataSource { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); ...... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ...//構造讀取各類外部系統資料的連線例項 } @Override public String sendMess() throws InterruptedException { Thread.sleep(1000); ...... MainData mainData = new MainData(); ......//通過外部系統資料的連線例項讀取外部系統資料,封裝進MainData物件中,然後返回即可。 return gson.toJson(mainData); } } ``` #### 3.5 業務邏輯實現 本作業計算的業務邏輯在Flink轉換操作符中進行實現,一般來說開發者只需要實現flatMap運算元即可以滿足大部分運算元的使用。 使用者編寫的自定義類需要繼承com.intsmaze.flink.base.transform.CommonFunction抽象類,均需實現如下方法。 * public abstract String execute(String message):本作業業務邏輯計算的方法,引數message為Kafka主題中讀取過來的引數,預設引數為String型別,如果需要將處理的資料傳送給Kakfa主題中,則要通過return將處理的資料返回即可。 ``` public class SimpleFunction extends CommonFunction { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); @Override public String execute(String message) throws Exception { FlowData flowData = gson.fromJson(message, new TypeToken() { }.getType()); String flowUUID = dataService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); dataService.insertFlow(flowData); } return gson.toJson(flowData); } } ``` ##### CommonFunction CommonFunction抽象類中預設在open方法中通過BeanFactory物件獲取到了Spring容器中對於的dataService例項,對於Spring中的其他例項同理在SimpleFunction類中的open方法中獲取即可。 ``` public abstract class CommonFunction extends RichFlatMapFunction { private IntCounter numLines = new IntCounter(); protected DataService dataService; protected ApplicationContext beanFactory; @Override public void open(Configuration parameters) { getRuntimeContext().addAccumulator("num-FlatMap", this.numLines); ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); dataService = beanFactory.getBean(DataService.class); } @Override public void flatMap(String value, Collector out) throws Exception { this.numLines.add(1); String execute = execute(value); if (StringUtils.isNotBlank(execute)) { out.collect(execute); } } public abstract String execute(String message) throws Exception; } ``` 可以根據情況選擇重寫open(Configuration parameters)方法,同時重寫的open(Configuration parameters)方法的第一行要呼叫父類的open(Configuration parameters)方法。 ``` public void open(Configuration parameters){ super.open(parameters); ...... //獲取在Spring配置檔案中配置的例項 XXX xxx=beanFactory.getBean(XXX.class); } ``` #### 3.6 叢集/本地執行 在自定義的Topology類編寫Main方法,建立自定義的Topology物件後,呼叫物件的run(...)方法。 public class SimpleClient extends BaseFlink { /** * 本地啟動引數 -isLocal local * 叢集啟動引數 -isIncremental isIncremental */ public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } ....