1. 程式人生 > >Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤

Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤

- 最近從Spark 1.5.2升級到2.0之後,執行測試程式碼spark-stream-kafka報以下錯誤:

java.lang.NoClassDefFoundError: org/apache/spark/Logging
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:
91)
  at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:
66)
  ... 53 elided
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 67 more

- 對比了一下spark-core_2.11-2.0.0.jar和spark-core_2.11-1.5.2.jar,發現2.0版本確實少了org.apache.spark.Logging,網上說這個只有1.5.2及以下版本才有,新版本是沒有的

- 網上search了一下解決方法,沒有找到......,我想到的解決方法就是把缺少的那幾個class弄成一個jar包,然後放到Spark的lib下面,重新啟動Spark並執行測試程式碼,沒有發現錯誤

- 下面是測試程式碼:

1. 啟動HDFS

2. 啟動Zookeeper

$ zookeeper-server-start ./kafka_2.11-0.10.0.1/config/zookeeper.properties

3. 啟動Kafka

$ kafka-server-start ./kafka_2.11-0.10.0.1/config/server.properties

4. 建立一個topic

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. 啟動Spark

$spark-shell --driver-memory 1G

6. 輸入測試命令

scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(2))
val zkQuorum = "localhost:2181"
val group = "test-group"
val topics = "test"
val numThreads = 1
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print
ssc.checkpoint("hdfs://localhost:9000/checkpoint")
ssc.start
ssc.awaitTermination

輸入ctrl-D執行上述命令,會看到下面的資訊:

-------------------------------------------
Time: 1474428268000 ms
-------------------------------------------

7. 測試在Kafka傳送訊息

$ kafka-console-producer --broker-list localhost:9092 --topic test

hello world

8. 在Spark會看見以下資訊證明測試成功:

-------------------------------------------
Time: 1474428396000 ms
-------------------------------------------
(hello,1)
(world,1)