1. 程式人生 > >推薦兩個不錯的flink項目

推薦兩個不錯的flink項目

art arr int 入參 reader sql解析 sql語法 reg ray

最近flink真是風生水起,但是浪院長看來這不過是阿裏錯過了創造spark影響力之後,想要在flink領域創建絕對的影響力。但是,不可否認flink在實時領域確實目前來看獨樹一幟,當然也有它不適合的地方,比如今天要推薦的第一個基於flink開發的項目,流表和維表的join,還有很多地方還是用spark streaming更合適,但是整體的流處理而言flink確實很優秀,雖然目前測出了一些bug,後面會發文說明一下flink開發時候常見的坑和已有的自身bug。接下來轉入正題。

flinkStreamSQL

熟悉flink的應該都了解,flink支持流表之間的join,但到1.6為止都不支持流表和維表的join。浪尖最近,也在開發流平臺,需要到flink流表和維表的join。那麽針對這個大家第一印象,可以寫個算子去實現,比如map等。但是浪尖這裏開發的流平臺不是說自己寫api,而是用戶通過sql去實現創建source,sink,udf,sql等,這個時候要進行維表join,大家可能是想到了udf。是的對於只有一個維表的情況下使用udf比較方便,但是多個維表,相對就麻煩很多了。

而基於flink開發的flinkStreamSQL主要是實現了flink 流表和維表的join,其主要功能如下:

自定義create table 語法(包括源表,輸出表,維表)

自定義create function 語法

實現了流與維表的join

浪尖花了個把小時看了一下源碼,源碼思路很清晰,主要是兩個步驟:

用flink api實現維表的功能: 要實現維表功能就要用到 flink Aysnc I/O 這個功能,是由阿裏巴巴貢獻給apache flink的。關於異步IO的介紹,可以參考:https://yq.aliyun.com/articles/457385

解析流與維表join的sql語法轉化成底層的flinkAPI

源碼下載地址:

https://github.com/DTStack/flinkStreamSQL

為了方便大家閱讀,這裏浪尖也把維表轉化的過程主要函數貼出來吧:

主函數
Main#main

SQL解析
SqlTree sqlTree = SqlParser.parseSql(sql)
拆讀
SqlParser#parseSql
TableInfoParserFactory#parseWithTableType


註冊表
registerTable

存在維表的話,維表轉換與邏輯sql執行
SideSqlExec#exec
也即是
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);

不存在維表的話
tableEnv.sqlUpdate(result.getExecSql());

SqlSession sqlSession=null;
List<User> userList=new ArrayList<User>(); try{
    sqlSession=MyBatisUtil.createSqlSession();
    User user=new User(www.furggw.com);
    user.setUserName("趙");
    user.setUserRole(www.mingrenf178.com);
    userList=sqlSession.getMapper(UserMapper.class).getUserListByUser(user);
}catch (Exception ex){
    ex.printStackTrace();
}finally {
    MyBatisUtil.closeSqlSession(sqlSession);
}
for (User user:
        userList) {
    System.out.println(user.getUserName()+"\t"+user.getUserRole());
}

使用Map入參編寫接口

List<User> getUserListByMap(Map<www.ysyl157.com String,String> userMap);

編寫UserMapper.xml文件

<select id="getUserListByMap" resultType="User" parameterType=www.mcyllpt.com"Map">
    SELECT * FROM USER www.meiwanyule.cn WHERE userName LIKE concat(‘%‘,#{userName},‘%‘)
    and userRole=#{userRole}

FlinkX

FlinkX主要是用來做數據同步的,實現了多種異構數據源之間高效的數據遷移。

不同的數據源頭被抽象成不同的Reader插件,不同的數據目標被抽象成不同的Writer插件。理論上,FlinkX框架可以支持任意數據源類型的數據同步工作。作為一套生態系統,每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

在底層實現上,FlinkX依賴Flink,數據同步任務會被翻譯成StreamGraph在Flink上執行

推薦兩個不錯的flink項目