1. 程式人生 > >【Scala-ML】使用Scala構建機器學習工作流

【Scala-ML】使用Scala構建機器學習工作流

引言

在這一小節中,我將介紹基於資料(函式式)的方法來構建資料應用。這裡會介紹monadic設計來建立動態工作流,利用依賴注入這樣的高階函式式特性來構建輕便的計算工作流。

建模過程

在統計學和概率論中,一個模型通過描述從一個系統中觀察到的資料來表達任何形式的不確定性,模型使得我們可以用來推斷規則,進行預測,從資料中學習有用的東西。
對於有經驗的Scala程式設計師而言,模型常常和monoid聯絡起來。monoid是一些觀測的集合,其中的操作是實現模型所需的函式。

關於模型的特徵
模型特徵的選擇是從可用變數中發現最小集合來構建模型的過程。資料中常常包含多餘和不相干的特徵,這些多餘特徵並不能提供任何有用資訊,所以需要通過特徵選擇將有用的特徵挑選出來。
特徵選擇包含兩個具體步驟

  • 搜尋新的特徵子集
  • 通過某種評分機制來評估特徵子集

觀測資料是一組隱含特徵(也稱為隱含變數,latent variables)的間接測量,他們可能是噪聲,也可能包含高度的相關性和冗餘。直接使用原始觀測進行預測任務常常得到不準確的結果,使用從觀測資料提取的所有特徵又帶來了計算代價。特徵抽取可以通過去除冗餘或不相關的特徵來減少特徵數量或維度。

設計工作流

首先,所選的數學模型是從原始輸入資料中抽取知識的,那麼模型的選擇中需要考慮以下幾個方面:

  • 業務需求,比如預測結果的準確度
  • 訓練資料和演算法的可用性
  • 專業領域的相關知識

然後,從工程角度出發,需要選擇一種計算排程框架來處理資料,這需要考慮以下幾個方面:

  • 可用資源,如CPU、記憶體、IO頻寬
  • 實現策略,如迭代和遞迴計算
  • 響應整個過程的需求,如計算時間、中間結果的顯示

下面的圖示給出了計算模型的工作流程:

在這個流程圖中,下游的資料轉換(data transformation)的引數需要根據上游資料轉換的輸出進行配置,Scala的高階函式非常適合實現可配置的資料轉換。

計算框架

建立足夠靈活和可重用的框架的目的是為了更好地適應不同工作流程,支援各種型別的機器學習演算法。
Scala通過特質(traits)語法實現了豐富的語言特性,可以通過下面的設計層級來構建複雜的程式框架:

管道操作符(The pipe operator)

資料轉換是對資料進行分類、訓練驗證模型、結果視覺化等每個步驟環節的基礎。定義一個符號,表示不同型別的資料轉換,而不暴露演算法實現的內部狀態。而管道操作符就是用來表示資料轉換的。

trait PipeOperator[-T, +U] {
  def |>(data: T): Option[U]
}

|>操作符將型別為T的資料轉換成型別為U的資料,返回一個Option來處理中間的錯誤和異常。

單子化資料轉換(Monadic data transformation)

接下來需要建立單子化的設計(monadic design)來實現管道操作(pipe operator)。通過單子化設計來包裝類_FCT_FCT類的方法代表了傳統Scala針對集合的高階函式子集。

class _FCT[+T](val _fct: T) {
  def map[U](c: T => U): _FCT[U] = new _FCT[U]( c(_fct))

  def flatMap[U](f: T =>_FCT[U]): _FCT[U] = f(_fct)

  def filter(p: T =>Boolean): _FCT[T] =
  if( p(_fct) ) new _FCT[T](_fct) else zeroFCT(_fct)

  def reduceLeft[U](f: (U,T) => U)(implicit c: T=> U): U =
  f(c(_fct),_fct)

  def foldLeft[U](zero: U)(f: (U, T) => U)(implicit c: T=> U): U =
  f(c(_fct), _fct)

  def foreach(p: T => Unit): Unit = p(_fct)
}

最後,Transform類將PipeOperator例項作為引數輸入,自動呼叫其操作符,像這樣:

class Transform[-T, +U](val op: PipeOperator[T, U]) extends _FCT[Function[T, Option[U]]](op.|>) {
  def |>(data: T): Option[U] = _fct(data)
}

也許你會對資料轉換Transform的單子化表示背後的原因表示懷疑,畢竟本來可以通過PipeOperator的實現來建立任何演算法。
原因是Transform含有豐富的方法,使得開發者可以建立豐富的工作流。
下面的程式碼片段描述的是使用單子化方法來進行資料轉換組合:

val op = new PipeOperator[Int, Double] {
  def |> (n: Int):Option[Double] =Some(Math.sin(n.toDouble))
}
def g(f: Int =>Option[Double]): (Int=> Long) = {
  (n: Int) => {
    f(n) match {
      case Some(x) => x.toLong
      case None => -1L
    }   
  }
}
val gof = new Transform[Int,Double](op).map(g(_))

這裡使用函式g作為現有的資料轉換來擴充套件op。

依賴注入(Dependency injection)

一個由可配置的資料轉換構成的工作流在其不同的流程階段都需要動態的模組化。蛋糕模式(Cake Pattern)是使用混入特質(mix-in traits)來滿足可配置計算工作流的一種高階類組合模式。
Scala通過特質這一語法特性使得開發者能夠使用一種靈活的、可重用的方法來建立和管理模組,特質是可巢狀的、可混入類中的、可堆疊的、可繼承的。

val myApp = new Classification with Validation with PreProcessing {
  val filter = ..
}
val myApp = new Clustering with Validation with PreProcessing {
  val filter = ..
}

對於上面兩個應用來說,都需要資料的預處理和驗證模組,在程式碼中都重複定義了filter方法,使得程式碼重複、缺乏靈活性。當特質在組合中存在依賴性時,這個問題凸現出來。

混入的線性化
在混入的特質中,方法呼叫遵循從右到左的順序:
- trait B extends A
- trait C extends A
- class M extends N with C with B
Scala編譯器按照M => B => C => A => N的線性順序來實現

trait PreProcessingWithValidation extends PreProcessing {
  self: Validation =>
  val filter = ..
}

val myApp = new Classification with PreProcessingWithValidation {
  val validation: Validation
}

在PreProcessingWithValidation中使用self型別來解決上述問題。
(tips:原書的內容在這裡我沒怎麼搞清楚,不知道是通過自身型別混入了Validation後filter方法具體是怎麼實現的,以及例項化Classification時混入PreProcessingWithValidation難道不需要混入Validation嗎?我表示疑問)

工作流模組

由PipeOperator定義的資料轉換動態地嵌入了通過抽象val定義的模組中,下面我們定義工作流的三個階段:

trait PreprocModule[-T, +U] { val preProc: PipeOperator[T, U] }
trait ProcModule[-T, +U] { val proc: PipeOperator[T, U] }
trait PostprocModule[-T, +U] { val postProc: PipeOperator[T, U] }

上面的特質(模組)僅包含一個抽象值,蛋糕模式的一個特點是用模組內部封裝的型別初始化抽象值來執行嚴格的模組化:

trait ProcModule[-T, +U] {
  val proc: PipeOperator [T, U]
  class Classification[-T, +U] extends PipeOperator [T,U] { }
}

構建框架的一個目的是允許開發者可以從任何工作流中獨立建立資料轉換(繼承自PipeOperator)。

工作流工廠

接下來就是將不同的模組寫入一個工作流中,通過上一小節中的三個特質的堆疊作為自身引用來實現:

class WorkFlow[T, U, V, W] {
  self: PreprocModule[T,U] with ProcModule[U,V] with PostprocModule[V,W] =>

  def |> (data: T): Option[W] = {
    preProc |> data match {
      case Some(input) => {
        proc |> input match {
          case Some(output) => postProc |> output
          case None => { … }
        }
      }
      case None => { … }
    }
  }
}

下面介紹如何具體地實現一個工作流。
首先通過繼承PipeOperator來定義集中資料轉換:

class Sampler(val samples: Int) extends PipeOperator[Double => Double, DblVector] {
  override def |> (f: Double => Double): Option[DblVector] =
  Some(Array.tabulate(samples)(n => f(n.toDouble/samples)) )
}

class Normalizer extends PipeOperator[DblVector, DblVector] {
  override def |> (data: DblVector): Option[DblVector] =
  Some(Stats[Double](data).normalize)
}

class Reducer extends PipeOperator[DblVector, Int] {
  override def |> (data: DblVector): Option[Int] =
  Range(0, data.size) find(data(_) == 1.0)
}


工作流工廠由這個UML類圖描述。
最終通過動態地初始化抽象值preProc、proc和postProc來例項化工作流。

val dataflow = new Workflow[Double => Double, DblVector, DblVector, Int]
  with PreprocModule[Double => Double, DblVector]
  with ProcModule[DblVector, DblVector]
  with PostprocModule[DblVector, Int] {
    val preProc: PipeOperator[Double => Double,DblVector] = new Sampler(100) //1
    val proc: PipeOperator[DblVector,DblVector]= new Normalizer //1
    val postProc: PipeOperator[DblVector,Int] = new Reducer//1
}

dataflow |> ((x: Double) => Math.log(x+1.0)+Random.nextDouble) match {
  case Some(index) => …

參考資料

《Scala for Machine Learning》Chapter 2