1. 程式人生 > >mac下單機版 kafka + spark + python搭建與例項

mac下單機版 kafka + spark + python搭建與例項

kafka+zookeeper

不提供spark安裝,這裡從kafka安裝開始
首先下載kafka和zookeeper

brew install zookeeper

等它安裝完畢,先進入zookeeper資料夾,往往在/usr/local/Cellar下,啟動zookeeper:

cd /usr/local/Cellar/zookeeper/3.4.6_1/bin 
zkServer start

如果啟動kafka下的zookeeper我這裡會報錯,然後進入kafka,啟動kafka:

bin/kafka-server-start.sh config/kafka.properties

啟動後會一直在最後一個INFO 不會動了,其實這已經表明啟動成功了,開啟另一個終端,進入同樣的kafka路徑,新建一個topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

檢視新建的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以看到test,測試下kafka:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

直接輸入幾個字元如

hello world!

再新建一個終端,注意上面已經有兩個終端了,這裡開啟第三個,進入同樣的路徑下,:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

就可以看到剛才輸入的hello world了。

接下來介紹python的kafka+spark的使用。

python+kafka+spark

編寫python例項程式碼:

# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from
pyspark.streaming.kafka import KafkaUtils sconf=SparkConf() sconf.set('spark.cores.max' , 2) sc=SparkContext(appName="KafkaWordCount",conf=sconf) ssc=StreamingContext(sc,3) zookeeper="localhost:2181" topic={"test":1} groupid="test-consumer-group" lines = KafkaUtils.createStream(ssc, zookeeper,groupid,topic) lines1=lines.map(lambda x:x[1]) words=lines1.flatMap(lambda line:line.split(" ")) pairs=words.map(lambda word:(word,1)) wordcounts=pairs.reduceByKey(lambda x,y:x+y) #wordcounts.saveAsTextFiles("/kafka") wordcounts.pprint() ssc.start() ssc.awaitTermination()

提交python程式碼

spark-submit --jars /usr/local/Cellar/kafka/0.8.2.1/libexec/spark-streaming-kafka-assembly_2.11-2.2.0.jar new.py 2>

如果想在自己的編譯器裡編寫python並選擇python3,則在spark預設的python源需要改為python3,這樣在spark-submit時就會選擇python3,而不是預設的python2.
在環境變數裡增加下列程式碼即可:

export PYSPARK_PYTHON=python3

且還需要pip一些包,pyspark、kafka等。
下面是以搜狗資料為實驗的生產者程式碼
注意生產者程式碼可以直接在自己的編譯器執行即可,不必提交

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from kafka.producer import SimpleProducer
from kafka.client import KafkaClient
import json
import time
import sys

def main():
    ##測試生產模組
    producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
    fileName1 = '/Volumes/DATA/BigData/bigdata_homework/sogou.500w.utf8'
    fileName2 = '/Users/tcd-pc/Desktop/sogou.10k.utf8'

    with open(fileName1,encoding='utf-8') as file:
        a = 20111230000000
        for fileline in file:
            #print(int(fileline.split('\t')[0])//10000-a//10000)
            if int(fileline.split('\t')[0])//10000%2==0:
                a = int(fileline.split('\t')[0])
                time.sleep(10)
            try:
                producer.send('test1',fileline.encode('utf-8'))
                producer.flush()
            except kafkaError as e:
                print(e)
if __name__ == '__main__':
    main()

下面是消費者的程式碼框架,具體分析的程式碼就不放了:

# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pyspark
import jieba
from jieba import analyse
import sys
import os

os.system('hdfs dfs -rm -r /tmp1/checkpoint/')
zookeeper="localhost:2181"
kafkatopic="test1"
groupid='test-consumer-group'

#建立上下文
def createContext():
    sconf = SparkConf()
    sconf.set('spark.cores.max', 8)
    sc = SparkContext(appName="KafkaWordCount", conf=sconf)
    ssc2 = StreamingContext(sc, 60)
    ssc2.checkpoint("/tmp1/checkpoint/")
    return ssc2

def main_main(ssc):
    consumer = KafkaUtils.createStream(ssc, zookeeper, groupid, {kafkatopic: 1})
    lines1 = consumer.map(lambda x: x[1])
    words = lines1.map(lambda line: line.split("\t"))
    #分割為 時間  使用者ID  搜素內容  rank1  rank2  點選的網址
    seqs = words.map(lambda word: [word[0],  word[1],word[2],word[3], word[4],
                                   word[5].split("//")[1].split("/")[0] if len(word[5].split("//"))>1 else 'nokey',
                                   analyse.extract_tags(word[2] if len(word[2])>0 else 'nokey', topK=1, withWeight=False)
                                    ])

def main():
    ##測試消費模組
    ssc = StreamingContext.getOrCreate("/tmp1/checkpoint/",createContext)
    #呼叫測試
    main_main(ssc)
    ssc.start()
    ssc.awaitTermination()


if __name__ == '__main__':
    main()

用spark streaming流資料分析時,會遇到需要排序的情況,而spark並沒有這樣的方法直接可以用,如果需要可以參考本渣寫的:

webs_time = seqs.map(lambda item: (item[5], 1)) \
                .reduceByKey(lambda x, y: x + y) \
                .map(lambda x: (1, [x[0], x[1]])).groupByKey() \
                .mapValues(lambda x: sorted(x, reverse=True, key=lambda y: y[1])[:10])

用groupbykey讓一個key對應所有這個key的資料,然後用mapValues對key下的所有資料進行排序。其中groupbykey之前用了map方法把需要的key提出來,其他的資料用list表示。
實驗表明:這樣的操作實在時非常慢,但還是可以排序的。。。。。。。