1. 程式人生 > >Flink SQL 核心概念剖析與程式設計案例實戰

Flink SQL 核心概念剖析與程式設計案例實戰

本次,我們從 0 開始逐步剖析 Flink SQL 的來龍去脈以及核心概念,並附帶完整的示例程式,希望對大家有幫助! ## 本文大綱 ![](https://imgkr2.cn-bj.ufileos.com/ee018625-f90d-4159-b77c-703451cf56d9.png?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=PB1HM5TJILi%252BtQW%252BBEIBTMmtq%252BE%253D&Expires=1610117134) ## 一、快速體驗 Flink SQL 為了快速搭建環境體驗 Flink SQL,我們使用 Docker 來安裝一些基礎元件,包括 zk 和 kafka,如果你有這個環境,可以略過了。 在 Centos 7 上安裝 Docker 環境,具體見這個連結,此處就不細說了: https://blog.csdn.net/qq_24434251/article/details/105712044 ### 1、拉取安裝並執行 zookeeper 映象 ```shell docker pull debezium/zookeeper docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ``` ### 2、拉取安裝並執行 kafka 映象 ```shell docker pull debezium/kafka docker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --link zookeeper:zookeeper debezium/kafka ``` ### 3、進入 kafka 容器內的命令列 ```shell docker exec -it kafka /bin/bash ``` ### 4、建立一個 topic ```shell /kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.10:2181 --topic user_log --partitions 1 --replication-factor 1 ``` ### 5、在 IDEA 中啟動程式 這裡不貼程式碼太長了,具體程式可以參見我的 github: https://github.com/nicekk/Flink-Practice ![](https://imgkr2.cn-bj.ufileos.com/2497bdf5-baaf-414f-be73-452b2c06013b.png?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=ZQjSRysNpV5QA5rklR6yzX1Rsd8%253D&Expires=1610117268) ### 6、寫入資料 ```shell /kafka/bin/kafka-console-producer.sh --broker-list 192.168.56.10:9092 --topic user_log ``` 資料樣例: ```text {"user_id":123,"item_id":345,"ts":"2021-01-05 23:04:00"} {"user_id":345,"item_id":345,"ts":"2021-01-05 23:04:00"} ``` ### 7、結果輸出: ![](https://imgkr2.cn-bj.ufileos.com/fca9d98f-9139-434d-89eb-91d6891340e3.png?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=F8B7R%252B8h6kRrWbp%252Frrctkkj3f4I%253D&Expires=1610117320) ## 二、資料型別系統 繼續說明 Flink SQL 使用之前,我們還需要談一談 Flink 的資料型別系統。 Flink 作為一款高效能的計算框架,必然繞不開分散式計算、資料傳輸和持久化這些問題。 在資料傳輸過程中,要對資料進行序列化和反序列化:序列化就是將一個記憶體物件轉換成二進位制串,形成網路傳輸或者持久化的資料流;反序列化將二進位制串轉換為記憶體物件,這樣就可以直接在程式語言中讀寫這個物件。 ![](https://imgkr2.cn-bj.ufileos.com/c036b42e-a12c-4ed7-acec-da3a64c80f89.png?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=B7HoNAWUFLKTBOkRkAkGjh2Y0rU%253D&Expires=1610117344) Flink 是執行在 JVM 上的,計算過程中會有大量的資料儲存在記憶體中,這就會面臨一些問題,如 Java 物件儲存密度較低等。 針對這些問題,最常用的方案就是自己實現一個顯示的記憶體管理,用自定義的記憶體池來進行記憶體的分配回收,接著將序列化後的物件儲存到記憶體塊中。 所以,Flink 對資料型別推斷越準確,越能更早的完成資料型別檢查,幫助 Flink 更好的規劃記憶體,節省儲存空間。 比如下面這個類