1. 程式人生 > >【大資料實踐】KSQL流處理——如何將多個STREAM輸出到一個TOPIC

【大資料實踐】KSQL流處理——如何將多個STREAM輸出到一個TOPIC

【大資料實踐】KSQL流處理——如何將資料處理結果推到指定Topic

需求場景描述

在生產環境中,各個業務服務產生的事件都會被push到Kafka訊息中介軟體中。如:充值中心的 充值事件 會被push到kafka的recharge topic中,玩家 結算事件 會被push到kafka的game_score topic中。

平臺希望通過處理,實時分析這些事件,篩選出滿足條件的一些玩家,對其獎勵相應道具。如,想做一個針對不同充值金額的玩家獎勵不同道具的活動:

  • 0 < 充值金額 < 100 時, 獎勵一個10萬金幣卡道具(道具ID:"10w")
  • 100 <= 充值金額 時,獎勵一個100萬金幣卡
    道具(道具ID:"100w")

方案設計

  • 將充值的事件(原始日誌資料,JSON格式),推送到Kafka的recharge topic中。充值事件資料格式:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 100
     }
  • kafka中新建一個PROPREWARD的topic,專門接收道具獎勵的事件,該主題事件訊息格式為:

    {"user/name" : "foo"
     "prop/id" : "道具ID"
     "reward/reason" : "獎勵的原因"}
  • 一個道具發放服務(Kafka消費者)訂閱該主題,當道具獎勵事件到達時,獲取事件中的 使用者名稱
    道具ID 為指定玩家發放道具。
  • 使用KSQL建立兩個派生流(Stream),分別從recharge topic中過濾出0 < 充值金額 < 100100 <= 充值金額 的事件,過濾出符合條件的使用者名稱,並組裝成約定的道具獎勵事件,將其推送到Kafka的PROPREWARD topic中。

具體實現

  • 新建一個kafka topic : PROPREWARD (大寫),用於接收和儲存道具獎勵事件

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic PROPREWARD
  • 從kafka topic PROPREWARD建立一個流(PROPREWARD),用於輸出道具獎勵事件,注意:流的名字與kafka topic的名字相同。

    CREATE STREAM PROPREWARD (`user/name` varchar, `seed/id` varchar, `reward/reason` varchar) \
    WITH (kafka_topic='PROPREWARD', value_format='JSON');
  • 根據業務需要,新建一個查詢規則為0 < 充值金額 < 100 的派生流並插入到PROPREWARD流中。

    INSERT INTO PROPREWARD \
       SELECT username AS `user/name` , '10w' AS `prop/id`, '充值100元以內獎勵10萬金幣卡道具' AS `reward/reason` \
       FROM recharge \
       WHERE EVENT_TYPE = 'cash_order' 
              AND CASH > 0 
              AND CASH <= 100;
  • 根據業務需要,新建一個查詢規則為 100 <= 充值金額 的派生流並插入到PROPREWARD流中。

    INSERT INTO PROPREWARD \
       SELECT username AS `user/name` , '100w' AS `prop/id`, '充值超過100元獎勵100萬金幣卡道具' AS `reward/reason` \
       FROM recharge \
       WHERE EVENT_TYPE = 'cash_order' AND CASH >= 100;
  • 還可以根據業務需要,從其他kafka topic中派生出其他流,插入到PROPREWARD流中。

結果驗證

  • 往kafka的recharge topic中寫入資料:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 9
     }

    可以在topic PROPREWARD中接收到事件:

    {"user/name" : "foo" ,
     "prop/id" : "10w"
     "reward/reason" : "充值100元以內獎勵10萬金幣卡道具"}
  • 往kafka的 recharge topic中寫入資料:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 11
     }

    可以在topic PROPREWARD 中接收到事件:

    {"user/name" : "foo" ,
     "prop/id" : "100w",
     "reward/reason" : "充值100元以上獎勵100萬金幣卡道具"}

注意

  • 要想將上述兩個派生流插入(INSERT INTO)到輸出結果的PROPREWARD流中,需要確保:

    • PROPREWARD 的名字與輸出結果的Kafka topic名字相同,否則會丟擲異常。這應該是KSQL 5.0.0 的一個BUG。
    • 派生流中輸出的資料結構 (SELECT username AS user/name , '100w' AS prop/id, '充值超過100元獎勵100萬金幣卡道具' AS reward/reason 與 流 PROPREWARD 定義的結構相同。

總結

  • 通過Kafka + KSQL 流式處理,可以配置出豐富的活動——根據各種不同的事件和規則,獎勵不同的道具(或者其他型別的東西),而不需要額外的程式碼開發!!