1. 程式人生 > >Spark Streaming-Kafka例項(Python與Java版本)

Spark Streaming-Kafka例項(Python與Java版本)

本文實現kafka與Spark Streaming之間的通訊,其中Kafka端producer實現使用Java,Spark Streaming端Consumer使用Python實現。

首先安裝kafka與spark streaming環境,kafka測試連通測試參考上文,本文的實驗環境都為本地單機版本。

Kafka

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public
class producer { private final static String TOPIC = "data-message"; private final static String BOOTSTRAP_SERVER = "127.0.0.1:9092"; public static Producer<String,String> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return
new KafkaProducer<>(props); } // 實現自定義partition public static int partition(long time){ if(time%2 == 0) return 0; else return 1; } public static void runProducer() throws Exception{ final Producer<String,String> producer = createProducer(); long
time = System.currentTimeMillis(); long curTime = time; try{ while(true){ curTime = System.currentTimeMillis(); if(curTime-time == 10000){ final ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC, partition(curTime) ,"JP_"+curTime,"AUX|989|bid|276|"+curTime); RecordMetadata metadata = producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime); curTime = time = System.currentTimeMillis(); } } } finally { producer.flush(); producer.close(); } } public static void main(String[] args) throws Exception{ runProducer(); } }

Spark Streaming實現了Spark Steaming兩者通訊方式,createStream和createDirectStream

import os

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import configparser

def startReceiver(config,topics,ssc):
    #connect kafka
    kafkaStreams = [KafkaUtils.createStream(ssc,config.get('oppo','zookeeper'),
                                       config.get('oppo','consumer'),topics) for _ in range(int(config.get('oppo','numStreams')))]
    uniStream = ssc.union(*kafkaStreams)

    stream = uniStream.map(lambda x: x[0])

    stream.pprint()

    ssc.start()
    ssc.awaitTermination()

def startDirect(config,topic,ssc):
    brokerList = config.get('oppo','brokerList')
    #connect kafka
    kafkaStreams = KafkaUtils.createDirectStream(ssc,[config.get('oppo','topic')],
                                                  {"metadata.broker.list":brokerList})
    stream = kafkaStreams.map(lambda x: x[1])
    stream.pprint()

    ssc.start()
    ssc.awaitTermination()

if __name__ == '__main__':
    config = configparser.SafeConfigParser()
    config.read("properties.conf")

    sc = SparkContext(appName=config.get('oppo', 'appName'))
    sc.setLogLevel(config.get('oppo', 'logLevel'))

    # create Streaming Context
    # deal with internal 10 seconds
    ssc = StreamingContext(sc, 10)

    topic = config.get('oppo', 'topic')
    topics = {topic: 0, topic: 1}

    #startReceiver(config,topics,ssc)
    startDirect(config,topic,ssc)

properties.conf配置檔案

[oppo]
appName = SparkStreamingKafka
logLevel = WARN
topic = data-message
partitions = 2
zookeeper=127.0.0.1:2181
numStreams = 2
consumer = spark-streaming
brokerList=127.0.0.1:9092