【python】spark+kafka使用
網上用python寫spark+kafka的資料好少啊 自己記錄一點踩到的坑~
spark+kafka介紹的官方網址:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
python的pyspark庫函數文檔:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=kafkautils.createdirectstream#pyspark.streaming.kafka.KafkaUtils.createDirectStream
上面兩個是最重要的資料,大多數問題可以通過仔細研讀上面兩個文檔得到答案
官網上說了,spark和kafka連用有兩種方式:接收器形式 以及 直連形式
一、 接收器形式
優點:支持kafka的group.id設置,支持用kafka api查詢offset,如果數據斷掉後,可以通過group.id輕松找到上一次失敗的位置
缺點:
1.失敗處理復雜。由於kafka隊列信息由kafka自己記錄,當spark消費了數據但是處理中出錯時會導致數據丟失。為了避免數據丟失就必須開啟Write Ahead Logs,把spark接收到的數據都存儲到分布式文件系統中,比如HDFS,然後失敗時從存儲的記錄中找到失敗的消息。這導致同一批數據被kafka和spark存儲了2次。造成數據冗余。
2.如果有多個地方都想獲取同一個kafka隊列的數據,必須建立多個流,無法用一個流並行處理。
該方法是比較老的一種方式,並不太被推薦。
二、直連形式
優點:
1. 不需兩次存儲數據,直連形式時,spark自己管理偏移信息,不再使用kafka的offset信息。所以spark可以自行處理失敗情況,不要再次存儲數據。spark保證數據傳輸時Exactly-once。
2.只需建立一個流就可以並行的在多個地方使用流中的數據
缺點:
不支持kafka的group,不支持通過kafka api查詢offset信息!!!!
在連接後spark會根據fromOffsets參數設置起始offset,默認是從最新的數據開始的。也就是說,必須自己記錄spark消耗的offset位置
我選用的是直連形式,我處理offset的方法是將spark消費的offset信息實時記錄到文件中。在啟動腳本時通過記錄的文件來找到起始位置。
#!/usr/bin/python # coding=utf-8 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition import time import os import json broker_list = "xxxx" topic_name = "xxxx" timer = 5 offsetRanges = [] def store_offset_ranges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def save_offset_ranges(rdd): root_path = os.path.dirname(os.path.realpath(__file__)) record_path = os.path.join(root_path, "offset.txt") data = dict() f = open(record_path, "w") for o in offsetRanges: data = {"topic": o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "untilOffset": o.untilOffset} f.write(json.dumps(data)) f.close() def deal_data(rdd): data = rdd.collect() for d in data: # do something pass def save_by_spark_streaming(): root_path = os.path.dirname(os.path.realpath(__file__)) record_path = os.path.join(root_path, "offset.txt") from_offsets = {} # 獲取已有的offset,沒有記錄文件時則用默認值即最大值 if os.path.exists(record_path): f = open(record_path, "r") offset_data = json.loads(f.read()) f.close() if offset_data["topic"] != topic_name: raise Exception("the topic name in offset.txt is incorrect") topic_partion = TopicAndPartition(offset_data["topic"], offset_data["partition"]) from_offsets = {topic_partion: long(offset_data["untilOffset"])} # 註意設置起始offset時的方法 print "start from offsets: %s" % from_offsets sc = SparkContext(appName="Realtime-Analytics-Engine") ssc = StreamingContext(sc, int(timer)) kvs = KafkaUtils.createDirectStream(ssc=ssc, topics=[topic_name], fromOffsets=from_offsets, kafkaParams={"metadata.broker.list": broker_list}) kvs.foreachRDD(lambda rec: deal_data(rec)) kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges) ssc.start() ssc.awaitTermination() ssc.stop() if __name__ == ‘__main__‘: save_by_spark_streaming()
運行:
正常情況下,只要輸入下面的語句就可以運行了
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 spark_kafka.py
然而,我的總是報錯,找不到依賴包,說各種庫不認識。所以我只好用--jars來手動指定包的位置了..................
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 --jars /root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar,/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar spark_kafka.py
吐槽:
我就踩在直連形式不支持offset的坑上了..... 開始官方文檔沒仔細看,就瞄了一眼說是直連形式好,就豪不猶豫的用了。結果我的腳本不穩定,各種斷,然後中間數據就各種丟啊.......
還有官網上居然完全沒有對fromOffsets這個參數的說明,我找了好久好久才弄清楚這個參數怎麽拼出來啊.................
【python】spark+kafka使用