這一章我們藉著之前的NER的模型聊聊tensorflow serving,以及gRPC呼叫要注意的點。以下程式碼為了方便理解做了簡化,完整程式碼詳見Github-ChineseNER ,裡面提供了訓練好的包括bert_bilstm_crf, bilstm_crf_softlexcion,和CWS+NER多工在內的4個模型,可以開箱即用。這裡tensorflow模型用的是estimator框架,整個推理環節主要分成:模型export,warmup,serving, client request四步
Model Export
要把estimator儲存成線上推理的格式,需要額外定義兩個欄位,serving的輸出和輸入格式。
輸出定義
serving的輸出在tf.estimator.EstimatorSpec中定義,比較容易混淆的是EstimatorSpec中有兩個和推理相關的欄位predictions和export_outputs,預設predictions是必須傳入,export_outputs是可選傳入。
差異在於predictions是estimator.predict的返回,並且允許predictions中的欄位和features&labels的欄位存在重合,例如我經常會把一些用於debug的欄位像中文的tokens放在predictions,這些欄位既是模型輸入也是predict輸出。
如果export_outputs=None,estimator會預設用如下方式生成export_output,signature_name='serving_default',欄位和predictions完全相同。
export_output = {
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
tf.estimator.export.PredictOutput(predictions)
}
但是對後面raw tensor輸入的serving input,是不允許export_output和input顯式出現相同欄位。所以我習慣單獨定義export_output,只保留線上serving需要返回的預測欄位
def model_fn(features, labels, mode, params):
... build tf graph
if mode == tf.estimator.ModeKeys.PREDICT:
output = {'serving_default':
tf.estimator.export.PredictOutput({'pred_ids': pred_ids})
}
spec = tf.estimator.EstimatorSpec(mode,
predictions= {'pred_ids': pred_ids,
'label_ids': features['label_ids'],
'tokens': features['tokens']
},
export_outputs=output)
return spec
輸入定義
serving的輸入在tf.estimator.export.ServingInputReceiver中定義,其中features是傳入模型的特徵格式,receiver_tensors是推理服務的請求格式,這倆啥差別呢?這個要說到serving input有兩種常見的定義方式,一種是傳入序列化後的tf.Example(receiver_tensor),然後按照tf_proto的特徵定義對example進行解析(feature)再輸入模型。這種方式的好處是請求介面一致,不管模型和特徵咋變服務請求欄位永遠是example。哈哈還有一個好處就是tf_proto的定義可以複用dataset裡面的定義好的
def serving_input_receiver_fn():
tf_proto = {
'token_ids': tf.io.FixedLenFeature([150], dtype=tf.int64),
'segment_ids': tf.io.FixedLenFeature([150], dtype=tf.int64)
}
serialized_tf_example = tf.placeholder(
dtype=tf.dtypes.string,
shape=[None],
name='input_tensor')
receiver_tensors = {'example': serialized_tf_example}
features = tf.parse_example(serialized_tf_example, tf_proto)
## 可能還會有feature preprocess邏輯在這裡
return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)
另一種就是直接用原始特徵請求,這時features和receiver_tensors是一樣滴。這種方式的好處是用saved_model_cli可以直接檢查serving的input格式,以及在請求特徵size非常大的時候,這種請求能多少節省一點以上序列化所需的時間。
def serving_input_receiver_fn():
token_ids = tf.placeholder(dtype=tf.int64, shape=[None, 150], name='token_ids')
segment_ids = tf.placeholder(dtype=tf.int64, shape=[None,150], name='segment_ids')
receiver_tensors = {'token_ids': token_ids,
'segment_ids': segment_ids}
return tf.estimator.export.ServingInputReceiver(receiver_tensors, receiver_tensors)
Export
定義好serving的輸入輸出後,直接export model即可,這裡可以是訓練完後export。也可以用已經訓練好的checkpoint來build estimator然後直接export,這裡會預設使用model_dir裡面latest ckpt來export。
estimator._export_to_tpu = False
estimator.export_saved_model('serving_model/bilstm_crf', serving_input_receiver_fn)
輸出的模型預設用當前timestamp作為folder_name, 按需要rename成version=1/2即可
然後我們可以通過saved_model_cli來檢查模型輸入輸出。圖一是tf.Example型別的輸入,圖二是raw tensor輸入,raw tensor型別的輸入debug更方便一點。
saved_model_cli show --all --dir ./serving_model/bilstm_crf/1
Warm up
在得到上面的servable model後,在serving前還有一步可選操作,就是加入warm up檔案。這主要是因為tensorflow模型啟動存在懶載入的邏輯,部分元件只在請求後才被觸發執行,所以我們會觀察到第一次(前幾次)請求的latency會顯著的高。warm up簡單說就是在模型檔案裡帶上幾條請求的測試資料,在模型啟動後用測試資料先去trigger懶載入的邏輯。具體操作就是在serving model的assets.extra目錄裡寫入請求資料
NUM_RECORDS=5
with tf.io.TFRecordWriter("./serving_model/{}/{}/assets.extra/tf_serving_warmup_requests".format(MODEL, VERSION)) as writer:
# 生成request的邏輯
log = prediction_log_pb2.PredictionLog(
predict_log=prediction_log_pb2.PredictLog(request=req))
for r in range(NUM_RECORDS):
writer.write(log.SerializeToString())
Server
server部分比較簡單,比較推薦Docker部署,方便快捷。只需要三步
- 下載Docker https://docs.docker.com/get-docker/
- 下載和環境適配的Image,不指定版本預設是latest
docker pull tensorflow/serving:1.14.0
- 在本地執行執行服務,注意port 8500是給gRPC的,8501是給REST API的不要寫錯
docker run -t --rm -p 8500:8500 \
-v "$(pwd)/serving_model/${MODEL_NAME}:/models/${MODEL_NAME}" \
-e MODEL_NAME=${MODEL_NAME} tensorflow/serving:1.14.0
gRPC client
Demo
這裡我們以上面tf.Example的serving請求格式,看下如何用gRPC請求服務。請求主要分成3步:建立通訊,生成request, 請求並解析response
第一步建立通訊
channel = grpc.insecure_channel(‘localhost:8500’)
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
第二步生成請求
# 得到tf_feature dict
example = tf.train.Example(
features=tf.train.Features(feature=tf_feature)
).SerializeToString() # 得到example並序列化成string
example = [example] # add batch_size dimension
# 生成request
request = predict_pb2.PredictRequest()
request.model_spec.signature_name = 'serving_default' # set in estimator output
request.model_spec.name = 'bilstm_crf'
request.model_spec.version.value = 1
request.inputs['example'].CopyFrom(
tensor_util.make_tensor_proto(example, dtype=tf.string)
)
第三步請求服務,解析response
resp = stub.Predict.future(request, timeout=5)
res = resp.result().outputs
pred_ids = np.squeeze(tf.make_ndarray(res['pred_ids']))
gRPC踩坑
在使用gPRC client的過程中有幾個可能會踩坑的點,哈哈但不排除出坑的姿勢不完全正確,如果是的話求指正~
Not fork safe,使用多程序要注意!
官方文件:grpc/fork_support
gRPC並不是fork safe的,如果在fork之前建立channel,可能會碰到deadlock或者報錯,取決於你用的gRPC版本。。。我使用的1.36版本會檢查fork,如果channel在fork之前建立且未close,會raise‘ValueError: Cannot invoke RPC: Channel closed due to fork’,之前用的忘記是啥版本的會deadlock。想要在client側使用多程序,合理的方案是在fork之後,在每個子程序中建立channel,如果主程序有channel需要先close掉。multiprocessing/client 給了一個多程序client的demo
channel重用大法好
官方文件:Performance Guide
最開始用gRPC我習慣性的在單條請求以後會channel.close,或者用with管理,後來發現channel建立銷燬本身是比較耗時的。看了官方文件才發現正確使用方式是在整個client生命週期裡複用同一個channel。至於stub,個人感覺建立成本很低,複用和每次從channel重新建立差別不大。
channel保活
官方文件:Keepalive User Guide
上面的channel複用會延伸到channel保活的問題。grpc客戶端預設是長連結,避免了連結建立和銷燬的開銷,但需要keep-alive機制來保證客戶端到服務端的連結持續有效。如果客戶端傳送請求的間隔較長,在一段時間沒有請求後,需要知道到底是server掉線了,還是真的沒有資料傳輸,這個連結還需不需要保持。grpc通過傳送keep-alive ping來保活。
在連結建立後,keep-alive計時器開始,通過以下引數控制是否傳送ping,傳送的時間,次數,間隔。
- grpc.keepalive_permit_without_calls,set=1則無請求進行,也可以傳送keepalive ping
- grpc.http2.max_pings_without_data,沒有資料傳輸的情況下,最多允許send多少ping,set=0是無限傳送
- grpc.keepalive_time_ms,client傳送ping的時間間隔
- grpc.keepalive_timeout_ms,確認ping應答的超時時間
- grpc.http2.min_ping_interval_without_data_ms,沒有資料傳輸的情況下,server允許收到ping的最小時間間隔,小於這個間隔的ping會被認為是ping strike。這個數值設定要>=以上keepalive_time_ms
- grpc.http2.max_pring_strikes, server最多允許ping strike的次數,超出會發送GOAWAY自動斷開連結,set=0允許無限次
以下是引數的預設取值
statusCode.UNAVAILABLE,‘connection reset by peer’
針對偶發UNAVAILABLE的報錯,部分情況可能是server部署環境和保活引數的設定有一些衝突,詳見Docker Swarm 部署 gRPC 服務的坑,不過多數情況下都能被retry解決。grpc issue裡提到一個interceptor 外掛現在是experimental API。簡單拆出來就是下面exponential backoff的retry邏輯。果然解決bug兩大法器restart+retry。。。
RETRY_TIEMS = {
StatusCode.INTERNAL: 1,
StatusCode.ABORTED: 3,
StatusCode.UNAVAILABLE: 3,
StatusCode.DEADLINE_EXCEEDED: 5 # most-likely grpc channel close, need time to reopen
}
def grpc_retry(default_max_retry=3, sleep=0.01):
def helper(func):
@wraps(func)
def handle_args(*args, **kwargs):
counter = 0
while True:
try:
return func(*args, **kwargs)
except RpcError as e:
max_retry = RETRY_TIEMS.get(e.code(), default_max_retry)
if counter >= max_retry:
raise e
counter += 1
backoff = min(sleep * 2 ** counter, 1) # exponential backoff
time.sleep(backoff) # wait for grpc to reopen channel
return handle_args
return helper
Reference
- https://www.cnblogs.com/junjiang3/p/9164513.html
- http://d0evi1.com/tensorflow/serving/estimator_saved_model/
- https://zhuanlan.zhihu.com/p/136619485