1. 程式人生 > >HBase實踐案例:知乎 AI 使用者模型服務效能優化實踐

HBase實踐案例:知乎 AI 使用者模型服務效能優化實踐

使用者模型簡介

知乎 AI 使用者模型服務於知乎兩億多使用者,主要為首頁、推薦、廣告、知識服務、想法、關注頁等業務場景提供資料和服務,

例如首頁個性化 Feed 的召回和排序、相關回答等用到的使用者長期興趣特徵,問題路由、回答排序中用到的 TPR「作者創作權威度」,廣告定向投放用到的基礎屬性等。

主要功能

提供的資料和功能主要有:

  • 使用者興趣:長期興趣、實時興趣、分類興趣、話題興趣、keyword 興趣、作者創作權威度等,
  • 使用者 Embedding 表示:最近鄰使用者、人群劃分、特定使用者圈定等,
  • 使用者社交屬性:使用者親密度、二度好友、共同好友、相似優秀回答者等,
  • 使用者實時屬性: LastN 行為、LastLogin 等,
  • 使用者基礎屬性:使用者性別預測、年齡段計算、職業預估等。

服務架構

整體主要分為 Streaming / 離線計算、線上服務和 HBase 多叢集同步三部分組成,下面將依次進行介紹。

                                                             使用者模型服務架構圖

Streaming / 離線計算

Streaming 計算主要涉及功能 LastRead、LastSearch、LastDisplay,實時話題/ Keyword 興趣、最後登入時間、最後活躍的省市等。

 

                                                     使用者模型實時興趣計算邏輯圖

實時興趣的計算流程

  1. 相應日誌獲取。從 CardshowLog、PageshowLog、QueryLog 中抽取<使用者,contentToken,actionType >等內容。
  2. 對映到對應的內容維度。對於問題、回答、文章、搜尋分別獲取對應的 Topic 和 Keyword,搜尋內容對應的 Topic。
    在 Redis 中用 contentToken 置換 contentId 後,請求 ContentProfile 獲取其對應話題和關鍵詞;
    對於 Query,呼叫 TopicMatch 服務,傳遞搜尋內容給服務,服務返回其對應的 Topic;呼叫 Znlp 的 KeywordExtractorJar 包,傳遞搜尋內容並獲得其對應的 Keyword 。
  3. 使用者-內容維度彙總。根據使用者的行為,在<使用者,topic,actionType>和<使用者,keyword,actionType>層面進行 groupBy 聚合彙總後,
    並以 hashmap 的格式儲存到 Redis,作為計算使用者實時興趣的基礎資料,按時間衰減係數 timeDecay 進行新舊興趣的 merge 後儲存。
  4. 計算興趣。在使用者的歷史基礎資料上,按一定的 decay 速度進行衰減,按威爾遜置信區間計算使用者興趣 score,並以 Sortedset 的格式儲存到 Redis。

關於興趣計算,已經優化的地方主要是:如何快速的計算平滑引數 alpha 和 beta,如何 daily_update 平滑引數,以及用卡方計算置信度時,
是否加入平滑引數等都會對最終的興趣分值有很大的影響,當 display 為 1 曝光數量不足的情況下,興趣 score 和 confidence 計算出現 的 bias 問題等。


線上服務

隨之知乎日益增加的使用者量,以及不斷豐富的業務場景和與之相對應出現的呼叫量上升等,對線上服務的穩定性和請求時延要求也越來越高。

舊服務本身也存在一些問題,比如:

  1. 線上服務直連 HBase,當資料熱點的時候,造成某些 Region Server 的負載很高,P95 上升,輕者造成服務抖動,監控圖偶發有「毛刺」現象,重者造成服務幾分鐘的不可用,需要平臺技術人員將 Region 從負載較高的 RegionServer 上移走。
  2. 離線任務每次計算完成後一次大批量同時寫入離線和線上叢集,會加重 HBase 線上叢集Region Server 的負載,增大 HBase get 請求的時延,從而影響線上服務穩定性和 P95。

針對問題一,我們在原來的服務架構中增加快取機制,以此來增強服務的穩定型、減小 Region Server 的負載。

針對問題二,修改了離線計算和多叢集資料同步的方式,詳見「HBase多叢集儲存機制」部分。

Cache機制具體實現

沒有 Cache 機制時,所有的 get 和 batchGet 方法直接請求到 HBase,具體如下圖:

                                                          使用者模型服務請求序列圖

 

  1. UserProfileServiceApp 啟動服務,將收到的請求交由 UserProfileServiceImpl 具體處理
  2. UserProfileServiceImp 根據請求引數,呼叫 GetTranslator 將 UserProfileRequest.GetRequest 轉化成 HBase 中的
    Get Object(在 Map 中維護每個 requestField 對應 HBase 中的 tablename,cf,column,prefix 等資訊),以格式Map[String, util.List[(AvailField, Get)]]返回。
  3. UserProfileServiceImp 用 Future 異步向 HBase 傳送 get 請求,獲取到結果返回。

增加 Cache 機制的具體方法,在上面的第二步中,增加一個 CacheMap,用來維護 get 中 AvailField 對應 Cache 中的 key,
key 的組成格式為:「 tablename 縮寫| columnfamily 縮寫| columnname 縮寫| rowkey 全寫」。這裡使用的 Redis 資料結構主要有兩種,SortedSet 和 Key-Value對。
服務端收到請求後先去轉化 requestField 為 Cache 中的 key,從 Cache 中獲取資料。對於沒有獲取到 requestField 的轉化成 GetObject,請求 HBase 獲取,將結果儲存到 Cache 中並返回。

最終效果

使用者模型的訪問量大概為 100K QPS,每個請求轉化為多個 get 請求。 增加 Cache 前 get 請求的 P95 為30ms,增加 Cache 後降低到小於 15ms,Cache 命中率 90% 以上。


HBase 多叢集儲存機制

離線任務和 Streaming 計算主要採用 Spark 計算實現, 結果儲存到 HBase 的幾種方式:

方法一:每次一條

1. 每次寫進一條,呼叫 API 進行儲存的程式碼如下:

val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val table = hbaseConn.getTable(TableName.valueOf("word"))
x.foreach(value => {
    var put = new Put(Bytes.toBytes(value.toString))
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
    table.put(put)
})

方法二:批量寫入

2. 批量寫入 HBase,使用的 API:

/**
   * {@inheritDoc}
   * @throws IOException
   */
  @Override
  public void put(final List<Put> puts) throws IOException {
    getBufferedMutator().mutate(puts);
    if (autoFlush) {
      flushCommits();
    }
  }

方法三:MapReduce 的 saveAsNewAPIHadoopDataset 方式寫入

3. saveAsNewAPIHadoopDataset 是通用的儲存到 Hadoop 儲存系統的方法,呼叫 org.apache.hadoop.mapreduce.RecordWriter 實現。
org.apache.hadoop.hbase.mapreduce.TableOutputFormat.TableRecordWriter 是其在 HBase 中的實現類。底層通過呼叫 hbase.client.BufferedMutator.mutate() 方式儲存。

val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
rdd.map(x => {
  var put = new Put(Bytes.toBytes(x.toString))
  put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x.toString))
  (new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)

/**
 * Writes a key/value pair into the table.
 * @throws IOException When writing fails.
 */
@Override
public void write(KEY key, Mutation value)
throws IOException {
  if (!(value instanceof Put) && !(value instanceof Delete)) {
    throw new IOException("Pass a Delete or a Put");
  }
  mutator.mutate(value);
}

方法四:BulkLoad 方式

4. BulkLoad 方式,建立 HFiles,呼叫 LoadIncrementalHFiles 作業將它們移到 HBase 表中。
首先需要根據表名 getRegionLocator 得到 RegionLocator,根據 RegionLocator 得到 partition,因為在 HFile 中是有序的
所以,需要呼叫 rdd.repartitionAndSortWithinPartitions(partitioner) 將 rdd 重新排序。
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) 進行任務增量Load 到具體表的配置 實現並執行對映( 並減少) 作業,
使用 HFileOutputFormat2 輸出格式將有序的放置或者 KeyValue 物件寫入 HFile 檔案。
Reduce 階段通過呼叫 HFileOutputFormat2.configureIncrementalLoad 配置在場景後面。執行LoadIncrementalHFiles 作業將 HFile 檔案移動到系統檔案。

static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator,
  Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);

// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(TextSortReducer.class);
} else {
  LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

conf.setStrings("io.serializations", conf.get("io.serializations"),
    MutationSerialization.class.getName(), ResultSerialization.class.getName(),
    KeyValueSerialization.class.getName());

configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
configureBlockSize(table, conf);
configureDataBlockEncoding(table, conf);

TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}

public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
  throws IOException {
configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class);
}
val hFileLoader = new LoadIncrementalHFiles(conf)
hFileLoader.doBulkLoad(hFilePath, new HTable(conf, table.getName))

 

將 HFile 檔案 Bulk Load 到已存在的表中。 由於 HBase 的 BulkLoad 方式是繞過了 Write to WAL,Write to MemStore 及 Flush to disk 的過程,所以並不能通過 WAL 來進行一些複製資料的操作。

由於 Bulkload 方式還是對叢集 RegionServer 造成很高的負載,最終採用方案三,下面是兩個叢集進行資料同步。

儲存同步機制

技術選型 HBase 常見的 Replication 方法有 SnapShot、CopyTable/Export、BulkLoad、Replication、應用層併發讀寫等。
應用層併發讀寫 優點:應用層可以自由靈活控制對 HBase寫入速度,開啟或關閉兩個叢集間的同步,開啟或關閉兩個叢集間具體到表或者具體到列簇的同步,對 HBase 叢集效能的影響最小,缺點是增加了應用層的維護成本。
初期沒有更好的叢集資料同步方式的時候,使用者模型和內容模型自己負責兩叢集間的資料同步工作。

                                                             使用者模型儲存多機房同步架構圖

具體實現細節

第一步:定義用於在 Kafka 的 Producer 和 Consumer 中流轉的統一資料 Protobuf 格式

message ColumnValue {
required bytes qualifier = 1;
......
}
message PutMessage {
required string tablename = 1;
......
}

第二步:傳送需要同步的資料到 Kafka,(如果有必要,需要對資料做相應的格式處理),這裡對資料的處理,有兩種方式。
第一種:如果程式中有統一的儲存到 HBase 的工具(另一個專案是使用自定義的 HBaseHandler,業務層面只生成 tableName,rowKey,columnFamily,column 等值,
由 HBaseHandler 統一構建成 Put 物件,並儲存 HBase 中),這種方式在業務層面改動較小,理論上可以直接用原來的格式發給 Kafka,但是如果 HBaseHandler 處理的格式和 PutMessage 格式有不符的地方,做下適配即可。

/**
* tableName: hbase table name
* rdd: RDD[(rowkey, family, column, value)]
*/
def convert(tableName: String, rdd: RDD): RDD = {
rdd.map {
  case (rowKey: String, family: String, column: String, value: Array[Byte]) =>
    val message = KafkaMessages.newBuilder()
    val columnValue = ColumnValue.newBuilder()
    columnValue.set
     ......
    (rowKey, message.build().toByteArray)
 }
}

 

第二種:程式在 RDD 中直接構建 HBase 的 Put 物件,呼叫 PairRDD 的 saveAsNewAPIHadoopDataset 方法儲存到 HBase 中。
此種情況,為了相容已有的程式碼,做到程式碼和業務邏輯的改動最小,傳送到 Kafka 時,需要將 Put 物件轉換為上面定義的 PutMessage Protobuf 格式,然後傳送給 Kafka。

/**
* tableName: hbase table namne
* rdd: RDD[(rowKey, put)]
*/
def convert(tableName: String, familyNames: Array[String], rdd: RDD): RDD = {
rdd.map {
  case (_, put: Put) =>
    val message = PutMessage.newBuilder()
    for(familyName <- familyNames){
      if(put.getFamilyMap().get(Bytes.toBytes(familyName))!=null){
      val keyValueList = put.getFamilyMap()
        .asInstanceOf[java.util.ArrayList[KeyValue]].asScala
        for( keyvalue <- keyValueList){
          message.setRowkey(ByteString.copyFrom(keyvalue.getRow))
        ......
        }
        message.setTablename(tableName)
      }
    }
    (null, message.build().toByteArray)
 }
}

 

第三步:傳送到 Kafka,不同的表傳送到不同的 Topic,對每個 Topic 的消費做監控。

/**
* 傳送 rdd 中的內容到 brokers 的指定 topic 中
* tableName: hbase table namne
* rdd: RDD[(rowKey, put)]
*/
def send[T](brokers: String,
               rdd: RDD[(String, T)],
               topic: String)(implicit cTag: ClassTag[T]): Unit = {
  rdd.foreachPartition(partitionOfRecords => {
      val producer = getProducer[T](brokers)
      partitionOfRecords.map(r => new ProducerRecord[String, T](topic, r._1, r._2))
        .foreach(m => producer.send(m))
      producer.close()
  })
}

 

第四步:另啟動 Streaming Consumer 或者服務消費 Kafka 中內容,將 putMessage 的 Protobuf 格式轉成 HBase 的 put 物件,同時寫入到線上 HBase 叢集中。 Streaming 消費Kafka ,不同的表傳送到不同的 Topic,對每個 Topic 的消費做監控。

val toHBaseTagsTopic = validKafkaStreamTagsTopic.map {
      record =>
        val tableName_r = record.getTablename()
        val put = new Put(record.getRowkey.toByteArray)
        for (cv <- record.getColumnsList) {
          put.addColumn(record.getFamily.toByteArray)
          ......
        }
        if(put.isEmpty){
          (new ImmutableBytesWritable(), null)
        }else{
          (new ImmutableBytesWritable(), put)
        }
    }.filter(_._2!=null)
    if(!isClean) {
      toHbaseTagsTopic.foreachRDD { rdd =>
        rdd.saveAsNewAPIHadoopDataset(
          AccessUtils.createOutputTableConfiguration(
            constants.Constants.NAMESPACE + ":" + constants.Constants.TAGS_TOPIC_TABLE_NAME
          )
        )
      }
   }

 

如下為另一種啟動服務消費 Kafka 的方式。
val consumer = new KafkaConsumer[String, Array[Byte]](probs)
consumer.subscribe(topics)
val records = consumer.poll(100)
for (p <- records.partitions) {
   val recordsOfPartition = records.records(p)
   recordsOfPartition.foreach { r =>
      Try(KafkaMessages.parseFrom(r.value())) match {
         case Success(record) =>
            val tableName = record.getTableName 
            if (validateTables.contains(tableName)) {
               val messageType = record.getType
               ......
               try {
                  val columns = record.getColumnsList.map(c => (c.getColumn, c.getValue.toByteArray)).toArray
                   HBaseHandler.write(tableName)
                ......
               } catch {
                  case ex: Throwable =>
                    LOG.error("write hbase fail")
                    HaloClient.increment(s"content_write_hbase_fail")
               }
            } else {
              LOG.error(s"table $tableName is valid")
            }
         }
      }
      //update offset
      val lastOffset = recordsOfPartition.get(recordsOfPartition.size - 1).offset()
      consumer.commitSync(java.util.Collections.singletonMap(p, new OffsetAndMetadata(lastOffset + 1)))
}

 

結語

最後,目前採用的由應用控制和管理線上離線叢集的同步機制,在隨著平臺多機房專案的推動下,平臺將推出 HBase 的統一同步機制 HRP (HBase Replication Proxy),屆時業務部門可以將更多的時間和精力集中在模型優化層面。

Reference

[1] HBase Cluster Replication
[2] 通過 BulkLoad 快速將海量資料匯入到 HBase
[3] HBase Replication 原始碼分析
[4] HBase 原始碼之 TableRecordWriter
[5] HBase 原始碼之 TableOutputFormat
[6] Spark2.1.1寫入 HBase 的三種方法效能對比