1. 程式人生 > >第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 
 
/* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580


1、作業內容:SparkStreaming基於Kafka Direct方式實現,把Kafka Direct理解成為像hdfs的資料來源,SparkStreaming直接讀取資料進行流處理。

2、之前的spark叢集環境:
   spark 1.6.0
   kafka_2.10-0.9.0.1
  
3、java開發SparkStreamingDirected,讀取topic SparkStreamingDirected中的資料。
 
4、kafka中建立topic SparkStreamingDirected161,生產者輸入資料。

5、將SparkStreamingDirected 在ecliplse中export打成jar包,提交spark執行,準備從kafka中讀取資料。

6、結果spark submit執行中報java.lang.ClassNotFoundException,踩坑記錄 :

   -com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected,要更新加上類名SparkStreamingOnKafkaDirected
   -kafka/serializer/StringDecoder:submit時指定--jars  /usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar
   -org.apache.spark.streaming.kafka.KafkaUtils:submit時指定--jars spark-streaming_2.10-1.6.0.jar
   -com/yammer/metrics/Metrics: submit時指定--jars  metrics-core-2.2.0.jar


7、將spark 1.6.0 及kafka_2.10-0.9.0.1 相關的jar指定以後,spark-submit提交仍然報錯,新的報錯提示:Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
。上stackoverflow.com及spark官網查詢,這個是因為版本不相容引起。官網提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1


7、因此,開始spark叢集的版本升級:

   spark 1.6.0升級到 spark1.6.1
   kafka_2.10-0.9.0.1調整為 kafka_2.10-0.8.2.1
   更新ecliplse的pom檔案,原始碼的依賴包更新為spark 1.6.1版本


8、spark1.6.1升級以後,從kafka中刪除之前的topic SparkStreamingDirected,因為有些資料沒有清徹底,為了一個乾淨的環境,重啟以後,從kafka新建topic parkStreamingDirected161來進行實驗。

9、kafka 中新建topic parkStreamingDirected161,生產者輸入資料。

10、spark submit 提交指令碼執行,對生產者輸入資料進行流處理,spark1.6.1+kafka_2.10-0.8.2.1這次成功執行出結果。

具體的過程如下:



1.啟動hdfs
2啟動spark
 
3啟動zookeeper


[email protected]:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
[email protected]:~# 








[email protected]:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
[email protected]:~# 


[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkSever.sh status 
zkSever.sh: command not found
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# 


4.啟動kafka
 nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.9.0.1/config/server.properties &




[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh 


 /usr/local/kafka_2.10-0.9.0.1/config/server.properties &
[1] 3736
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# jps
3792 Jps
3073 Master
2691 NameNode
3736 Kafka
2906 SecondaryNameNode
3180 HistoryServer
3439 QuorumPeerMain
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# 








[email protected]:~# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.9.0.1/config/server.properties &
[1] 2828
[email protected]:~# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:~# jps
2884 Jps
2324 DataNode
2763 QuorumPeerMain
2508 Worker
2828 Kafka
[email protected]:~# 










[email protected]:~# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh  /usr/local/kafka_2.10-


0.9.0.1/config/server.properties &
[1] 2795
[email protected]:~# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:~# jps
2535 QuorumPeerMain
2394 Worker
2795 Kafka
2847 Jps
2255 DataNode
[email protected]:~# 








5 上傳開發好的jar包










[email protected]:/usr/local/setup_tools# ls
apache-hive-1.2.1-bin.tar.gz  mysql-connector-java-5.1.13-bin.jar  spark-1.6.0-bin-hadoop2.6.tgz
apache-hive-1.2.1-src.tar.gz  mysql-connector-java-5.1.36.zip      spark-streaming-flume-sink_2.10-1.6.1.jar
commons-lang3-3.3.2.jar       scala-2.10.4.tgz                     SparkStreamingOnKafkaDirected.jar
hadoop-2.6.0.tar.gz           scala-library-2.10.4.jar             zookeeper-3.4.6.tar.gz
jdk-8u60-linux-x64.tar.gz     slf4j-1.7.21
kafka_2.10-0.9.0.1.tgz        slf4j-1.7.21.zip
[email protected]:/usr/local/setup_tools# mv SparkStreamingOnKafkaDirected.jar /usr/local/IMF_testdata/
[email protected]:/usr/local/setup_tools# cd  /usr/local/IMF_testdata/
[email protected]:/usr/local/IMF_testdata# ls








6.編輯提交的submit指令碼


 IMFSparkStreamingOnKafkaDirectedSubmit.sh




[email protected]:/usr/local/setup_scripts# cat  IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.SparkApps.SparkStreaming --master 


spark://192.168.189.1:7077  /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts# 






7.kafka建立 topic






kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --


topic SparkStreamingDirected






[email protected]:/usr/local/setup_scripts# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --


replication-factor 1 --partitions 1 --topic SparkStreamingDirected
Created topic "SparkStreamingDirected".
[email protected]:/usr/local/setup_scripts# 








8.檢視建立的topic SparkStreamingDirected




kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181   --topic SparkStreamingDirected




[email protected]:/usr/local/setup_scripts# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181   


--topic SparkStreamingDirected
Topic:SparkStreamingDirected    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: SparkStreamingDirected   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
[email protected]:/usr/local/setup_scripts# 








9.執行spark submit






[email protected]:~# cd /usr/local/setup_scripts
[email protected]:/usr/local/setup_scripts# ls
addpartitions.sh  IMFkafka.sh                                partitions10w        sparkhistory_scp.sh  yarn_scp.sh
hadoop_scp.sh     IMFSparkStreamingOnKafkaDirectedSubmit.sh  partitions3w         spark_scp.sh         zookeeper.out
host_scp.sh       IMFsparksubmit.sh                          partitions3w-7w-10w  ssh_config.sh
IMFFlume.sh       IMFzookeeper.sh                            partitions5w-5w-10w  ssh_scp.sh
[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit.sh
java.lang.ClassNotFoundException: com.dt.spark.SparkApps.SparkStreaming
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[email protected]:/usr/local/setup_scripts# 






解決:com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected類的名字沒有寫,加上就OK了






[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077  


/usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts# 








報新的錯誤
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)






解決,人工指定kafka jars包,


/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars  


/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar








報新的錯誤




Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)




 加入spark的包還是抱錯
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars  


/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-


1.6.0-hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)






[email protected]:/usr/local/setup_tools# cp  spark-streaming_2.10-1.6.0.jar /usr/local/spark-1.6.0-bin-hadoop2.6/lib/
[email protected]:/usr/local/setup_tools# cp  spark-streaming-kafka_2.10-1.6.0.jar /usr/local/spark-1.6.0-bin-


hadoop2.6/lib/
[email protected]:/usr/local/setup_tools# 




[email protected]:/usr/local/setup_scripts# chmod u+x IMFSparkStreamingOnKafkaDirectedSubmit.sh
[email protected]:/usr/local/setup_scripts# cat  IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars 


/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-


streaming_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-


1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar 


/usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts# 




報新的錯誤


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Send
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Send
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)








[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars 


/usr/local/kafka_2.10-0.9.0.1/libs/kafka-clients-0.9.0.1.jar,/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-


0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-


hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-


hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#








報新的錯誤


Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics
        at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85)
        at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)
        at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:35)
        at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:47)
        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply


(FetchRequestAndResponseStats.scala:60)
        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply


(FetchRequestAndResponseStats.scala:60)
        at kafka.utils.Pool.getAndMaybePut(Pool.scala:59)
        at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats


(FetchRequestAndResponseStats.scala:64)
        at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:44)
        at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$


$withBrokers$1.apply(KafkaCluster.scala:345)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$


$withBrokers$1.apply(KafkaCluster.scala:342)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers


(KafkaCluster.scala:342)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)


10.
 
加入新的jar包 zkclient-0.7.jar  metrics-core-2.2.0.jar,


 [email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars 


/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.0.jar,/usr/local/kafka_2.10-


0.9.0.1/libs/kafka-clients-0.9.0.1.jar,/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-


1.6.0-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.0.jar,/usr/local/kafka_2.10-0.9.0.1/libs/metrics-core-


2.2.0.jar,/usr/local/kafka_2.10-0.9.0.1/libs/zkclient-0.7.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-


assembly-1.6.0-hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#






新的報錯
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to 


kafka.cluster.Broker
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply


$7.apply(KafkaCluster.scala:90)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply


(KafkaCluster.scala:90)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply


(KafkaCluster.scala:87)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)


原因kafka版本不相容
http://stackoverflow.com/questions/34145483/spark-streaming-kafka-stream
The problem was related the wrong spark-streaming-kafka version.


As described in the documentation


Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1






重新下載kafka kafka_2.10-0.8.2.1


https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz






10.啟動新版本kafka


nohup kafka-server-start.sh /usr/local/kafka_2.10-0.8.2.1/config/server.properties &
[email protected]:/usr/local# nohup kafka-server-start.sh /usr/local/kafka_2.10-0.8.2.1/config/server.properties &
[1] 3175
[email protected]:/usr/local# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:/usr/local# jps
3175 Kafka
2410 Worker
2474 QuorumPeerMain
3227 Jps
2283 DataNode
[email protected]:/usr/local# 






kafka建立 topic




[email protected]:/usr/local# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-


factor 1 --partitions 1 --topic SparkStreamingDirected
Error while executing topic command : Topic "SparkStreamingDirected" already exists.
[2016-04-30 13:23:42,688] ERROR kafka.common.TopicExistsException: Topic "SparkStreamingDirected" already exists.
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
        at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
[email protected]:/usr/local# 










[email protected]:/usr/local# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181   --topic 


SparkStreamingDirected
Topic:SparkStreamingDirected    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: SparkStreamingDirected   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
[email protected]:/usr/local# 






[email protected]:/usr/local/spark-1.6.1-bin-hadoop2.6/sbin# kafka-topics.sh --delete --zookeeper 


master:2181,worker1:2181,worker2:2181  --topic SparkStreamingDirected
Topic SparkStreamingDirected is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[email protected]:/usr/local/spark-1.6.1-bin-hadoop2.6/sbin# 






[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# ls
cleaner-offset-checkpoint  recovery-point-offset-checkpoint  replication-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm cleaner-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# ls
recovery-point-offset-checkpoint  replication-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm recovery-point-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm replication-offset-checkpoint




11.start kafka


[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 3929
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
3568 QuorumPeerMain
2932 NameNode
3929 Kafka
3306 Master
3147 SecondaryNameNode
3403 HistoryServer
3997 Jps
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# 








[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 2847
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
2771 QuorumPeerMain
2894 Jps
2494 DataNode
2847 Kafka
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# 








[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-


0.8.2.1/config/server.properties &
[1] 2744
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥


[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
2786 Jps
2564 Worker
2744 Kafka
2633 QuorumPeerMain
2447 DataNode
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# 






[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# ./kafka-topics.sh --create --zookeeper 


master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --topic SparkStreamingDirected
Error while executing topic command Topic "SparkStreamingDirected" already exists.
kafka.common.TopicExistsException: Topic "SparkStreamingDirected" already exists.
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
        at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)


[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# ./kafka-topics.sh --describe --zookeeper 


master:2181,worker1:2181,worker2:2181   --topic SparkStreamingDirected
Topic:SparkStreamingDirected    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: SparkStreamingDirected   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# 




生產者輸入資料


kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic SparkStreamingDirected






15. spark 1.6.0 升級到 spark 1.6.1版本,kafka調整到版本2.10-0.8.2.1以後,報錯終於有了新變化,


[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit.sh 
16/04/30 19:27:59 INFO spark.SparkContext: Running Spark version 1.6.1
16/04/30 19:28:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using 


builtin-java classes where applicable
16/04/30 19:28:00 INFO spark.SecurityManager: Changing view acls to: root
16/04/30 19:28:00 INFO spark.SecurityManager: Changing modify acls to: root
16/04/30 19:28:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with 


view permissions: Set(root); users with modify permissions: Set(root)
16/04/30 19:28:01 INFO util.Utils: Successfully started service 'sparkDriver' on port 37293.
16/04/30 19:28:02 INFO slf4j.Slf4jLogger: Slf4jLogger started


16/04/30 19:31:52 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Error getting partition 


metadata for 'SparkStreamingDirected'. Does the topic exist?
org.apache.spark.SparkException: Error getting partition metadata for 'SparkStreamingDirected'. Does the topic 


exist?
org.apache.spark.SparkException: Error getting partition metadata for 'SparkStreamingDirected'. Does the topic 


exist?
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main


(SparkStreamingOnKafkaDirected.java:68)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/04/30 19:31:52 INFO spark.SparkContext: Invoking stop() from shutdown hoo






16.重新更新pom檔案




 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <groupId>com.dt.spark</groupId>
  <artifactId>IMFSparkAppsSQL</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>


  <name>IMFSparkAppsSQL</name>
  <url>http://maven.apache.org</url>


    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>


  <dependencies>
 
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    
    

    <dependency>  
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.10</artifactId>
 <version>1.6.1</version>
</dependency>

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.10</artifactId>
 <version>1.6.1</version>
 </dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-hive_2.10</artifactId>
 <version>1.6.1</version>
</dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming_2.10</artifactId>
 <version>1.6.1</version>
</dependency>

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming-kafka_2.10</artifactId>
 <version>1.6.1</version>
</dependency>

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming-flume_2.10</artifactId>
 <version>1.6.1</version>
</dependency>

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.35</version>
</dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.10</artifactId>
    <version>1.6.1</version>
      
</dependency>
 
    
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.10</artifactId>
    <version>1.6.1</version>
</dependency>
    
    <dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-jdbc</artifactId>
 <version>1.2.1</version>
</dependency>
<dependency>
 <groupId>org.apache.httpcomponents</groupId>
 <artifactId>httpclient</artifactId>
 <version>4.4.1</version>
</dependency>
<dependency>
 <groupId>org.apache.httpcomponents</groupId>
 <artifactId>httpcore</artifactId>
 <version>4.4.1</version>
</dependency>
  </dependencies>
  
   <build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/main/test</testSourceDirectory>


    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>


      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>java</executable>
          <includeProjectDependencies>true</includeProjectDependencies>
          <includePluginDependencies>false</includePluginDependencies>
          <classpathScope>compile</classpathScope>
          <mainClass>com.dt.spark.App</mainClass>
        </configuration>
      </plugin>


      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>


    </plugins>
  </build>
</project>








17.換個topic名字SparkStreamingDirected161


kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --


topic SparkStreamingDirected161


[email protected]:/usr/local/setup_scripts# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --


replication-factor 1 --partitions 1 --topic SparkStreamingDirected161
Created topic "SparkStreamingDirected161".


[email protected]:/usr/local/setup_scripts# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181   


--topic SparkStreamingDirected161




Topic:SparkStreamingDirected161 PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: SparkStreamingDirected161        Partition: 0    Leader: 2       Replicas: 2     Isr: 2


[email protected]:/usr/local/setup_scripts# 


kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic SparkStreamingDirected161


[email protected]:/usr/local/setup_scripts# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 


--topic SparkStreamingDirected161
[2016-04-30 20:43:11,417] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hadoop
spark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkspark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkspark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop








[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit161.sh 
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit  --class 


com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars 


/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.1.jar,/usr/local/kafka_2.10-


0.8.2.1/libs/kafka-clients-0.8.2.1.jar,/usr/local/kafka_2.10-0.8.2.1/libs/kafka_2.10-0.8.2.1.jar,/usr/local/spark-


1.6.1-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.1.jar,/usr/local/kafka_2.10-0.8.2.1/libs/metrics-core-


2.2.0.jar,/usr/local/kafka_2.10-0.8.2.1/libs/zkclient-0.3.jar,/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-


assembly-1.6.1-hadoop2.6.0.jar  /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected161.jar
[email protected]:/usr/local/setup_scripts# 




[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit161.sh 


原始碼:


JavaStreamingContext jsc=new JavaStreamingContext(conf, Durations.seconds(15));
Map<String,String> kafaParameters = new HashMap<String,String>();
kafaParameters.put("metadata.broker.list", 
"master:9092,worker1:9092,worker2:9092");
Set<String> topics =new HashSet<String>();
topics.add("SparkStreamingDirected161");

JavaPairInputDStream<String, String> lines =KafkaUtils.createDirectStream(jsc,
String.class,String.class,
StringDecoder.class, StringDecoder.class, 
kafaParameters,
topics);


JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() { //如果是Scala,由於SAM轉換,所以可以寫成val words = lines.flatMap { line => line.split(" ")}


public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});


   JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
     @Override
     public Tuple2<String, Integer> call(String s) {
       return new Tuple2<String, Integer>(s, 1);
     }
   });


   JavaPairDStream<String, Integer> wordcount= ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
     @Override
     public Integer call(Integer i1, Integer i2) {
       return i1 + i2;
     }
   });
    wordcount.print();
 //  wordcount.foreachRDD(foreachFunc);
   
   
   jsc.start();
   jsc.awaitTermination();
   jsc.close();


終結者:

將 spark 1.6.0 升級為spark 1.6.1

kafka 從 kafka_2.10-0.9.0.1 調整為 kafka_2.10-0.8.2.1

kafka_2.10-0.8.2.1 + spark-1.6.1 徹底搞定了!










王家林老師 :DT大資料夢工廠創始人和首席專家。

聯絡郵箱:[email protected] 電話:18610086859 QQ:1740415547 

微訊號:18610086859 微博:http://weibo.com/ilovepains/   
每天晚上20:00YY頻道現場授課頻道68917580




IMF Spark原始碼版本定製班學員  :
上海-段智華  QQ:1036179833 mail:[email protected] 微信 18918561505


相關推薦

91SparkStreaming基於Kafka Direct案例實戰內幕原始碼解密 java.lang.ClassNotFoundException 解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密    /* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS

91sparkStreaming基於kafkaDirect詳解

有興趣想學習國內整套Spark+Spark Streaming+Machine learning最頂級課程的,可加我qq  471186150。共享視訊,價效比超高! 1:Direct方式特點: 1)Direct的方式是會直接操作kafka底層的元資料資訊,這樣如果計算

69Spark SQL通過Hive資料來源實戰

內容:     1.Spark SQL操作Hive解析     2.SparkSQL操作Hive實戰 一、Spark SQL操作Hive解析     1.在目前企業級大資料Spark開發的時候,

《區塊鏈學堂》以太坊智慧合約實戰(附課程視訊)

既區塊鏈學堂第二課:以太坊架構和工具之後,時隔一週的4月2日區塊鏈學堂推出了第三課:以太坊智慧合約實戰。 本期主要介紹了以太坊智慧合約程式設計基礎及實戰。 從這一期開始我們區塊鏈學堂會在鬥魚進行直播,方便不能到現場聽課的朋友們。 以下一些現場照片,課程視訊和PPT在最後。

Django_重灌系統後無法使用 sqlite 資料庫報錯com.intellij.execution.ExecutionException: Exception in thread "main" java.lang.ClassNotFoundException: org.sq

   重灌系統後無法使用 sqlite 資料庫報錯   報錯 : com.intellij.execution.ExecutionException: Exception in thread "main" java.lang.ClassNotFoundE

通過配置文件獲取對象(Spring框架中的IOCDI的底層就是基於這樣的機制)

ted const dex generate stat clas name 必須 nbsp 首先在D盤創建一個文件hero.txt,內容為:com.hero.Hero(此處必須是Hero的完整路徑) 接下來是Hero類 package com.hero; publi

SparkStreaming(13)高階資料來源kafka Direct方式(生產)

【Direct方式,直接從kafka的broker讀取資料,而Receiver方式,從zk獲得偏移量資訊,效能要差一些!】 1.測試環境 (1)啟動zk bin/zkServer.sh start (2) 啟動kafka bin/kafka-server-start.sh -da

兩種監聽器方法(基於AndroidStudio3.2)

一、佈局 在主程式介面上新增新按鈕,如以前一樣。 建立一個 Activity,名字為MeventActivity 新增一個三個Button控制元件。 新增呼叫該視窗程式給按鈕“多事件” Button btn2 = (Button) findViewByI

多活動的工作機制(基於AndroidStudio3.2)

一、元件啟用 Android平臺在鬆散耦合方面很有用。 一個應用程式只不過是一個manifest檔案和每個元件儲存在一起的元件集合,元件通過向其傳送訊息來啟用。 如果你想顯示(啟用)一個活動,你需要建立一條訊息,將其傳送到執行時,讓執行時為您啟用它。您無法直接處

十三繼續Asynctask-呼叫遠端圖片(基於AndroidStudio3.2)

一、測試一 下面通過程式碼演示一個典型的非同步處理的例項--載入網路圖片.網路操作作為一個不穩定的耗時操作,從4.0開始就被嚴禁放入主執行緒中.所以在顯示一張網路圖片時,我們需要在非同步處理中下載圖片,並在UI執行緒中設定圖片 1、 新建專案Async2 - 主視窗

25基於MAT分析Shallow HeapRetained Heap

內容: Shallow Heap和Retained Heap 一、Shallow Heap和Retained Heap     Shallow Heap:只包含自己堆內的元素物件,不包含引用的物件

79Spark SQL基於網站Log的綜合案例綜合程式碼實際執行測試

內容:     1.熱門論壇板塊排名     2.綜合程式碼實戰和測試 一、熱門論壇板塊排版 建立表:     spark.sql("createtable userlogs(date st

75Spark SQL基於網站Log的綜合案例實戰

內容:     1.案例概述     2.資料來源和分析 一、案例概述     PV:頁面訪問數     UV:獨立訪問數 二、資料來源和分析 packag

87Flume推送資料到SparkStreaming案例實戰內幕原始碼解密--flume安裝篇

1、  下載flume 老師提供的包 2、  安裝 vi/etc/profile exportFLUME_HOME=/usr/local/apache-flume-1.6.0-bin exportPATH=.:$PATH:$JAVA_HOME/bin:$HADOOP

【Linux探索之旅】第一部分測試並安裝Ubuntu

u盤 nco 過程 sans ubunt windows u盤啟動盤 系統 .com 內容簡單介紹 1、第一部分第三課:測試並安裝Ubuntu 2、第一部分第四課預告:磁盤分區 測試並安裝Ubuntu 大家好,經過前兩個比較偏理論(是否

【Linux探索之旅】四部分文件傳輸,瀟灑同步

命令行 上傳文件 文件夾 images lsh wget命令 ace 目標 wechat 內容簡單介紹 1、第四部分第三課:文件傳輸。瀟灑同步 2、第四部分第四課:分析網絡。隔離防火 文件傳輸。瀟灑同步 這一課的內容相

ServletServletContext HttpSession 以及HttpServletRequest之間的關系

ora cep 結束 context pan 介紹 row for 瀏覽器中 課程目標: ① 在Servlet中懂得ServletContext HttpSession 以及HttpServletRequest之間的關系 ② 懂得怎樣使用它們 概念介紹

微信小程序自學文件作用域,模塊化

數據 變量 span data ava 有效 函數 方法 oba 一、文件作用域   在 JavaScript 文件中聲明的變量和函數只在該文件中有效;不同的文件中可以聲明相同名字的變量和函數,不會互相影響。 通過全局函數 getApp() 可以獲取全局的應用實例,如果需要

單用戶及救援模式(一)

單用戶及救援模式.有時候當我們密碼忘記了,需要遠程修改密碼,可以進入單用戶模式來修改密碼,如下:1、首先重啟服務器,然後按a鍵,進入當用戶模式:選擇第一項,按e編輯:如下:2、然後定位到ro這裏,修改ro為rw init=/sysroot/bin/sh,完成之後按crtl+X重啟:3、修改後如下:4、此時可以

linux克隆及服務器之間登錄(二)

linux克隆及服務器之間登錄1、linux服務器下可以克隆虛擬機,這樣兩臺方便操作,克隆步驟如下:打開虛擬機---右擊-管理==克隆:到如下界面:克隆虛擬機的當前狀態:創建完整克隆即可,方便以後操作:3、顯示克隆進度:4、此時克隆完成:5、克隆後需要修改IP地址才可以登錄。2、linux服務器之間登錄:比如