1. 程式人生 > >強化學習系列5:有模型的策略迭代方法

強化學習系列5:有模型的策略迭代方法

1. 策略迭代演算法

這裡策略迭代使用的是表格法,基本步驟是:

  • 用字典儲存每個s的v值
  • 根據v值來選骰子

策略迭代的步驟為:

  1. 初始化 V V Π \Pi
  2. 進行一定次數的迭代。
    2.1 每次首先進行策略評估,不斷按照 Π \Pi 更新 V V 直至收斂,計算公式為 V
    = Π P ( R + γ V
    ) V'=\Pi P(R+\gamma V)
    (當然也可以直接解方程進行計算,但是求逆比較費時間,一般不這麼做);
    2.2 然後進行策略更新,遍歷所有可能action,按照更新後的 V V 計算最優的策略,矩陣形式為 Π = arg max a P ( R + γ V ) \Pi = \arg \max_{a} P(R+\gamma V)

2. 一些額外的程式碼

繼續使用上一節蛇棋的例子。首先定義一個智慧體類,將遊戲操作的一些方法進行封裝。由於問題是有模型的,因此智慧體知道環境env的資訊。

class TableAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n # |S|
        self.a_len = env.action_space.n # |A|
        self.r = [env.reward(s) for s in range(0, self.s_len)] # R
        self.pi = np.array([0 for s in range(0, self.s_len)]) # π
        self.p = env.p # P
        self.value_pi = np.zeros((self.s_len)) # V
        self.value_q = np.zeros((self.s_len, self.a_len)) # Q
        self.gamma = 0.8 # γ

    def play(self, state):
        return self.pi[state]

順便把無模型的智慧體也定義一下,相比上面有模型的智慧體,少了p和r兩個值:

class ModelFreeAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n
        self.a_len = env.action_space.n
        self.pi = np.array([0 for s in range(0, self.s_len)])
        self.value_q = np.zeros((self.s_len, self.a_len))
        self.value_n = np.zeros((self.s_len, self.a_len))
        self.gamma = 0.8

    def play(self, state, epsilon = 0):
        if np.random.rand() < epsilon:
            return np.random.randint(self.a_len)
        else:
            return self.pi[state]

其次定義一個評估時間的方法:

import time
def timer(name):
    start = time.time()
    yield
    end = time.time()
    print('{} COST:{}'.format(name, end - start))

最後定義評價遊戲策略效果的方法:

def eval_game(env, policy):
    state = env.reset()
    return_val = 0
    # 有兩種play的方法,一種是用我們定義的智慧體去玩,另一種是直接指定每個s的a。
    while True:
        if isinstance(policy, TableAgent) or isinstance(policy, ModelFreeAgent):
            act = policy.play(state)
        elif isinstance(policy, list):
            act = policy[state]
        else:
            raise Error('Illegal policy')
        state, reward, terminate, _ = env.step(act) # 不斷遊戲直至結束
        return_val += reward
        if terminate:
          break
    return return_val

我們先假設沒有梯子,然後設計3種直接策略來測試一下這個評估函式的效果:

policy_opt = [1] * 97 + [0] * 3 # 最優策略
policy_0 = [0] * 100 # 全部都投擲第一個骰子(1~3)
policy_1 = [1] * 100 # 全部都投擲第二個骰子(1~6)
np.random.seed(0)
sum_opt = 0
sum_0 = 0
sum_1 = 0
env = SnakeEnv(0, [3, 6])
for i in range(10000):
    sum_opt += eval_game(env, policy_opt)
    sum_0 += eval_game(env, policy_0)
    sum_1 += eval_game(env, policy_1)
print('opt avg={}'.format(sum_opt / 10000.0))
print('0 avg={}'.format(sum_0 / 10000.0))
print('1 avg={}'.format(sum_1 / 10000.0))

輸出結果為:

ladders info:
{}
opt avg=70.5498
0 avg=49.6654
1 avg=68.0072

3. 策略迭代方法的實現

下面按照第一節的步驟定義策略迭代:

class PolicyIteration(object):
    # 迭代計算V直至收斂
    def policy_evaluation(self, agent, max_iter = -1):
        iteration = 0
        while True:
            iteration += 1
            new_value_pi = agent.value_pi.copy()
            for i in range(1, agent.s_len): 
                value_sas = []
                ac = agent.pi[i]
                transition = agent.p[ac, i, :]
                value_sa = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
                new_value_pi[i] = value_sa
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            if diff < 1e-6:
                break
            else:
                agent.value_pi = new_value_pi
            if iteration == max_iter:
                break
    
    # 根據V更新π
    def policy_improvement(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                agent.value_q[i,j] = np.dot(agent.p[j,i,:], agent.r + agent.gamma * agent.value_pi)
            max_act = np.argmax(agent.value_q[i,:])
            new_policy[i] = max_act
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True
	 
	 # 大框架:進行一定次數的迭代,每次先策略評估,再策略改善
    def policy_iteration(self, agent):
        iteration = 0
        while True:
            iteration += 1
            self.policy_evaluation(agent)
            ret = self.policy_improvement(agent)
            if not ret:
                break
        print('Iter {} rounds converge'.format(iteration))

我們來看一下在沒有梯子的時候,這個演算法的效果:

env = SnakeEnv(0, [3,6])
agent = TableAgent(env)
pi_algo = PolicyIteration()
pi_algo.policy_iteration(agent)
print('return_pi={}'.format(eval_game(env, agent)))
print(agent.pi)

輸出結果為:

ladders info:
{}
Iter 2 rounds converge
return_pi=71
[0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0]

兩輪迭代就收斂,獲得了和最優策略一樣的結果,V值也和真實的V值非常接近。

4. 一些分析

4.1 10個梯子的策略迭代法

我們首先來看一下有10個梯子時,第二節的3種策略和策略迭代方法給出策略的計算結果:

env = SnakeEnv(10, [3,6])
agent = TableAgent(env)
agent.pi[:]=0
print('0 avg={}'.format(eval_game(env,agent)))
agent.pi[:]=1
print('1 avg={}'.format(eval_game(env,agent)))
agent.pi[97:100]=0
print('opt avg={}'.format(eval_game(env,agent)))
pi_algo = PolicyIteration()
pi_algo.policy_iteration(agent)
print('pi avg={}'.format(eval_game(env,agent)))
print(agent.pi)

輸出結果為:

ladders info:
{78: 38, 11: 13, 27: 25, 55: 55, 41: 59, 21: 10, 76: 74, 10: 21, 43: 35, 50: 53, 38: 78, 13: 21, 25: 27, 59: 41, 74: 76, 35: 43, 53: 50}
0 avg=62
1 avg=57
opt avg=81
Iter 3 rounds converge
pi avg=85
[0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0]

在這個案例中,policy iteration經過3輪迭代後收斂,在測試案例上的效果最好。但注意梯子是隨機生成的,而且policy iteration並不一定是收斂到了全域性最優,因此在不同的case下並不總是policy iteration的效果最好。

4.2 策略迭代法的V值變化分析

我們來看下狀態50在策略評估階段的V值變化情況:
在這裡插入圖片描述

可以大致看出一些結論:1. 值函式在最開始的迭代過程中波動非常大;2. 每輪迭代收斂時的迭代次數越來越少。

4.3 限制策略迭代的輪數

策略迭代方法的iteration是比較費時間的,我們將允許迭代的最大次數進行限制,然後看一下執行效果:

ladders info:
{42: 45, 10: 62, 34: 81, 40: 35, 38: 72, 44: 45, 48: 73, 12: 26, 60: 51, 70: 66, 45: 44, 62: 10, 81: 34, 35: 40, 72: 38, 73: 48, 26: 12, 51: 60, 66: 70}
Iter 3 rounds converge
Timer PolicyIter COST:0.26311492919921875
return_pi=75
Iter 3 rounds converge
Timer PolicyIter COST:0.15500688552856445
return_pi=81
Iter 5 rounds converge
Timer PolicyIter COST:0.06501197814941406
return_pi=77
Iter 11 rounds converge
Timer PolicyIter COST:0.04662799835205078
return_pi=88

先不管結果,光看時間的話,限制迭代次數為1時,雖然總迭代次數最多(7次),但是由於每次迭代花的策略評估時間大幅降低,因此總耗時也大幅降低了。
然後再直觀看下s=50對應的值函式在每種情況下不同迭代輪數時的變化圖:
在這裡插入圖片描述