【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十之銘文升級版
銘文一級:
第八章:Spark Streaming進階與案例實戰
updateStateByKey算子
需求:統計到目前為止累積出現的單詞的個數(需要保持住以前的狀態)
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
需求:將統計結果寫入到MySQL
create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
通過該sql將統計結果寫入到MySQL
insert into wordcount(word, wordcount) values(‘" + record._1 + "‘," + record._2 + ")"
存在的問題:
1) 對於已有的數據做更新,而是所有的數據均為insert
改進思路:
a) 在插入數據前先判斷單詞是否存在,如果存在就update,不存在則insert
b) 工作中:HBase/Redis
2) 每個rdd的partition創建connection,建議大家改成連接池
window:定時的進行一個時間段內的數據處理
window length : 窗口的長度
sliding interval: 窗口的間隔
這2個參數和我們的batch size有關系:倍數
每隔多久計算某個範圍內的數據:每隔10秒計算前10分鐘的wc
==> 每隔sliding interval統計前window length的值
銘文二級:
第七章:Spark Streaming核心概念與編程
實戰:Spark Streaming處理文件系統數據=>
與處理socket數據類似
1.建FileWordCount類
2.建監控的路徑,本次為:/Users/rocky/data/imooc/ss
3.只需修改SocketTextStream成textFileStream
參數設置為file:///Users/rocky/data/imooc/ss/ /* 前面的“///”、最後的“/” */
4.vi test.log //裏面有內容,然後cp到監控的路徑
nc監控6789端口即可
註意事項:
官網Basic Sources
1、必須每次相同的文件格式
2、必須使用移動的方式將內容move到路徑
3、一旦移動,無法再修改裏面的內容
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十之銘文升級版