第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密
/* * *王家林老師授課http://weibo.com/ilovepains */ 每天晚上20:00YY頻道現場授課頻道68917580
1、作業內容:SparkStreaming基於Kafka Direct方式實現,把Kafka Direct理解成為像hdfs的資料來源,SparkStreaming直接讀取資料進行流處理。
2、之前的spark叢集環境:
spark 1.6.0
kafka_2.10-0.9.0.1
3、java開發SparkStreamingDirected,讀取topic SparkStreamingDirected中的資料。
4、kafka中建立topic SparkStreamingDirected161,生產者輸入資料。
5、將SparkStreamingDirected 在ecliplse中export打成jar包,提交spark執行,準備從kafka中讀取資料。
6、結果spark submit執行中報java.lang.ClassNotFoundException,踩坑記錄 :
-com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected,要更新加上類名SparkStreamingOnKafkaDirected
-kafka/serializer/StringDecoder:submit時指定--jars /usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar
-org.apache.spark.streaming.kafka.KafkaUtils:submit時指定--jars spark-streaming_2.10-1.6.0.jar
-com/yammer/metrics/Metrics: submit時指定--jars metrics-core-2.2.0.jar
7、將spark 1.6.0 及kafka_2.10-0.9.0.1 相關的jar指定以後,spark-submit提交仍然報錯,新的報錯提示:Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
。上stackoverflow.com及spark官網查詢,這個是因為版本不相容引起。官網提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1
7、因此,開始spark叢集的版本升級:
spark 1.6.0升級到 spark1.6.1
kafka_2.10-0.9.0.1調整為 kafka_2.10-0.8.2.1
更新ecliplse的pom檔案,原始碼的依賴包更新為spark 1.6.1版本
8、spark1.6.1升級以後,從kafka中刪除之前的topic SparkStreamingDirected,因為有些資料沒有清徹底,為了一個乾淨的環境,重啟以後,從kafka新建topic parkStreamingDirected161來進行實驗。
9、kafka 中新建topic parkStreamingDirected161,生產者輸入資料。
10、spark submit 提交指令碼執行,對生產者輸入資料進行流處理,spark1.6.1+kafka_2.10-0.8.2.1這次成功執行出結果。
具體的過程如下:
1.啟動hdfs
2啟動spark
3啟動zookeeper
[email protected]:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
[email protected]:~#
[email protected]:~# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:~# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
[email protected]:~#
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkSever.sh status
zkSever.sh: command not found
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin#
4.啟動kafka
nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.9.0.1/config/server.properties &
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh
/usr/local/kafka_2.10-0.9.0.1/config/server.properties &
[1] 3736
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin# jps
3792 Jps
3073 Master
2691 NameNode
3736 Kafka
2906 SecondaryNameNode
3180 HistoryServer
3439 QuorumPeerMain
[email protected]:/usr/local/spark-1.6.0-bin-hadoop2.6/sbin#
[email protected]:~# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.9.0.1/config/server.properties &
[1] 2828
[email protected]:~# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:~# jps
2884 Jps
2324 DataNode
2763 QuorumPeerMain
2508 Worker
2828 Kafka
[email protected]:~#
[email protected]:~# nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-
0.9.0.1/config/server.properties &
[1] 2795
[email protected]:~# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:~# jps
2535 QuorumPeerMain
2394 Worker
2795 Kafka
2847 Jps
2255 DataNode
[email protected]:~#
5 上傳開發好的jar包
[email protected]:/usr/local/setup_tools# ls
apache-hive-1.2.1-bin.tar.gz mysql-connector-java-5.1.13-bin.jar spark-1.6.0-bin-hadoop2.6.tgz
apache-hive-1.2.1-src.tar.gz mysql-connector-java-5.1.36.zip spark-streaming-flume-sink_2.10-1.6.1.jar
commons-lang3-3.3.2.jar scala-2.10.4.tgz SparkStreamingOnKafkaDirected.jar
hadoop-2.6.0.tar.gz scala-library-2.10.4.jar zookeeper-3.4.6.tar.gz
jdk-8u60-linux-x64.tar.gz slf4j-1.7.21
kafka_2.10-0.9.0.1.tgz slf4j-1.7.21.zip
[email protected]:/usr/local/setup_tools# mv SparkStreamingOnKafkaDirected.jar /usr/local/IMF_testdata/
[email protected]:/usr/local/setup_tools# cd /usr/local/IMF_testdata/
[email protected]:/usr/local/IMF_testdata# ls
6.編輯提交的submit指令碼
IMFSparkStreamingOnKafkaDirectedSubmit.sh
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.SparkApps.SparkStreaming --master
spark://192.168.189.1:7077 /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#
7.kafka建立 topic
kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --
topic SparkStreamingDirected
[email protected]:/usr/local/setup_scripts# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --
replication-factor 1 --partitions 1 --topic SparkStreamingDirected
Created topic "SparkStreamingDirected".
[email protected]:/usr/local/setup_scripts#
8.檢視建立的topic SparkStreamingDirected
kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181 --topic SparkStreamingDirected
[email protected]:/usr/local/setup_scripts# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181
--topic SparkStreamingDirected
Topic:SparkStreamingDirected PartitionCount:1 ReplicationFactor:1 Configs:
Topic: SparkStreamingDirected Partition: 0 Leader: 1 Replicas: 1 Isr: 1
[email protected]:/usr/local/setup_scripts#
9.執行spark submit
[email protected]:~# cd /usr/local/setup_scripts
[email protected]:/usr/local/setup_scripts# ls
addpartitions.sh IMFkafka.sh partitions10w sparkhistory_scp.sh yarn_scp.sh
hadoop_scp.sh IMFSparkStreamingOnKafkaDirectedSubmit.sh partitions3w spark_scp.sh zookeeper.out
host_scp.sh IMFsparksubmit.sh partitions3w-7w-10w ssh_config.sh
IMFFlume.sh IMFzookeeper.sh partitions5w-5w-10w ssh_scp.sh
[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit.sh
java.lang.ClassNotFoundException: com.dt.spark.SparkApps.SparkStreaming
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[email protected]:/usr/local/setup_scripts#
解決:com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected類的名字沒有寫,加上就OK了
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077
/usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#
報新的錯誤
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
解決,人工指定kafka jars包,
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
報新的錯誤
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
加入spark的包還是抱錯
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-
1.6.0-hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[email protected]:/usr/local/setup_tools# cp spark-streaming_2.10-1.6.0.jar /usr/local/spark-1.6.0-bin-hadoop2.6/lib/
[email protected]:/usr/local/setup_tools# cp spark-streaming-kafka_2.10-1.6.0.jar /usr/local/spark-1.6.0-bin-
hadoop2.6/lib/
[email protected]:/usr/local/setup_tools#
[email protected]:/usr/local/setup_scripts# chmod u+x IMFSparkStreamingOnKafkaDirectedSubmit.sh
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-
streaming_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-
1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar
/usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#
報新的錯誤
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Send
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Send
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/kafka_2.10-0.9.0.1/libs/kafka-clients-0.9.0.1.jar,/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-
0.9.0.1.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-
hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.0.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-
hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#
報新的錯誤
Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics
at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85)
at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)
at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:35)
at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:47)
at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply
(FetchRequestAndResponseStats.scala:60)
at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply
(FetchRequestAndResponseStats.scala:60)
at kafka.utils.Pool.getAndMaybePut(Pool.scala:59)
at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats
(FetchRequestAndResponseStats.scala:64)
at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:44)
at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$
$withBrokers$1.apply(KafkaCluster.scala:345)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$
$withBrokers$1.apply(KafkaCluster.scala:342)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers
(KafkaCluster.scala:342)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
10.
加入新的jar包 zkclient-0.7.jar metrics-core-2.2.0.jar,
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.0.jar,/usr/local/kafka_2.10-
0.9.0.1/libs/kafka-clients-0.9.0.1.jar,/usr/local/kafka_2.10-0.9.0.1/libs/kafka_2.10-0.9.0.1.jar,/usr/local/spark-
1.6.0-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.0.jar,/usr/local/kafka_2.10-0.9.0.1/libs/metrics-core-
2.2.0.jar,/usr/local/kafka_2.10-0.9.0.1/libs/zkclient-0.7.jar,/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-
assembly-1.6.0-hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected.jar
[email protected]:/usr/local/setup_scripts#
新的報錯
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to
kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply
$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply
(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply
(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
原因kafka版本不相容
http://stackoverflow.com/questions/34145483/spark-streaming-kafka-stream
The problem was related the wrong spark-streaming-kafka version.
As described in the documentation
Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1
重新下載kafka kafka_2.10-0.8.2.1
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
10.啟動新版本kafka
nohup kafka-server-start.sh /usr/local/kafka_2.10-0.8.2.1/config/server.properties &
[email protected]:/usr/local# nohup kafka-server-start.sh /usr/local/kafka_2.10-0.8.2.1/config/server.properties &
[1] 3175
[email protected]:/usr/local# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:/usr/local# jps
3175 Kafka
2410 Worker
2474 QuorumPeerMain
3227 Jps
2283 DataNode
[email protected]:/usr/local#
kafka建立 topic
[email protected]:/usr/local# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-
factor 1 --partitions 1 --topic SparkStreamingDirected
Error while executing topic command : Topic "SparkStreamingDirected" already exists.
[2016-04-30 13:23:42,688] ERROR kafka.common.TopicExistsException: Topic "SparkStreamingDirected" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
[email protected]:/usr/local#
[email protected]:/usr/local# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181 --topic
SparkStreamingDirected
Topic:SparkStreamingDirected PartitionCount:1 ReplicationFactor:1 Configs:
Topic: SparkStreamingDirected Partition: 0 Leader: 1 Replicas: 1 Isr: 1
[email protected]:/usr/local#
[email protected]:/usr/local/spark-1.6.1-bin-hadoop2.6/sbin# kafka-topics.sh --delete --zookeeper
master:2181,worker1:2181,worker2:2181 --topic SparkStreamingDirected
Topic SparkStreamingDirected is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[email protected]:/usr/local/spark-1.6.1-bin-hadoop2.6/sbin#
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# ls
cleaner-offset-checkpoint recovery-point-offset-checkpoint replication-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm cleaner-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# ls
recovery-point-offset-checkpoint replication-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm recovery-point-offset-checkpoint
[email protected]:/usr/local/kafka_2.10-0.8.2.1/tmp/kafka-logs# rm replication-offset-checkpoint
11.start kafka
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 3929
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
3568 QuorumPeerMain
2932 NameNode
3929 Kafka
3306 Master
3147 SecondaryNameNode
3403 HistoryServer
3997 Jps
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin#
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 2847
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
2771 QuorumPeerMain
2894 Jps
2494 DataNode
2847 Kafka
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin#
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup ./kafka-server-start.sh /usr/local/kafka_2.10-
0.8.2.1/config/server.properties &
[1] 2744
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# nohup: ignoring input and appending output to 鈥榥ohup.out鈥
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# jps
2786 Jps
2564 Worker
2744 Kafka
2633 QuorumPeerMain
2447 DataNode
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin#
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# ./kafka-topics.sh --create --zookeeper
master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --topic SparkStreamingDirected
Error while executing topic command Topic "SparkStreamingDirected" already exists.
kafka.common.TopicExistsException: Topic "SparkStreamingDirected" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin# ./kafka-topics.sh --describe --zookeeper
master:2181,worker1:2181,worker2:2181 --topic SparkStreamingDirected
Topic:SparkStreamingDirected PartitionCount:1 ReplicationFactor:1 Configs:
Topic: SparkStreamingDirected Partition: 0 Leader: 1 Replicas: 1 Isr: 1
[email protected]:/usr/local/kafka_2.10-0.8.2.1/bin#
生產者輸入資料
kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic SparkStreamingDirected
15. spark 1.6.0 升級到 spark 1.6.1版本,kafka調整到版本2.10-0.8.2.1以後,報錯終於有了新變化,
[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit.sh
16/04/30 19:27:59 INFO spark.SparkContext: Running Spark version 1.6.1
16/04/30 19:28:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
16/04/30 19:28:00 INFO spark.SecurityManager: Changing view acls to: root
16/04/30 19:28:00 INFO spark.SecurityManager: Changing modify acls to: root
16/04/30 19:28:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with
view permissions: Set(root); users with modify permissions: Set(root)
16/04/30 19:28:01 INFO util.Utils: Successfully started service 'sparkDriver' on port 37293.
16/04/30 19:28:02 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/30 19:31:52 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Error getting partition
metadata for 'SparkStreamingDirected'. Does the topic exist?
org.apache.spark.SparkException: Error getting partition metadata for 'SparkStreamingDirected'. Does the topic
exist?
org.apache.spark.SparkException: Error getting partition metadata for 'SparkStreamingDirected'. Does the topic
exist?
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main
(SparkStreamingOnKafkaDirected.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/04/30 19:31:52 INFO spark.SparkContext: Invoking stop() from shutdown hoo
16.重新更新pom檔案
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dt.spark</groupId>
<artifactId>IMFSparkAppsSQL</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>IMFSparkAppsSQL</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/main/test</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.dt.spark.App</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
17.換個topic名字SparkStreamingDirected161
kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --
topic SparkStreamingDirected161
[email protected]:/usr/local/setup_scripts# kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --
replication-factor 1 --partitions 1 --topic SparkStreamingDirected161
Created topic "SparkStreamingDirected161".
[email protected]:/usr/local/setup_scripts# kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181
--topic SparkStreamingDirected161
Topic:SparkStreamingDirected161 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: SparkStreamingDirected161 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
[email protected]:/usr/local/setup_scripts#
kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic SparkStreamingDirected161
[email protected]:/usr/local/setup_scripts# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092
--topic SparkStreamingDirected161
[2016-04-30 20:43:11,417] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hadoop
spark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkspark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkhadoop
sparkspark
hadoop
sparkhadoop
sparkhadoop
sparkhadoop
[email protected]:/usr/local/setup_scripts# cat IMFSparkStreamingOnKafkaDirectedSubmit161.sh
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class
com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected --master spark://192.168.189.1:7077 --jars
/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-streaming-kafka_2.10-1.6.1.jar,/usr/local/kafka_2.10-
0.8.2.1/libs/kafka-clients-0.8.2.1.jar,/usr/local/kafka_2.10-0.8.2.1/libs/kafka_2.10-0.8.2.1.jar,/usr/local/spark-
1.6.1-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.1.jar,/usr/local/kafka_2.10-0.8.2.1/libs/metrics-core-
2.2.0.jar,/usr/local/kafka_2.10-0.8.2.1/libs/zkclient-0.3.jar,/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-
assembly-1.6.1-hadoop2.6.0.jar /usr/local/IMF_testdata/SparkStreamingOnKafkaDirected161.jar
[email protected]:/usr/local/setup_scripts#
[email protected]:/usr/local/setup_scripts# IMFSparkStreamingOnKafkaDirectedSubmit161.sh
原始碼:
JavaStreamingContext jsc=new JavaStreamingContext(conf, Durations.seconds(15));
Map<String,String> kafaParameters = new HashMap<String,String>();
kafaParameters.put("metadata.broker.list",
"master:9092,worker1:9092,worker2:9092");
Set<String> topics =new HashSet<String>();
topics.add("SparkStreamingDirected161");
JavaPairInputDStream<String, String> lines =KafkaUtils.createDirectStream(jsc,
String.class,String.class,
StringDecoder.class, StringDecoder.class,
kafaParameters,
topics);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() { //如果是Scala,由於SAM轉換,所以可以寫成val words = lines.flatMap { line => line.split(" ")}
public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordcount= ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordcount.print();
// wordcount.foreachRDD(foreachFunc);
jsc.start();
jsc.awaitTermination();
jsc.close();
終結者:
將 spark 1.6.0 升級為spark 1.6.1
kafka 從 kafka_2.10-0.9.0.1 調整為 kafka_2.10-0.8.2.1
kafka_2.10-0.8.2.1 + spark-1.6.1 徹底搞定了!王家林老師 :DT大資料夢工廠創始人和首席專家。
聯絡郵箱:[email protected] 電話:18610086859 QQ:1740415547
微訊號:18610086859 微博:http://weibo.com/ilovepains/
每天晚上20:00YY頻道現場授課頻道68917580
IMF Spark原始碼版本定製班學員 :
上海-段智華 QQ:1036179833 mail:[email protected] 微信 18918561505
相關推薦
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 /* * *王家林老師授課http://weibo.com/ilovepains */ 每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS
第91講:sparkStreaming基於kafka的Direct詳解
有興趣想學習國內整套Spark+Spark Streaming+Machine learning最頂級課程的,可加我qq 471186150。共享視訊,價效比超高! 1:Direct方式特點: 1)Direct的方式是會直接操作kafka底層的元資料資訊,這樣如果計算
第69課:Spark SQL通過Hive資料來源實戰
內容: 1.Spark SQL操作Hive解析 2.SparkSQL操作Hive實戰 一、Spark SQL操作Hive解析 1.在目前企業級大資料Spark開發的時候,
《區塊鏈學堂》第三課:以太坊智慧合約實戰(附課程視訊)
既區塊鏈學堂第二課:以太坊架構和工具之後,時隔一週的4月2日區塊鏈學堂推出了第三課:以太坊智慧合約實戰。 本期主要介紹了以太坊智慧合約程式設計基礎及實戰。 從這一期開始我們區塊鏈學堂會在鬥魚進行直播,方便不能到現場聽課的朋友們。 以下一些現場照片,課程視訊和PPT在最後。
Django_重灌系統後無法使用 sqlite 資料庫報錯:com.intellij.execution.ExecutionException: Exception in thread "main" java.lang.ClassNotFoundException: org.sq
重灌系統後無法使用 sqlite 資料庫報錯 報錯 : com.intellij.execution.ExecutionException: Exception in thread "main" java.lang.ClassNotFoundE
第四課:通過配置文件獲取對象(Spring框架中的IOC和DI的底層就是基於這樣的機制)
ted const dex generate stat clas name 必須 nbsp 首先在D盤創建一個文件hero.txt,內容為:com.hero.Hero(此處必須是Hero的完整路徑) 接下來是Hero類 package com.hero; publi
SparkStreaming(13):高階資料來源kafka Direct方式(生產)
【Direct方式,直接從kafka的broker讀取資料,而Receiver方式,從zk獲得偏移量資訊,效能要差一些!】 1.測試環境 (1)啟動zk bin/zkServer.sh start (2) 啟動kafka bin/kafka-server-start.sh -da
第四課:兩種監聽器方法(基於AndroidStudio3.2)
一、佈局 在主程式介面上新增新按鈕,如以前一樣。 建立一個 Activity,名字為MeventActivity 新增一個三個Button控制元件。 新增呼叫該視窗程式給按鈕“多事件” Button btn2 = (Button) findViewByI
第五課:多活動的工作機制(基於AndroidStudio3.2)
一、元件啟用 Android平臺在鬆散耦合方面很有用。 一個應用程式只不過是一個manifest檔案和每個元件儲存在一起的元件集合,元件通過向其傳送訊息來啟用。 如果你想顯示(啟用)一個活動,你需要建立一條訊息,將其傳送到執行時,讓執行時為您啟用它。您無法直接處
第十三課:繼續Asynctask-呼叫遠端圖片(基於AndroidStudio3.2)
一、測試一 下面通過程式碼演示一個典型的非同步處理的例項--載入網路圖片.網路操作作為一個不穩定的耗時操作,從4.0開始就被嚴禁放入主執行緒中.所以在顯示一張網路圖片時,我們需要在非同步處理中下載圖片,並在UI執行緒中設定圖片 1、 新建專案Async2 - 主視窗
第25課:基於MAT分析Shallow Heap和Retained Heap
內容: Shallow Heap和Retained Heap 一、Shallow Heap和Retained Heap Shallow Heap:只包含自己堆內的元素物件,不包含引用的物件
第79課:Spark SQL基於網站Log的綜合案例綜合程式碼和實際執行測試
內容: 1.熱門論壇板塊排名 2.綜合程式碼實戰和測試 一、熱門論壇板塊排版 建立表: spark.sql("createtable userlogs(date st
第75課:Spark SQL基於網站Log的綜合案例實戰
內容: 1.案例概述 2.資料來源和分析 一、案例概述 PV:頁面訪問數 UV:獨立訪問數 二、資料來源和分析 packag
第87課:Flume推送資料到SparkStreaming案例實戰和內幕原始碼解密--flume安裝篇
1、 下載flume 老師提供的包 2、 安裝 vi/etc/profile exportFLUME_HOME=/usr/local/apache-flume-1.6.0-bin exportPATH=.:$PATH:$JAVA_HOME/bin:$HADOOP
【Linux探索之旅】第一部分第三課:測試並安裝Ubuntu
u盤 nco 過程 sans ubunt windows u盤啟動盤 系統 .com 內容簡單介紹 1、第一部分第三課:測試並安裝Ubuntu 2、第一部分第四課預告:磁盤分區 測試並安裝Ubuntu 大家好,經過前兩個比較偏理論(是否
【Linux探索之旅】第四部分第三課:文件傳輸,瀟灑同步
命令行 上傳文件 文件夾 images lsh wget命令 ace 目標 wechat 內容簡單介紹 1、第四部分第三課:文件傳輸。瀟灑同步 2、第四部分第四課:分析網絡。隔離防火 文件傳輸。瀟灑同步 這一課的內容相
Servlet第七課:ServletContext HttpSession 以及HttpServletRequest之間的關系
ora cep 結束 context pan 介紹 row for 瀏覽器中 課程目標: ① 在Servlet中懂得ServletContext HttpSession 以及HttpServletRequest之間的關系 ② 懂得怎樣使用它們 概念介紹
微信小程序自學第三課:文件作用域,模塊化
數據 變量 span data ava 有效 函數 方法 oba 一、文件作用域 在 JavaScript 文件中聲明的變量和函數只在該文件中有效;不同的文件中可以聲明相同名字的變量和函數,不會互相影響。 通過全局函數 getApp() 可以獲取全局的應用實例,如果需要
第四課:單用戶及救援模式(一)
單用戶及救援模式.有時候當我們密碼忘記了,需要遠程修改密碼,可以進入單用戶模式來修改密碼,如下:1、首先重啟服務器,然後按a鍵,進入當用戶模式:選擇第一項,按e編輯:如下:2、然後定位到ro這裏,修改ro為rw init=/sysroot/bin/sh,完成之後按crtl+X重啟:3、修改後如下:4、此時可以
第四課:linux克隆及服務器之間登錄(二)
linux克隆及服務器之間登錄1、linux服務器下可以克隆虛擬機,這樣兩臺方便操作,克隆步驟如下:打開虛擬機---右擊-管理==克隆:到如下界面:克隆虛擬機的當前狀態:創建完整克隆即可,方便以後操作:3、顯示克隆進度:4、此時克隆完成:5、克隆後需要修改IP地址才可以登錄。2、linux服務器之間登錄:比如