1. 程式人生 > >DQN玩FlappyBird的核心程式碼和註釋

DQN玩FlappyBird的核心程式碼和註釋

文章的轉載地址

http://lanbing510.info/2018/07/17/DQN.html

# File: FlappyBirdDQN.py

import cv2
import wrapped_flappy_bird as game
from BrainDQN_Nature import BrainDQN
import numpy as np
import sys
sys.path.append("game/")


# 輔助函式:將80*80大小的影象進行灰度二值化處理
def preprocess(observation):
    observation = cv2.cvtColor(cv2.resize(observation, (80, 80)), cv2.COLOR_BGR2GRAY)
    ret, observation = cv2.threshold(observation,1,255,cv2.THRESH_BINARY)
    return np.reshape(observation,(80,80,1))

# 主函式:初始化DQN和遊戲,並開始遊戲進行訓練
def playFlappyBird():
    # Step 1:   初始化BrainDQN
    actions = 2
    brain = BrainDQN(actions)
    # Step 2:   初始化Flappy Bird遊戲
    flappyBird = game.GameState()
    # Step 3:   開始遊戲

    # Step 3.1: 得到初始狀態
    action0 = np.array([1,0])
    observation0, reward0, terminal = flappyBird.frame_step(action0)
    observation0 = cv2.cvtColor(cv2.resize(observation0, (80, 80)), cv2.COLOR_BGR2GRAY)
    ret, observation0 = cv2.threshold(observation0,1,255,cv2.THRESH_BINARY)
    brain.setInitState(observation0)

    # Step 3.2: 開始遊戲
    while 1!= 0:
        # 得到一個動作
        action = brain.getAction()
        # 通過遊戲介面得到動作後返回的下一幀影象、回報和終止標誌
        nextObservation,reward,terminal = flappyBird.frame_step(action) 
        # 影象灰度二值化處理
        nextObservation = preprocess(nextObservation)
        # 將動作後得到的下一幀影象放入到新狀態newState,然後將新狀態、當前狀態、動作、回報和終止標誌放入都遊戲回放記憶序列
        brain.setPerception(nextObservation,action,reward,terminal) 

def main():
    playFlappyBird()

if __name__ == '__main__':
    main()

 

# File: BrainDQN_NIPS.py

import tensorflow as tf 
import numpy as np 
import random
from collections import deque 

# 超引數
FRAME_PER_ACTION = 1
GAMMA = 0.99 # decay rate of past observations
OBSERVE = 100. # timesteps to observe before training
EXPLORE = 150000. # frames over which to anneal epsilon
FINAL_EPSILON = 0.0 # final value of epsilon
INITIAL_EPSILON = 0.9 # starting value of epsilon
REPLAY_MEMORY = 50000 # number of previous transitions to remember
BATCH_SIZE = 32 # size of minibatch

class BrainDQN:
    # 初始化函式
    def __init__(self,actions):
        # 初始化回放記憶佇列
        self.replayMemory = deque()
        # 初始化一些引數
        self.timeStep = 0
        self.epsilon = INITIAL_EPSILON
        self.actions = actions
        # 初始化Q網路
        self.createQNetwork()

    # 建立Q深度神經網路
    def createQNetwork(self):
        # 網路權值
        W_conv1 = self.weight_variable([8,8,4,32])
        b_conv1 = self.bias_variable([32])

        W_conv2 = self.weight_variable([4,4,32,64])
        b_conv2 = self.bias_variable([64])

        W_conv3 = self.weight_variable([3,3,64,64])
        b_conv3 = self.bias_variable([64])

        W_fc1 = self.weight_variable([1600,512])
        b_fc1 = self.bias_variable([512])

        W_fc2 = self.weight_variable([512,self.actions])
        b_fc2 = self.bias_variable([self.actions])

        # 輸入層
        self.stateInput = tf.placeholder("float",[None,80,80,4])

        # 隱層
        h_conv1 = tf.nn.relu(self.conv2d(self.stateInput,W_conv1,4) + b_conv1)
        h_pool1 = self.max_pool_2x2(h_conv1)

        h_conv2 = tf.nn.relu(self.conv2d(h_pool1,W_conv2,2) + b_conv2)

        h_conv3 = tf.nn.relu(self.conv2d(h_conv2,W_conv3,1) + b_conv3)

        h_conv3_flat = tf.reshape(h_conv3,[-1,1600])
        h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat,W_fc1) + b_fc1)

        # Q值層
        self.QValue = tf.matmul(h_fc1,W_fc2) + b_fc2

        # 訓練配置
        self.actionInput = tf.placeholder("float",[None,self.actions])
        self.yInput = tf.placeholder("float", [None]) 
        Q_action = tf.reduce_sum(tf.mul(self.QValue, self.actionInput), reduction_indices = 1)
        self.cost = tf.reduce_mean(tf.square(self.yInput - Q_action))
        self.trainStep = tf.train.AdamOptimizer(1e-6).minimize(self.cost)

        # 保持與載入網路
        self.saver = tf.train.Saver()
        self.session = tf.InteractiveSession()
        self.session.run(tf.initialize_all_variables())
        checkpoint = tf.train.get_checkpoint_state("saved_networks")
        if checkpoint and checkpoint.model_checkpoint_path:
                self.saver.restore(self.session, checkpoint.model_checkpoint_path)
                print ("Successfully loaded:", checkpoint.model_checkpoint_path)
        else:
                print ("Could not find old network weights")

    # 訓練Q網路
    def trainQNetwork(self):
        # Step 1: 從回放記憶中隨機抽取小批量資料
        minibatch = random.sample(self.replayMemory,BATCH_SIZE)
        state_batch = [data[0] for data in minibatch]
        action_batch = [data[1] for data in minibatch]
        reward_batch = [data[2] for data in minibatch]
        nextState_batch = [data[3] for data in minibatch]

        # Step 2: 計算y 
        y_batch = []
        QValue_batch = self.QValue.eval(feed_dict={self.stateInput:nextState_batch})
        for i in range(0,BATCH_SIZE):
            terminal = minibatch[i][4]
            if terminal:
                y_batch.append(reward_batch[i])
            else:
                y_batch.append(reward_batch[i] + GAMMA * np.max(QValue_batch[i]))

        # Step 3: 訓練
        self.trainStep.run(feed_dict={
            self.yInput : y_batch,
            self.actionInput : action_batch,
            self.stateInput : state_batch
            })

        # 每10000次迭代儲存一次網路
        if self.timeStep % 10000 == 0:
            self.saver.save(self.session, 'saved_networks/' + 'network' + '-dqn', global_step = self.timeStep)


    # 更新回放記憶序列,當回放資料足夠時呼叫trainQNetwork進行訓練
    def setPerception(self,nextObservation,action,reward,terminal):
        newState = np.append(self.currentState[:,:,1:],nextObservation,axis = 2)
        self.replayMemory.append((self.currentState,action,reward,newState,terminal))
        if len(self.replayMemory) > REPLAY_MEMORY:
            self.replayMemory.popleft()
        if self.timeStep > OBSERVE:
            self.trainQNetwork() # 訓練網路
        self.currentState = newState
        self.timeStep += 1

    # 得到動作
    def getAction(self):
        QValue = self.QValue.eval(feed_dict= {self.stateInput:[self.currentState]})[0]
        action = np.zeros(self.actions)
        action_index = 0
        if self.timeStep % FRAME_PER_ACTION == 0:
            if random.random() <= self.epsilon:
                action_index = random.randrange(self.actions)
                action[action_index] = 1
            else:
                action_index = np.argmax(QValue)
                action[action_index] = 1
        else:
            action[0] = 1 
        if self.epsilon > FINAL_EPSILON and self.timeStep > OBSERVE:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON)/EXPLORE
        return action

    # 設定初始狀態
    def setInitState(self,observation):
        self.currentState = np.stack((observation, observation, observation, observation), axis = 2)

    # 輔助函式,用於生成網路權值
    def weight_variable(self,shape):
        initial = tf.truncated_normal(shape, stddev = 0.01)
        return tf.Variable(initial)

    # 輔助函式,用於生成網路bias
    def bias_variable(self,shape):
        initial = tf.constant(0.01, shape = shape)
        return tf.Variable(initial)

    # 輔助函式,2D卷積
    def conv2d(self,x, W, stride):
        return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")

    # 輔助函式,2*2 max pooling
    def max_pool_2x2(self,x):
        return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")

訓練完成後儲存網路,則可以進行遊戲: