1. 程式人生 > >Tensorflow實現策略網路(深度強化學習)之cartPole

Tensorflow實現策略網路(深度強化學習)之cartPole

所謂策略網路

  即建立一個神經網路模型,它可以通過觀察環境狀態,直接預測出目前最應該執行的策略(Policy),執行這個策略可以獲得最大的期望收益(包括現在和未來的Reward)。

  到這裡了, 相信你也瞭解什麼是cartPloe,也瞭解他的原理是什麼, 我這裡就不再細說了。

實現cartPole需要使用的模組-gym

在這裡插入圖片描述
 gym現在只能在ubuntu上使用,安裝如下:

sudo pip install gym

費話不多說,直接上程式碼:

import numpy as np
import tensorflow as tf
import gym

# 建立環境
env = gym.make('CartPole-v0')

# 初始化環境
env.reset()

# 隱藏層節點數
H = 50
# 批次數量
batch_size = 25
learning_rate = 0.1
D = 4
gamma = 0.99

# 建立卷積層並輸出
#這裡輸入為observation ,最後輸出為action向左或向右的概率
observations = tf.placeholder(tf.float32, [None, D], name='input_x')
w1 = tf.get_variable('w1', shape=[D, H],
                     initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observations, w1))
w2 = tf.get_variable('w2', shape=[H, 1],
                     initializer=tf.contrib.layers.xavier_initializer())
score = tf.matmul(layer1, w2)
probability = tf.nn.sigmoid(score)


# 計算潛在分數(這裡包含的不只是一步action的得分,而是現在及以後所有
# 步驟的action的得分,每次預測會乘以gamma係數(0.99)
def discount_rewards(r):
    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(range(r.size)):
        running_add = running_add * gamma + r[t]
        discounted_r[t] = running_add
    # 返回一個矩陣,每一行是每個回合的得分資料,
    #每行形如[s1, s2+s1*gamma, s3+s2*gamma+s1*gamma*gamma]    
    return discounted_r

# 虛擬的label值,用以對已完成的action的糾正
input_y = tf.placeholder(tf.float32, [None, 1], name='input_y')

# 每個action的潛在分數
advantages = tf.placeholder(tf.float32, name='reward_signal')

#定義損失函式
	# loglik當前 action對應的概率的對數
loglik = tf.log(input_y * (input_y-probability) + \
                (1-input_y)*(input_y + probability))
# 損失函式= 潛在分數 × 概率對數
loss = -tf.reduce_mean(loglik * advantages)
# 返回需要訓練的變數
tvars = tf.trainable_variables()
# 按tvars中的每個變數對loss求導,and
# return A list of sum(dy/dx) for each x in xs.
newGrads = tf.gradients(loss, tvars)

# 使用adam優化器
adam = tf.train.AdamOptimizer(learning_rate=learning_rate)
w1Grad = tf.placeholder(tf.float32, name='batch_grad1')
w2Grad = tf.placeholder(tf.float32, name='batch_grad2')
batchGrad = [w1Grad, w2Grad]
# 使用tvars中的引數計算梯度,並將計算結果更新至tvars引數中
# [apply_gradents具體用法見](https://www.cnblogs.com/marsggbo/p/10056057.html)
updateGrads = adam.apply_gradients(zip(batchGrad, tvars))

# xs observation環境例項列表
# ys label列表
# drs 每一個action的reward
xs, ys, drs = [], [], []
reward_sum = 0
episode_number = 1
total_episodes = 10000

with tf.Session() as sess:
    rendering = False
    init = tf.global_variables_initializer()
    sess.run(init)
    observation = env.reset()

    # 收集訓練需要的引數,值全部置0,裝在buffer中
    gradBuffer = sess.run(tvars)
    for ix, grad in enumerate(gradBuffer):
        gradBuffer[ix] = grad * 0

    # 杆倒一次,episode加1,共完成10000次
    while episode_number <= total_episodes:
    	# 當得分大於100,說明訓練有一定的成就,
    	# 渲染出影象,render()
        if reward_sum > 100 or rendering ==True:
            env.render()
            rendering = True

		# observation的實質是一個一行四列的陣列
        x = np.reshape(observation, [1, D])
		
		# 生成環境後(observat),將環境裝入神經網路輸入端,執行得到action取值為1概率
        tfprob = sess.run(probability, feed_dict={observations:x})
        
        # 此處需要注意,tfprob是取值為1的概率,不能因為是大於0.5,就取值1,小於0.5就取值0
        # 例tfprob=0.8,說明他還有0.2的概率是取值為0的,只有如下方式可以完美的表達這個問題        
        action = 1 if np.random.uniform() < tfprob else 0

		# 將如下資訊壓入列表
        xs.append(x)
        y = 1-action
        ys.append(y)

        observation, reward, done, info = env.step(action)
        reward_sum += reward
        drs.append(reward)
	
		# 如下杆倒下或超出2.4單位的距離
        if done:
            episode_number += 1
            # 把這回合的環境等資料壓入更大的矩陣列表            
            epx = np.vstack(xs)
            epy = np.vstack(ys)
            epr = np.vstack(drs)
            xs, ys, drs = [], [], []

			# 每個回合的潛在分數(已進行歸一化,即均值為0, 方差為1)
            discounted_epr = discount_rewards(epr)
            discounted_epr -= np.mean(discounted_epr)
            discounted_epr /= np.std(discounted_epr)
            
            # 新的引數,每回合更新一次
            tGrad = sess.run(newGrads, feed_dict={observations:epx,
                                                  input_y:epy,
                                                  advantages:discounted_epr})
            # 將每回合的每個函式的梯度新增到gradBuffer
            for ix,grad in enumerate(tGrad):
                gradBuffer[ix] += grad
                
                # batch_size的整數倍時
                if episode_number % batch_size == 0:
                	# 升級引數,引數緩衝器置零
                    sess.run(updateGrads, feed_dict={w1Grad:gradBuffer[0],
                                                     w2Grad:gradBuffer[1]})
                    for ix,grad in enumerate(gradBuffer):
                        gradBuffer[ix] = grad * 0

                    print('average reward for episode %d: %f'%\
                          (episode_number, reward_sum/batch_size))
					
					# 當平均得分大於200時,結束程式。
                    if reward_sum/batch_size > 200:
                        print('Task solved in', episode_number, 'episodes')
                        break

                    reward_sum = 0
				# 環境重置
                observation = env.reset()