[ML] Reinforcement Learning 簡介

ML:Reinforcement Learning
Package:
PyTorch
Github
李宏毅 老師的課程

簡介:Reinforcement Learning

AI = RL + DL
  • Episode
    表示一個回合,從開始到結束,\(\tau=\{ s_1,a_1,r_1,s_2,a_2,r_2,\cdots ,s_T,a_T,r_T\}\)
  • Agent
    決策中心,產生 action 跟 environment 互動
    • Actor
      Policy-based Approach 訓練的主軸,負責產生 action
    • Critic
      Value-based Approach 訓練的主軸,負責評估 actor 的好壞
  • Environment
    環境,根據 agent 的 action 回饋 reward
  • State
    Agent 觀察到的狀態
  • Reward
    environment 根據 action 決定獎勵程度

課程說明
Policy-based Approach
Policy Gradient
有一 actor \(\pi _\theta\),更新方法如下 $$ \begin{align*} \theta ^{new}&=\theta ^{old}+\eta \triangledown _\theta \bar{R}_{\theta^{old}}\\ \triangledown _\theta \bar{R}_\theta &\approx \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (R(\tau^n) -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ \end{align*}\\ $$ $$ when\ R(\tau^n|\pi _\theta(s_t^n|a_t^n)) = \sum _{k=t}^{T_n}\gamma^{k-t} r_k^n\\ \Rightarrow \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (\sum _{k=t}^{T_n}\gamma^{k-t} r_k^n -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ $$
實質上就是設計一個 actor,讓此 actor 越來越好
而一個 episode 可視為 trajectory \(\tau=\{ s_1,a_1,r_1,s_2,a_2,r_2,\cdots ,s_T,a_T,r_T\} \)
而其中 total reward 為 \(R(\tau)=\sum ^T_{t=1}r_t \)
但即使是同樣的 agent,因為是採機率挑選的緣故,每次的 \(\tau\) 仍會不同
故同樣的 \(\tau\) 在 actor \(\pi\) 於參數 \(\theta\) 下,出現的機率是 \(P(\tau|\theta) \)
然後 total reward 的期望值為
$$ \bar{R}_\theta = E[R(\tau)|\theta] =\sum _\tau R(\tau)P(\tau|\theta) $$ 當用 \(\pi_\theta\) 跑過 N 個 episode 時,得到 \(\{\tau^1,\tau^2, \cdots, \tau^N\} \)
此時期望值可近似
$$ \bar{R}_\theta = \sum _\tau R(\tau)P(\tau|\theta) \approx \frac{1}{N}\sum_{n=1}^NR(\tau^n) \\ $$ 於是問題可看作
$$ \begin{align*} \theta ^* & =arg\ \underset{\theta}{max}\bar{R}_\theta\\ &=arg\ \underset{\theta}{max}\sum _\tau R(\tau)P(\tau|\theta) \\ \end{align*} $$ 利用 Gradient ascent 更新參數
$$ \theta ^{new}=\theta ^{old}+\eta \triangledown _\theta \bar{R}_{\theta^{old}} $$ $$ \begin{align*} \triangledown _\theta \bar{R}_\theta &=\sum _\tau R(\tau)\triangledown _\theta P(\tau|\theta) \\ &=\sum _\tau R(\tau)P(\tau|\theta)\frac{\triangledown _\theta P(\tau|\theta)}{P(\tau|\theta)} \\ \because \frac{\mathrm{d} log(f(x))}{\mathrm{d} x}=\frac{1}{f(x)}\frac{\mathrm{d} f(x)}{\mathrm{d} x}&=\sum _\tau R(\tau)P(\tau|\theta)\triangledown _\theta log P(\tau|\theta) \\ &\approx \frac{1}{N}\sum ^N_{n=1}R(\tau^n)\triangledown _\theta log P(\tau^n|\theta) \\ \end{align*}\\ $$ $$ \begin{align*} P(\tau|\theta) &= p(s_1)p(a_1|s_1,\theta )p(r_1,s_2|s_1,a_1)p(a_2|s_2,\theta )p(r_2,s_3|s_2,a_2)\cdots p(a_T|s_T,\theta )p(r_T,s_{T+1}|s_T,a_T)\\ &=p(s_1)\prod ^T_{t=1}p(a_t|s_t,\theta )p(r_t,s_{t+1}|s_t,a_t) \\ log P(\tau|\theta) &= log p(s_1) + \sum ^T_{t=1}log p(a_t|s_t,\theta )+\sum ^T_{t=1}log p(r_t,s_{t+1}|s_t,a_t)\\ \triangledown _\theta log P(\tau|\theta) &= \sum ^T_{t=1}\triangledown _\theta log p(a_t|s_t,\theta )\\ \triangledown _\theta log P(\tau|\theta) &= \sum ^T_{t=1}\triangledown _\theta log p(a_t|s_t,\theta )\\ \end{align*}\\ $$ 故
$$ \begin{align*} \triangledown _\theta \bar{R}_\theta &=\sum _\tau R(\tau)P(\tau|\theta)\triangledown _\theta log P(\tau|\theta) \\ &\approx \frac{1}{N}\sum ^N_{n=1}R(\tau^n)\triangledown _\theta log P(\tau^n|\theta) \\ &=\frac{1}{N}\sum ^N_{n=1}R(\tau^n)\sum ^{T_n}_{t=1}\triangledown _\theta log p(a_t^n|s_t^n,\theta )\\ &=\frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}R(\tau^n)\triangledown _\theta log p(a_t^n|s_t^n,\theta )\\ \end{align*}\\ $$ \(\triangledown _\theta \bar{R}_\theta \) 可看作,當 \(\tau^n\) 得到正向的 \(R(\tau^n)\) ,則希望提升選擇 \(a_t^n\) 的機率 \(p(a_t^n|s_t^n,\theta)\)
反之,若是負向的 \(R(\tau^n)\) ,則希望降低 \(a_t^n\) 的機率 \(p(a_t^n|s_t^n,\theta)\)
注意:\(R(\tau^n)\) 是整個 \(\tau^n\) 的 reward 並非是單一 action 的 reward
例如:遊戲中射擊敵人才會得分,若是使用即時的 reward,會導致只會一直射擊

而為何 \(\triangledown _\theta log p(a_t^n|s_t^n,\theta)\) 取的是 log 呢?
先將式子弄回來
$$ \triangledown _\theta log p(a_t^n|s_t^n,\theta)=\frac{\triangledown _\theta p(a_t^n|s_t^n,\theta)}{p(a_t^n|s_t^n,\theta)} $$ 若無除以機率 \(p(a_t^n|s_t^n,\theta)\) 時,當一個 action 得到的 reward 比較好
但因出現的機率比較小,反而會被 model 所忽略掉
所以除以機率,像是一種 Normalize 的行為

而當 \(R(\tau^n)\) 都是正的也會有問題
如下圖,在理想狀態下,a b c 三個動作都會做到,所以更新比較少的,則會成為機率較低的一方
但實際上是採抽樣的方法,若 a 一直都沒做到,會導致 a 的機率越來越低,但也許 a 是比較好的解
所以要加上一個 bias,需自行設計
$$ \begin{align*} \triangledown _\theta \bar{R}_\theta &=\sum _\tau \left (R(\tau) -b \right )P(\tau|\theta)\triangledown _\theta log P(\tau|\theta) \\ &\approx \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (R(\tau^n) -b \right )\triangledown _\theta log p(a_t^n|s_t^n,\theta )\\ \end{align*}\\ $$ 而 \(p(a_t^n|s_t^n,\theta)\) 其實就是產生動作的機率,這不就是由策略決定的,故等同 \(\pi _\theta(s_t^n|a_t^n)\)
$$ \begin{align*} \triangledown _\theta \bar{R}_\theta &=\sum _\tau \left (R(\tau) -b \right )P(\tau|\theta)\triangledown _\theta log P(\tau|\theta) \\ &\approx \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (R(\tau^n) -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ \end{align*}\\ $$ \(R(\tau^n)\) 依個人喜好,可隨意設計,其中一個想法
現在的 reward 是由現在的 action 造成的,過去的 action 影響不大
也就是越未來的 reward,目前的 action 影響會越來越小,故加上一個衰減函數 \(\gamma\)
$$ R(\tau^n|\pi _\theta(s_t^n|a_t^n)) = \sum _{k=t}^{T_n}\gamma^{k-t} r_k^n \\ $$ 套進之前式子
$$ \begin{align*} \triangledown _\theta \bar{R}_\theta &=\sum _\tau \left (R(\tau) -b \right )P(\tau|\theta)\triangledown _\theta log P(\tau|\theta) \\ &\approx \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (R(\tau^n) -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ when\ R(\tau^n|\pi _\theta(s_t^n|a_t^n)) = \sum _{k=t}^{T_n}\gamma^{k-t} r_k^n &\Rightarrow \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (\sum _{k=t}^{T_n}\gamma^{k-t} r_k^n -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ \end{align*}\\ $$
課程說明
Policy-based Approach 實作
  1. 初始化 \(PolicyGradient(s)\)
  2. for each episode
    1. 初始化 \(s\)
    2. for each step
      1. 利用 \(PolicyGradient(s)\) 挑選 \(a\)
      2. 環境輸入 \(a\) ,並得到 \(r\)
      3. 記錄 \(r\)
      4. 直到達到中止條件
    3. 訓練 PolicyGradient(s)
      $$ \begin{align*} \triangledown _\theta\bar{R}_\theta &= \sum ^{T}_{t=1} \sum _{k=t}^T\gamma^{k-t} r_k \triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ \end{align*}\\ $$
在 reward 部分有一些修正
將 \(b=0, N=1\),\(N=1\) 表示不取樣多個再更新,則是利用 Adam 的方法解決,同 SGD 的概念
最後得到下式
$$ \begin{align*} \triangledown _\theta\bar{R}_\theta &=\frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (\sum _{k=t}^{T_n}\gamma^{k-t} r_k^n -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ b=0,N=1\ &\Rightarrow \sum ^{T}_{t=1}\sum _{k=t}^{T}\gamma^{k-t} r_k \triangledown _\theta log\ \pi _\theta(s_t|a_t)\\ \end{align*}\\ $$ PolicyGradient.py
import numpy as np
import torch
from torch.distributions import Categorical

torch.manual_seed(500)  # 固定隨機種子 for 再現性


class PolicyGradient:
    def __init__(self, n_features, n_actions, learning_rate=0.01):
        self.net = Net(n_features, n_actions)
        print(self.net)

        self.lr = learning_rate
        # reward 衰減係數
        self.gamma = 0.99

        # optimizer 是訓練的工具
        self.optimizer = torch.optim.Adam(
            self.net.parameters(), lr=self.lr
        )  # 傳入 net 的所有參數, 學習率

        self.saved_log_probs = []
        self.rewards = []
        self.eps = np.finfo(np.float32).eps.item()

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        probs = self.net(state)

        m = Categorical(probs)
        action = m.sample()
        
        log_prob = m.log_prob(action)
        self.saved_log_probs.append(log_prob)
        # 也可自定,但因計算誤差,需調整 learning rate 才能學到東西
        # 要小心節點關係不變,不然往上更新會有問題
        # log_prob_m = torch.log(probs[action.item()])
        # self.saved_log_probs.append(log_prob_m)

        return action.item()

    def store_trajectory(self, s, a, r):
        self.rewards.append(r)

    def train(self):
        R = 0
        policy_loss = []
        rewards = []

        # 現在的 reward 是由現在的 action 造成的,過去的 action 影響不大
        # 越未來的 reward,現在的 action 影響會越來越小
        # 若是看直接看 total reward 不太能區分出 action 的好壞,導致學習不好
        for r in self.rewards[::-1]:
            R = r + self.gamma * R
            rewards.insert(0, R)

        rewards = torch.tensor(rewards)
        # 正規化 reward 並加入 machine epsilon (self.eps) 以免除以 0
        rewards = (rewards - rewards.mean()) / (rewards.std() + self.eps)
        for log_prob, reward in zip(self.saved_log_probs, rewards):
            # 最大化,故加上負號
            policy_loss.append(-log_prob * reward)

        self.optimizer.zero_grad()
        policy_loss = torch.stack(policy_loss).sum()
        policy_loss.backward()
        self.optimizer.step()

        del self.rewards[:]
        del self.saved_log_probs[:]


class Net(torch.nn.Module):
    def __init__(self, n_features, n_actions):
        super(Net, self).__init__()
        # 定義每層用什麼樣的形式
        self.fc1 = torch.nn.Linear(n_features, 128)
        self.fc2 = torch.nn.Linear(128, n_actions)  # Prob of Left

    def forward(self, x):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        model = torch.nn.Sequential(
            self.fc1, torch.nn.ReLU(), self.fc2, torch.nn.Softmax()
        )
        return model(x)

run.py
import gym
from PolicyGradient import PolicyGradient
import matplotlib.pyplot as plt
import torch

RENDER = True  # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示

env = gym.make("CartPole-v0")
env.seed(1)  # 固定隨機種子 for 再現性
# env = env.unwrapped # 不限定 episode

print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

agent = PolicyGradient(
    n_features=env.observation_space.shape[0],
    n_actions=env.action_space.n,
    learning_rate=0.005,
)

reward_history = []


def plot_durations():
    y_t = torch.FloatTensor(reward_history)
    plt.figure(1)
    plt.clf()
    plt.title("Training...")
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.plot(y_t.numpy())
    # Take 100 episode averages and plot them too
    if len(reward_history) >= 100:
        means = y_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated


for n_episode in range(3000):
    state = env.reset()
    sumR = 0
    for t in range(3000):  # Don't infinite loop while learning
        if RENDER:
            env.render()

        action = agent.choose_action(state)
        state_, reward, done, _ = env.step(action)
        agent.store_trajectory(state, action, reward)
        
        sumR += reward

        if done:
            break

        state = state_

    agent.train()
    
    reward_history.append(sumR)
    if RENDER:
        plot_durations()
    print("episode:", n_episode, "duration:", t, "Reward", sumR)

課程說明
Value-based Approach
QLearning 只適用 discrete action
$$ Q^\pi(s_t,a_t)^{new} = Q^\pi(s_t,a_t)^{old} + \eta \left [\underset{Real(Target)}{\underbrace{r + \gamma \underset{a_{t+1}}{max}\ Q^\pi(s_{t+1},a_{t+1})^{old}}}-\underset{estimate}{\underbrace{Q^\pi(s_t,a_t)^{old}}} \right ] $$
實質上就是設計一個 critic 衡量 actor 的好壞
也就是判斷 actor \(\pi\) 在目前的 state \(s\) 下,未來可得到的 total reward
若是 state value function,則如下 $$ V^\pi(s) $$ 所以即使是同樣的 state,不同的 actor 會有不同的 \(V\)

兩種設計的方法

  • Monte-Carlo based approach (MC)
    • 必須等待一個 episode 完成,才能開始訓練,概念如下
    • 當 \(s_a\) 開始,直到一次 episode 的完成,會得到 \(G_a\)
    • 當 \(s_b\) 開始,直到一次 episode 的完成,會得到 \(G_b\)
    • 依此進行訓練 $$\begin{align*} V^\pi(s_a) &= G_a \\ V^\pi(s_b) &= G_b \\ \end{align*}$$
    • 變異性較小
  • Temporal-difference approach (TD)
    • 不用等待一個 episode 完成,就能開始訓練,概念如下
    • 取一段 \(\tau=\{\cdots s_t,a_t,r_t,s_{t+1}\cdots\}\)
    • \(t\) & \(t+1\) 之間只差了一個 \(r_t\)
    • 依此進行訓練 \(V^\pi(s_t) = V^\pi(s_{t+1}) + r_t  \)
    • 變異性較大
範例
假設有八個 episodes
$$ \begin{align*} s_a,r&=0,s_b,r=0,END\\ s_b,r&=1,END\\ s_b,r&=1,END\\ s_b,r&=1,END\\ s_b,r&=1,END\\ s_b,r&=1,END\\ s_b,r&=1,END\\ s_b,r&=0,END\\ \end{align*} $$ 若是 MC
$$ \begin{align*} V^\pi(s_b)&=\frac{6}{8}=\frac{3}{4} \\ V^\pi(s_a)&=0 \\ \end{align*} $$ 若是 TD
$$ \begin{align*} V^\pi(s_b)&=\frac{6}{8}=\frac{3}{4} \\ V^\pi(s_a)&=V^\pi(s_b)+r=\frac{3}{4}+0=\frac{3}{4} \\ \end{align*} $$ 但這無法衡量 actor,無法依此選擇 action
所以改為 state-action value function,也就是 QLearning
但因 action 需是有限的,故只適用 discrete action
$$ Q^\pi(s,a) $$
故問題可看作
$$ \pi(s)=arg\ \underset{a}{max}\ Q^\pi(s,a) $$ 此時定義一下,當下 \(s\) 可得到的未來總回報 \(G_t\)
$$ G_t=r_t+\gamma r_{t+1}+\gamma ^2 r_{t+1}+\cdots =\sum _{k=0}^T \gamma ^k r_{t+k} $$ 離現在越遠的 reward,加上 \(\gamma \) 衰減
$$ \begin{align*} \underset{a_{t}}{max}\ Q^\pi(s_t,a_t)&=G_t \\ &=\sum _{k=0}^T \gamma ^k r_{t+k} \\ &=r_t + \sum _{k=1}^T \gamma ^k r_{t+k} \\ &=r_t + \gamma \sum _{k=0}^T \gamma ^k r_{t+1+k} \\ &=r_t + \gamma G_{t+1} \\ &=r_t + \gamma \underset{a_{t+1}}{max}\ Q^\pi(s_{t+1},a_{t+1}) \\ \end{align*} $$ 利用 Gradient descent 的概念更新參數,在此用的是 TD 的概念更新
$$ Q^\pi(s_t,a_t)^{new} = Q^\pi(s_t,a_t)^{old} + \eta \left [\underset{Real(Target)}{\underbrace{r + \gamma \underset{a_{t+1}}{max}\ Q^\pi(s_{t+1},a_{t+1})^{old}}}-\underset{estimate}{\underbrace{Q^\pi(s_t,a_t)^{old}}} \right ] $$ 而 actor \(\pi\) 並不做任何更新
只是利用 \(\epsilon-greedy\) 決定動作
當隨機數大於 \(\epsilon\),則動作隨機,以供 \(Q^\pi(s,a)\) 認知到更好的可能性
反之,則選擇 \(max\ Q^\pi(s,a)\) 的動作
Value-based Approach 實作
QLearning
  1. 初始化 \(Q^\pi(s,a)\)
  2. for each episode
    1. 初始化 \(s\)
    2. for each step
      1. 利用 \(Q^\pi(s,a)\) 挑選 \(a\),可用 \(\epsilon -greedy\)
      2. 環境輸入\(a\) ,並得到 \(r, {s}'\)
      3. 記錄 \(s, a, r, done, {s}'\)
      4. batch 更新 \(Q^\pi(s,a)\)
        注意,在此只更新 \(a\) 這個動作之上的節點,並非所有動作的節點
        $$Q^\pi(s,a)^{new} = Q^\pi(s,a)^{old} + \eta \left [\underset{Real(Target)}{\underbrace{r + \gamma \underset{{a}'}{max}\ Q^\pi({s}',{a}')^{old}}}-\underset{estimate}{\underbrace{Q^\pi(s,a)^{old}}} \right ] $$
      5. \(s = {s}'\)
      6. 若 s 不是中止狀態,則回到第一步
重點在於 \(1-Done\) & ReplayMemory
\(1-Done\) 可以讓 Target 更加的準確,而不是無限放大
ReplayMemory 則可以 off-policy 訓練,脫離連續動作的相關性
QLearning.py
import numpy as np
import torch
from collections import namedtuple
import random

torch.manual_seed(500)  # 固定隨機種子 for 再現性

Trajectory = namedtuple(
    "Transition", ("state", "action", "reward", "done", "next_state")
)


# 很重要的機制,無此機制,比較難收斂
# 可以試著將 capacity & BATCH_SIZE 設為 1 看看
class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a trajectory."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Trajectory(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class QLearning:
    def __init__(self, n_features, n_actions, learning_rate=0.01, gamma=0.9):
        self.n_actions = n_actions
        self.n_features = n_features
        self.net = Net(n_features, n_actions)
        print(self.net)

        self.lr = learning_rate
        # Q 衰減係數
        self.gamma = gamma

        # optimizer 是訓練的工具
        self.optimizer = torch.optim.Adam(
            self.net.parameters(), lr=self.lr
        )  # 傳入 net 的所有參數, 學習率
        # loss function
        self.lossFun = torch.nn.MSELoss()

        self.trajectories = ReplayMemory(10000)
        self.BATCH_SIZE = 50

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        value = self.net(state)
        action_max_value, action = torch.max(value, 0)

        if np.random.random() >= 0.95:  # epslion greedy
            action = np.random.choice(range(self.n_actions), 1)

        return action.item()

    def store_trajectory(self, s, a, r, done, s_):
        self.trajectories.push(s, a, r, done, s_)

    def train(self):
        if len(self.trajectories) < self.BATCH_SIZE:
            return

        trajectories = self.trajectories.sample(self.BATCH_SIZE)
        batch = Trajectory(*zip(*trajectories))

        s = batch.state
        s = torch.tensor(s).float()
        a = batch.action
        a = torch.tensor(a).long()
        a = torch.unsqueeze(a, 1)  # 在 dim=1 增加維度 ex: (50,) => (50,1)
        r = batch.reward
        r = torch.tensor(r).float()
        done = batch.done
        done = torch.tensor(done).float()
        s_ = batch.next_state
        s_ = torch.tensor(s_).float()

        # 在 dim=1,以 a 為 index 取值
        qValue = self.net(s).gather(1, a).squeeze(1)
        qNext = self.net(s_).detach()  # detach from graph, don't backpropagate
        # done 是關鍵之一,不導入計算會導致 qNext 預估錯誤
        # 這也是讓 qValue 收斂的要素,不然 target 會一直往上累加,進而估不準
        target = r + self.gamma * qNext.max(1)[0] * (1 - done)

        self.optimizer.zero_grad()
        loss = self.lossFun(target.detach(), qValue)
        loss.backward()
        # torch.nn.utils.clip_grad_norm(self.net.parameters(), 0.5)
        self.optimizer.step()


class Net(torch.nn.Module):
    def __init__(self, n_features, n_actions):
        super(Net, self).__init__()
        # 定義每層用什麼樣的形式
        self.fc1 = torch.nn.Linear(n_features, 10)
        self.fc2 = torch.nn.Linear(10, n_actions)  # Prob of Left

    def forward(self, x):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        model = torch.nn.Sequential(self.fc1, torch.nn.ReLU6(), self.fc2)
        return model(x)

run.py
import gym
from QLearning import QLearning
import matplotlib.pyplot as plt
import torch

RENDER = True  # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示

env = gym.make("CartPole-v0")
env.seed(1)  # 固定隨機種子 for 再現性
# env = env.unwrapped # 不限定 episode

print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

agent = QLearning(
    n_features=env.observation_space.shape[0],
    n_actions=env.action_space.n,
    learning_rate=0.01,
    gamma=0.99,
)

reward_history = []


def plot_durations():
    y_t = torch.FloatTensor(reward_history)
    plt.figure(1)
    plt.clf()
    plt.title("Training...")
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.plot(y_t.numpy())
    # Take 100 episode averages and plot them too
    if len(reward_history) >= 100:
        means = y_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated


for n_episode in range(3000):
    state = env.reset()
    sumR = 0
    for t in range(3000):  # Don't infinite loop while learning
        if RENDER:
            env.render()

        action = agent.choose_action(state)
        state_, reward, done, _ = env.step(action)
        agent.store_trajectory(state, action, reward, done, state_)

        sumR += reward

        agent.train()

        if done:
            break

        state = state_

    reward_history.append(sumR)
    if RENDER:
        plot_durations()

    avgR = sum(reward_history[:-11:-1]) / 10
    print(
        "episode: {:4d} duration: {:4d} Reward: {:5.1f} avgR: {:5.1f}".format(
            n_episode, t, sumR, avgR
        )
    )
課程說明
Actor-Critic Approach
Advance Actor-Critic(A2C)
有一 actor \(\pi _\theta\),更新方法如下 $$ \begin{align*} \theta ^{new}&=\theta ^{old}+\eta \triangledown _\theta \bar{R}_{\theta^{old}}\\ \triangledown _\theta \bar{R}_\theta &\approx \frac{1}{N}\sum ^N_{n=1}\sum^{T_n}_{t=1}\left (r_t^n +V^{\pi_\theta}(s_{t+1}^n) -V^{\pi_\theta}(s_t^n) \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ \end{align*} $$
藉由之前的 Policy Gradient
$$ \triangledown _\theta \bar{R}_\theta \approx \frac{1}{N}\sum ^N_{n=1}\sum ^{T_n}_{t=1}\left (R(\tau^n) -b \right )\triangledown _\theta log\ \pi _\theta(s_t^n|a_t^n)\\ $$ \(R(\tau^n)\) 因是隨機抽樣得到的結果,具有相當大的不穩定性,除非 \(N\) 夠大
但實際運用上,\(N\) 通常無法太大,而且常常都將 \(N = 1\)
那麼是否能估測 \(E[R(\tau^n)]\) 呢?
而這不就是 QLearning 中的 \(Q^{\pi_\theta}(s_t^n,a_t^n)\)
而 baseline 可以用 state value function \(V^{\pi_\theta}(s_t^n) \)所取代
而兩者相減的意義即是,當下 action 能對未來造成多少影響
故式子可重寫為
$$ \triangledown _\theta \bar{R}_\theta \approx \frac{1}{N}\sum ^N_{n=1}\sum^{T_n}_{t=1}\left (Q^{\pi_\theta}(s_t^n,a_t^n) -V^{\pi_\theta}(s_t^n) \right )\triangledown _\theta log\ \pi _\theta(s_t)\\ $$ 但若照以上實作,必須有兩個 networks \(Q\) & \(V\)
而兩個 networks 通常意味著更大的預測誤差
於是換個角度來看,下個 state 的 total reward + 目前的 reward,大概就是 \(Q\) 的 reward
$$ Q^{\pi_\theta}(s_t^n,a_t^n) \approx r_t^n +V^{\pi_\theta}(s_{t+1}^n) $$ 所以
$$ Q^{\pi_\theta}(s_t^n,a_t^n) -V^{\pi_\theta}(s_t^n) \Rightarrow r_t^n +V^{\pi_\theta}(s_{t+1}^n) -V^{\pi_\theta}(s_t^n)\\ $$ 故式子可再重寫為
$$ \triangledown _\theta \bar{R}_\theta \approx \frac{1}{N}\sum ^N_{n=1}\sum^{T_n}_{t=1}\left (r_t^n +V^{\pi_\theta}(s_{t+1}^n) -V^{\pi_\theta}(s_t^n) \right )\triangledown _\theta log\ \pi _\theta(s_t)\\ $$
Actor-Critic Approach 實作
Advance Actor-Critic(A2C)
  1. 初始化 A2C
  2. for each episode
    1. 初始化 \(s\)
    2. for each step
      1. 利用 A2C 的 actor \(\pi_\theta (s)\) 挑選 \(a\)
      2. 環境輸入 \(a\) ,並得到 \(r, {s}'\)
      3. 運用 TD 更新 \(V^{\pi_\theta}(s)\)
        $$V^{\pi_\theta}(s) \rightarrow r +V^{\pi_\theta}({s}') $$
      4. \(s={s}'\) 直到達到中止條件
    3. 運用 MC 更新 \(V^{\pi_\theta}(s)\)
      $$V^{\pi_\theta}(s) \rightarrow R $$
    4. 訓練 A2C 的 actor \(\pi_\theta (s)\)
      $$ \begin{align*} \theta ^{new}&=\theta ^{old}+\eta \triangledown _\theta \bar{R}_{\theta^{old}}\\ R_k &= r_k^n +V^{\pi_\theta}(s_{k+1}^n) -V^{\pi_\theta}(s_k^n)\\ \triangledown _\theta\bar{R}_\theta &= \sum ^{T}_{t=1} \sum _{k=t}^T\gamma^{k-t} R_k \triangledown _\theta log\ \pi _\theta(s_t)\\ \end{align*} $$
    5. 更新 target NN
在此針對下式,做了點修改
$$ \triangledown _\theta \bar{R}_\theta \approx \frac{1}{N}\sum ^N_{n=1}\sum^{T_n}_{t=1}\left (r_t^n +V^{\pi_\theta}(s_{t+1}^n) -V^{\pi_\theta}(s_t^n) \right )\triangledown _\theta log\ \pi _\theta(s_t)\\ $$ 同先前的 Policy Gradient 一樣,引入了衰減的概念
reward 是由當下的 action 造成的,而當下的 action 對未來的影響會越來越小
再將 \(N=1\) 表示不取樣多個再更新,則是利用 Adam 的方法解決,同 SGD 的概念
最後得到下式
$$ \begin{align*} R_k &= r_k^n +V^{\pi_\theta}(s_{k+1}^n) -V^{\pi_\theta}(s_k^n)\\ \triangledown _\theta\bar{R}_\theta &= \sum ^{T}_{t=1} \sum _{k=t}^T\gamma^{k-t} R_k \triangledown _\theta log\ \pi _\theta(s_t)\\ \end{align*} $$ 另外 critic 將會有兩個 NN,一個負責在過程中更新,一個負責 actor 的 \(V^{\pi_\theta}\) 估計值
這樣才不會一邊更新 critic 一邊影響 actor,導致不夠穩定
而且 critic 將會是由 TD & MC 的兩種方法進行更新
個人認為 TD 可以 step 更新,但在這個例子中會導致預測值越來越大,因為只要存活就會得到 reward
所以加上 MC 令之可以收斂至合理的值
A2C.py
import torch
import torch.nn.functional as F
from torch.distributions import Categorical

torch.manual_seed(500)  # 固定隨機種子 for 再現性


class A2C:
    def __init__(self, n_actions, n_features, learning_rate=0.01, gamma=0.9):
        self.actorCriticEval = ActorCriticNet(n_actions, n_features)
        self.actorCriticTarget = ActorCriticNet(n_actions, n_features)
        print(self.actorCriticEval)
        print(self.actorCriticTarget)

        self.lr = learning_rate
        # reward 衰減係數
        self.gamma = gamma

        # optimizer 是訓練的工具
        # 傳入 net 的所有參數, 學習率
        self.optimizerActorCriticEval = torch.optim.Adam(
            self.actorCriticEval.parameters(), lr=self.lr
        )

        self.saved_log_probs = []
        self.rewards = []
        self.states = []

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        probs, _ = self.actorCriticEval(state)

        m = Categorical(probs)
        action = m.sample()

        log_prob = m.log_prob(action)
        self.saved_log_probs.append(log_prob)

        return action.item()

    def store_trajectory(self, s, a, r, s_):
        self.rewards.append(r)
        self.states.append(s)
        self.nextState = s_

    # episode train
    def trainActor(self):
        R = 0
        policy_loss = []
        rewards = []

        # 現在的 reward 是由現在的 action 造成的,過去的 action 影響不大
        # 越未來的 reward,現在的 action 影響會越來越小
        # 若是看直接看 total reward 不太能區分出 action 的好壞,導致學習不好
        nextStates = self.states + [self.nextState]
        for r, s, s_ in zip(self.rewards[::-1], self.states[::-1], nextStates[::-1]):
            _, futureVal = self.actorCriticTarget(torch.tensor(s_).float())
            _, nowVal = self.actorCriticTarget(torch.tensor(s).float())
            R_now = r + futureVal.detach() - nowVal.detach()
            R = R_now + self.gamma * R
            rewards.insert(0, R)

        for log_prob, reward in zip(self.saved_log_probs, rewards):
            # 最大化,故加上負號
            policy_loss.append(-log_prob * reward)

        # Actor
        self.optimizerActorCriticEval.zero_grad()
        policy_loss = torch.stack(policy_loss).sum()
        policy_loss.backward()
        # 梯度裁剪,以免爆炸
        # torch.nn.utils.clip_grad_norm(actor_network.parameters(),0.5)
        self.optimizerActorCriticEval.step()

        del self.rewards[:]
        del self.saved_log_probs[:]

        # print(list(self.actorCriticEval.parameters()))

    # step train
    def trainCriticTD(self):
        r = self.rewards[-1]

        _, futureVal = self.actorCriticTarget(torch.tensor(self.nextState).float())
        val = r + futureVal
        target = val.detach()
        _, predict = self.actorCriticEval(torch.tensor(self.states[-1]).float())
        # print(predict, futureVal)

        self.optimizerActorCriticEval.zero_grad()
        lossFun = torch.nn.MSELoss()
        loss = lossFun(target, predict)
        loss.backward()
        # 梯度裁剪,以免爆炸
        # torch.nn.utils.clip_grad_norm(actor_network.parameters(),0.5)
        self.optimizerActorCriticEval.step()

        # print(list(self.actorCriticEval.parameters()))

    def trainCriticMC(self):
        R = 0

        for r, s in zip(self.rewards[::-1], self.states[::-1]):
            R = r + R
            target = torch.tensor(R).float()

            _, predict = self.actorCriticEval(torch.tensor(s).float())

            self.optimizerActorCriticEval.zero_grad()
            lossFun = torch.nn.MSELoss()
            loss = lossFun(target, predict)
            loss.backward()
            # 梯度裁剪,以免爆炸
            # torch.nn.utils.clip_grad_norm(actor_network.parameters(),0.5)
            self.optimizerActorCriticEval.step()
        # print(predict.item(), target.item())

        # print(list(self.actorCriticEval.parameters()))

    # 逐步更新 target NN
    def updateTarget(self):
        for paramEval, paramTarget in zip(
            self.actorCriticEval.parameters(), self.actorCriticTarget.parameters()
        ):
            paramTarget.data = paramTarget.data + 0.1 * (
                paramEval.data - paramTarget.data
            )


class ActorCriticNet(torch.nn.Module):
    def __init__(self, n_actions, n_features):
        super(ActorCriticNet, self).__init__()
        # 定義每層用什麼樣的形式
        self.fc1 = torch.nn.Linear(n_features, 10)
        self.fc2 = torch.nn.Linear(10, n_actions)  # Prob of Left

        self.fc3 = torch.nn.Linear(n_features, 10)
        self.fc4 = torch.nn.Linear(10, 1)  # Prob of Left

    def forward(self, x):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        x_a = self.fc1(x)
        x_a = F.relu6(x_a)
        action = F.softmax(self.fc2(x_a))

        x_v = self.fc3(x)
        x_v = F.relu6(x_v)
        val = self.fc4(x_v)

        return action, val

run.py
import gym
from A2C import A2C
import matplotlib.pyplot as plt
import torch

RENDER = True  # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示

env = gym.make("CartPole-v0")
env.seed(1)  # 固定隨機種子 for 再現性
# env = env.unwrapped  # 不限定 episode

print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

agent = A2C(
    n_actions=env.action_space.n,
    n_features=env.observation_space.shape[0],
    learning_rate=0.01,
    gamma=0.9,
)

reward_history = []


def plot_durations():
    y_t = torch.FloatTensor(reward_history)
    plt.figure(1)
    plt.clf()
    plt.title("Training...")
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.plot(y_t.numpy())
    # Take 100 episode averages and plot them too
    if len(reward_history) >= 100:
        means = y_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated


for n_episode in range(3000):
    state = env.reset()
    sumR = 0
    for t in range(3000):  # Don't infinite loop while learning
        if RENDER:
            env.render()

        action = agent.choose_action(state)
        state_, reward, done, _ = env.step(action)
        agent.store_trajectory(state, action, reward, state_)

        agent.trainCriticTD()

        sumR += reward
        if done:
            break

        state = state_

    agent.trainCriticMC()
    agent.trainActor()
    agent.updateTarget()

    reward_history.append(sumR)
    if RENDER:
        plot_durations()

    avgR = sum(reward_history[:-11:-1]) / 10
    print(
        "episode: {:4d} duration: {:4d} Reward: {:5.1f} avgR: {:5.1f}".format(
            n_episode, t, sumR, avgR
        )
    )
Asynchronous Advance Actor-Critic(A3C)

  1. copy global parameters 到各個 worker(A2C)
  2. workers 初始化各種不同環境參數
  3. worker train & update
  4. update gradients to global network
    \(\theta ^{new}_{global}=\theta ^{old}_{global}+\eta \triangledown \theta_{worker}\)
簡言之,就是影分身之術的 A2C
在不同的 CPU or Machine 上,同時運行 A2C,再更新至最上層的 Global Network

參考

DQN 从入门到放弃1 DQN与增强学习
reinforcement-learning
Reinforcement Learning 健身房:OpenAI Gym
深入淺出介紹策略梯度
higgsfield/RL-Adventure
higgsfield/RL-Adventure-2
sweetice/Deep-reinforcement-learning-with-pytorch
Simple Reinforcement Learning with Tensorflow Part 8: Asynchronous Actor-Critic Agents (A3C)
MorvanZhou/pytorch-A3C

留言