1. 程式人生 > >深度學習分散式訓練實戰(二)——TF

深度學習分散式訓練實戰(二)——TF

本篇部落格主要介紹TF的分散式訓練,重點從程式碼層面進行講解。理論部分可以參考深度學習分散式訓練實戰(一)

TF的分散式實現方式

TF的分散式有兩種實現方式,一種是圖內分散式(In-graph replication);一種是圖間分散式(Between-graph replication)。這兩種分別對應分散式訓練中的資料並行,模型並行方式。關於資料並行,模型並行可以參考深度學習分散式訓練實戰(一) 大部分情況下,我們使用圖間分散式,圖內分散式一般只會在模型太大的情況下使用。對於圖間分散式,其基於gRPC通訊框架,模型引數只有一份,計算圖有多份,一個master負責建立主session,多個worker執行計算圖任務。模型訓練過程中,每個計算圖計算出各自梯度,然後對引數進行更新。更新方式有兩種:同步更新,非同步更新。

分散式TF中,TF需要建立一個叢集,然後在叢集中建立兩個job,一個是ps job,負責引數初始化,引數更新,一個job下面可以有多個task(有多個task,說明有多臺機器,或者GPU負責引數初始化,更新)。一個是woker job,負責計算圖的運算,計算梯度,一個worker job下面也可以有很多個task(有多個task,說明有多臺機器,或者GPU負責執行計算圖)。

引數非同步更新的分散式訓練

引數同步更新基本上和這裡寫的差不多TensorFlow分散式部署 。只不過為了方便在本機上除錯,所以改了一點點。(自己的筆記本沒有GPU),介紹下面幾個重點的語句: tf.train.ClusterSpec()

:建立一個叢集物件 tf.train.Server():在這個叢集上面建立一個伺服器,根據實際情況,可以是引數伺服器,也可以是計算伺服器 tf.train.Supervisor():建立一個監視器,就是用來監控訓練過程的,個人感覺主要就是方便恢復模型訓練,其logdir引數為訓練日誌目錄,如果裡面有模型,則直接恢復訓練。所以如果想重新訓練,需要刪除這個目錄。 sv.managed_session():啟動Session,相比於其他啟動Session的方式,多了一些功能。可以參考TensorFlow 中三種啟動圖用法 具體程式碼如下:

# tensorflow distribute train by asynchronously update 
import tensorflow as tf import numpy as np tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("job_name", "", "one of ps or worker") tf.app.flags.DEFINE_integer("task_index", 0, "0, 1, 2...") FLAGS = tf.app.flags.FLAGS def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") # Create a cluster from the parameter server and worker server cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts}) # Create and start a server for the local task server = tf.train.Server(cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_index) # 如果是引數伺服器,則直接阻塞,等待計算伺服器下達引數初始化,引數更新命令就可以了。 # 不過“下達命令”這個是TF內部實現的,沒有顯式實現 if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Assigns ops to the local worker by default with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): train_X = np.linspace(-1.0, 1.0, 100) train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0 X = tf.placeholder("float") Y = tf.placeholder("float") w = tf.Variable(0.0, name="weight") b = tf.Variable(0.0, name="bias") loss = tf.square(Y - tf.multiply(X, w) - b) global_step = tf.Variable(0) train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step) saver = tf.train.Saver() summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() # Create a "supervisor", which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index==0), logdir="~/Downloads/log/", init_op=init_op, summary_op = summary_op, saver=saver, global_step=global_step, save_model_secs=600) # The supervisor takes care of session initialization, retoring from a # checkpoint, and closing when done or an error occurs. with sv.managed_session(server.target) as sess: step = 0 while step < 1000000: # Run a training step asynchronously for (x, y) in zip(train_X, train_Y): _, step =sess.run([train_op, global_step], feed_dict={X:x, Y:y}) loss_value = sess.run(loss, feed_dict={X:x, Y:y}) print("Step: {}, loss: {}".format(step, loss_value)) # Ask for all the services to stop sv.stop() if __name__=="__main__": tf.app.run()

開啟三個終端,分別輸入以下三個命令,就可以看到訓練結果了:

CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=0
CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=0
CUDA_VISIBLE_DEVICES='' python AsynDis.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=1

這裡有一點要注意,控制計算圖使用哪一塊GPU是通過命令列設定CUDA_VISIBLE_DEVICES來實現的,而不是寫死在程式碼裡面的。 此外,還有一點不方便的地方,如果有很多臺機器,則需要把多份這份程式碼拷貝多次,在每臺機器上分別執行上述命令才可以,還是不太方便的。

引數同步更新的分散式訓練

同步更新稍微麻煩了點,需要加幾行程式碼(重點參考了《Tensorflow實戰》一書),改動部分已經標明,程式碼如下:

# tensorflow distribute train by synchronously update 

import tensorflow as tf
import numpy as np

tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "one of ps or worker")
tf.app.flags.DEFINE_integer("task_index", 0, "0, 1, 2...")

FLAGS = tf.app.flags.FLAGS
def main(_):
	ps_hosts = FLAGS.ps_hosts.split(",")
	worker_hosts = FLAGS.worker_hosts.split(",")
	n_works = len(worker_hosts)
	# Create a cluster from the parameter server and worker server
	cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts})

	# Create and start a server for the local task
	server = tf.train.Server(cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_index)

	if FLAGS.job_name == "ps":
		server.join()
	elif FLAGS.job_name == "worker":
		# Assigns ops to the local worker by default
		with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
			train_X = np.linspace(-1.0, 1.0, 100)
			train_Y = 2.0 * train_X + np.random.randn(*train_X.shape) * 0.33 + 10.0
			X = tf.placeholder("float")
			Y = tf.placeholder("float")

			w = tf.Variable(0.0, name="weight")
			b = tf.Variable(0.0, name="bias")
			loss = tf.square(Y - tf.multiply(X, w) - b)

			global_step = tf.Variable(0)

			# for Syncmously updata
			# 同步更新模式下,需要等待所有計算圖計算出梯度,然後梯度求平均,tf.train.SyncReplicasOptimizer實現了這種封裝
			opt = tf.train.SyncReplicasOptimizer(
				tf.train.AdagradOptimizer(0.01),
				replicas_to_aggregate=n_works,
				total_num_replicas=n_works,
				)
			train_op = opt.minimize(loss, global_step=global_step)
			saver = tf.train.Saver()
			summary_op = tf.summary.merge_all()

			init_op = tf.global_variables_initializer()

			# for Syncmously updata
			# 同步模式下,主計算伺服器需要協調不同計算伺服器計算得到的梯度,並更新引數。
			if FLAGS.task_index==0:
			    # 定義協調不同計算伺服器的佇列,並定義初始化操作
				chief_queue_runner = opt.get_chief_queue_runner()
				init_tokens_op = opt.get_init_tokens_op(0)

			# Create a "supervisor", which oversees the training process.
			sv = tf.train.Supervisor(is_chief=(FLAGS.task_index==0),
				logdir="~/Downloads/log/",
				init_op=init_op,
				summary_op = summary_op,
				saver=saver,
				global_step=global_step,
				save_model_secs=600)



			# The supervisor takes care of session initialization, retoring from a
			# checkpoint, and closing when done or an error occurs.

			# for Syncmously updata. 
			# prepare_or_wait_for_session used by sync. It will wait until main node ok and parameter init over!
			# for Syncmously updata. 
			# 這裡用的是prepare_or_wait_for_session。
			# 相比於同步更新的managed_session:只要某個計算伺服器引數初始化完畢就可以開始,
			# prepare_or_wait_for_session:等待所有計算伺服器引數初始化完畢(引數只有一份,後續的計算伺服器應該不需要初始化了?只需要和引數伺服器建立一個關係?),主節點協調工作完畢後,開始。
			with sv.prepare_or_wait_for_session(server.target) as sess:
				# for Syncmously updata
				if FLAGS.task_index==0:
				    # 開始訓練之前,主計算伺服器需要啟動協調同步更新的佇列,並執行初始化操作
					sv.start_queue_runners(sess, [chief_queue_runner])
					sess.run(init_tokens_op)

				step = 0
				while step < 100000:
					# Run a training step asynchronously
					for (x, y) in zip(train_X, train_Y):
						_, step =sess.run([train_op, global_step], feed_dict={X:x, Y:y})
					loss_value = sess.run(loss, feed_dict={X:x, Y:y})
					print("Step: {}, loss: {}".format(step, loss_value))

			# Ask for all the services to stop
			sv.stop()

if __name__=="__main__":
	tf.app.run()