1. 程式人生 > >kafka+spark streaming程式碼例項(pyspark+python)

kafka+spark streaming程式碼例項(pyspark+python)

一、系統準備

1.啟動zookeeper:bin/zkServer.cmd start

2.啟動kafka:bin/kafka-server-start.sh -daemon config/server.properties

3.啟動spark:sbin/start-all.sh

資料來源:http://files.grouplens.org/datasets/movielens/ml-100k.zip

 流程:kafka讀取user資料集並生產資料流——spark streaming 計算每個職業人數——計算結果存入MySQL

二、kafka讀取user資料集並生產資料流,1秒生產1條記錄。

先建立topic:

bin/kafka-topics.sh

 --create --zookeeper 192.168.26.247:2181 --replication-factor2 --partitions 1 --topic txt

驗證topic:bin/kafka-topics.sh --list --zookeeper 192.168.26.247:2181

 bin/kafka-topics.sh --describe --zookeeper192.168.26.247:2181 --topic txt


from kafka import KafkaProducer  
from kafka import KafkaConsumer  
from kafka.errors import KafkaError  
import time  
def main():  
    ##生產模組  
    producer = KafkaProducer(bootstrap_servers=['192.168.26.247:9092'])  
    with open('/home/hadoop/ml-100k/u.user','r') as f:  
        for line in f.readlines():  
            time.sleep(1)  
            producer.send("txt",line)  
            print line  
           #producer.flush()  
  
if __name__ == '__main__':  
    main()  
儲存txt.py執行結果如下:

spark streaming 消費並計算資料,並將結果存入資料庫。

from pyspark import SparkContext  
from pyspark import SparkConf  
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition  
import MySQLdb  
def start():  
    sconf=SparkConf()  
    sconf.set('spark.cores.max',3)  
    sc=SparkContext(appName='txt',conf=sconf)  
    ssc=StreamingContext(sc,5)  
    brokers ="192.168.26.247:9092,192.168.26.246:9092"  
    topic='txt'  
    start = 70000  
    partition=0  
    user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers})  
    #fromOffsets 設定從起始偏移量消費  
    #user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers},fromOffsets={TopicAndPartition(topic,partition):long(start)})  
    user_fields = user_data.map(lambda line: line[1].split('|'))  
    gender_users = user_fields.map(lambda fields: fields[3]).map(lambda gender: (gender,1)).reduceByKey(lambda a,b: a+b)  
    user_data.foreachRDD(offset)#儲存offset資訊  
    gender_users.pprint()  
    gender_users.foreachRDD(lambda rdd: rdd.foreach(echo))#返回元組    
    ssc.start()  
    ssc.awaitTermination()  
offsetRanges = []  
def offset(rdd):  
    global offsetRanges  
    offsetRanges = rdd.offsetRanges()  
def echo(rdd):  
    zhiye = rdd[0]  
    num = rdd[1]  
    for o in offsetRanges:  
        topic = o.topic   
        partition = o.partition  
        fromoffset = o.fromOffset  
        untiloffset = o.untilOffset  
    #結果插入MySQL  
    conn = MySQLdb.connect(user="root",passwd="******",host="192.168.26.245",db="test",charset="utf8")  
    cursor = conn.cursor()  
    sql = "insert into zhiye(id,zhiye,num,topic,partitions,fromoffset,untiloffset) \  
                       values (NULL,'%s','%d','%s','%d','%d','%d')" % (zhiye,num,topic,partition,fromoffset,untiloffset)  
    cursor.execute(sql)  
    conn.commit()  
    conn.close()   
   
if __name__ == '__main__':  
    start()  
三、向叢集submit

bin/spark-submit --master spark://192.168.26.245:7077 --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar python/txt.py

執行結果


資料庫部分資料:


WEB顯示資料: