1. 程式人生 > >spark與flume整合

spark與flume整合

kcon text org http clas appname spl storage ket

spark-streaming與flume整合 push

package cn.my.sparkStream

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._


/**
 
  */
object SparkFlumePush {
  def main(args: Array[String]) {
    if (args.length < 2
) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } LogLevel.setStreamingLogLevels() val Array(host, port) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("
FlumeEventCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "
Received " + cnt + " flume events.").print() //拿到消息中的event,從event中拿出body,body是真正的消息體 stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print ssc.start() ssc.awaitTermination() } }


package cn.my.sparkStream

import java.net.InetSocketAddress

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._


/**
*
*/
object SparkFlumePull {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
}
LogLevel.setStreamingLogLevels()
val Array(host, port) = args
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream

//val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
// val flumeStream = FlumeUtils.createPollingStream(ssc, host, port.toInt)
/*
def createPollingStream(
jssc: JavaStreamingContext,
addresses: Array[InetSocketAddress],
storageLevel: StorageLevel
):
*/
//當sink有多個的時候
val flumesinklist = Array[InetSocketAddress](new InetSocketAddress("mini1", 8888))
val flumeStream = FlumeUtils.createPollingStream(ssc, flumesinklist, StorageLevel.MEMORY_ONLY_2)

flumeStream.count().map(cnt => "Received " + cnt + " flume events.").print()
flumeStream.flatMap(t => {
new String(t.event.getBody.array()).split(" ")
}).map((_, 1)).reduceByKey(_ + _).print()


// Print out the count of events received from this server in each batch
//stream.count().map(cnt => "Received " + cnt + " flume events.").print()
//拿到消息中的event,從event中拿出body,body是真正的消息體
//stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print

ssc.start()
ssc.awaitTermination()
}
}
 

http://spark.apache.org/docs/1.6.3/streaming-flume-integration.html

spark與flume整合