1. 程式人生 > >使用gym庫Classic control實現deep Q learning

使用gym庫Classic control實現deep Q learning

本文轉自:https://blog.csdn.net/winycg/article/details/79468320

target="_blank">https://gym.openai.com/envs/   OpenAI gym官網

https://github.com/openai/gym#installation  gym安裝教程

http://blog.csdn.net/cs123951/article/details/77854453  MountainCar原理參考

OpenAI gym提供了強化學習時的環境模組,使得我們實現強化學習演算法的時候無需關注於模擬環境的實現。

CartPole例項

    運載體在一根杆子下無摩擦的跟蹤。系統通過施加+1和-1推動運載體。杆子的搖擺在初始時垂直的,目標是阻止它掉落運載體。每一步杆子保持垂直可以獲得+1的獎勵。episode將會終結於杆子的搖擺幅度超過了離垂直方向的15°或者是運載體偏移初始中心超過2.4個單位。


使用DQN程式碼測試CartPole:


import numpy as np
import random
import tensorflow as tf
import gym
 
max_episode = 100
env = gym.make('CartPole-v0'
) env = env.unwrapped class DeepQNetwork(object): def __init__(self, n_actions, n_features, learning_rate=0.01, reward_decay=0.9, # gamma epsilon_greedy=0.9, # epsilon epsilon_increment = 0.001, replace_target_iter=
300, # 更新target網路的間隔步數 buffer_size=500, # 樣本緩衝區 batch_size=32, ): self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.epsilon_max = epsilon_greedy self.replace_target_iter = replace_target_iter self.buffer_size = buffer_size self.buffer_counter = 0 # 統計目前進入過buffer的數量 self.batch_size = batch_size self.epsilon = 0 if epsilon_increment is not None else epsilon_greedy self.epsilon_max = epsilon_greedy self.epsilon_increment = epsilon_increment self.learn_step_counter = 0 # 學習計步器 self.buffer = np.zeros((self.buffer_size, n_features * 2 + 2)) # 初始化Experience buffer[s,a,r,s_] self.build_net() # 將eval網路中引數全部更新到target網路 target_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='target_net') eval_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='eval_net') with tf.variable_scope('soft_replacement'): self.target_replace_op = [tf.assign(t, e) for t, e in zip(target_params, eval_params)] self.sess = tf.Session() tf.summary.FileWriter('logs/', self.sess.graph) self.sess.run(tf.global_variables_initializer()) def build_net(self): self.s = tf.placeholder(tf.float32, [None, self.n_features]) self.s_ = tf.placeholder(tf.float32, [None, self.n_features]) self.r = tf.placeholder(tf.float32, [None, ]) self.a = tf.placeholder(tf.int32, [None, ]) w_initializer = tf.random_normal_initializer(0., 0.3) b_initializer = tf.constant_initializer(0.1) # q_eval網路架構,輸入狀態屬性,輸出4種動作 with tf.variable_scope('eval_net'): eval_layer = tf.layers.dense(self.s, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='eval_layer') self.q_eval = tf.layers.dense(eval_layer, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='output_layer1') with tf.variable_scope('target_net'): target_layer = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='target_layer') self.q_next = tf.layers.dense(target_layer, self.n_actions, kernel_initializer=w_initializer, bias_initializer=b_initializer, name='output_layer2') with tf.variable_scope('q_target'): # 計算期望價值,並使用stop_gradient函式將其不計算梯度,也就是當做常數對待 self.q_target = tf.stop_gradient(self.r + self.gamma * tf.reduce_max(self.q_next, axis=1)) with tf.variable_scope('q_eval'): # 將a的值對應起來, a_indices = tf.stack([tf.range(tf.shape(self.a)[0]), self.a], axis=1) self.q_eval_a = tf.gather_nd(params=self.q_eval, indices=a_indices) with tf.variable_scope('loss'): self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval_a)) with tf.variable_scope('train'): self.train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss) # 儲存訓練資料 def store_transition(self, s, a, r, s_): transition = np.hstack((s, a, r, s_)) index = self.buffer_counter % self.buffer_size self.buffer[index, :] = transition self.buffer_counter += 1 def choose_action_by_epsilon_greedy(self, status): status = status[np.newaxis, :] if random.random() < self.epsilon: actions_value = self.sess.run(self.q_eval, feed_dict={self.s: status}) action = np.argmax(actions_value) else: action = np.random.randint(0, self.n_actions) return action def learn(self): # 每學習self.replace_target_iter步,更新target網路的引數 if self.learn_step_counter % self.replace_target_iter == 0: self.sess.run(self.target_replace_op) # 從Experience buffer中選擇樣本 sample_index = np.random.choice(min(self.buffer_counter, self.buffer_size), size=self.batch_size) batch_buffer = self.buffer[sample_index, :] _, cost = self.sess.run([self.train_op, self.loss], feed_dict={ self.s: batch_buffer[:, :self.n_features], self.a: batch_buffer[:, self.n_features], self.r: batch_buffer[:, self.n_features + 1], self.s_: batch_buffer[:, -self.n_features:] }) self.epsilon = min(self.epsilon_max, self.epsilon + self.epsilon_increment) self.learn_step_counter += 1 return cost RL = DeepQNetwork(n_actions=env.action_space.n, n_features=env.observation_space.shape[0]) total_step = 0 for episode in range(max_episode): observation = env.reset() episode_reward = 0 while True: env.render() # 表達環境 action = RL.choose_action_by_epsilon_greedy(observation) observation_, reward, done, info = env.step(action) # x是車的水平位移,theta是杆離垂直的角度 x, x_dot, theta, theta_dot = observation_ # reward1是車越偏離中心越少 reward1 = (env.x_threshold - abs(x))/env.x_threshold - 0.8 # reward2為杆越垂直越高 reward2 = (env.theta_threshold_radians - abs(theta))/env.theta_threshold_radians - 0.5 reward = reward1 + reward2 RL.store_transition(observation, action, reward, observation_) if total_step > 100: cost = RL.learn() print('cost: %.3f' % cost) episode_reward += reward observation = observation_ if done: print('episode:', episode, 'episode_reward %.2f' % episode_reward, 'epsilon %.2f' % RL.epsilon) break total_step += 1

MountainCar例項

car的軌跡是一維的,定位在兩山之間,目標是爬上右邊的山頂。可是car的發動機不足以一次性攀登到山頂,唯一的方式是car來回擺動增加動量。

有3個動作:向前、不動和向後。狀態有2個:位置position和速度velocity。position的值在最低點處為-0.5左右,左邊的坡頂為-1.2,右邊與之相對應的高度位置為0,小黃旗位置為0.5。reward的值只有-1,步數越少到達終點,reward越大。

自定義reward:高度越高,reward越大,因為左邊高度高了,可以積攢的動量越大。所以可設為reward=abs(position+0.5)


RL = DeepQNetwork(n_actions=env.action_space.n,
                  n_features=env.observation_space.shape[0])
total_step = 0
for episode in range(max_episode):
    observation = env.reset()
    episode_reward = 0
    while True:
        env.render()  # 表達環境
        action = RL.choose_action_by_epsilon_greedy(observation)
        observation_, reward, done, info = env.step(action)
        #
        position, velocity = observation_
        reward=abs(position+0.5)
        RL.store_transition(observation, action, reward, observation_)
        if total_step > 100:
            cost_ = RL.learn()
            cost.append(cost_)
        episode_reward += reward
        observation = observation_
        if done:
            print('episode:', episode,
                  'episode_reward %.2f' % episode_reward,
                  'epsilon %.2f' % RL.epsilon)
            break
        total_step += 1
 
plt.plot(np.arange(len(cost)), cost)
plt.show()

在這裡插入圖片描述




更多案例請關注“思享會Club”公眾號或者關注思享會部落格:http://gkhelp.cn/



在這裡插入圖片描述