spark streaming 同時處理兩個不同kafka叢集的資料
如題,總是不那麼完美,要處理的資料在兩個不同的kafka叢集裡面,日子得過,問題也得解決,我們建立兩個DStream,連線兩個不同的kafka叢集的不同topic,然後再把這兩個DStream union在一起處理,程式碼如下:
-
package com.kingnet
-
import java.util
-
import org.apache.spark.SparkConf
-
import org.apache.spark.streaming.kafka.KafkaUtils
-
import org.apache.spark.streaming.{Seconds, StreamingContext}
-
import org.joda.time.DateTime
-
import org.joda.time.format.DateTimeFormat
-
import scala.collection.JavaConversions._
-
/** *
-
*
-
*/
-
object IOSChannelNewActiveDids {
-
def createContext(params: KafkaStreamingParams) = {
-
// {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}
-
val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")
-
val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))
-
// ssc.checkpoint(checkpointDirectory)
-
val rawdata = params.getSources.map(p => {
-
val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap
-
KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)
-
}).toSeq
-
//把多個DStream union在一起處理。
-
val union_rawdata = ssc.union(rawdata)
-
union_rawdata.print()
-
ssc
-
}
-
def main(args: Array[String]) {
-
if (args.length < 1) {
-
System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")
-
System.exit(1)
-
}
-
val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])
-
params.getSources.foreach(p => {
-
println(p.getTopics)
-
})
-
val ssc = createContext(params)
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
我們向args裡面傳遞了一個json字串作為引數,json字串中配置了一個sources列表,裡面指定了兩個連線資訊(我這裡是測試,所以兩個配置的zookerlist是相同的),然後我把這個json解析成了一個java物件:
-
package com.kingnet;
-
import java.util.List;
-
/**
-
* Created by xiaoj on 2016/7/13.
-
*/
-
public class KafkaStreamingParams {
-
private String batchTime;
-
private List<KafkaParams> sources;
-
public String getBatchTime() {
-
return batchTime;
-
}
-
public void setBatchTime(String batchTime) {
-
this.batchTime = batchTime;
-
}
-
public List<KafkaParams> getSources() {
-
return sources;
-
}
-
public void setSources(List<KafkaParams> sources) {
-
this.sources = sources;
-
}
-
@Override
-
public String toString() {
-
return "KafkaStreamingParams{" +
-
"batchTime='" + batchTime + '\'' +
-
", sources=" + sources +
-
'}';
-
}
-
class KafkaParams{
-
private String zookeeper;
-
private String group;
-
private String topics;
-
private String numThreads;
-
public String getZookeeper() {
-
return zookeeper;
-
}
-
public void setZookeeper(String zookeeper) {
-
this.zookeeper = zookeeper;
-
}
-
public String getGroup() {
-
return group;
-
}
-
public void setGroup(String group) {
-
this.group = group;
-
}
-
public String getTopics() {
-
return topics;
-
}
-
public void setTopics(String topics) {
-
this.topics = topics;
-
}
-
public String getNumThreads() {
-
return numThreads;
-
}
-
public void setNumThreads(String numThreads) {
-
this.numThreads = numThreads;
-
}
-
@Override
-
public String toString() {
-
return "KafkaParams{" +
-
"zookeeper='" + zookeeper + '\'' +
-
", group='" + group + '\'' +
-
", topics='" + topics + '\'' +
-
", numThreads='" + numThreads + '\'' +
-
'}';
-
}
-
}
-
}
好吧,我經常這麼幹,在scala專案中建立java類,得益於強大的IDEA開發工具。
-
package com.kingnet
-
import java.util
-
import com.google.gson.{Gson, GsonBuilder}
-
/**
-
* Created by xiaoj on 2016/5/5.
-
*/
-
object GsonObject {
-
@volatile private var instance: Gson = null
-
def getInstance(): Gson = {
-
if (instance == null) {
-
synchronized {
-
if (instance == null) {
-
instance = new GsonBuilder().create()
-
}
-
}
-
}
-
instance
-
}
-
def fromJson(s: String): Option[util.HashMap[String, Any]] = {
-
try {
-
Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))
-
} catch {
-
case e: Exception =>
-
e.printStackTrace()
-
None
-
}
-
}
-
def toJson(src:Any) = {
-
getInstance().toJson(src)
-
}
-
}
執行程式,傳遞一個json引數:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}
開啟兩個kafka 的console producer分別往test1和test2兩個topic裡面寫資料,然後在streaming程式控制臺就會打印出接收到的訊息了。