1. 程式人生 > >Flink系列之1.10版流式SQL應用

Flink系列之1.10版流式SQL應用

  隨著Flink 1.10的釋出,對SQL的支援也非常強大。Flink 還提供了 MySql, Hive,ES, Kafka等聯結器Connector,所以使用起來非常方便。

  接下來咱們針對構建流式SQL應用文章的梗概如下:

  1. 搭建流式SQL應用所需要的環境準備。

  2. 構建一個按每小時進行統計購買量的應用。

  3. 構建每天以10分鐘的粒度進行統計應用。

  4. 構建按分類進行排行,取出想要的結果應用。

 

1. 搭建流式應用所需要的環境準備

   Kafka,用於將資料寫入到Kafka中,然後Flink通過讀取Kafka的資料然後再進行處理。版本號:2.11。

     MySQL, 用於儲存資料的分類。Flink從中讀取分類進行處理和計算 。版本號:8.0.15。

   ElasticSearch, 用於儲存結果資料和進行索引儲存。下載的時候可以在搜尋引擎裡邊搜尋“elasticsearch 國內”,這樣就可以從國內快速下載,要不然下載的太慢了。版本號:7.6.0。

   Kibana, 用於ES的結果展示,圖形化的介面美觀。 下載的時候也需要搜尋“Kibana 國內”,比較快速。版本號:7.6.0。

     Flink, 核心的流處理程式,版本號:1.10。Flink支援國內映象下載,這個到時候可以自行找一下。

     Zookeeper,  Kafka依賴這個應用,所以也會用到的,這個什麼版本都是可以的。我的版本號:3.4.12。

   當然我的是mac電腦,如果是mac電腦的話,下載ES和Kibana的時候要下載檔案中帶“darwin”字樣的,可以在Mac中使用其他的不能執行。應該是程式裡邊的編譯不同,這個也是一個小坑。

     因為Flink需要連線Mysql, Elasticseratch , Kafka,所以也需要提前下載Flink所需要的Connector jar包到Flink的lib裡邊。

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

  

   環境都準備好了,那麼需要把環境都啟動起來,進行檢查。

   Elasticsearch啟動好了之後需要訪問這個網址沒有問題,說明成功了:http://localhost:9200/_cluster/health?pretty。

   Flink啟動好之後需要訪問 http://localhost:8081 會有介面展示。

   Kibana 啟動好了之後訪問:http://127.0.0.1:5601/ 會有介面展示。當然Kibana在目錄conf/kibana.yml裡邊需要把ES的地址給開啟。

   Zookeeper 這個相信很多同學都會配置了,如果有不會配置的,可以自己搜尋一下。

   我們先看一下最後的效果圖,可能不是特別好,是這麼個意思。

 

 

 

 

 

 

 

2. 構建一個按每個小時統計購買量應用。 

 

  我們寫一個程式,往Kafka裡邊寫資料,模擬一些連續的資料來源頭。

  首先定義一個Pojo類。

package myflink.pojo;

public class UserBehavior {
    //使用者ID
    public long userId;
    //商品ID
    public long itemId;
    //商品類目ID
    public  int categoryId;
    //使用者行為,包括{"pv","buy","cart", "fav"}
    public String behavior;
    //行為發生的時間戳,單位秒
    public String ts;

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getItemId() {
        return itemId;
    }

    public void setItemId(long itemId) {
        this.itemId = itemId;
    }

    public int getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(int categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public String getTimestamp() {
        return ts;
    }

    public void setTimestamp(String ts) {
        this.ts = ts;
    }
}

  接著寫一個往Kafka寫資料的類。隨機生成用於的行為,裡邊包括使用者的id,類目id等。讓程式執行起來。

package myflink.kafka;

import com.alibaba.fastjson.JSON;
import myflink.pojo.UserBehavior;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2020-03-15
 */
public class KafkaWriter {

    //本地的kafka機器列表
    public static final String BROKER_LIST = "localhost:9092";
    //kafka的topic
    public static final String TOPIC_USER_BEHAVIOR = "user_behaviors";
    //key序列化的方式,採用字串的形式
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //value的序列化的方式
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

    private static final String[] BEHAVIORS = {"pv","buy","cart", "fav"};

    private static KafkaProducer<String, String> producer;

    public static void writeToKafka() throws Exception{


        //構建userBehavior, 資料都是隨機產生的
        int randomInt = RandomUtils.nextInt(0, 4);
        UserBehavior userBehavior = new UserBehavior();
        userBehavior.setBehavior(BEHAVIORS[randomInt]);
        Long ranUserId = RandomUtils.nextLong(1, 10000);
        userBehavior.setUserId(ranUserId);
        int ranCate = RandomUtils.nextInt(1, 100);
        userBehavior.setCategoryId(ranCate);
        Long ranItemId = RandomUtils.nextLong(1, 100000);
        userBehavior.setItemId(ranItemId);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
        userBehavior.setTimestamp(sdf.format(new Date()));

        //轉換為json
        String userBehaviorStr = JSON.toJSONString(userBehavior);

        //包裝成kafka傳送的記錄
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_BEHAVIOR, null,
                null, userBehaviorStr);
        //傳送到快取
        producer.send(record);
        System.out.println("向kafka傳送資料:" + userBehaviorStr);
        //立即傳送
        producer.flush();

    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        producer = new KafkaProducer<>(props);

        while(true) {
            try {
                //每一秒寫一條資料
                TimeUnit.SECONDS.sleep(1);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

  本地idea Console 輸出的結果是這樣的:

 

    向kafka傳送資料:{"behavior":"buy","categoryId":7,"itemId":75902,"timestamp":"2020-03-15T11:35:11Z","ts":"2020-03-15T11:35:11Z","userId":4737}

  我們將Flink的任務數調整成10個,也就是同時執行的任務數。 位置在 conf/flink-conf.yaml,taskmanager.numberOfTaskSlots: 10,然後重啟下。我的已經啟動並且運行了3個任務,看下圖:

 

 

   我們接下來執行Flink 內建的客戶端。命令: bin/sql-client.sh embedded,這樣我們就開始了Flink SQL之旅了。我們使用Flink的DDL,從Kafka裡邊讀取資料,採用ProcessingTime的時間事件進行處理,為ts設定水位線,允許5秒延遲。更多參考 時間屬性 和 Flink DDL。裡邊的Kafka 連線以及相關的配置,相信大家都不是很陌生。

CREATE TABLE user_behavior (
    userId BIGINT,
    itemId BIGINT,
    categoryId BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通過計算列產生一個處理時間列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定義watermark,ts成為事件時間列
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'user_behaviors',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 從起始 offset 開始讀取
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
    'format.type' = 'json'  -- 資料來源格式為 json
);

  接下來我們使用select來看一下Flink的資料,執行語句:select * from user_behavior,會出現如下圖。同時SQL上面還支援 show tables、describe user_behavior 等操作。

 

 

   我們需要將結果放入Elasticsearch,這樣也比較簡單,我們還通過DDL來建立一個表。我們只需要一個語句,就可以實現連線Elasticsearch(後邊簡稱ES)並且建立相應的Type和Index了。不需要自己再去建立一次,是不是很簡單,哈。裡邊有兩個欄位,一個是每天的小時數,一個是購買的統計量。當有資料寫入這個表的時候,那麼就會將資料寫入到ES上,非常方便。

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
    'connector.version' = '6',  -- elasticsearch 版本,6 能支援 es 6+ 以及 7+ 的版本
    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch 地址
    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch 索引名,相當於資料庫的表名
    'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當於資料庫的庫名
    'connector.bulk-flush.max-actions' = '1',  -- 每條資料都重新整理
    'format.type' = 'json',  -- 輸出資料格式 json
    'update-mode' = 'append'
);

  每個小時的購買量,那麼我們需要的是使用滾動視窗,Tumbling Window,那麼使用TUMBLE_START函式,另外我們還需要獲取ts中的小時數,那麼需要使用HOUR函式。將所有behavior為buy的寫入到這個表中。

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

  這個時候看Flink裡邊的任務中會出現這個任務,因為是持續不斷的進行處理的。執行過程中如果有資料的話,那麼會將資料寫到表 buy_cnt_per_hour,同時也會將資料寫到ES裡邊。

 

 

  下面我們來配置一下Kinbana來將結果進行展示,訪問 http://localhost:5601, 然後選擇左邊選單的“Management”,然後選擇 “Index Patterns” -> “Create Index Pattern”, 輸入我們剛才建立的Index: “buy_cnt_per_hour”。可以通過左側的“Discover”按鈕就可以看到我們的資料了。

  

 

 

   我們繼續點選左側的“Dashboard”按鈕,建立一個“使用者行為日誌分析”的Dashboard。 進入左側的 “Visualize” - “Create Visualization" 選擇“Area”圖,Bucket的按我下邊截圖左下進行配置和選擇。

 

 

  儲存後新增到Dashboard即可。這樣就從資料來源頭到資料展示就構建完成了,是不是很快~

 

3. 構建每天以10分鐘的粒度進行統計獨立使用者數應用。

  

  我們繼續使用DDL建立Flink的表以及對應的ES的Index。

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

  建立好了需要將ts進行分解出來小時和分鐘,通過一個檢視,這個檢視和資料庫的檢視類似,不儲存資料,也不佔用Flink的執行Task。首先將ts格式化,然後轉換成時間:小時:分鐘,分鐘後邊沒有0,結尾需要補個0。然後統計不同的使用者數需要使用DISTINCT函式和COUNT函式。還有使用Over Window功能,也就是從之前的資料到現在,以處理時間升序把資料按Window的功能來進行統計。直白的將就是有一條資料的話就會將資料處理, 然後有一條資料比當前最大值大的話會保留最大值。當前視窗是以每10分鐘為一個視窗。

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT userId) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

  這個檢視主要是資料比較多,只需要每10分鐘一個點其實就滿足要求了,那麼現在我們需要做的就是再將資料處理一下即可寫入ES。

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

  這樣ES裡邊就會有新的index產生,下一步我們在kibana裡邊建立一個 index pattern, 輸入index “cumulative_uv”,接下來到 “Visualize”裡邊建立一個 Visualization ,名為“累計獨立使用者數”,表選擇“Line”型別的圖示,其他指標和我下圖配置的一樣即可。

  

 

 

 

  累計獨立使用者數也建立好了。

 

4.   構建按分類進行排行,取出想要的結果應用。

    接下來我們需要按主類目進行統計和排序。因為子類目非常多。

  首先我們需要準備一個mysql, 然後建立好表。簡單些幾條對應的類目關係,當然可以根據自己所生成的資料進行自行寫入一些對應的關係表。

create table category (
    sub_category_id bigint(20),
    parent_category_id bigint(20)
);
insert into category(sub_category_id, parent_category_id) values(1038, 1);
insert into category(sub_category_id, parent_category_id) values(91244, 1);
insert into category(sub_category_id, parent_category_id) values(44712, 1);
insert into category(sub_category_id, parent_category_id) values(2,2);
insert into category(sub_category_id, parent_category_id) values(3,3);
insert into category(sub_category_id, parent_category_id) values(4,4);
insert into category(sub_category_id, parent_category_id) values(5,5);
insert into category(sub_category_id, parent_category_id) values(6,6);
insert into category(sub_category_id, parent_category_id) values(7,7);
insert into category(sub_category_id, parent_category_id) values(8,8);
insert into category(sub_category_id, parent_category_id) values(9,9);

  定義一個Flink表,資料從Mysql獲取,用於進行類目關係關聯。

CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子類目
    parent_category_id BIGINT -- 頂級類目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

  建立ES的index,用於儲存統計後的結果。

CREATE TABLE top_category (
    category_name STRING,  -- 類目名稱
    buy_cnt BIGINT  -- 銷量
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

  接下來還是建立一個檢視,將表和類目關聯起來,方便後邊的統計結果。使用的是 Temporal Join。

CREATE VIEW rich_user_behavior AS
SELECT U.userId, U.itemId, U.behavior,
  CASE C.parent_category_id
    WHEN 1 THEN '服飾鞋包'
    WHEN 2 THEN '家裝家飾'
    WHEN 3 THEN '家電'
    WHEN 4 THEN '美妝'
    WHEN 5 THEN '母嬰'
    WHEN 6 THEN '3C數碼'
    WHEN 7 THEN '運動戶外'
    WHEN 8 THEN '食品'
    ELSE '其他'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.categoryId = C.sub_category_id;

  將型別為“buy”的寫入到表,同時也就是寫入了ES裡邊,然後ES裡邊的index-top_category 也就有了資料了。

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

  我們繼續在Kibana裡邊建立一個index pattern,輸入“top_category”,然後visualize裡邊建立一個visualization 名為類目排行榜。詳細的配置可參考如下。

 

 

   好了整個的過程計算建立完了。

   

  通過使用Flink 1.10以及對應的Connector, 實現了對Mysql, Kafka, Elasticsearch 的快速連線,更快的達到的我們想要實現的效果。

  裡邊涉及到往kafka裡邊寫資料可參考工程:https://github.com/stonehqs/flink-demo