基於 Algebird 談一談代數資料型別在資料聚合中的應用
此文已由作者肖乃同授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
代數資料型別是指滿足一定數學特性的資料型別, 這些特性使得計算能夠很方便的並行化,在Scalding和Spark等資料計算框架中有著廣泛的應用。代數資料型別是一個通用的概念, 其實現不限於Algebird,本文主要結合近期處理的一個數據任務, 介紹一下這一技術及Algebird這個函式庫。文中程式碼示例都是基於Scala, 如有紕漏歡迎指正。
應用場景:雲閱讀使用者流失模型特徵體取
近期接到這樣一個任務, 提取一個特定時間視窗登陸使用者的95個特徵,用於訓練預測使用者流失的模型。抽取任何一個單獨特徵並不複雜, 不過特徵眾多,資料分佈在多個數據源。我計劃延續前期的程式碼, 使用scalding處理這一任務。 這些特徵大致分為四類:
-
功能使用的次數
-
功能使用的天數
-
使用者某一屬性最新的非空值
-
使用者某一屬性的集合數量
使用次數就是常規的數值累加,2和4都要考慮集合的去重,3是時間上的maxBy同時要考慮空值處理問題。 為了方便統一處理,我考慮將這些資料都轉化成可加的代數資料型別, 然後基於這些型別做聚合。
Scalding 及 Spark 型別安全聚合介面介紹
先來看一下Scalding提供的聚合介面,直接使用Algebird提供的聚合器:
import com.twitter.algebird.Aggregator.count val users: TypedPipe[User] = ??? users.groupBy(_.userId).aggregate(count)複製程式碼
Spark在2.0後增加了DataSet這一新的API, 簡單講就是型別安全的DataFrame (基本等同於與scalding type safe api 之於 field based api)。
import org.apache.spark.sql.expressions.scalalang.count spark.createDataset(Seq(1,2,3)) .groupByKey(_) .agg(count)複製程式碼
Spark 沒有直接使用 algebird, 其參考algebird程式碼(看了一下spark的程式碼註釋),寫了一套類似介面。 後面會講到如何在Spark使用Algebird。
Aggregator提供了可複用的聚合元件,不再限於特定的欄位。Algebird的Aggregator是基於半群和么半群的代數資料型別聚合。
代數資料型別理論
先來補習一下數學知識,群是一個二元操作下滿足一定特徵的集合:
-
閉包性: 集合中的任意兩個元素A和B, A op B 結果依然是集合的元素
-
結合律: 集合中的任意兩個元素A、B和C, (A op B) op C 等價於 A op (B op C)
-
么元(也可以叫零元 Unit):集合存在元素e, 使得任意的元素A有 e op A 等價於 A op e 等價於 A
-
逆元: 任意元素A, 存在集合元素B, 使得 A op B = e(e 為么元)
滿足全部條件的是群, 滿足1和2的是半群, 滿足1、2和3的是么半群(有么元存在)
舉例幾個具體的例子說明一下:
-
自然數在加法下是一個群,滿足閉包和結合律,0是么元,負數是逆元
-
偶數在加法下是一個群, 奇數不是, 不滿足閉包性,奇數相加為偶數
-
奇數在乘法下是一個么半群, 不存在逆元
-
正整數在加法下是一個半群, 不存在么元,0不屬於正整數
Algebird 實現介紹
Algebird 是twitter開源的scala的抽象代數庫,實現了常見資料型別的半群、么半群等支援,是從scalding分離出來的通用庫。
通過例子比較好理解:
import com.twitter.algebird._ import com.twitter.algebird.Operators._ Max(3) + Max(5) + Max(10)// result: Max(10) Map(1 -> 2) + Map(1 -> 3) // result: Map(1 -> 5) Map(1 -> Max(3), 2 -> Max(7)) + Map(1 -> Max(-10), 2 -> Max(20)) // result: Map(1 -> Max(3), 2 -> Max(20))複製程式碼
Max是個Semigroup(半群), Map是個Monoid(么半群),Algebird有很大的靈活性,從上面示例可以看到Map的值是半群,可以實現相同key的值的聚合。
Algebird除了基本Semigroup和Monoid, Map、IndexedSeq、Tuple等高階的群 (引數型別是群的群,我這樣稱謂),可以組合出非常靈活的使用。
使用者流失模型中的應用
回到我要處理的問題上來, 需要按照使用者去計算4類不同的特徵值, 這些值很稀疏, 可以把上述問題轉化成聚合問題。
以搜尋這個事件來說明, 假設要統計使用者搜尋的次數、天數、關鍵詞數量,那麼
Map("搜尋次數" -> 1) Map("搜尋天數" -> date) Map("搜尋關鍵詞數量" -> keyword)複製程式碼
Map是么半群,需要值型別是半群, date和keyword需要轉換成半群的資料結構。 關鍵詞數量需要去重, 可以使用Set來做, 使用Set求集合, 最後取集合數量, 聚合器如下:
import com.twitter.algebird.Aggregator.{const, toSet, prepareMonoid => sumAfter} val searchCountAgg = sumAfter[MdaEvent, Map[String, Int]](_.searchCount) val keywordCountAgg = toSet[String] .composePrepare[ClientEvent](_.keyword) .andThenPresent(_.size)複製程式碼
搜尋天數統計,日期也可以使用上述集合,不過天數的統計非常多, 集合開銷比較大, 我把它轉成一個bitset, , 我統計的視窗只有1個月, 所以用Long型記下相對於開始日期, 這一天是不是有使用:
import com.twitter.algebird.Monoid import com.github.nscala_time.time.Imports._ import org.joda.time.Days class Bits(val value: Long) extends AnyVal { def count: Int = java.lang.Long.bitCount(value) def get(b: Int): Int = if((value & (1 << b)) > 0) 1 else 0 override def toString: String = value.toBinaryString } object BitsMonoid extends Monoid[Bits] { override def zero = new Bits(0L) override def plus(left: Bits, right: Bits) = new Bits(left.value | right.value) override def sumOption(iter: TraversableOnce[Bits]): Option[Bits] = { if(iter.isEmpty) None Some(iter.reduce((a, b) => new Bits(a.value | b.value))) } } def dateDiffToBits(fromDate: DateTime): Long => Bits = { val base = fromDate.withTimeAtStartOfDay() (timestamp: Long) => { val theDay = new DateTime(timestamp).withTimeAtStartOfDay() val days = Days.daysBetween(base, theDay).getDays require(days < 64, s"only 64 bits long is supported, got day diff: $days") new Bits(1 << days) } } val toBitsFun = dateDiffToBits(sampleStartDate) val searchDaysAgg = { implicit val m = BitsMonoid sumAfter[MdaEvent, Map[String, Bits]] { event => searchTime(event).mapValues(toBitsFun) }.andThenPresent(_.mapValues(b => b.count)) }複製程式碼
最後來處理第三類特徵非空最新屬性,這個屬性是取按時間的最大值,空值需要特別處理一下, 使用Max, 把排序函式修改一下:
import com.twitter.algebird.Aggregator.max def latestStringProperty[U <: ClientEvent](fn: U => String): Aggregator[U, U, String] = { import com.twitter.algebird.Aggregator.max implicit val ordU = Ordering.by { u: U => val p = fn(u) val isEmpty = if (p.isEmpty) 0 else 1 (isEmpty, u.opTime) // empty property always be covered by value property } max[U].andThenPresent(e => fn(e)) }複製程式碼
最後就能夠使用這些聚合器, 提取所需的特徵值了
val multiOps = MultiAggregator( searchCountAgg, keywordCountAgg, searchDaysAgg, latestStringProperty(_.productVersion) ) val daReport = daEvents.groupBy(_.userId).aggregate(multiOps)複製程式碼
如何在Spark中的使用
最後來講講如果在Spark中使用Algebird聚合器, 這個特徵提取本來應該在Spark處理更為方便, 來研究了一下Spark的聚合器。
Spark沒有直接使用Algebird, 但其聚合器基本參照Algebird的, 我寫了一個適配的類來方便直接在 Spark中使用上述的聚合器:
import com.twitter.algebird.MonoidAggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder, SparkSession, TypedColumn} implicit class MonoidToTypedColumn[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) { def toColumn: TypedColumn[A,C] = new MonoidAggregatorAdaptor(m).toColumn } class MonoidAggregatorAdaptor[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) extendsAggregator[A,B,C] { override def zero = m.monoid.zero override def reduce(b: B, a: A) = m.reduce(b, m.prepare(a)) override def finish(reduction: B) =m.present(reduction) override def merge(b1: B, b2: B) = m.reduce(b1, b2) override def bufferEncoder = implicitly[Encoder[B]] override def outputEncoder = implicitly[Encoder[C]] }複製程式碼
這裡只貼了適配Monoid的聚合器, Semigroup的會稍麻煩,程式碼比較多, 基本參考org.apache.spark.sql.expressions.ReduceAggregator。
最後我們就可以直接再Spark使用Algebird:
val latest = maxBy[DeviceEvent, Long](_.timestamp).toColumn.name("latest") val count = size.toColumn.name("count") spark.createDataset(Seq(DeviceEvent("a", "iphone", 10L), DeviceEvent("a", "android", 100L), DeviceEvent("a", "iphone", 123L))) .groupByKey(_.id) .agg(count, latest) .collect複製程式碼
總結
使用代數資料型別, 我們資料計算的程式碼更接近於問題描述語言, 表達力更強,避免了命令式的操作,bug更少。
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易技術、產品、運營經驗分享請點選。
相關文章: