1. 程式人生 > >0073 spark streaming從埠接受資料進行實時處理的方法

0073 spark streaming從埠接受資料進行實時處理的方法

一,環境

Windows_x64 系統 Java1.8 Scala2.10.6 spark1.6.0 hadoop2.7.5 IDEA IntelliJ 2017.2 nmap工具(用到其中的ncat命令,對應Linux中的nc命令)

二,本地應用搭建

2.1 環境變數

設定方法:系統引數--》新增變數--》形式為:XXX_HOME,然後把對應安裝包的根目錄複製作為變數值;在PATH變數中新增:  %XXX_HOME%\bin; 1,Hadoop需要設定環境變數; 2,Scala最好自己下載安裝相應版本,設定環境變數; 3,spark直接解壓即可;

2.2 搭建測試

利用SBT工具非常方便的可以完成搭建,利用sbt建立Scala專案。專案結構生成為:
其中testMain.scala:
/**
  * notes: To test scala and spark and hadoop
  * date: 2017.12.20
  * author: gendlee
  */
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.log4j.{Level,Logger}
import com.test.SparkStreaming
object test {

  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]): Unit = {

    SparkStreaming.printWebsites()



    //initiate spark
    
    val sc = new SparkContext(conf)

    //read file from local disc
    val rdd = sc.textFile("F:\\Code\\scala2.10.6_spark1.6_hadoop2.8\\Test.log")


  }

}
其中SparkStreaming.scala為:
/**
  *notes: To test spark streaming
  * date: 2017.12.21
  * author: gendlee
  */
package com.test
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming {
  def printWebsites(): Unit= {

    val conf = new SparkConf().setMaster("local[2]").setAppName("PrintWebsites")
    val ssc = new StreamingContext(conf, Seconds(1))

    val output = "F:\\Code\\scala2.10.6_spark1.6_hadoop2.8\\out\\gettedWebsites"

    val lines = ssc.socketTextStream("localhost", 7777)

    val websiteLines = lines.filter(_.contains("http"))
    websiteLines.print()
    //websiteLines.repartition(1).saveAsTextFiles(output)

    ssc.start()
    ssc.awaitTermination()
  }

}


我要從輸入中提取出含有網址的欄位(含有http):
踩坑:
val conf = new SparkConf().setMaster("local[2]").setAppName("PrintWebsites")
這裡setMaster引數必須為local[2],應為這裡要開啟兩個程序,一個發一個收,若用預設的local將接受不到資料。
編譯後可以執行一下,發現列印這樣的資訊:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/12/22 16:39:14 INFO Slf4jLogger: Slf4jLogger started
17/12/22 16:39:14 INFO Remoting: Starting remoting
17/12/22 16:39:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:64905]
17/12/22 16:39:15 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
-------------------------------------------
Time: 1513931956000 ms
-------------------------------------------
Time: 1513931957000 ms
-------------------------------------------


    出現錯誤。不著急,那是因為7777 埠沒有接受到資料,下面先暫停程式,我們需要往7777埠發資料。
     利用socketTextStream()函式,我們可以從指定的主機上某個特定埠接收資料。下面看一下如何在7777埠發資料。
     開啟windows的power shell或CMD,輸入:
ncat -lk -p 7777
然後再執行IDEA中的程式,這時在開啟的CMD窗空中輸入,當輸入的欄位含有http,就會在IDEA的執行展示視窗打印出來。
IDEA端過濾列印:
可見這裡有個問題,其實像https這種我是不要的,即http作為單詞的一部分這種是不要的,所以後續再想辦法看看如何過濾。 至此完成題目的要求。

三,參考:

http://blog.csdn.net/gendlee1991/article/details/78066548 https://www.cnblogs.com/FG123/p/5324743.html