1. 程式人生 > >Spark 學習(6)

Spark 學習(6)

SparkStream

  • 邏輯

    • 當ssc啟動之後,driver會執行一個長時間執行的Task
    • 作為Reveiver的executors,接受傳來的資料收到資料,並將其分成塊儲存在記憶體中
    • 這寫塊也會被賦值給另一個Executors,以免資料丟失
    • 每個批次間隔(通常這是每1秒),驅動程式將啟動Spark任務處理塊。然後這些塊被持久化到任意數量的目標中
    • SparkSteam 與 Tcp 對接 每一秒會請求一次連結然後返回資料 而不是建立一個長連線
    • 資料儲存,包括雲端儲存(例如S3、WASB等),關係資料儲存(例如,MySQL、PostgreSQL等等)NoSQL儲存。
  • DStreams

    • 一組隨著時間到底的資料序列,代表著每一個時間段內到達的RDDs序列
  • Windowed transformations

    • 每一次資料計算的視窗
  • checkpointing

    • 目的是將資料儲存在可靠的檔案系統中,如HDFS
          ssc.checkpoint("hdfs://...")
      
  • Load

    • SparkContext

          # Create a local SparkContext and Streaming Contexts
          from pyspark import SparkContext
          from pyspark.streaming import StreamingContext
      
           # Create
      sc with two working threads sc = SparkContext("local[2]", "NetworkWordCount") # 一定要寫local[2] 而不是local # Create local StreamingContextwith batch interval of 1 second ssc = StreamingContext(sc, 1) # Create checkpoint for local StreamingContext # 把資料直接存在checkpoint 資料夾下 # ssc.checkpoint("checkpoint"
      ) # Create DStream that connects to localhost:9999 # 使用 tcp 去連結遠方服務端 並且接受資料 lines = ssc.socketTextStream("localhost", 9999) # Split lines into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD in this DStream wordCounts.pprint() # Populate `meetup_stream` table #sqlContext.sql("insert into meetup_stream select * from meetup_stream_ #json") # Start the computation ssc.start() # Wait for the computation to terminate ssc.awaitTermination() #控制檯使用 nc -l -p 9999  當 nc -lk 9999 不能使用時
    • Sparksession

          """
              與之前的指令碼不同,現在使用的是更熟悉的指令碼
              先建立一個session 這裡不需要再去建立一個SparkStreaming
          """
          # Import the necessary classes and create a local SparkSession
          from pyspark.sql import SparkSession
          from pyspark.sql.functions import explode
          from pyspark.sql.functions import split
          spark = SparkSession \
          .builder \
          .appName("StructuredNetworkWordCount") \
          .getOrCreate()
      
          """
              SparkStream 通過在第4行呼叫readStream來發起的
          """
          # Create DataFrame representing the stream of input lines
          # from connection to localhost:9999
          lines = spark\
          .readStream\
          .format('socket')\
          .option('host', 'localhost')\
           .option('port', 9999)\
          .load()
      
          """
              在這裡就不需要使用RDD的複雜操作,
              直接使用SQL便可以
          """
          # Split the lines into words
          words = lines.select(
          explode(
              split(lines.value, ' ')
          ).alias('word')
          )
      
          # Generate running word count
          wordCounts = words.groupBy('word').count()
      
          """
              沒有使用pprint(),而是顯式地呼叫writeStream來編寫
              流,並定義格式和輸出模式。雖然寫的時間要長一些,
              這些方法和屬性在語法上與其他DataFrame呼叫相似
              你只需要改變outputMode和格式屬性來儲存它
              對於資料庫、檔案系統、控制檯等等。
          """
          # Start running the query that prints the
          # running counts to the console
          query = wordCounts\
          .writeStream\
          .outputMode('complete')\
          .format('console')\
          .start()
      
          """
              最後,執行等待取消這個流媒體工作。
          """
          # Await Spark Streaming termination
          query.awaitTermination()
      
      
  • Save to text

        ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
  • Example

      # Create a local SparkContext and Streaming Contexts
      from pyspark import SparkContext
      from pyspark import SparkConf
    
    
      from pyspark.streaming import StreamingContext
      import datetime
      # from pyspark.sql import SQLContext
      # from pyspark.sql.types import *
      import numpy as np
    
      # Create sc with four working threads
    
      sconf = SparkConf()
      sconf.setMaster("local[4]")
    
      sc = SparkContext(appName="act_analysis",conf = sconf)
    
      # Create local StreamingContextwith batch interval of 1 second
      ssc = StreamingContext(sc, 3)
    
    
      # Create DStream that connects to localhost:9999
      lines = ssc.socketTextStream("192.168.14.2",1234) 
    
      def get_rows(x):
          res = x.split(" ")
          return (res[0],res[1])
    
      rows = lines.map(get_rows)
    
      roles = rows.groupByKey()
    
      def get_speed_std(action):
          temp_list = []
          temp = 0
          for i in action:
              if temp == 0:
                  temp_list.append(0)
                  temp = i
                  continue
              temp_list.append((i - temp))
              temp = i
          res = np.std(np.array(temp_list), ddof=1)
          return res
    
      def get_role_action_feature(action):
          try:
              action = sorted(action)
          except Exception as e:
              print(e)
              return None
          count = len(action)
          stay_time = int(action[-1] - action[0])
          try:
              ave_speed = count / stay_time
              std_speed = get_speed_std(action)
          except:
              ave_speed = 0
              std_speed = 0
          return ave_speed, std_speed
    
      def feature(x):
          role = x[0]
          action = [float(i)for i in list(x[1])]
          res = get_role_action_feature(action)
    
          return (role,res[0],res[1],datetime.datetime.now())
    
      res = roles.map(feature)
    
      # res.pprint()
    
      from pymongo import MongoClient
    
      MONGODB = {
          'MONGO_HOST': '',
          'MONGO_PORT': '27017',
          'MONGO_USERNAME': '',
          'MONGO_PASSWORD': ''
      }
    
    
      mongo_uri = 'mongodb://{account}{host}:{port}/'.format(
                  account='{username}:{password}@'.format(
                      username=MONGODB['MONGO_USERNAME'],
                      password=MONGODB['MONGO_PASSWORD']) if MONGODB['MONGO_USERNAME'] else '',
                  host=MONGODB['MONGO_HOST'],
                  port=MONGODB['MONGO_PORT'])
      conn = MongoClient(mongo_uri)
      db = conn['104']
    
      def save(x):
    
          col = db['act_live_res']
          res = x.collect()
          for i in res:
              item = {
                  'role_id' : i[0],
                  'ave':i[1],
                  'std':i[2],
                  'date':i[3]
              }
    
              col.insert_one(item)
    
      #     conn.close()    
    
      res.foreachRDD(save)
    
      ssc.start()
      # Wait for the computation to terminate
      ssc.awaitTermination()