使用Apache Kafka和KSQL實現流處理普及化——第二部分
本文要點
- 針對客戶操作、操作儀表板、線上分析等應用場景,使用Apache Kafka和KSQL構建資料整合和處理應用程式。
- 流處理的主要好處包括:資料增強一次性完成、低延遲處理、向客戶Ops團隊實時傳送通知。
- 你可以使用變化資料捕獲(CDC)工具把資料庫的資料以及任何後續的變化映象到Kafka主題。
- 使用KSQL,很容易把業務資料流和源自資料庫、在Kafka主題中維護的相關資訊合流。
- 擴充套件應用程式使其可以處理更多的通知,而不必修改過濾邏輯。
這是文章“使用Apache Kafka和KSQL實現流處理普及化”的第二部分。第一部分在這裡。
在本文中,我們將介紹如何使用Apache Kafka®和KSQL構建資料整合和處理應用程式。這是一個來自電商領域的簡單示例:在一個網站上,通過一系列事件跟蹤使用者評論。關於這些使用者的資訊,如姓名、聯絡方式、尊貴客戶俱樂部資深會員,儲存在資料庫的某個地方。對於這類評論資料,至少有三種用途:
- 客戶操作 ——如果一個尊貴客戶俱樂部資深會員留下了差評,我們希望可以馬上做些事情,降低流失這類客戶的風險。我們希望應用程式在出現滿足此條件的評論時立即通知我們。這樣,我們就可以馬上為客戶提供服務,這遠遠好於我們等待一段時間後才執行的批處理為我們標記出需要聯絡的使用者。
- 操作儀表板 實時展示評論輸入,滾動聚合,計算數量和平均分數,等等,並按使用者區域劃分。
- 結合其他資料(不論在資料湖中,還是在資料倉庫中)線上分析 分析評論資料。這可以擴充套件到更廣泛的資料科學實踐和機器學習應用。所有這些都需要訪問評論資訊及使用者的詳細資訊。
我們將介紹下如何使用一種更為現代化的模式基於流平臺實現上述功能。我們將使用開源專案Apache Kafka和KSQL來實現。KSQL是一個面向Apache Kafka的流SQL引擎,基於Kafka Streams API實現,後者是Apache Kafka的一個組成部分。
下圖展示了流應用程式示例的工作原理。
圖1.流資料應用程式
事件是使用者提交到網站的評論,它們被以流的方式直接傳遞給Kafka。從這裡,它們可以實時和使用者資訊聯絡起來,經過充實的結果資料會寫回到Kafka。轉換完成後,這些資料就可以用於驅動上述應用和目標了。轉換邏輯只需要執行一次。資料一次性從源系統提取。轉換後的資料可以供不相關的應用程式多次使用。不用對現有元件做任何修改,就可以新增新的源和目標。所有這些操作的延遲都非常低。
因此,高層設計是這樣的:
- Web應用直接向Kafka傳送評論;
- Kafka Connect把資料庫使用者資料快照以流的方式傳送給Kafka,並且直接與CDC保持同步;
- 流處理把使用者資料新增到評論事件,並寫回到一個新的Kafka主題;
- 流處理會針對VIP使用者的差評篩選出充實後的Kafka主題,並寫入一個新的Kafka主題;
- 事件驅動應用會監聽Kafka主題,在VIP使用者留下差評後立即推送通知;
- Kafka Connect把資料以流的方式傳入Elasticsearch,供操作儀表板使用;
- Kafka Connect把資料以流的方式傳入S3,供長期線上分析使用以及和其他資料集一起使用。
其中,主要好處包括:
- 資料增強一次性完成,可供任何應用程式消費;
- 資料處理延遲低;
- 可以在VIP客戶留下差評後立即通知客戶Ops團隊——提供更好的客戶體驗,增加業務保留機會;
- 容易擴充套件,可以按需增加新節點,實現更大的吞吐量。
實現
讓我們看一下構建這個應用程式的詳細過程。ofollow,noindex" target="_blank">GitHub 上提供了所有示例的程式碼以及docker-compose檔案。
把資料寫入Kafka
Web應用程式有多種方式可以使事件流入Kafka。
在我們的例子中,應用程式使用了Producer API。
Web應用程式傳送給Kafka主題“評級(ratings)”的訊息格式如下:
{ "rating_id": 604087, "user_id": 7, "stars": 1, "route_id": 2777, "rating_time": 1528800546808, "channel": "android", "message": "thank you for the most friendly, helpful experience today at your new lounge" }
使Kafka可以訪問資料庫中的資料
在構建應用程式的時候,經常需要使用儲存在資料庫中的資料。在我們的例子中,使用者資料儲存在MySQL中,不過,設計模式都是一樣的,與採用哪種具體的RDBMS技術無關。
在使用Kafka編寫流處理應用程式時,整合儲存在資料庫中的資料的標準方法是,確保資料本身在Kafka中儲存和維護。這比聽上去簡單——我們只需要使用資料變化捕獲(CDC)工具把資料庫中的資料和任何後續的變化映象到一個Kafka主題。
這樣做的好處是隔離了資料庫和流處理。這主要有兩個好處:資料庫不會因為我們的請求增加開銷,我們可以自由使用我們選取的資料,而又不會使我們的開發和部署流程和資料庫所有者的相耦合。
CDC技術和工具不止一種 ,我們這裡就不介紹了。由於資料在MySQL中,我們使用Debezium 專案作為我們的CDC工具。它會把使用者表的內容快照到Kafka,並使用MySQL的binlog即時檢測後續MySQL中資料的變化並複製到Kafka。
圖2詳細展示了資料變化捕獲過程的資料流動。
圖2.流應用程式變化資料捕獲
從資料庫流出、流入Kafka主題asgard.demo.CUSTOMERS的訊息格式如下:
{ "id": 1, "first_name": "Rica", "last_name": "Blaisdell", "email": "[email protected]", "gender": "Female", "club_status": "bronze", "comments": "Universal optimal hierarchy", "create_ts": "2018-06-12T11:47:30Z", "update_ts": "2018-06-12T11:47:30Z", "messagetopic": "asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard" }
使用資料庫資訊充實事件流
使用KSQL,很容易就可以把源於資料庫、在Kafka主題中維護的相關資訊合併到評級中。
合併細節如圖3所示:

第一步是確保客戶主題中的訊息以關聯列為鍵,在這個例子中是客戶ID。我們實際上可以使用KSQL進行重新分割槽。KSQLCREATE STREAM
的輸出被寫入一個Kafka主題,在預設情況下,會以流本身的名稱命名:
-- 處理流中所有現有的資料以及將來的資料 SET 'auto.offset.reset' = 'earliest'; -- 宣告源流 CREATE STREAM CUSTOMERS_SRC WITH \ (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO'); -- 在ID列上重新分割槽,設定目標主題,使其分割槽數量與作為源的評級主題一致: CREATE STREAM CUSTOMERS_SRC_REKEY WITH (PARTITIONS=1) AS \ SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;
現在,到達asgard.demo.CUSTOMERS
主題的每條資訊都將寫入正確設定了訊息鍵的Kafka主題CUSTOMERS_SRC_REKEY
。注意,我們不一定要宣告任何模式,因為我們在使用Avro。KSQL和Kafka Connect都無縫集成了開源的Confluent Schema Registry,序列化/反序列化Avro資料,並在Schema Registry中儲存/檢索模式。
為了進行合併,我們使用標準的SQL聯合查詢語法:
-- 把CUSTOMER註冊為一張KSQL表, -- 源自重新分割槽後的主題 CREATE TABLE CUSTOMERS WITH \ (KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID'); -- 把RATINGS資料註冊到一個KSQL流,源自ratings主題 CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO'); -- 執行聯合查詢,寫入新主題——注意,主題名稱是顯式設定的。 -- 如果移除KAFKA_TOPIC引數,那麼目標主題將使用所建立的流或者表的名稱 CREATE STREAM RATINGS_ENRICHED WITH \ (KAFKA_TOPIC='ratings-with-customer-data', PARTITIONS=1) AS \ SELECT R.RATING_ID, R.CHANNEL, R.STARS, R.MESSAGE, \ C.ID, C.CLUB_STATUS, C.EMAIL, \ C.FIRST_NAME, C.LAST_NAME \ FROM RATINGS R \ LEFT JOIN CUSTOMERS C \ ON R.USER_ID = C.ID \ WHERE C.FIRST_NAME IS NOT NULL ;
我們可以檢視這條查詢處理的訊息數量:
ksql> DESCRIBE EXTENDED RATINGS_ENRICHED; Name: RATINGS_ENRICHED Type: STREAM Key field: R.USER_ID Key format: STRING Timestamp field: Not set - using <ROWTIME> Value format: AVRO Kafka topic: ratings-with-customer-data (partitions: 4, replication: 1) [...] Local runtime statistics ------------------------ messages-per-sec:3.61total-messages:2824last-message: 6/12/18 11:58:27 AM UTC failed-messages:0 failed-messages-per-sec:0last-failed:n/a (本地KSQL伺服器與Kafka主題ratings-with-customer-data的互動統計)
實際上,這條SQL語句本身就是一個應用程式,就像我們在Java、Python、C……中編寫的程式碼一樣。它不斷地執行,接收輸入資料、處理資料、輸出資料。我們在上面看到的輸出是該應用程式的執行時指標。
使用KSQL過濾資料流
我們前面建立的JOIN查詢其輸出是一個Kafka主題,在源自源主題ratings的事件的驅動下實時填充,如下圖4所示:
我們可以構建第二個KSQL應用程式,由這個派生主題所驅動,並對資料做進一步地處理。這裡,我們將簡單地過濾所有評級流,識別那些同時滿足如下兩個條件的評級:
- 差評(評級範圍1到5,小於3即為差評)
- “鉑金”客戶留下的評級
SQL給出的語義幾乎可以從字面上表達上述需求。我們可以首先使用KSQL CLI驗證該查詢:
SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \ FROMRATINGS_ENRICHED \ WHERESTARS < 3 \ ANDCLUB_STATUS = 'platinum'; platinum | [email protected] | 1 | worst. flight. ever. #neveragain platinum | [email protected] | 2 | (expletive deleted)
然後,和以前一樣,這個持續查詢的結果可以持久化到一個Kafka主題,只需為語句加上CREATE STREAM ... AS
(通常使用縮寫CSAS)字首。注意,我們可以選擇所有的源列(SELECT *
),或者建立一個可用欄位的子集(SELECT COL1, COL2
),使用哪一個取決於建立流的目的。此外,我們將把目標訊息寫成JSON格式:
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS \ WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS \ SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \ FROMRATINGS_ENRICHED \ WHERESTARS < 3 \ ANDCLUB_STATUS = 'platinum';
檢視生成的Kafka主題,我們可以看到,它只包含我們感興趣的事件。再次強調一下,這是一個Kafka主題——我們可以使用KSQL查詢它——這裡,我將跳過KSQL,使用流行的kafkacat 工具檢視它:
kafka-console-consumer \ --bootstrap-server kafka:9092 \ --topic UNHAPPY_PLATINUM_CUSTOMERS | jq '.' { "CLUB_STATUS": { "string": "platinum" }, "EMAIL": { "string": "[email protected]" }, "STARS": { "int": 1 }, "MESSAGE": { "string": "Surprisingly good, maybe you are getting your mojo back at long last!" } }
在離開KSQL之前,我們給自己提個醒,我們實際上僅寫了三個流應用程式:
ksql> SHOW QUERIES; Query ID| Kafka Topic| Query String ------------------------------------------------------------------------------------------------------------ CSAS_CUSTOMERS_SRC_REKEY_0| CUSTOMERS_SRC_REKEY| CREATE STREAM CUSTOMERS_SRC_REKEY[…] CSAS_RATINGS_ENRICHED_1| RATINGS_ENRICHED| CREATE STREAM RATINGS_ENRICHED[…] CSAS_UNHAPPY_PLATINUM_CUSTOMERS_2 | UNHAPPY_PLATINUM_CUSTOMERS | CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS[…]
由Kafka主題驅動的推送通知
我們在上面建立的主題UNHAPPY_PLATINUM_CUSTOMERS
可以用於驅動一個應用程式,如果有重要客戶留下了差評,它就會給客戶運營團隊傳送警報。這裡的關鍵是,我們基於一個剛剛發生的事件驅動了一個實時的動作。基於批處理的分析下週才告訴我們,上週我們讓一位客戶失望了,這就沒用了。我們希望現在就知道,以便我們現在就可以採取行動,向那位客戶提供更好的體驗。
Kafka客戶端庫有面向各種語言的——你幾乎可以選擇任何語言。這裡,我們使用面向Python的開源Confluent Kafka庫 。這是一個構建事件驅動應用程式的簡單例子。它在一個Kafka主題上監聽事件,然後生成一個推送通知。我們將使用Slack作為我們的通知傳送平臺。為了簡化說明,下面的程式碼片段刪除了所有的錯誤處理程式碼。我們可以把一個API(如Slack的API) 和一個Kafka主題整合,在這個主題上監聽事件,從而觸發一個動作。
from slackclient import SlackClient from confluent_kafka import Consumer, KafkaError sc = SlackClient('api-token-xxxxxxx') settings = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'python_kafka_notify.py', 'default.topic.config': {'auto.offset.reset': 'largest'} } c = Consumer(settings) c.subscribe(['UNHAPPY_PLATINUM_CUSTOMERS']) while True: msg = c.poll(0.1) if msg is None: continue else: email=app_msg['EMAIL'] message=app_msg['MESSAGE'] channel='unhappy-customers' text=('`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_' % (email, message)) sc.api_call('chat.postMessage', channel=channel, text=text, username='KSQL Notifications', icon_emoji=':rocket:') finally: c.close()
下圖5展示了使用Slack API傳送使用者通知。
[點選檢視大圖]

這裡有必要重申一下,我們正在構建的應用程式(如果你願意,可以把它稱為微服務)是事件驅動的。就是說,該應用程式會等待一個事件,然後執行動作。它不是嘗試處理所有資料並查詢特定的條件,也不是一個響應某個命令的同步請求-響應服務。我們已經分離出了這些職責:
-
根據確定的條件過濾實時事件流
是由KSQL完成的(使用我們前面介紹的
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS
語句),匹配的事件被寫入一個Kafka主題; - 通知服務 有一個唯一的職責,它負責從Kafka主題獲得事件,並基於它生成一個推送通知。這是非同步完成的。
這樣做的好處很明顯:
- 我們可以橫向擴充套件應用程式,使其處理更多通知,而不必修改過濾邏輯;
- 我們可以使用可選的其他應用程式替換這個應用程式,而不必修改過濾邏輯;
- 我們可以替換或修改過濾邏輯,而不必觸及通知應用程式。
Kafka和請求/響應模式
對於基於Kafka平臺編寫應用程式,有一種常見的質疑,就是事件驅動模式不適用於應用程式的流程,並由此推論,Kafka也不適合。這種觀點是錯誤的,有兩個關鍵點需要記住:
- 事件驅動模式和請求/響應模式都完全可以使用——它們不是互斥的,有些需求需要使用請求/響應模式;
- 決定因素應該是需求;應該挑戰現有方法的慣性。在部分或全部應用程式的訊息傳遞中使用事件驅動架構,你可以從它帶來的非同步性、可擴充套件性以及與Kafka的整合中受益,其他所有使用Kafka的系統和應用程式也是如此。
要了解有關這個問題的進一步討論,可以查閱Ben Stopford的系列文章 及其最新著作《事件驅動系統設計 》。
使資料從Kafka流入Elasticsearch,用於操作分析
使用Kafka Connect很容易就可以使資料從Kafka流入Elasticsearch。它提供了一個由配置檔案控制的可擴充套件的流整合。有一個開源的Elasticsearch聯結器,既可以單獨存在 ,也可以作為Confluent平臺 的一部分。這裡,我們將使原始評級及警告資訊流入Elasticsearch:
"name": "es_sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "ratings-with-customer-data,UNHAPPY_PLATINUM_CUSTOMERS", "connection.url": "http://elasticsearch:9200" [...] } }
在從Kafka Connect到Elasticsearch的資料流上,使用Kibana很容易在經過充實、過濾的資料上構建一個實時儀表板,如圖6所示。
[點選檢視大圖]

使資料從Kafka流入資料湖
最後,我們將使充實後的評級流入資料湖。在這裡,它可以用於線上分析、訓練機器學習模型和資料科學專案,等等。
Kafka中的資料可以流入使用Kafka Connect的各種型別的目標 。這裡,我們將看下S3和BigQuery,但是,使用HDFS、GCS、Redshift、Snowflake DB等也同樣簡單。
就像前面介紹的使資料從Kafka流入Elasticsearch一樣,針對每專案標技術的設定只是一個簡單的配置檔案設定:
"name": "s3-sink-ratings", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "ratings-with-customer-data", "s3.region": "us-west-2", "s3.bucket.name": "rmoff-demo-ratings",
資料流入S3後,我們可以在桶裡檢視,如圖7所示。
[點選檢視大圖]

我們還可以使同樣的資料流入谷歌的BigQuery:
"name": "gbq-sink-ratings", "config": { "connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "topics": "ratings-with-customer-data", "project":"rmoff", "datasets":".*=ratings",
[點選檢視大圖]

谷歌的Data Studio是眾多可以用於分析這些來自雲物件儲存的資料的應用程式之一:
[點選檢視大圖]
這裡的重點不是上面介紹的具體技術,不管你選擇使用什麼樣的資料湖技術,使用Kafka Connect都很容易使資料流入它。
和KSQL及流平臺一起走向未來
在這篇文章中,我們已經看了把流平臺作為資料架構核心組成部分的其中多個有力的論據。它提供了一個可擴充套件的基礎,由於其解耦特性,使系統可以靈活地整合和演進。分析工作可以從流平臺的強大整合能力中獲益。它是流平臺,因此,實時不是其主要動因。應用程式可以從流平臺獲益,因為它是實時的,而且也因為它的整合能力。
藉助KSQL,可以使用許多開發人員都熟悉的語言編寫流處理應用程式。這些應用程式可以是簡單的Kafka事件流過濾器,也可以是複雜的充真實模式,從包括資料庫在內的其他系統獲取資料。
要了解更多有關KSQL的資訊,你可以觀看教程 並自己試一下 。文件中介紹了調整和部署實踐。在Confluent Community Slack群組 中,有一個與此相關的活躍社群。GitHub 上提供了本文的示例。
關於作者
Robin Moffatt是Confluent的一名開發大使,該公司由Apache Kafka的建立者發起成立。他還是Oracle ACE總監和開發冠軍。職業生涯至今,他一直在跟資料打交道,從以前的COBOL和DB2到Oracle和Hadoop,再到如今的Kafka。他的主要研究領域是分析、系統架構、效能測試和優化。你可以在這裡
和這裡
閱讀他的博文(之前在這裡
)。他的Twitter賬號是@rmoff
。在業餘時間裡,他喜歡喝啤酒,吃煎炸早餐,不過一般不是同時。
這是文章“使用Apache Kafka和KSQL實現流處理普及化”的第二部分。第一部分在這裡。
檢視英文原文:Democratizing Stream Processing with Apache Kafka® and KSQL - Part 2