1. 程式人生 > >[Spark][Streaming]Spark讀取網絡輸入的例子

[Spark][Streaming]Spark讀取網絡輸入的例子

trac pair keep exception clas zookeeper 包含 air blog

Spark讀取網絡輸入的例子:

參考如下的URL進行試驗

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
http://www.cnblogs.com/FG123/p/5324743.html

發現 先執行 nc -lk 9999 ,再執行 spark 程序之後,
如果停止 nc ,spark程序會報錯:

類似於:

-------------------------------------------
Time: 2017-10-28 19:32:02
-------------------------------------------

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

這表明,兩者已經建立 的 通信。但是沒有看到預想的 word count 輸出。我猜測是 用於參與計算的進程數不夠,所以進行如下改動:

sc = SparkContext("local[2]", "streamwordcount")

改為:

sc = SparkContext("local[3]", "streamwordcount")

整個程序如下:

[training@localhost ab]$ cat test.py
#showing remote messages

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[3]", "streamwordcount")
# 創建本地的SparkContext對象,包含3個執行線程

ssc = StreamingContext(sc, 2)
# 創建本地的StreamingContext對象,處理的時間片間隔時間,設置為2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split對2秒內收到的字符串進行分割

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start()
# 啟動Spark Streaming應用

ssc.awaitTermination()

再次運行 nc 程序

[training@localhost ~]$ nc -lk 9999

運行 spark 程序:

[training@localhost ~]$ spark-submit /home/training/ab/test.py

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

在nc窗口中輸入一些數據:

aaa bbb ccc
ddd aaa sss
sss bbb bbb

kkk jjj mmm
ooo kkk jjj
mmm ccc ddd
eee fff sss
rrr nnn ooo
ppp sss zzz
mmm sss ttt
kkk sss ttt
rrr ooo ppp
kkk qqq kkk
lll nnn jjj
rrr ooo sss
kkk aaa ddd
aaa aaa fff
eee sss nnn
ooo ppp qqq
qqq sss eee
sss mmm nnn

此時,經過一小會,可以看到,spark 程序的窗口輸出:

-------------------------------------------
Time: 2017-10-28 19:33:50
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:33:52
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:33:54
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:33:56
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:33:58
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:00
-------------------------------------------
(u‘‘, 1)
(u‘mmm‘, 2)
(u‘bbb‘, 3)
(u‘nnn‘, 1)
(u‘ccc‘, 2)
(u‘rrr‘, 1)
(u‘sss‘, 3)
(u‘fff‘, 1)
(u‘aaa‘, 2)
(u‘ooo‘, 2)
...

-------------------------------------------
Time: 2017-10-28 19:34:02
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:04
-------------------------------------------
(u‘ppp‘, 1)
(u‘sss‘, 1)
(u‘zzz‘, 1)

-------------------------------------------
Time: 2017-10-28 19:34:06
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:08
-------------------------------------------
(u‘mmm‘, 1)
(u‘sss‘, 1)
(u‘ttt‘, 1)

-------------------------------------------
Time: 2017-10-28 19:34:10
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:12
-------------------------------------------
(u‘sss‘, 1)
(u‘ttt‘, 1)
(u‘kkk‘, 1)

-------------------------------------------
Time: 2017-10-28 19:34:14
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:16
-------------------------------------------
(u‘ppp‘, 1)
(u‘rrr‘, 1)
(u‘ooo‘, 1)

-------------------------------------------
Time: 2017-10-28 19:34:18
-------------------------------------------
(u‘qqq‘, 1)
(u‘kkk‘, 2)

-------------------------------------------
Time: 2017-10-28 19:34:20
-------------------------------------------

-------------------------------------------
Time: 2017-10-28 19:34:22
-------------------------------------------

[Spark][Streaming]Spark讀取網絡輸入的例子