Keras深度強化學習--DPG與DQN實現
最近在接觸一些關深度強化學習(DRL)的內容,本文是學習DRL過程中對Demo的復現與理解。相關原理推薦李巨集毅的 ofollow,noindex">Q-Learning強化學習 和 深度強化學習 課程。
強化學習中有兩種重要的方法:Policy Gradients和Q-learning。其中Policy Gradients方法直接預測在某個環境下應該採取的Action,而Q-learning方法預測某個環境下所有Action的期望值(即Q值)。一般來說,Q-learning方法只適合有少量離散取值的Action環境,而Policy Gradients方法適合有連續取值的Action環境。在與深度學習方法結合後,這兩種演算法就變成了DPG(Deterministic Policy Gradient)和DQN(Deep Q-learning Network),他們都是由DeepMind提出來的。DDPG(Deep Deterministic Policy Gradient)則是利用 DQN 擴充套件 Q 學習演算法的思路對DPG方法進行改造得到的(Actor-Critic,AC)框架的演算法,該演算法可用於解決連續動作空間上的 DRL 問題。
Paper:
DPG: Deterministic policy gradient algorithms
DDPG: Continuous Control with Deep Reinforcement Learning
DQN: Playing Atari with Deep Reinforcement Learning
環境
Gym
Gym 是 OpenAI 釋出的用於開發和比較強化學習演算法的工具包。使用它我們可以讓 AI 智慧體做很多事情,比如行走、跑動,以及進行多種遊戲。在這個Demo中,我們使用的是車杆遊戲(Cart-Pole)這個小遊戲。
遊戲規則很簡單,遊戲裡面有一個小車,上有豎著一根杆子。小車需要左右移動來保持杆子豎直。如果杆子傾斜的角度大於15°,那麼遊戲結束。小車也不能移動出一個範圍(中間到兩邊各2.4個單位長度)。
Cart-Pole:

car.png
Cart-Pole世界包括一個沿水平軸移動的車和一個固定在車上的杆子。 在每個時間步,你可以觀察它的位置(x),速度(x_dot),角度(theta)和角速度(theta_dot)。 這是這個世界的可觀察的狀態。 在任何狀態下,車只有兩種可能的行動:向左移動或向右移動。換句話說,Cart-Pole的狀態空間有四個維度的連續值,行動空間有一個維度的兩個離散值。
首先安裝gym:
pip install gym
gym嘗試:
# -*- coding: utf-8 -*- import gym import numpy as np def try_gym(): # 使用gym建立一個CartPole環境 # 這個環境可以接收一個action,返回執行action後的觀測值,獎勵與遊戲是否結束 env = gym.make('CartPole-v0') # 重置遊戲環境 env.reset() # 遊戲輪數 random_episodes = 0 # 每輪遊戲的Reward總和 reward_sum = 0 count = 0 while random_episodes < 10: # 渲染顯示遊戲效果 env.render() # 隨機生成一個action,即向左移動或者向右移動。 # 然後接收執行action之後的反饋值 observation, reward, done, _ = env.step(np.random.randint(0, 2)) reward_sum += reward count += 1 # 如果遊戲結束,列印Reward總和,重置遊戲 if done: random_episodes += 1 print("Reward for this episode was: {}, turns was: {}".format(reward_sum, count)) reward_sum = 0 count = 0 env.reset() if __name__ == '__main__': try_gym()
我們輸出的是每一輪遊戲從開始到結束得到的Reward的總和與操作次數,輸出結果如下:
Reward for this episode was: 20.0, turns was: 20 Reward for this episode was: 26.0, turns was: 26 Reward for this episode was: 18.0, turns was: 18 Reward for this episode was: 25.0, turns was: 25 Reward for this episode was: 25.0, turns was: 25 Reward for this episode was: 23.0, turns was: 23 Reward for this episode was: 29.0, turns was: 29 Reward for this episode was: 17.0, turns was: 17 Reward for this episode was: 13.0, turns was: 13 Reward for this episode was: 27.0, turns was: 27
如果使用的環境是Anoconda 3,可能會出現下列錯誤:
raise NotImplementedError('abstract') NotImplementedError: abstract
這是由於 pyglet
引起的,需要替換成1.2.4版本:
pip uninstall pyglet pip install pyglet==1.2.4
DPG
DPG是一種典型的蒙特卡洛方法,是在一個episode結束時對資料進行學習。
DPG的實現流程如下:
(1)首先構建神經網路,網路的輸入為obervation,網路的輸出為action=1的概率。
(2)在一個episode結束時(遊戲勝利或死亡),將env重置,即observation恢復到了初始狀態。下一次迴圈時,輸入observation,輸出一個概率值p0。根據概率p0選取一個action輸入到環境中,獲取到新的observation和reward。記錄 [observation, action, reward] 作為後續訓練的資料。
(3)reward為大於0的數,根據上面的action得到reward,將整個episode的reward放到一個序列裡,然後計算discount_reward。
(4)攢夠個batch的episode,進行梯度下降更新。損失函式分為兩部分,首先使用binary_crossentropy計算action的交叉熵損失,然後與discount_reward相乘得到最終損失。
使用keras實現的DPG如下所示:
# -*- coding: utf-8 -*- import os import gym import numpy as np from keras.layers import Input, Dense from keras.models import Model from keras.optimizers import Adam import keras.backend as K class DPG: def __init__(self): self.model = self.build_model() if os.path.exists('dpg.h5'): self.model.load_weights('dpg.h5') self.env = gym.make('CartPole-v0') self.gamma = 0.95 def build_model(self): """基本網路結構. """ inputs = Input(shape=(4,), name='ob_input') x = Dense(16, activation='relu')(inputs) x = Dense(16, activation='relu')(x) x = Dense(1, activation='sigmoid')(x) model = Model(inputs=inputs, outputs=x) return model def loss(self, y_true, y_pred): """損失函式. Arguments: y_true: (action, reward) y_pred: action_prob Returns: loss: reward loss """ action_pred = y_pred action_true, discount_episode_reward = y_true[:, 0], y_true[:, 1] # 二分類交叉熵損失 action_true = K.reshape(action_true, (-1, 1)) loss = K.binary_crossentropy(action_true, action_pred) # 乘上discount_reward loss = loss * K.flatten(discount_episode_reward) return loss def discount_reward(self, rewards): """Discount reward Arguments: rewards: 一次episode中的rewards """ # 以時序順序計算一次episode中的discount reward discount_rewards = np.zeros_like(rewards, dtype=np.float32) cumulative = 0. for i in reversed(range(len(rewards))): cumulative = cumulative * self.gamma + rewards[i] discount_rewards[i] = cumulative # normalization,有利於控制梯度的方差 discount_rewards -= np.mean(discount_rewards) discount_rewards //= np.std(discount_rewards) return list(discount_rewards) def train(self, episode, batch): """訓練 Arguments: episode: 遊戲次數 batch: 一個batch包含幾次episode,每個batch更新一次梯度 Returns: history: 訓練記錄 """ self.model.compile(loss=self.loss, optimizer=Adam(lr=0.01)) history = {'episode': [], 'Batch_reward': [], 'Episode_reward': [], 'Loss': []} episode_reward = 0 states = [] actions = [] rewards = [] discount_rewards = [] for i in range(episode): observation = self.env.reset() erewards = [] while True: x = observation.reshape(-1, 4) prob = self.model.predict(x)[0][0] # 根據隨機概率選擇action action = np.random.choice(np.array(range(2)), size=1, p=[1 - prob, prob])[0] observation, reward, done, _ = self.env.step(action) # 記錄一個episode中產生的資料 states.append(x[0]) actions.append(action) erewards.append(reward) rewards.append(reward) if done: # 一次episode結束後計算discount rewards discount_rewards.extend(self.discount_reward(erewards)) break # 儲存batch個episode的資料,用這些資料更新模型 if i != 0 and i % batch == 0: batch_reward = sum(rewards) episode_reward = batch_reward / batch # 輸入X為狀態, y為action與discount_rewards,用來與預測出來的prob計算損失 X = np.array(states) y = np.array(list(zip(actions, discount_rewards))) loss = self.model.train_on_batch(X, y) history['episode'].append(i) history['Batch_reward'].append(batch_reward) history['Episode_reward'].append(episode_reward) history['Loss'].append(loss) print('Episode: {} | Batch reward: {} | Episode reward: {} | loss: {:.3f}'.format(i, batch_reward, episode_reward, loss)) episode_reward = 0 states = [] actions = [] rewards = [] discount_rewards = [] self.model.save_weights('dpg.h5') return history def play(self): """使用訓練好的模型測試遊戲. """ observation = self.env.reset() count = 0 reward_sum = 0 random_episodes = 0 while random_episodes < 10: self.env.render() x = observation.reshape(-1, 4) prob = self.model.predict(x)[0][0] action = 1 if prob > 0.5 else 0 observation, reward, done, _ = self.env.step(action) count += 1 reward_sum += reward if done: print("Reward for this episode was: {}, turns was: {}".format(reward_sum, count)) random_episodes += 1 reward_sum = 0 count = 0 observation = self.env.reset() if __name__ == '__main__': model = DPG() history = model.train(5000, 5) model.play()
訓練結果與測試結果如下所示,可以看出隨著訓練次數的增加,DPG模型在遊戲中獲得Reward不斷的增加,並且Loss不斷降低。在完成5000次Episode的訓練後進行模型測試, 相比隨機操作來說DPG模型能達到200 reward,由於到達200個reward之後遊戲也會結束,因此DPG可以說是解決了這個問題。
但是根據我的實驗,DPG訓練起來並不穩定,模型引數初始化對訓練效果也有著較大的影響,需要多次嘗試。有時reward收斂一段時間後又會快速下降,出現週期性的變化,從圖中也可以看出訓練過程的不穩定。
Episode: 5 | Batch reward: 120.0 | Episode reward: 24.0 | loss: -0.325 Episode: 10 | Batch reward: 67.0 | Episode reward: 13.4 | loss: -0.300 Episode: 15 | Batch reward: 128.0 | Episode reward: 25.6 | loss: -0.326 Episode: 20 | Batch reward: 117.0 | Episode reward: 23.4 | loss: -0.332 Episode: 25 | Batch reward: 122.0 | Episode reward: 24.4 | loss: -0.330 Episode: 30 | Batch reward: 97.0 | Episode reward: 19.4 | loss: -0.339 Episode: 35 | Batch reward: 120.0 | Episode reward: 24.0 | loss: -0.331 ...... Episode: 4960 | Batch reward: 973.0 | Episode reward: 194.6 | loss: -0.228 Episode: 4965 | Batch reward: 1000.0 | Episode reward: 200.0 | loss: -0.224 Episode: 4970 | Batch reward: 881.0 | Episode reward: 176.2 | loss: -0.238 Episode: 4975 | Batch reward: 1000.0 | Episode reward: 200.0 | loss: -0.213 Episode: 4980 | Batch reward: 974.0 | Episode reward: 194.8 | loss: -0.229 Episode: 4985 | Batch reward: 862.0 | Episode reward: 172.4 | loss: -0.235 Episode: 4990 | Batch reward: 914.0 | Episode reward: 182.8 | loss: -0.233 Episode: 4995 | Batch reward: 737.0 | Episode reward: 147.4 | loss: -0.254 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 200.0, turns was: 200

DPG
DQN
DQN是一種典型的時序差分方法,與DPG不同,DQN對時刻n與時刻n+1的資料進行學習,這樣話其產生的方差要小於蒙特卡洛方法。常用的DQN演算法是在15年提出來的Nature DQN,這裡使用Nature DQN為例。
DQN使用單個網路來進行選擇動作和計算目標Q值;Nature DQN使用了兩個網路,一個當前主網路用來選擇動作,更新模型引數,另一個目標網路用於計算目標Q值,兩個網路的結構是一模一樣的。目標網路的網路引數不需要迭代更新,而是每隔一段時間從當前主網路複製過來,即延時更新,這樣可以減少目標Q值和當前的Q值相關性。Nature DQN和DQN相比,除了用一個新的相同結構的目標網路來計算目標Q值以外,其餘部分基本是完全相同的。
Nature DQN的實現流程如下:
(1)首先構建神經網路,一個主網路,一個目標網路,他們的輸入都為obervation,輸出為不同action對應的Q值。
(2)在一個episode結束時(遊戲勝利或死亡),將env重置,即observation恢復到了初始狀態observation,通過貪婪選擇法ε-greedy選擇action。根據選擇的action,獲取到新的next_observation、reward和遊戲狀態。將 [observation, action, reward, next_observation, done] 放入到經驗池中。經驗池有一定的容量,會將舊的資料刪除。
(3)從經驗池中隨機選取batch個大小的資料,計算出observation的Q值作為Q_target。對於done為False的資料,使用reward和next_observation計算discount_reward。然後將discount_reward更新到Q_traget中。
(4)每一個action進行一次梯度下降更新,使用MSE作為損失函式。注意與DPG不同,引數更新不是發生在每次遊戲結束,而是發生在遊戲進行中的每一步。
(5)每個batch我們更新引數epsilon,egreedy的epsilon是不斷變小的,也就是隨機性不斷變小。
(6)每隔固定的步數,從主網路中複製引數到目標網路。
使用keras實現的Nature DQN如下所示:
# -*- coding: utf-8 -*- import os import gym import random import numpy as np from collections import deque from keras.layers import Input, Dense from keras.models import Model from keras.optimizers import Adam import keras.backend as K class DQN: def __init__(self): self.model = self.build_model() self.target_model = self.build_model() self.update_target_model() if os.path.exists('dqn.h5'): self.model.load_weights('dqn.h5') # 經驗池 self.memory_buffer = deque(maxlen=2000) # Q_value的discount rate,以便計算未來reward的折扣回報 self.gamma = 0.95 # 貪婪選擇法的隨機選擇行為的程度 self.epsilon = 1.0 # 上述引數的衰減率 self.epsilon_decay = 0.995 # 最小隨機探索的概率 self.epsilon_min = 0.01 self.env = gym.make('CartPole-v0') def build_model(self): """基本網路結構. """ inputs = Input(shape=(4,)) x = Dense(16, activation='relu')(inputs) x = Dense(16, activation='relu')(x) x = Dense(2, activation='linear')(x) model = Model(inputs=inputs, outputs=x) return model def update_target_model(self): """更新target_model """ self.target_model.set_weights(self.model.get_weights()) def egreedy_action(self, state): """ε-greedy選擇action Arguments: state: 狀態 Returns: action: 動作 """ if np.random.rand() <= self.epsilon: return random.randint(0, 1) else: q_values = self.model.predict(state)[0] return np.argmax(q_values) def remember(self, state, action, reward, next_state, done): """向經驗池新增資料 Arguments: state: 狀態 action: 動作 reward: 回報 next_state: 下一個狀態 done: 遊戲結束標誌 """ item = (state, action, reward, next_state, done) self.memory_buffer.append(item) def update_epsilon(self): """更新epsilon """ if self.epsilon >= self.epsilon_min: self.epsilon *= self.epsilon_decay def process_batch(self, batch): """batch資料處理 Arguments: batch: batch size Returns: X: states y: [Q_value1, Q_value2] """ # 從經驗池中隨機取樣一個batch data = random.sample(self.memory_buffer, batch) # 生成Q_target。 states = np.array([d[0] for d in data]) next_states = np.array([d[3] for d in data]) y = self.model.predict(states) q = self.target_model.predict(next_states) for i, (_, action, reward, _, done) in enumerate(data): target = reward if not done: target += self.gamma * np.amax(q[i]) y[i][action] = target return states, y def train(self, episode, batch): """訓練 Arguments: episode: 遊戲次數 batch: batch size Returns: history: 訓練記錄 """ self.model.compile(loss='mse', optimizer=Adam(1e-3)) history = {'episode': [], 'Episode_reward': [], 'Loss': []} count = 0 for i in range(episode): observation = self.env.reset() reward_sum = 0 loss = np.infty done = False while not done: # 通過貪婪選擇法ε-greedy選擇action。 x = observation.reshape(-1, 4) action = self.egreedy_action(x) observation, reward, done, _ = self.env.step(action) # 將資料加入到經驗池。 reward_sum += reward self.remember(x[0], action, reward, observation, done) if len(self.memory_buffer) > batch: # 訓練 X, y = self.process_batch(batch) loss = self.model.train_on_batch(X, y) count += 1 # 減小egreedy的epsilon引數。 self.update_epsilon() # 固定次數更新target_model if count != 0 and count % 20 == 0: self.update_target_model() if i % 5 == 0: history['episode'].append(i) history['Episode_reward'].append(reward_sum) history['Loss'].append(loss) print('Episode: {} | Episode reward: {} | loss: {:.3f} | e:{:.2f}'.format(i, reward_sum, loss, self.epsilon)) self.model.save_weights('dqn.h5') return history def play(self): """使用訓練好的模型測試遊戲. """ observation = self.env.reset() count = 0 reward_sum = 0 random_episodes = 0 while random_episodes < 10: self.env.render() x = observation.reshape(-1, 4) q_values = self.model.predict(x)[0] action = np.argmax(q_values) observation, reward, done, _ = self.env.step(action) count += 1 reward_sum += reward if done: print("Reward for this episode was: {}, turns was: {}".format(reward_sum, count)) random_episodes += 1 reward_sum = 0 count = 0 observation = self.env.reset() self.env.close() if __name__ == '__main__': model = DQN() history = model.train(600, 32) model.play()
訓練結果與測試結果如下所示,可以看出隨著訓練次數的增加,DQN模型在遊戲中獲得Reward不斷的增加,並且Loss不斷降低。在batch=32的條件下500次Episode的訓練後進行模型測試, DQN也有不錯的表現,如果進一步訓練應該能達到和DPG同樣的效果。
相比DPG,DQN的訓練過程更穩定一些,但是DQN有個問題,就是它並不一定能保證Q網路的收斂,也就是說,我們不一定可以得到收斂後的Q網路引數,這會導致我們訓練出的模型效果很差,因此也需要反覆嘗試選取最好的模型。
Episode: 0 | Episode reward: 11.0 | loss: inf | e:1.00 Episode: 5 | Episode reward: 23.0 | loss: 0.816 | e:0.67 Episode: 10 | Episode reward: 18.0 | loss: 2.684 | e:0.46 Episode: 15 | Episode reward: 11.0 | loss: 3.662 | e:0.34 Episode: 20 | Episode reward: 16.0 | loss: 2.702 | e:0.23 Episode: 25 | Episode reward: 10.0 | loss: 4.092 | e:0.18 Episode: 30 | Episode reward: 12.0 | loss: 3.734 | e:0.13 ... Episode: 460 | Episode reward: 111.0 | loss: 6.325 | e:0.01 Episode: 465 | Episode reward: 180.0 | loss: 0.046 | e:0.01 Episode: 470 | Episode reward: 141.0 | loss: 0.136 | e:0.01 Episode: 475 | Episode reward: 169.0 | loss: 0.110 | e:0.01 Episode: 480 | Episode reward: 200.0 | loss: 0.095 | e:0.01 Episode: 485 | Episode reward: 200.0 | loss: 0.024 | e:0.01 Episode: 490 | Episode reward: 200.0 | loss: 0.066 | e:0.01 Episode: 495 | Episode reward: 146.0 | loss: 0.022 | e:0.01 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 196.0, turns was: 196 Reward for this episode was: 198.0, turns was: 198 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 199.0, turns was: 199 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 193.0, turns was: 193 Reward for this episode was: 200.0, turns was: 200 Reward for this episode was: 189.0, turns was: 189 Reward for this episode was: 200.0, turns was: 200

DQN
對比
(1)DPG可以處理連續的action,而DQN則只能處理離散問題,通過列舉的方式來實現,連續的action只能離散化後再處理。
(2)DPG通過輸出的action概率值大小隨機選擇action,而DQN則通過貪婪選擇法ε-greedy選擇action。
(3)DQN的更新是一個一個的reward進行更新,即當前的reward只跟鄰近的一個相關;DPG則將一個episode的reward全部儲存起來,然後用discount的方式修正reward,標準化後進行更新。