Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤
- 最近從Spark 1.5.2升級到2.0之後,執行測試程式碼spark-stream-kafka報以下錯誤:
java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala: 91) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala: 66) ... 53 elided Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 67 more
- 對比了一下spark-core_2.11-2.0.0.jar和spark-core_2.11-1.5.2.jar,發現2.0版本確實少了org.apache.spark.Logging,網上說這個只有1.5.2及以下版本才有,新版本是沒有的
- 網上search了一下解決方法,沒有找到......,我想到的解決方法就是把缺少的那幾個class弄成一個jar包,然後放到Spark的lib下面,重新啟動Spark並執行測試程式碼,沒有發現錯誤
- 下面是測試程式碼:
1. 啟動HDFS
2. 啟動Zookeeper
$ zookeeper-server-start ./kafka_2.11-0.10.0.1/config/zookeeper.properties
3. 啟動Kafka
$ kafka-server-start ./kafka_2.11-0.10.0.1/config/server.properties
4. 建立一個topic
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. 啟動Spark
$spark-shell --driver-memory 1G
6. 輸入測試命令
scala> :paste // Entering paste mode (ctrl-D to finish) import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils val ssc = new StreamingContext(sc, Seconds(2)) val zkQuorum = "localhost:2181" val group = "test-group" val topics = "test" val numThreads = 1 val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) val lines = lineMap.map(_._2) val words = lines.flatMap(_.split(" ")) val pair = words.map(x => (x,1)) val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print ssc.checkpoint("hdfs://localhost:9000/checkpoint") ssc.start ssc.awaitTermination
輸入ctrl-D執行上述命令,會看到下面的資訊:
-------------------------------------------
Time: 1474428268000 ms
-------------------------------------------
7. 測試在Kafka傳送訊息
$ kafka-console-producer --broker-list localhost:9092 --topic test
hello world
8. 在Spark會看見以下資訊證明測試成功:
-------------------------------------------
Time: 1474428396000 ms
-------------------------------------------
(hello,1)
(world,1)