1. 程式人生 > >Spark Streaming介紹以及簡單使用

Spark Streaming介紹以及簡單使用

一、Spark Streaming介紹

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
Spark Streaming是Spark core的擴充套件,有以下特點:高可擴充套件、高吞吐量的、容錯的流式程式作用在資料流上
Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms
expressed with high-level functions like map, reduce, join and window.
資料能夠被採集,從Kafka, Flume, Kinesis, or TCP sockets中,並且可以被處理,使用複雜的演算法通過高級別得方法表達,比如:map, reduce, join and window
Finally, processed data can be pushed out to filesystems, databases, and live dashboards.
最終,處理過的資料能夠被推到檔案系統、資料庫以及線上的dashboards(儀表板)。
In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
事實上,你可以在實時的資料流上應用Spark的機器學習和圖形處理演算法
在這裡插入圖片描述

1、總結:

將不同資料來源的資料經過Spark Streaming處理之後將結果輸出到外部檔案系統

2、特點:

1)低延遲
2)能從錯誤高效的恢復:fault-tolerant
3)能執行在成百上千的節點上
4)能夠將批處理、機器學習、圖處理等子框架和Spark Streaming綜合起來使用

3、工作原理:
粗粒度

資料流進入,Spark Streaming接收到實時資料流,把資料按照指定的時間段切成一片一片小的資料塊,然後把小的資料塊傳給 Spark Engine處理後批次輸出
在這裡插入圖片描述

二、Spark Streaming使用

1、Github

https://github.com/apache/spark/

2、啟動nc
yum install -y nc
nc -lk 9999
3、spark-submit使用
$SPARK_HOME/bin/spark-submit --master local[2] \
--class org.apache.spark.examples.streaming.NetworkWordCount \
--name NetworkWordCount \
/app/cdh/spark-2.3.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.3.0.jar dev2 9999

–class 使用包名org.apache.spark.examples.streaming + 類名
在這裡插入圖片描述

4、如何使用spark-shell測試
$SPARK_HOME/bin/spark-shell --master local[2]

#由於sc在spark-shell中已經自帶,我們直接使用就可以了

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("dev2", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()

#StorageLevel.MEMORY_AND_DISK_SER這個引數是設定spark處理資料的儲存方式,如果資料量少,我們一般使用記憶體