1. 程式人生 > >SparkStreaming 的編程模型

SparkStreaming 的編程模型

could not 輸出 receive clas tar there int des .org

技術分享

依賴管理

技術分享

基本套路

技術分享

Dstream輸入源 ---input DStream

技術分享

Dstream輸入源--- Receiver

技術分享

內置的input Dstream : Basic Source

技術分享

內置的input Dstream :Advanced Sources

技術分享

Dstream 輸入源: multiple input DStream

技術分享

Dstream 輸入源: Custom Receiver

技術分享

官方參考網站 http://spark.apache.org/docs/1.6.2/streaming-custom-receivers.html

scala 參考模版

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   
// There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null
try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } }

java 參考模版

public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
      @Override public void run() {
        receive();
      }
    }.start();
  }

  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data ‘" + userInput + "");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

無狀態的轉換操作

技術分享

有狀態的轉換操作1-updateStateByKey

技術分享

有狀態的轉換操作2-window

技術分享

技術分享

有狀態的轉換操作2-window普通規約與增量規約

技術分享

理解增量規約

技術分享

輸出操作

Dstream輸出

技術分享

持久化操作

技術分享

SparkStreaming 的編程模型