1. 程式人生 > >spark streaming 同時處理兩個不同kafka叢集的資料

spark streaming 同時處理兩個不同kafka叢集的資料

如題,總是不那麼完美,要處理的資料在兩個不同的kafka叢集裡面,日子得過,問題也得解決,我們建立兩個DStream,連線兩個不同的kafka叢集的不同topic,然後再把這兩個DStream union在一起處理,程式碼如下:

  1. package com.kingnet

  2. import java.util

  3. import org.apache.spark.SparkConf

  4. import org.apache.spark.streaming.kafka.KafkaUtils

  5. import org.apache.spark.streaming.{Seconds, StreamingContext}

  6. import org.joda.time.DateTime

  7. import org.joda.time.format.DateTimeFormat

  8. import scala.collection.JavaConversions._

  9. /** *

  10. *

  11. */

  12. object IOSChannelNewActiveDids {

  13. def createContext(params: KafkaStreamingParams) = {

  14. // {"batchTime":5,"sources":[{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test1","numThreads":"1"},{"zookeeper":"name85:2181,name86:2181,name87:2181","group":"group1","topics":"test2","numThreads":"1"}]}

  15. val sparkConf = new SparkConf().setAppName("IOSChannelNewActiveDids")

  16. val ssc = new StreamingContext(sparkConf, Seconds(params.getBatchTime.toInt))

  17. // ssc.checkpoint(checkpointDirectory)

  18. val rawdata = params.getSources.map(p => {

  19. val topicMap = p.getTopics.split(",").map((_, p.getNumThreads.toInt)).toMap

  20. KafkaUtils.createStream(ssc, p.getZookeeper, p.getGroup, topicMap).map(_._2)

  21. }).toSeq

  22. //把多個DStream union在一起處理。

  23. val union_rawdata = ssc.union(rawdata)

  24. union_rawdata.print()

  25. ssc

  26. }

  27. def main(args: Array[String]) {

  28. if (args.length < 1) {

  29. System.err.println("Usage: com.kingnet.IOSChannelNewActiveDids {\"batchTime\":5,\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":1},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":1}]}")

  30. System.exit(1)

  31. }

  32. val params = GsonObject.getInstance().fromJson(args(0), classOf[KafkaStreamingParams])

  33. params.getSources.foreach(p => {

  34. println(p.getTopics)

  35. })

  36. val ssc = createContext(params)

  37. ssc.start()

  38. ssc.awaitTermination()

  39. }

  40. }

我們向args裡面傳遞了一個json字串作為引數,json字串中配置了一個sources列表,裡面指定了兩個連線資訊(我這裡是測試,所以兩個配置的zookerlist是相同的),然後我把這個json解析成了一個java物件:

  1. package com.kingnet;

  2. import java.util.List;

  3. /**

  4. * Created by xiaoj on 2016/7/13.

  5. */

  6. public class KafkaStreamingParams {

  7. private String batchTime;

  8. private List<KafkaParams> sources;

  9. public String getBatchTime() {

  10. return batchTime;

  11. }

  12. public void setBatchTime(String batchTime) {

  13. this.batchTime = batchTime;

  14. }

  15. public List<KafkaParams> getSources() {

  16. return sources;

  17. }

  18. public void setSources(List<KafkaParams> sources) {

  19. this.sources = sources;

  20. }

  21. @Override

  22. public String toString() {

  23. return "KafkaStreamingParams{" +

  24. "batchTime='" + batchTime + '\'' +

  25. ", sources=" + sources +

  26. '}';

  27. }

  28. class KafkaParams{

  29. private String zookeeper;

  30. private String group;

  31. private String topics;

  32. private String numThreads;

  33. public String getZookeeper() {

  34. return zookeeper;

  35. }

  36. public void setZookeeper(String zookeeper) {

  37. this.zookeeper = zookeeper;

  38. }

  39. public String getGroup() {

  40. return group;

  41. }

  42. public void setGroup(String group) {

  43. this.group = group;

  44. }

  45. public String getTopics() {

  46. return topics;

  47. }

  48. public void setTopics(String topics) {

  49. this.topics = topics;

  50. }

  51. public String getNumThreads() {

  52. return numThreads;

  53. }

  54. public void setNumThreads(String numThreads) {

  55. this.numThreads = numThreads;

  56. }

  57. @Override

  58. public String toString() {

  59. return "KafkaParams{" +

  60. "zookeeper='" + zookeeper + '\'' +

  61. ", group='" + group + '\'' +

  62. ", topics='" + topics + '\'' +

  63. ", numThreads='" + numThreads + '\'' +

  64. '}';

  65. }

  66. }

  67. }


好吧,我經常這麼幹,在scala專案中建立java類,得益於強大的IDEA開發工具。

  1. package com.kingnet

  2. import java.util

  3. import com.google.gson.{Gson, GsonBuilder}

  4. /**

  5. * Created by xiaoj on 2016/5/5.

  6. */

  7. object GsonObject {

  8. @volatile private var instance: Gson = null

  9. def getInstance(): Gson = {

  10. if (instance == null) {

  11. synchronized {

  12. if (instance == null) {

  13. instance = new GsonBuilder().create()

  14. }

  15. }

  16. }

  17. instance

  18. }

  19. def fromJson(s: String): Option[util.HashMap[String, Any]] = {

  20. try {

  21. Some(getInstance().fromJson(s,classOf[util.HashMap[String, Any]]))

  22. } catch {

  23. case e: Exception =>

  24. e.printStackTrace()

  25. None

  26. }

  27. }

  28. def toJson(src:Any) = {

  29. getInstance().toJson(src)

  30. }

  31. }


執行程式,傳遞一個json引數:{\"batchTime\":\"10\",\"sources\":[{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test1\",\"numThreads\":"1"},{\"zookeeper\":\"name85:2181,name86:2181,name87:2181\",\"group\":\"group1\",\"topics\":\"test2\",\"numThreads\":"1"}]}

開啟兩個kafka 的console producer分別往test1和test2兩個topic裡面寫資料,然後在streaming程式控制臺就會打印出接收到的訊息了。