Flink SQL Table 我們一起去看2018中超聯賽-Flink牛刀小試
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。
寫在前面的話
Flink是一個新型的流式處理引擎,作者自身只是對Spark底層較為熟悉,有興趣可以查閱我的Spark core ,Spark String 以及 SQL/">Spark SQL 原始碼解讀系列。在這裡我們只是品味一下號稱第四代大資料處理引擎的Flink,作者也並沒有深入到Flink底層原始碼級別。請見諒如果您已經是FLink大牛了!看一下2018中超聯賽積分榜:

1 SQL Table(牛刀小試)
The Table API is a declarative DSL centered around tables, which may be dynamically changing tables (when representing streams). The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. Table API programs declaratively define what logical operation should be done rather than specifying exactly how the code for the operation looks. Though the Table API is extensible by various types of user-defined functions, it is less expressive than the Core APIs, but more concise to use (less code to write). In addition, Table API programs also go through an optimizer that applies optimization rules before execution.
One can seamlessly convert between tables and DataStream/DataSet, allowing programs to mix Table API and with the DataStream and DataSet APIs.

The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.
2 上程式碼分析(球隊粒度進行進球聚合排序)
- 1 進行pojo物件的資料封裝。
- 2 BatchTableEnvironment tableEnv環境生成: BatchTableEnvironment.getTableEnvironment(env);
- 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
- 4 Table表註冊:tableEnv.registerTable("topScore",topScore);
- 5 Table表查詢:tableEnv.sqlQuery
- 6 Table錶轉換回DataSet: tableEnv.toDataSet
2.1 詳情請參考程式碼
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; public class TableSQL { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv"); input.print(); DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() { @Override public TopScorers map(String s) throws Exception { String[] splits = s.split("\t"); return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10])); } }); //將DataSet轉換為Table Table topScore = tableEnv.fromDataSet(topInput); //將topScore註冊為一個表 tableEnv.registerTable("topScore",topScore); Table tapiResult = tableEnv.scan("topScore").select("club"); tapiResult.printSchema(); Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc"); //轉換回dataset DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class); //將dataset map成tuple輸出 result.map(new MapFunction<Result, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(Result result) throws Exception { String country = result.club; int sum_total_score = result.sum_score; return Tuple2.of(country,sum_total_score); } }).print(); } /** * 源資料的對映類 */ public static class TopScorers { /** * 排名,球員,球隊,出場,進球,射正,任意球,犯規,黃牌,紅牌 */ public Integer rank; public String player; public String club; public Integer chuchang; public Integer jinqiu; public Integer zhugong; public Integer shezheng; public Integer renyiqiu; public Integer fangui; public Integer huangpai; public Integer hongpai; public TopScorers() { super(); } public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) { this.rank = rank; this.player = player; this.club = club; this.chuchang = chuchang; this.jinqiu = jinqiu; this.zhugong = zhugong; this.shezheng = shezheng; this.renyiqiu = renyiqiu; this.fangui = fangui; this.huangpai = huangpai; this.hongpai = hongpai; } } /** * 統計結果對應的類 */ public static class Result { public String club; public Integer sum_score; public Result() {} } } 複製程式碼
2.2 結果展示(2018恆大隊很厲害,進球55個)

3 理論昇華一下
3.1 Create a TableEnvironment
// *************** // STREAMING QUERY // *************** StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment for streaming queries StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); // *********** // BATCH QUERY // *********** ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment for batch queries BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); 複製程式碼
3.2 TTL用法
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register Orders table // scan registered Orders table Table orders = tableEnv.scan("Orders"); // compute revenue for all customers from France Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); // emit or convert Table // execute query 複製程式碼
3.3 Register a DataStream or DataSet as Table
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream); // register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, "myLong, myString"); 複製程式碼
3.4 Convert a DataStream or DataSet into a Table
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // Convert the DataStream into a Table with fields "myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, "myLong, myString"); 複製程式碼
4 收工
通過2018中超聯賽,我們管中窺豹,學會了Flink SQL Table 的核心思想,當然本文並不完善,希望本文能夠給大家帶來一些收穫。辛苦成文,彼此珍惜,謝謝!
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。
秦凱新 於深圳 201811262252