[ML] Policy Gradient

程式語言:Python
Package:
PyTorch
GitHub 網址
李宏毅老師-DRL Lecture 1: Policy Gradient (Review)

簡介:Policy Gradient 概念

符號列表

符號意義
\(s\in S\)狀態
\(a\in A\)動作
\(r\in R\)回報
\(\tau\)遊玩一場遊戲,從開始到結束的全部資料即為一個軌跡
\(s_t,a_t,r_t\)一個軌跡中第 \(t\) 個時間步對應的狀態、動作以及回報
\(\gamma\)折扣因子;用於懲罰未來回報中的不確定性;\(0<\gamma \leq 1\)
\(G_t\)累積回報;
或者說累積折扣回報;\(G_t=\sum _{k=0}^\infty \gamma ^k r_{t+k+1}\)
\(P(s{'},r|s,a)\)在當前狀態 \(s\) 下採取動作 \(a\) 後轉移到下一個狀態 \(s{′}\) 並得到回報 \(r\) 的機率
\(\pi(a|s)\)Stochastic policy;\(\pi _\theta(s)\) 代表由 \(\theta\) 參數化的策略
\(\mu (s)\)Deterministic policy;與 Stochastic policy \(\pi (s)\) 區分,故定義成 \(\mu (s)\)
\(V(s)\)State-Value Function 衡量狀態 \(s\) 的期望累積回報
\(V_w\) 代表由 \(w\) 參數化的 State-Value Function
\(V^{\pi}(s)\)策略 \(\pi\) 時,在狀態 \(s\) 的期望累積回報;
\(V^\pi(s)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s]\)
\(Q(s,a)\)State-Action Value Function,與 \(V(s)\) 類似
但它衡量在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報
\(Q_w\) 代表由 \(w\) 參數化的 State-Action Value Function
\(Q^{\pi}(s,a)\)與 \(V^{\pi}(s)\) 類似
策略 \(\pi\) 時,在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報
\(Q^\pi(s,a)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s,a_t=a]\)
\(A(s,a)\)Advantage function,\(A(s,a)=Q(s,a)−V(s)\)
可以認為優勢函數是加強版本的 \(Q(s,a)\)
但是由於它採用 \(V(s)\) 作為基準使得它具有更小的 variance (證明)
\(d^{\pi}(s)\)代表由 \(\pi _\theta\) 引出的馬爾科夫鏈的平穩分佈(\(\pi\) 下的在線策略狀態分佈)

簡單觀念

$$ \begin{align*} \overline{R}_\theta &=\sum _\tau R(\tau) p_\theta(\tau) \\ \triangledown \overline{R}_\theta &= \sum _\tau R(\tau) \triangledown p_\theta(\tau) \\ &= \sum _\tau R(\tau) p_\theta(\tau) \frac{\triangledown p_\theta(\tau)}{p_\theta(\tau)} \\ \because \triangledown logf(x) = \frac{\triangledown f(x)}{f(x)}&= \sum _\tau R(\tau) p_\theta(\tau) \triangledown logp_\theta(\tau) \\ &= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ] \end{align*} $$ $$ \begin{align*} p_\theta(\tau) &= p(s_1)\pi_\theta(a_1|s_1)p(s_2|a_1,s_1)\pi_\theta(a_2|s_2)p(s_3|a_2,s_2)\cdots \\ &= p(s_1)\prod_{t=1}^{T}\pi_\theta(a_t|s_t)p(s_{t+1}|a_t,s_t) \\ logp_\theta(\tau) &= logp(s_1) + \sum_{t=1}^{T}\left (log\pi_\theta(a_t|s_t) + logp(s_{t+1}|a_t,s_t) \right ) \\ \triangledown logp_\theta(\tau) &= \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \\ \triangledown \overline{R}_\theta &= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ] \\ &= E_\tau \left [ R(\tau) \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \right ] \\ \end{align*} $$ 推導成期望值,而被省略的 \(p_\theta(\tau)\) 則表現在程式收集訓練資料的部分
機率越高的資料則越容易出現,進而被收集

回報函數 \(J(\theta)\)

Policy Gradient Theorem 證明
所有的目標,就是在最大化回報
最簡單的形式
$$ J(\theta) = Q^{\pi}(s, a) $$ 常見形式
$$ J(\theta)=\sum_{s \in \mathcal{S}} d^{\pi}(s) V^{\pi}(s)=\sum_{s \in \mathcal{S}} d^{\pi}(s) \sum_{a \in \mathcal{A}} \pi_{\theta}(a | s) Q^{\pi}(s, a)= \mathbb{E}_{s \sim d_{\pi}, a \sim \pi_{\theta}}[Q^{\pi}(s, a)] $$

常見 reward

越原始的 reward,variance 越大,而 bias 越低
更新式子的直覺性思考

A general form of policy gradient methods. (Image source: Schulman et al., 2016)
補充
GAE 證明
$$ \begin{align*} \delta_t^V &= r_t + \gamma V(s_{t+1}) - V(s_t) \\ \hat{A}_t^{GAE(\gamma,\lambda,L)} &= \sum_{l=0}^{L-1} (\gamma \lambda)^l \delta_{t+l}^{V} \end{align*} $$

常見名詞定義

更多定義
  • On Policy vs Off Policy
    • 更新時是否為不同的策略,若不同則為 Off Policy,反之亦然
      例:設計時,通常會分作 target 跟 train 兩個 net
      更新時,會使用 target 去更新 train,但收集樣本仍為 train
      經過一段時間,再將 train 的參數複製到 target

DDPG 程式碼

A2C.py
import torch
import torch.nn.functional as F
from torch.distributions import Normal
from memory import MemoryDataset
from collections import namedtuple

torch.manual_seed(500)  # 固定隨機種子 for 再現性

Trajectory = namedtuple(
    "Transition", ("state", "action", "reward", "done", "next_state")
)


class A2C:
    def __init__(
        self,
        n_actions,
        n_actionRange,
        n_features,
        learning_rate=0.01,
        gamma=0.9,
        tau=0.001,
        mSize=10000,
        batchSize=200,
    ):
        self.n_actionRange = torch.tensor(list(n_actionRange))
        self.actorCriticEval = ActorCriticNet(n_actions, n_features)
        self.actorCriticTarget = ActorCriticNet(n_actions, n_features)
        print(self.actorCriticEval)
        print(self.actorCriticTarget)
        print("max action range:", self.n_actionRange[:, 0])

        self.memory = MemoryDataset(mSize)
        self.batchSize = batchSize

        self.lr = learning_rate
        # reward 衰減係數
        self.gamma = gamma
        self.tau = tau

        # optimizer 是訓練的工具
        # 傳入 net 的所有參數, 學習率
        self.optimizerCritic = torch.optim.Adam(
            self.actorCriticEval.critic.parameters(), lr=self.lr
        )
        self.optimizerActor = torch.optim.Adam(
            self.actorCriticEval.actor.parameters(), lr=self.lr
        )

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        mean, std = self.actorCriticEval.action(state)
        # print(mean, std)
        action = torch.normal(mean, std)
        action = action * self.n_actionRange[:, 0]

        return action.detach().numpy()

    def store_trajectory(self, s, a, r, done, s_):
        self.memory.add(s, a, r, done, s_)

    # episode train
    def trainActor(self):
        if len(self.memory) < self.batchSize * 10:
            return

        batch = Trajectory(*zip(*self.memory.sample(self.batchSize)))

        s = torch.FloatTensor(batch.state)
        # a = torch.FloatTensor(batch.action)
        # r = torch.unsqueeze(torch.FloatTensor(batch.reward), dim=1)
        # done = torch.FloatTensor(batch.done)
        # s_ = torch.FloatTensor(batch.next_state)

        mean, std = self.actorCriticEval.action(s)
        gauss = Normal(mean, std)
        a = gauss.rsample()
        qVal = self.actorCriticEval.qValue(s, a)
        loss = -qVal.sum()

        self.optimizerActor.zero_grad()
        loss.backward(retain_graph=True)
        self.optimizerActor.step()

        # print(loss.item())
        # print(list(self.actorCriticEval.actor.parameters()))
        # print("=============================================")

    # step train
    def trainCriticTD(self):
        if len(self.memory) < self.batchSize * 10:
            return

        batch = Trajectory(*zip(*self.memory.sample(self.batchSize)))

        s = torch.FloatTensor(batch.state)
        a = torch.FloatTensor(batch.action)
        r = torch.FloatTensor(batch.reward)
        # done = torch.FloatTensor(batch.done)
        s_ = torch.FloatTensor(batch.next_state)

        mean, std = self.actorCriticTarget.action(s_)
        a_ = torch.normal(mean, std) * self.n_actionRange[:, 0]

        futureVal = torch.squeeze(self.actorCriticTarget.qValue(s_, a_))
        val = r + self.gamma * futureVal
        target = val.detach()
        predict = torch.squeeze(self.actorCriticEval.qValue(s, a))

        self.optimizerCritic.zero_grad()
        loss = F.smooth_l1_loss(target, predict)
        loss.backward()
        self.optimizerCritic.step()

        # print(list(self.actorCriticEval.critic.parameters()))
        # print("=============================================")

    # 逐步更新 target NN
    def updateTarget(self):
        for paramEval, paramTarget in zip(
            self.actorCriticEval.parameters(), self.actorCriticTarget.parameters()
        ):
            paramTarget.data = paramEval.data + self.tau * (
                paramTarget.data - paramEval.data
            )


class ActorNet(torch.nn.Module):
    def __init__(self, n_actions, n_features):
        super(ActorNet, self).__init__()
        # 定義每層用什麼樣的形式
        self.fcMean1 = torch.nn.Linear(n_features, 5)
        self.fcMean2 = torch.nn.Linear(5, 3)
        self.fcMean3 = torch.nn.Linear(3, n_actions)

        self.fcStd1 = torch.nn.Linear(n_features, 5)
        self.fcStd2 = torch.nn.Linear(5, 3)
        self.fcStd3 = torch.nn.Linear(3, n_actions)

    def forward(self, x):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        x_m = F.relu(self.fcMean1(x))
        x_m = F.relu(self.fcMean2(x_m))
        mean = self.fcMean3(x_m)

        x_s = F.relu(self.fcStd1(x))
        x_s = F.relu(self.fcStd2(x_s))
        # 加入 1e-14 防止 std = 0
        std = F.relu(self.fcStd3(x_s)) + 1e-14

        return mean, std


class CriticNet(torch.nn.Module):
    def __init__(self, n_actions, n_features):
        super(CriticNet, self).__init__()
        # 定義每層用什麼樣的形式
        self.fcVal1_s = torch.nn.Linear(n_features, 256)
        self.fcVal2_s = torch.nn.Linear(256, 128)

        self.fcVal1_a = torch.nn.Linear(n_actions, 128)

        self.fcVal3 = torch.nn.Linear(256, 128)
        self.fcVal4 = torch.nn.Linear(128, 1)

    def forward(self, x, a):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        x_v = F.relu(self.fcVal1_s(x))
        x_v = F.relu(self.fcVal2_s(x_v))

        x_a = F.relu(self.fcVal1_a(a))

        x = torch.cat((x_v, x_a), dim=1)
        x = F.relu(self.fcVal3(x))
        qVal = self.fcVal4(x)

        return qVal


class ActorCriticNet(torch.nn.Module):
    def __init__(self, n_actions, n_features):
        super(ActorCriticNet, self).__init__()
        # 定義每層用什麼樣的形式
        self.actor = ActorNet(n_actions, n_features)
        self.critic = CriticNet(n_actions, n_features)

    def forward(self, x, a):  # 這同時也是 Module 中的 forward 功能
        # 正向傳播輸入值, 神經網絡分析出輸出值
        mean, std = self.actor(x)
        qVal = self.critic(x, a)

        return mean, std, qVal

    def action(self, x):
        mean, std = self.actor(x)
        return mean, std

    def qValue(self, x, a):
        qVal = self.critic(x, a)

        return qVal

memory.py
import torch
import random
from torch.utils.data import Dataset
from collections import deque


class MemoryDataset(Dataset):
    def __init__(self, size, transform=None):
        self.memory = deque(maxlen=size)
        self.transform = transform

    def __len__(self):
        return len(self.memory)

    def __getitem__(self, idx):
        sample = self.memory[idx]

        if self.transform:
            sample = self.transform(sample)

        return sample

    def add(self, s, a, r, done, s_):
        """
        adds a particular transaction in the memory buffer
        :param s: current state
        :param a: action taken
        :param r: reward received
        :param done: env finish
        :param s_: next state
        :return:
        """
        self.memory.append([s, a, r, done, s_])

    def sample(self, batchSize):
        """
        samples a random batch from the replay memory buffer
        :param batchSize: batch size
        :return: batch (numpy array)
        """
        batchSize = min(batchSize, self.__len__())
        batch = random.sample(self.memory, batchSize)

        return batch

train.py
import gym
from A2C import A2C
import matplotlib.pyplot as plt
import torch


RENDER = False  # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示

env = gym.make("Pendulum-v0")
env.seed(1)  # 固定隨機種子 for 再現性
# env = env.unwrapped  # 不限定 episode

print("actions", env.action_space)
print("actions high", env.action_space.high)
print("actions low", env.action_space.low)
print("observartions", env.observation_space)
print("observartions high", env.observation_space.high)
print("observartions low", env.observation_space.low)

agent = A2C(
    n_actions=env.action_space.shape[0],
    n_actionRange=zip(env.action_space.high, env.action_space.low),
    n_features=env.observation_space.shape[0],
    learning_rate=0.001,
    gamma=0.99,
    tau=0.001,
    mSize=10000,
    batchSize=100,
)

reward_history = []


def plot_durations():
    y_t = torch.FloatTensor(reward_history)
    plt.figure(1)
    plt.clf()
    plt.title("Training...")
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.plot(y_t.numpy())
    # Take 100 episode averages and plot them too
    if len(reward_history) >= 100:
        means = y_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated


for n_episode in range(3000):
    state = env.reset()
    sumR = 0
    for t in range(3000):  # Don't infinite loop while learning
        if RENDER:
            env.render()

        action = agent.choose_action(state)
        state_, reward, done, _ = env.step(action)

        if not done:
            agent.store_trajectory(state, action, reward, done, state_)

        agent.trainCriticTD()
        agent.trainActor()

        sumR += reward
        if done:
            break

        state = state_

    agent.updateTarget()

    reward_history.append(sumR)
    if RENDER:
        plot_durations()

    avgR = sum(reward_history[:-11:-1]) / 10
    print(
        "episode: {:4d} duration: {:4d} Reward: {:5.1f} avgR: {:5.1f}".format(
            n_episode, t, sumR, avgR
        )
    )

    # 訓練成功條件
    if avgR > -100 and n_episode > 10:
        break

# 儲存 model 參數
torch.save(agent.actorCriticEval.state_dict(), "params.pkl")

參考

策略梯度方法
Policy Gradient Algorithms
Going Deeper Into Reinforcement Learning: Fundamentals of Policy Gradients
Notes on the Generalized Advantage Estimation Paper
vy007vikas/PyTorch-ActorCriticRL
floodsung/DDPG
An Intuitive Explanation of Policy Gradient
https://zhuanlan.zhihu.com/p/26882898
Deep Deterministic Policy Gradient (DDPG) (Tensorflow)
Deep Reinforcement Learning - 1. DDPG原理和算法
The Complete Reinforcement Learning Dictionary
Reinforcement Learning: An Introduction Sutton&Barto,2017
强化学习—DDPG算法原理详解

留言