本文首發於微信公眾號「對白的演算法屋」
大家好,我是對白。
目前,越來越多的網際網路公司內部都有自己的一套框架去訓練模型,而模型訓練時需要的資料則都儲存在分散式檔案系統(HDFS)上。Hive作為構建在HDFS上的一個數據倉庫,它本質上可以看作是一個翻譯器,可以將HiveSQL語句翻譯成MapReduce程式或Spark程式,因此模型需要的資料例如csv/libsvm檔案都會儲存成Hive表並存放在HDFS上,那麼問題就來了,如何大規模地把HDFS中的資料直接喂到Tensorflow中呢?Tensorflow提供了一種解決方法:spark-tensorflow-connector,支援將spark DataFrame格式資料直接儲存為TFRecords格式資料,接下來就帶大家瞭解一下TFRecord的原理、構成和如何生成TFRecords檔案。
TFRecord介紹
TFRecord是Tensorflow訓練和推斷標準的資料儲存格式之一,將資料儲存為二進位制檔案(二進位制儲存具有佔用空間少,拷貝和讀取(from disk)更加高效的特點),而且不需要單獨的標籤檔案了,其本質是一行行位元組字串構成的樣本資料。
一條TFRecord資料代表一個Example,一個Example就是一個樣本資料,每個Example內部由一個字典構成,每個key對應一個Feature,key為欄位名,Feature為欄位名所對應的資料,Feature有三種資料型別:ByteList、FloatList,Int64List。
TFRecord構成
它實質上是由protobuf定義的一種資料協議,其中tensorflow提供了兩種Example表示形式 Example和SequenceExample。它的定義程式碼位於[tensroflow/core/example/example.proto & feature.proto]。
Example和SequenceExample的定義:
message Example {
Features features = 1;
};
message SequenceExample {
Features context = 1;
FeatureLists feature_lists = 2;
};
message Features {
// Map from feature name to feature.
map<string, Feature> feature = 1;
};
// Containers for non-sequential data.
message Feature {
// Each feature can be exactly one kind.
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
// Containers for sequential data.
//
// A FeatureList contains lists of Features. These may hold zero or more
// Feature values.
//
// FeatureLists are organized into categories by name. The FeatureLists message
// contains the mapping from name to FeatureList.
//
message FeatureList {
repeated Feature feature = 1;
};
message FeatureLists {
// Map from feature name to feature list.
map<string, FeatureList> feature_list = 1;
};
我們這裡以最常用的Example來進行解釋。從圖中可以看到,在樣本生產環節,每個Example內部由一個dict構成,每個key(string)對應著一個Feature結構,這個Feature結構有三種具體形式,分別是ByteList,FloatList,Int64List三種。這三種形式便可以承載string,bytes,float,double,int,long等多種樣本結構,並且基於list的表示,使得我們既可以表達scalar,也可以表達vector型別的資料(注意如果想要將一個matrix儲存到到一個Feature內,其值需要時按照Row-Major拍平的1-D array, 行列資料需使用額外欄位儲存,方便反序列化)。這裡需要注意的是,我們在序列化的時候,並未將格式資訊序列化進去,實質上,序列化後的,每條tfrecord中的資料,只具有以下資料:
TFRecord中每條資料的格式:
uint64 length
uint32 masked_crc32_of_length
byte data[length]
uint32 masked_crc32_of_data
因此我們可以看出來,TFRecord並不是一個self-describing的格式,也就是說,tfrecord的write和read都需要額外指明schema。從上圖我們也能看出來,在實際訓練的時候,樣本都需要經過一個知曉了Schema的Parser來進行解析,然後才能傳遞給Tensorflow進行實際的訓練。
注:這裡只展示了CTR場景常使用的Example,當然也有影象等場景需要使用SequenceExample進行一些樣本的結構化表達,這裡不做展開。根據官方文件來看,SequenceExample主要是使用在時序特徵和視訊特徵。其中context欄位描述的是和當期時間和特徵不相關的共性資料,而feature_list則持有和時間或者視訊幀相關的資料。感興趣可以參考youtube-8M這個資料集中關於樣本資料的表示。
TFRecord的生成(小規模)
TFRecord的生成=Example序列化+寫入TFRecord檔案
構建Example時需要指定格式資訊(字典)key是特徵,value是BytesList/FloatList/Int64List值,但Example序列化時並未將格式資訊序列化進去,因此讀取TFRecord檔案需要額外指明schema。
每個Example會序列化成位元組字串並寫入TFRecord檔案中,程式碼如下:
import tensorflow as tf
# 回憶上一小節介紹的,每個Example內部實際有若干種Feature表達,下面
# 的四個工具方法方便我們進行Feature的構造
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _int64list_feature(value_list):
return tf.train.Feature(int64_list=tf.train.Int64List(value=value_list))
# Example序列化成位元組字串
def serialize_example(user_id, city_id, app_type, viewd_pois, avg_paid, comment):
# 注意我們需要按照格式來進行資料的組裝,這裡的dict便按照指定Schema構造了一條Example
feature = {
'user_id': _int64_feature(user_id),
'city_id': _int64_feature(city_id),
'app_type': _int64_feature(app_type),
'viewd_pois': _int64list_feature(viewd_pois),
'avg_paid': _float_feature(avg_paid),
'comment': _bytes_feature(comment),
}
# 呼叫相關api將Example序列化為位元組字串
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
# 樣本的生產,這裡展示了將2條樣本資料寫入到了TFRecord檔案中
def write_demo(filepath):
with tf.python_io.TFRecordWriter(filepath) as writer:
writer.write(serialize_example(1, 10, 1, [658, 325], 36.3, "yummy food."))
writer.write(serialize_example(2, 20, 2, [897, 568, 126], 89.6, "nice place to have dinner."))
print "write demo data done."
filepath = "testdata.tfrecord"
write_demo(filepath)
由以上程式碼可知,TFRecord的原理是:將每個樣本傳給serialize_example函式並輸出位元組字串,再通過TFRecordWriter類寫入TFRecord檔案中,有多少個樣本就會生成多少個位元組字串。
TFRecord的生成(大規模)
TFRecord的生成=spark DataFrame格式資料儲存為tfrecords格式資料
from pyspark.sql.types import *
def main():
#從hive表中讀取資料
df=spark.sql("""
select * from experiment.table""")
#tfrecords儲存路徑
path = "viewfs:///user/hadoop-hdp/ml/demo/tensorflow/data/tfrecord"
#將spark DataFrame格式資料轉換為tfrecords格式資料
df.repartition(file_num).write \
.mode("overwrite") \
.format("tfrecords") \
.option("recordType", "Example")\
.save(path)
if __name__ == "__main__":
main()
TFRecord的讀取
在模型訓練的時候需要讀取TFRecord檔案,有三個步驟:
1、首先通過tf.data.TFRecordDataset() API讀取TFRecord檔案並建立dataset;
2、定義schema;
3、使用tf.parse_single_example() 按照schema解析dataset中每個樣本;
schema的意義在於指定每個樣本的每一列資料應該用哪一種特徵解析函式去解析。
Tensorflow提供了三種解析函式:
1、tf.FixedLenFeature(shape,dtype,default_value):解析定長特徵,shape:輸入資料形狀、dtype:輸入資料型別、default_value:預設值;
2、tf.VarLenFeature(dtype):解析變長特徵,dtype:輸入資料型別;
3、tf.FixedSequenceFeature(shape,dtype,default_value):解析定長序列特徵,shape:輸入資料形狀、dtype:輸入資料型別、default_value:預設值;
程式碼如下:
def read_demo(filepath):
# 定義schema
schema = {
'user_id': tf.FixedLenFeature([], tf.int64),
'city_id': tf.FixedLenFeature([], tf.int64),
'app_type': tf.FixedLenFeature([], tf.int64),
'viewed_pois': tf.VarLenFeature(tf.int64),
'avg_paid': tf.FixedLenFeature([], tf.float32, default_value=0.0),
'comment': tf.FixedLenFeature([], tf.string, default_value=''),
}
# 使用相關api,按照schema解析dataset中的樣本
def _parse_function(example_proto):
return tf.parse_single_example(example_proto, schema)
# 讀取TFRecord檔案來建立dataset
dataset = tf.data.TFRecordDataset(filepath)
#按照schema解析dataset中的每個樣本
parsed_dataset = dataset.map(_parse_function)
#建立Iterator並迭代Iterator即可訪問dataset中的樣本
next = parsed_dataset.make_one_shot_iterator().get_next()
# 這裡直接利用session,列印dataset中的樣本
with tf.Session() as sess:
while True:
try:
print sess.run(next)
except:
print "out of data"
break
其中,
tf.parse_single_example(
serialized,
features,
name=None,
example_names=None
)
引數:
- serialized:序列化的Example。
- features:一個字典,key是特徵,value是FixedLenFeature/VarLenFeature/FixedSequenceFeature值。
- name:此操作的名稱(可選)。
- example_names:(可選)標量字串張量,關聯的名稱。
返回:
一個字典,key是特徵,value是Tensor或Sparse Tensor值。
最後歡迎大家關注我的微信公眾號:對白的演算法屋(duibainotes),跟蹤NLP、推薦系統和對比學習等機器學習領域前沿。
想進一步交流的同學也可以通過公眾號加我的微信一同探討技術問題,謝謝。