1. 程式人生 > >深入淺出Spark2.1.0度量系統——Sink繼承體系

深入淺出Spark2.1.0度量系統——Sink繼承體系

         Source準備好度量資料後,我們就需要考慮如何輸出和使用的問題。這裡介紹一些常見的度量輸出方式:阿里資料部門採用的一種度量使用方式就是輸出到日誌;在命令列執行過Hadoop任務(例如:mapreduce)的使用者也會發現控制檯列印的內容中也包含度量資訊;使用者可能希望將有些度量資訊儲存到檔案(例如CSV),以便將來能夠檢視;如果覺得使用CSV或者控制檯等方式不夠直觀,還可以將採集到的度量資料輸出到專用的監控系統介面。這些最終對度量資料的使用,或者說是輸出方式,Spark將它們統一抽象為Sink。Sink的定義見程式碼清單1。

程式碼清單1         度量輸出的定義

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

從程式碼清單3-53可以看到Sink是一個特質,包含三個介面方法:

  • start:啟動Sink;
  • stop:停止Sink;
  • report:輸出到目的地;

從這三個方法的解釋來看,很難讓讀者獲得更多的資訊。我們先把這些困惑放在一邊,來看看Spark中Sink的類繼承體系,如圖1所示。

Sink的類繼承體系
圖1     Sink的類繼承體系

圖1中展示了6種Sink的具體實現。

  • ConsoleSink:藉助Metrics提供的ConsoleReporter的API,將度量輸出到System.out,因此可以輸出到控制檯。
  • CsvSink:藉助Metrics提供的CsvReporter的API,將度量輸出到CSV檔案。
  • MetricsServlet:在Spark UI的jetty服務中建立ServletContextHandler,將度量資料通過Spark UI展示在瀏覽器中。
  • JmxSink:藉助Metrics提供的JmxReporter的API,將度量輸出到MBean中,這樣就可以開啟Java VisualVM,然後開啟Tomcat程序監控,給VisualVM安裝MBeans外掛後,選擇MBeans標籤頁可以對JmxSink所有註冊到JMX中的物件進行管理。
  • Slf4jSink:藉助Metrics提供的Slf4jReporter的API,將度量輸出到實現了Slf4j規範的日誌輸出。
  • GraphiteSink:藉助Metrics提供的GraphiteReporter的API,將度量輸出到Graphite(一個由Python實現的Web應用,採用django框架,用來收集伺服器狀態的監控系統)。

瞭解了Sink的類繼承體系,我們挑選Slf4jSink作為Spark中Sink實現類的例子,來了解Sink具體該如何實現。Slf4jSink的實現見程式碼清單2。

程式碼清單2         Slf4jSink的實現

private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"

  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}

從Slf4jSink的實現可以看到Slf4jSink的start、stop及report實際都是代理了Metrics庫中的Slf4jReporter的start、stop及report方法。Slf4jReporter的start方法實際是其父類ScheduledReporter的start實現。而傳遞的兩個引數pollPeriod和pollUnit,正是被ScheduledReporter使用作為定時器獲取資料的週期和時間單位。有關ScheduledReporter中start、stop及Slf4jReporter的report方法的實現可以參閱《附錄D Metrics簡介》。

關於《Spark核心設計的藝術 架構設計與實現》

經過近一年的準備,《Spark核心設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

紙質版售賣連結如下: