TensorFlow做Sparse Machine Learning
TensorFlow Sparse現狀及背景
在機器學習這塊,Estimator本身的封裝能夠適應比較多的Dense的場景,而對於Sparse的場景無論是官方demo還是一些業界的大牛都分享的比較少,在很多場景,比如libfm、libffm、Xgboost都支援直接libsvm, field-libsvm的格式中讀入資料,訓練模型沒有原始的實現,沒法直接調包使用,得自己在TensorFlow的框架上構造,所幸Estimator本身的框架支援自定義的input_fn,和自定義的model_fn,筆者過去一段時間工作之餘研究了下,並實現了基於libsvm的Sparse Logistic Regression和Sparse Factorization Machine的一套比較高效的流程, 打通了從資料讀取、模型訓練、到TensorFlow Serving的部署。
TensorFlow中的sparse_tensor實現
我們讀下sparse_tensor的原始碼, ofollow,noindex">sparse_tensor.py , 很容易看出來sparse_tensor在TensorFlow中是一個高層的封裝,主要包括indices, values, shape三個部分,這裡很有意思,後面我實踐中遇到一個大坑,可以通過這裡解決,這裡我先賣個關子;
sparse representation的好處
常見的稀疏矩陣的表示有csc,csr,在很多矩陣計算的庫當中有使用,比如python中大家使用比較多的scipy,TensorFlow底層計算模組eigen,都是用類似的方式來表示稀疏矩陣,舉個例子比如某個商戶有500萬個商品,而使用者產生行為的商品必定遠遠小於500萬,如果都是用dense表示,那麼儲存單個使用者行為的商品資料需要500萬個指,而採用稀疏資料表示則儲存所需要的空間只需要和你才產生行為的商品數量有關,如下圖100個使用者的在500w上的行為資料如果用dense表示需要大概3G的空間;


需要儲存100*5000000個int,而使用csc_matrix,
row = np.array(range(100)) col = np.zeros(100) data = np.ones(100) csc_matrix((data, (row, col)), shape=(100, 5000000))
我們只需要儲存3*NNZ(這裡就是100)個int,然後加上一個shape資訊,空間佔用大大減少;
在記憶體中,我們通常使用csc來表示Sparse Matrix,而在樣本儲存中,通常使用libsvm格式來儲存
以空格為sep,label為1, 後續為feature的表示,格式為feature_id: feature_val, 在TensorFlow中我們可以使用TextlineDataset自定義input_fn來解析文字,其他很多相關的技術文章都有提及,但是作為一個程式員總感覺不想走已經走過的路,而且TF官宣tfrecord的讀寫效率高, 考慮到效率問題,我這裡使用TFRecordDataset來做資料的讀取;
LibSVM To TFRecord
解析LibSVM feature_ids, 和feature_vals, 很簡單沒有啥好說的, 直接貼程式碼,想要深入瞭解的,可以去看看TF的 example.proto , feature.proto , 就大概能瞭解Example和Feature的邏輯了,不用悶悶地只知道別人是這樣寫的。
import codecs import tensorflow as tf import logging logger = logging.getLogger("TFRecSYS") sh = logging.StreamHandler(stream=None) logger.setLevel(logging.DEBUG) fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(process)d %(message)s" datefmt = "%a %d %b %Y %H:%M:%S" formatter = logging.Formatter(fmt, datefmt) sh.setFormatter(formatter) logger.addHandler(sh) class LibSVM2TFRecord(object): def __init__(self, libsvm_filenames, tfrecord_filename, info_interval=10000, tfrecord_large_line_num = 10000000): self.libsvm_filenames = libsvm_filenames self.tfrecord_filename = tfrecord_filename self.info_interval = info_interval self.tfrecord_large_line_num = tfrecord_large_line_num def set_transform_files(self, libsvm_filenames, tfrecord_filename): self.libsvm_filenames = libsvm_filenames self.tfrecord_filename = tfrecord_filename def fit(self): logger.info(self.libsvm_filenames) writer = tf.python_io.TFRecordWriter(self.tfrecord_filename+".tfrecord") tfrecord_num = 1 for libsvm_filename in self.libsvm_filenames: logger.info("Begin to process {0}".format(libsvm_filename)) with codecs.open(libsvm_filename, mode='r', encoding='utf-8') as fread: line = fread.readline() line_num = 0 while line: line = fread.readline() line_num += 1 if line_num % self.info_interval == 0: logger.info("Processing the {0} line sample".format(line_num)) if line_num % self.tfrecord_large_line_num == 0: writer.close() tfrecord_file_component = self.tfrecord_filename.split(".") self.tfrecord_filename = self.tfrecord_filename.split("_")[0]+"_%05d.tfrecord"%tfrecord_num writer = tf.python_io.TFRecordWriter(self.tfrecord_filename) tfrecord_num += 1 logger.info("Change the tfrecord file to {0}".format(self.tfrecord_filename)) feature_ids = [] vals = [] line_components = line.strip().split(" ") try: label = float(line_components[0]) features = line_components[1:] except IndexError: logger.info("Index Error, line: {0}".format(line)) continue for feature in features: feature_components = feature.split(":") try: feature_id = int(feature_components[0]) val = float(feature_components[1]) except IndexError: logger.info("Index Error: , feature_components: {0}",format(feature)) continue except ValueError: logger.info("Value Error: feature_components[0]: {0}".format(feature_components[0]) ) feature_ids.append(feature_id) vals.append(val) tfrecord_feature = { "label" : tf.train.Feature(float_list=tf.train.FloatList(value=[label])), "feature_ids": tf.train.Feature(int64_list=tf.train.Int64List(value=feature_ids)), "feature_vals": tf.train.Feature(float_list=tf.train.FloatList(value=vals)) } example = tf.train.Example(features=tf.train.Features(feature=tfrecord_feature)) writer.write(example.SerializeToString()) writer.close() logger.info("libsvm: {0} transform to tfrecord: {1} successfully".format(libsvm_filename, self.tfrecord_filename)) if __name__ == "__main__": libsvm_to_tfrecord = LibSVM2TFRecord(["../../data/kdd2010/kdda.libsvm"], "../../data/kdd2010/kdda") libsvm_to_tfrecord.fit()

轉成tfrecord檔案之後,通常比原始的檔案要大一些,具體的格式的說明參考下 https:// cloud.tencent.com/devel oper/article/1088751 這篇文章比較詳細地介紹了轉tfrecord和解析tfrecord的用法,另外關於shuffle的buff size的問題,個人感覺問題並不大,在推薦場景下,資料條數多,其實記憶體消耗也不大,只是在執行前會有比較長載入解析的時間,另外一個問題是,大家應該都會提問的,為啥tfrecord會比自己寫input_fn去接下文字檔案最後來的快呢?
這裡我只能淺層意義上去猜測,這部分程式碼沒有拎出來讀過,所以不做回覆哈,有讀過原始碼,瞭解比較深的同學可以解釋下
TFRecord的解析

個人讀了一些解析tfrecord的幾個格式的原始碼,現在還有點亂,大概現在貌似程式碼中有支援VarLenFeature, SparseFeature, FixedLenFeature, FixedLenSequenceFeature這幾種,但是幾個api的說明裡面貌似對sparsefeature的支援有點磨礪兩可,所以選擇使用VarLenFeature上面的方式, 不知道這裡SparseFeature是怎麼玩的,有時間還得仔細看看。
然後,簡單寫個讀取的demo:

大家可以動手跑跑看,仔細研究的話會發現一些比較有意思的東西,比如VarLenFeature出來的是一個SparseTensor:

這裡我最開始是打算每次sess.run,然後轉換為numpy.array, 然後再喂feed_dict到模型,但是覺得這樣會很麻煩,速度會是瓶頸,如果能過直接使用這裡的SparseTensor去做模型的計算,直接從tfrecord解析,應該會比較好,但是又會遇到另一個問題,後面再詳細說明;這裡簡單提下,我這邊就是直接拿到兩個SparseTensor,直接去到模型,所以模型的設計會和常規的演算法會有不同;
Sparse Model的高效實現
import tensorflow as tf class SparseFactorizationMachine(object): def __init__(self, model_name="sparse_fm"): self.model_name = model_name def build(self, features, labels, mode, params): print("export features {0}".format(features)) print(mode) if mode == tf.estimator.ModeKeys.PREDICT: sp_indexes = tf.SparseTensor(indices=features['DeserializeSparse:0'], values=features['DeserializeSparse:1'], dense_shape=features['DeserializeSparse:2']) sp_vals = tf.SparseTensor(indices=features['DeserializeSparse_1:0'], values=features['DeserializeSparse_1:1'], dense_shape=features['DeserializeSparse_1:2']) if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL: sp_indexes = features['feature_ids'] sp_vals = features['feature_vals'] print("sp: {0}, {1}".format(sp_indexes, sp_vals)) batch_size = params["batch_size"] feature_max_num = params["feature_max_num"] optimizer_type = params["optimizer_type"] factor_vec_size = params["factor_size"] bias = tf.get_variable(name="b", shape=[1], initializer=tf.glorot_normal_initializer()) w_first_order = tf.get_variable(name='w_first_order', shape=[feature_max_num, 1], initializer=tf.glorot_normal_initializer()) linear_part = tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum") + bias w_second_order = tf.get_variable(name='w_second_order', shape=[feature_max_num, factor_vec_size], initializer=tf.glorot_normal_initializer()) embedding = tf.nn.embedding_lookup_sparse(w_second_order, sp_indexes, sp_vals, combiner="sum") embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum") sum_square = tf.square(embedding) second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1) y_hat = linear_part + tf.expand_dims(second_part, -1) predictions = tf.sigmoid(y_hat) print "y_hat: {0}, second_part: {1}, linear_part: {2}".format(y_hat, second_part, linear_part) pred = {"prob": predictions} export_outputs = { tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions) } if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec( mode=mode, predictions=predictions, export_outputs=export_outputs) loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=tf.squeeze(y_hat))) if optimizer_type == "sgd": opt = tf.train.GradientDescentOptimizer(learning_rate=params['learning_rate']) elif optimizer_type == "ftrl": opt = tf.train.FtrlOptimizer(learning_rate=params['learning_rate'],) elif optimizer_type == "adam": opt = tf.train.AdamOptimizer(learning_rate=params['learning_rate']) elif optimizer_type == "momentum": opt = tf.train.MomentumOptimizer(learning_rate=params['learning_rate'], momentum=params['momentum']) train_step = opt.minimize(loss,global_step=tf.train.get_global_step()) eval_metric_ops = { "auc" : tf.metrics.auc(labels, predictions) } if mode == tf.estimator.ModeKeys.TRAIN: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, loss=loss, train_op=train_step) if mode == tf.estimator.ModeKeys.EVAL: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, loss=loss, eval_metric_ops=eval_metric_ops)
這裡講個Factorization Machine的實現,會比Sparse Logistic Regression的實現要稍微複雜一點,首先,模型的演算法實現,比較簡單,隨便搜下應該大概都知道Factorization Machine的演算法原理,fm主要包括兩個部分,一個是LogisticRegression的部分,包括bias和一階特徵,另外一部分是把每一維特徵表示為一個指定大小的vector,去從樣本中去學習對訓練有效的交叉資訊:
bias = tf.get_variable(name="b", shape=[1], initializer=tf.glorot_normal_initializer()) w_first_order = tf.get_variable(name='w_first_order', shape=[feature_max_num, 1], initializer=tf.glorot_normal_initializer()) linear_part = tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum") + bias w_second_order = tf.get_variable(name='w_second_order', shape=[feature_max_num, factor_vec_size], initializer=tf.glorot_normal_initializer()) embedding = tf.nn.embedding_lookup_sparse(w_second_order, sp_indexes, sp_vals, combiner="sum") embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum") sum_square = tf.square(embedding) second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1) y_hat = linear_part + tf.expand_dims(second_part, -1) predictions = tf.sigmoid(y_hat)
這裡和普通的fm唯一不同的是,我使用tf.nn.embedding_lookup_sparse 來計算WX,在海量特徵維度的前提下,做全部的WX相乘是耗時,且沒有必要的,我們只需要取出其中有值的部分來計算即可,比如kdd2010,20216831的特徵,但是計算WX其實就會考驗系統的瓶頸,但是如果經過一個簡單的tf.nn.embedding_lookup_sparse來替代WX,就會先lookup feature_id,對應的embedding的表示,然後乘以相應的weight,最後在每一個樣本上進行一個combiner(sum)的操作,其實就是等同於WX, tf.nn.embedding_lookup_sparse(w_first_order, sp_indexes, sp_vals, combiner="sum")
, 而在系統方面,由於計算只與NNZ(非零數)有關, 效能則完全沒有任何壓力。二階的部分可以降低時間複雜度,相信應該瞭解FM的都知道,和的平方減去平方的和:
embedding_square = tf.nn.embedding_lookup_sparse(tf.square(w_second_order), sp_indexes, tf.square(sp_vals), combiner="sum") sum_square = tf.square(embedding) second_part = 0.5*tf.reduce_sum(tf.subtract(sum_square, embedding_square), 1)
由上面的實現,我們只需要把特徵的sp_indexes, sp_val傳出來就可以了, 但是因為這兩者都是SparseTensor,筆者開始想到的不是上述的實現,而是使用 tf.sparse.placeholder
, 然後喂一個feed_dict,對應SparseTensorValue就可以了,確實是可以的,模型訓練沒有問題,模型export出來也沒有問題(其實是有問題的, 我這裡重寫了Estimator的 build_raw_serving_input_receiver_fn
使其支援SparseTensor),但是在部署好TensorFlow Serving之後,我發現在客戶端SparseTensorValue貌似不能組成一個TensorProto, tf.make_tensor_proto
主要是把請求的值放進一個TensorProto,而TensorProto, https:// github.com/tensorflow/t ensorflow/blob/master/tensorflow/core/framework/tensor.proto ,貌似不能直接支援SparseTensorValue去放進TensorProto,所以就無法在部署好TensorFlow Serving後去請求(部署會在後文詳細描述,這裡我也想過能不能改他們的程式碼,但是貌似涉及太底層的東西,有點hold不住),但是也是有辦法的,前面文章提到SparseTensor,在TensorFlow中是高階的api,他其實就是由3個Tensor組成,是否可以把SparseTensor本身的3個Tensor暴露出來,然後請求的時候去組這三個Tensor就可以啦,所以只需要找到TFRecord接下出來的sp_indexes, sp_vals就可以了

從這裡很容易看到sp_indexes, sp_vals的TensorName,然後用佔位符替代,然後用這些去組成sp_indexes,sp_vals


說明下,這裡我使用的kdd2010的資料,特徵維度是20216831,樣本數量8407752,我是用我15年的macbook pro跑的, 使用的sgd, 收斂還是比較明顯的, 大家有興趣可以試試,按以往經驗使用其他優化器如adam,ftrl會在這種特徵規模比較大的條件下有比較好的提升,我這裡就走通整個流程,另外機器也不忍心折騰;
到了這裡,就訓練出來了一個可用的Sparse FM的模型,接下來要匯出模型,這裡的匯出模型是匯出一個暴露了placeholder的模型,可以在TensorFlow Serving被載入,被請求,不是單純的ckpt;
模型部署
feature_spec = { 'DeserializeSparse:0': tf.placeholder(dtype=tf.int64, name='feature_ids/indices'), 'DeserializeSparse:1': tf.placeholder(dtype=tf.int64, name='feature_ids/values'), 'DeserializeSparse:2': tf.placeholder(dtype=tf.int64, name='feaurte_ids/shape'), 'DeserializeSparse_1:0': tf.placeholder(dtype=tf.int64, name='feature_vals/indices'), 'DeserializeSparse_1:1': tf.placeholder(dtype=tf.float32, name='feature_vals/values'), 'DeserializeSparse_1:2': tf.placeholder(dtype=tf.int64, name='feature_vals/shape') } serving_input_receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feature_spec, is_sparse=False) sparse_fm_model.export_savedmodel(servable_model_dir, serving_input_receiver_fn, as_text=True)
和前面構造模型的時候對應,只需要把DeserializeSparse的部分暴露出來即可

這裡會以時間戳建立模型,儲存成功後temp-1543117151會變為1543117151,接下來,就是要啟動TensorFlow Serving載入模型: docker run -p 8500:8500 --mount type=bind,source=/Users/burness/work/tencent/TFRecSYS/TFRecSYS/runner/save_model,target=/models/ -e MODEL_NAME=sparse_fm -t tensorflow/serving
,使用官方提供的docker映象來部署環境很方便。

會先載入新的模型,然後unload舊模型,從命令列log資訊可以看出RPC/">gRPC介面為8500
剩下的,就下一個client,去請求
import grpc import sys sys.path.insert(0, "./") from tensorflow_serving.apis import predict_pb2 from tensorflow_serving.apis import prediction_service_pb2_grpc import tensorflow as tf from tensorflow.python.framework import dtypes import time import numpy as np from sklearn import metrics def get_sp_component(file_name): with open(file_name, "r") as fread: for line in fread.readlines(): fea_ids = [] fea_vals = [] line_components = line.strip().split(" ") label = float(line_components[0]) for part in line_components[1:]: part_components = part.split(":") fea_ids.append(int(part_components[0])) fea_vals.append(float(part_components[1])) yield (label, fea_ids, fea_vals) def batch2sparse_component(fea_ids, fea_vals): feature_id_indices = [] feature_id_values = [] feature_vals_indices = [] feature_vals_values = [] for index, id in enumerate(fea_ids): feature_id_values += id for i in range(len(id)): feature_id_indices.append([index, i]) for index, val in enumerate(fea_vals): feature_vals_values +=val for i in range(len(val)): feature_vals_indices.append([index, i]) return np.array(feature_id_indices, dtype=np.int64), np.array(feature_id_values, dtype=np.int64), np.array(feature_vals_indices, dtype=np.int64), np.array(feature_vals_values, dtype=np.float32) if __name__ == '__main__': start_time = time.time() channel = grpc.insecure_channel("127.0.0.1:8500") stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) request = predict_pb2.PredictRequest() request.model_spec.name = "sparse_fm" record_genertor = get_sp_component("../../data/kdd2010/kdda_t.libsvm") batch_size = 1000 predictions = np.array([]) labels = [] while True: try: batch_label = [] batch_fea_ids = [] batch_fea_vals = [] max_fea_size = 0 for i in range(batch_size): label, fea_ids, fea_vals = next(record_genertor) batch_label.append(label) batch_fea_ids.append(fea_ids) batch_fea_vals.append(fea_vals) if len(batch_fea_ids) > max_fea_size: max_fea_size = len(batch_fea_ids) shape = np.array([batch_size, max_fea_size],dtype=np.int64 ) batch_feature_id_indices, batch_feature_id_values,batch_feature_val_indices, batch_feature_val_values= batch2sparse_component(batch_fea_ids, batch_fea_vals) request.inputs["DeserializeSparse:0"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_id_indices)) request.inputs["DeserializeSparse:1"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_id_values)) request.inputs["DeserializeSparse:2"].CopyFrom(tf.contrib.util.make_tensor_proto(shape)) request.inputs["DeserializeSparse_1:0"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_val_indices)) request.inputs["DeserializeSparse_1:1"].CopyFrom(tf.contrib.util.make_tensor_proto(batch_feature_val_values)) request.inputs["DeserializeSparse_1:2"].CopyFrom(tf.contrib.util.make_tensor_proto(shape)) response = stub.Predict(request, 10.0) results = {} for key in response.outputs: tensor_proto = response.outputs[key] nd_array = tf.contrib.util.make_ndarray(tensor_proto) results[key] = nd_array print("cost %ss to predict: " % (time.time() - start_time)) predictions = np.append(predictions, results['output']) labels += batch_label print(len(labels), len(predictions)) except StopIteration: break fpr, tpr, thresholds = metrics.roc_curve(labels, predictions) print("auc: {0}",format(metrics.auc(fpr, tpr)))
開始用一個樣本做測試打出pred的值,成功後,我將所有的測試樣本去組batch去請求,然後計算下auc,對比下eval的時候的auc,差不多,那說明整體流程沒啥問題,另外每1000個樣本耗時大概270多ms,整體感覺還可以。

後續
基本到這裡就差不多了,現在已經支援單個field的Logistic Regression和Factorization Machine,擴充套件性比較強,只需要重寫演算法的類,剩餘的大部分都可以複用,接下來計劃是支援multi-field的資料接入,會實現更高效的Sparse DeepFM, FNN, DIN, DIEN, 其實已經差不多了,現在正在弄可用性,希望能夠通過配置檔案直接串起整個流程;另外分散式的也會支援,這個比較簡單,Estimator本身就ok,只是資源比較少,有機器的時候我試試,雖然我覺得TensorFlow本身的分散式做的不太讓人理解,但是能夠簡單複用還是比較厲害的;更高維度的支援之後也可以嘗試