- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Python
李宏毅老師-DRL Lecture 1: Policy Gradient (Review)
簡介:Policy Gradient 概念
Policy Gradient Algorithms
Going Deeper Into Reinforcement Learning: Fundamentals of Policy Gradients
Notes on the Generalized Advantage Estimation Paper
vy007vikas/PyTorch-ActorCriticRL
floodsung/DDPG
An Intuitive Explanation of Policy Gradient
https://zhuanlan.zhihu.com/p/26882898
Deep Deterministic Policy Gradient (DDPG) (Tensorflow)
Deep Reinforcement Learning - 1. DDPG原理和算法
The Complete Reinforcement Learning Dictionary
Reinforcement Learning: An Introduction Sutton&Barto,2017
强化学习—DDPG算法原理详解
- Package:
- PyTorch
李宏毅老師-DRL Lecture 1: Policy Gradient (Review)
簡介:Policy Gradient 概念
符號列表
| 符號 | 意義 |
|---|---|
| \(s\in S\) | 狀態 |
| \(a\in A\) | 動作 |
| \(r\in R\) | 回報 |
| \(\tau\) | 遊玩一場遊戲,從開始到結束的全部資料即為一個軌跡 |
| \(s_t,a_t,r_t\) | 一個軌跡中第 \(t\) 個時間步對應的狀態、動作以及回報 |
| \(\gamma\) | 折扣因子;用於懲罰未來回報中的不確定性;\(0<\gamma \leq 1\) |
| \(G_t\) | 累積回報; 或者說累積折扣回報;\(G_t=\sum _{k=0}^\infty \gamma ^k r_{t+k+1}\) |
| \(P(s{'},r|s,a)\) | 在當前狀態 \(s\) 下採取動作 \(a\) 後轉移到下一個狀態 \(s{′}\) 並得到回報 \(r\) 的機率 |
| \(\pi(a|s)\) | Stochastic policy;\(\pi _\theta(s)\) 代表由 \(\theta\) 參數化的策略 |
| \(\mu (s)\) | Deterministic policy;與 Stochastic policy \(\pi (s)\) 區分,故定義成 \(\mu (s)\) |
| \(V(s)\) | State-Value Function 衡量狀態 \(s\) 的期望累積回報 \(V_w\) 代表由 \(w\) 參數化的 State-Value Function |
| \(V^{\pi}(s)\) | 策略 \(\pi\) 時,在狀態 \(s\) 的期望累積回報; \(V^\pi(s)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s]\) |
| \(Q(s,a)\) | State-Action Value Function,與 \(V(s)\) 類似 但它衡量在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報 \(Q_w\) 代表由 \(w\) 參數化的 State-Action Value Function |
| \(Q^{\pi}(s,a)\) | 與 \(V^{\pi}(s)\) 類似 策略 \(\pi\) 時,在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報 \(Q^\pi(s,a)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s,a_t=a]\) |
| \(A(s,a)\) | Advantage function,\(A(s,a)=Q(s,a)−V(s)\) 可以認為優勢函數是加強版本的 \(Q(s,a)\) 但是由於它採用 \(V(s)\) 作為基準使得它具有更小的 variance (證明) |
| \(d^{\pi}(s)\) | 代表由 \(\pi _\theta\) 引出的馬爾科夫鏈的平穩分佈(\(\pi\) 下的在線策略狀態分佈) |
簡單觀念
$$
\begin{align*}
\overline{R}_\theta &=\sum _\tau R(\tau) p_\theta(\tau) \\
\triangledown \overline{R}_\theta &= \sum _\tau R(\tau) \triangledown p_\theta(\tau) \\
&= \sum _\tau R(\tau) p_\theta(\tau) \frac{\triangledown p_\theta(\tau)}{p_\theta(\tau)} \\
\because \triangledown logf(x) = \frac{\triangledown f(x)}{f(x)}&= \sum _\tau R(\tau) p_\theta(\tau) \triangledown logp_\theta(\tau) \\
&= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ]
\end{align*}
$$
$$
\begin{align*}
p_\theta(\tau) &= p(s_1)\pi_\theta(a_1|s_1)p(s_2|a_1,s_1)\pi_\theta(a_2|s_2)p(s_3|a_2,s_2)\cdots \\
&= p(s_1)\prod_{t=1}^{T}\pi_\theta(a_t|s_t)p(s_{t+1}|a_t,s_t) \\
logp_\theta(\tau) &= logp(s_1) + \sum_{t=1}^{T}\left (log\pi_\theta(a_t|s_t) + logp(s_{t+1}|a_t,s_t) \right ) \\
\triangledown logp_\theta(\tau) &= \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \\
\triangledown \overline{R}_\theta &= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ] \\
&= E_\tau \left [ R(\tau) \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \right ] \\
\end{align*}
$$
推導成期望值,而被省略的 \(p_\theta(\tau)\) 則表現在程式收集訓練資料的部分
機率越高的資料則越容易出現,進而被收集
機率越高的資料則越容易出現,進而被收集
回報函數 \(J(\theta)\)
Policy Gradient Theorem 證明
所有的目標,就是在最大化回報
最簡單的形式
$$ J(\theta) = Q^{\pi}(s, a) $$ 常見形式
$$ J(\theta)=\sum_{s \in \mathcal{S}} d^{\pi}(s) V^{\pi}(s)=\sum_{s \in \mathcal{S}} d^{\pi}(s) \sum_{a \in \mathcal{A}} \pi_{\theta}(a | s) Q^{\pi}(s, a)= \mathbb{E}_{s \sim d_{\pi}, a \sim \pi_{\theta}}[Q^{\pi}(s, a)] $$
所有的目標,就是在最大化回報
最簡單的形式
$$ J(\theta) = Q^{\pi}(s, a) $$ 常見形式
$$ J(\theta)=\sum_{s \in \mathcal{S}} d^{\pi}(s) V^{\pi}(s)=\sum_{s \in \mathcal{S}} d^{\pi}(s) \sum_{a \in \mathcal{A}} \pi_{\theta}(a | s) Q^{\pi}(s, a)= \mathbb{E}_{s \sim d_{\pi}, a \sim \pi_{\theta}}[Q^{\pi}(s, a)] $$
常見 reward
越原始的 reward,variance 越大,而 bias 越低
更新式子的直覺性思考

GAE 證明
$$ \begin{align*} \delta_t^V &= r_t + \gamma V(s_{t+1}) - V(s_t) \\ \hat{A}_t^{GAE(\gamma,\lambda,L)} &= \sum_{l=0}^{L-1} (\gamma \lambda)^l \delta_{t+l}^{V} \end{align*} $$
更新式子的直覺性思考

A general form of policy gradient methods. (Image source: Schulman et al., 2016)
補充GAE 證明
$$ \begin{align*} \delta_t^V &= r_t + \gamma V(s_{t+1}) - V(s_t) \\ \hat{A}_t^{GAE(\gamma,\lambda,L)} &= \sum_{l=0}^{L-1} (\gamma \lambda)^l \delta_{t+l}^{V} \end{align*} $$
常見名詞定義
更多定義
- On Policy vs Off Policy
- 更新時是否為不同的策略,若不同則為 Off Policy,反之亦然
例:設計時,通常會分作 target 跟 train 兩個 net
更新時,會使用 target 去更新 train,但收集樣本仍為 train
經過一段時間,再將 train 的參數複製到 target
A2C.py
memory.py
train.py
import torch
import torch.nn.functional as F
from torch.distributions import Normal
from memory import MemoryDataset
from collections import namedtuple
torch.manual_seed(500) # 固定隨機種子 for 再現性
Trajectory = namedtuple(
"Transition", ("state", "action", "reward", "done", "next_state")
)
class A2C:
def __init__(
self,
n_actions,
n_actionRange,
n_features,
learning_rate=0.01,
gamma=0.9,
tau=0.001,
mSize=10000,
batchSize=200,
):
self.n_actionRange = torch.tensor(list(n_actionRange))
self.actorCriticEval = ActorCriticNet(n_actions, n_features)
self.actorCriticTarget = ActorCriticNet(n_actions, n_features)
print(self.actorCriticEval)
print(self.actorCriticTarget)
print("max action range:", self.n_actionRange[:, 0])
self.memory = MemoryDataset(mSize)
self.batchSize = batchSize
self.lr = learning_rate
# reward 衰減係數
self.gamma = gamma
self.tau = tau
# optimizer 是訓練的工具
# 傳入 net 的所有參數, 學習率
self.optimizerCritic = torch.optim.Adam(
self.actorCriticEval.critic.parameters(), lr=self.lr
)
self.optimizerActor = torch.optim.Adam(
self.actorCriticEval.actor.parameters(), lr=self.lr
)
def choose_action(self, state):
state = torch.from_numpy(state).float()
mean, std = self.actorCriticEval.action(state)
# print(mean, std)
action = torch.normal(mean, std)
action = action * self.n_actionRange[:, 0]
return action.detach().numpy()
def store_trajectory(self, s, a, r, done, s_):
self.memory.add(s, a, r, done, s_)
# episode train
def trainActor(self):
if len(self.memory) < self.batchSize * 10:
return
batch = Trajectory(*zip(*self.memory.sample(self.batchSize)))
s = torch.FloatTensor(batch.state)
# a = torch.FloatTensor(batch.action)
# r = torch.unsqueeze(torch.FloatTensor(batch.reward), dim=1)
# done = torch.FloatTensor(batch.done)
# s_ = torch.FloatTensor(batch.next_state)
mean, std = self.actorCriticEval.action(s)
gauss = Normal(mean, std)
a = gauss.rsample()
qVal = self.actorCriticEval.qValue(s, a)
loss = -qVal.sum()
self.optimizerActor.zero_grad()
loss.backward(retain_graph=True)
self.optimizerActor.step()
# print(loss.item())
# print(list(self.actorCriticEval.actor.parameters()))
# print("=============================================")
# step train
def trainCriticTD(self):
if len(self.memory) < self.batchSize * 10:
return
batch = Trajectory(*zip(*self.memory.sample(self.batchSize)))
s = torch.FloatTensor(batch.state)
a = torch.FloatTensor(batch.action)
r = torch.FloatTensor(batch.reward)
# done = torch.FloatTensor(batch.done)
s_ = torch.FloatTensor(batch.next_state)
mean, std = self.actorCriticTarget.action(s_)
a_ = torch.normal(mean, std) * self.n_actionRange[:, 0]
futureVal = torch.squeeze(self.actorCriticTarget.qValue(s_, a_))
val = r + self.gamma * futureVal
target = val.detach()
predict = torch.squeeze(self.actorCriticEval.qValue(s, a))
self.optimizerCritic.zero_grad()
loss = F.smooth_l1_loss(target, predict)
loss.backward()
self.optimizerCritic.step()
# print(list(self.actorCriticEval.critic.parameters()))
# print("=============================================")
# 逐步更新 target NN
def updateTarget(self):
for paramEval, paramTarget in zip(
self.actorCriticEval.parameters(), self.actorCriticTarget.parameters()
):
paramTarget.data = paramEval.data + self.tau * (
paramTarget.data - paramEval.data
)
class ActorNet(torch.nn.Module):
def __init__(self, n_actions, n_features):
super(ActorNet, self).__init__()
# 定義每層用什麼樣的形式
self.fcMean1 = torch.nn.Linear(n_features, 5)
self.fcMean2 = torch.nn.Linear(5, 3)
self.fcMean3 = torch.nn.Linear(3, n_actions)
self.fcStd1 = torch.nn.Linear(n_features, 5)
self.fcStd2 = torch.nn.Linear(5, 3)
self.fcStd3 = torch.nn.Linear(3, n_actions)
def forward(self, x): # 這同時也是 Module 中的 forward 功能
# 正向傳播輸入值, 神經網絡分析出輸出值
x_m = F.relu(self.fcMean1(x))
x_m = F.relu(self.fcMean2(x_m))
mean = self.fcMean3(x_m)
x_s = F.relu(self.fcStd1(x))
x_s = F.relu(self.fcStd2(x_s))
# 加入 1e-14 防止 std = 0
std = F.relu(self.fcStd3(x_s)) + 1e-14
return mean, std
class CriticNet(torch.nn.Module):
def __init__(self, n_actions, n_features):
super(CriticNet, self).__init__()
# 定義每層用什麼樣的形式
self.fcVal1_s = torch.nn.Linear(n_features, 256)
self.fcVal2_s = torch.nn.Linear(256, 128)
self.fcVal1_a = torch.nn.Linear(n_actions, 128)
self.fcVal3 = torch.nn.Linear(256, 128)
self.fcVal4 = torch.nn.Linear(128, 1)
def forward(self, x, a): # 這同時也是 Module 中的 forward 功能
# 正向傳播輸入值, 神經網絡分析出輸出值
x_v = F.relu(self.fcVal1_s(x))
x_v = F.relu(self.fcVal2_s(x_v))
x_a = F.relu(self.fcVal1_a(a))
x = torch.cat((x_v, x_a), dim=1)
x = F.relu(self.fcVal3(x))
qVal = self.fcVal4(x)
return qVal
class ActorCriticNet(torch.nn.Module):
def __init__(self, n_actions, n_features):
super(ActorCriticNet, self).__init__()
# 定義每層用什麼樣的形式
self.actor = ActorNet(n_actions, n_features)
self.critic = CriticNet(n_actions, n_features)
def forward(self, x, a): # 這同時也是 Module 中的 forward 功能
# 正向傳播輸入值, 神經網絡分析出輸出值
mean, std = self.actor(x)
qVal = self.critic(x, a)
return mean, std, qVal
def action(self, x):
mean, std = self.actor(x)
return mean, std
def qValue(self, x, a):
qVal = self.critic(x, a)
return qVal
memory.py
import torch
import random
from torch.utils.data import Dataset
from collections import deque
class MemoryDataset(Dataset):
def __init__(self, size, transform=None):
self.memory = deque(maxlen=size)
self.transform = transform
def __len__(self):
return len(self.memory)
def __getitem__(self, idx):
sample = self.memory[idx]
if self.transform:
sample = self.transform(sample)
return sample
def add(self, s, a, r, done, s_):
"""
adds a particular transaction in the memory buffer
:param s: current state
:param a: action taken
:param r: reward received
:param done: env finish
:param s_: next state
:return:
"""
self.memory.append([s, a, r, done, s_])
def sample(self, batchSize):
"""
samples a random batch from the replay memory buffer
:param batchSize: batch size
:return: batch (numpy array)
"""
batchSize = min(batchSize, self.__len__())
batch = random.sample(self.memory, batchSize)
return batch
train.py
import gym
from A2C import A2C
import matplotlib.pyplot as plt
import torch
RENDER = False # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示
env = gym.make("Pendulum-v0")
env.seed(1) # 固定隨機種子 for 再現性
# env = env.unwrapped # 不限定 episode
print("actions", env.action_space)
print("actions high", env.action_space.high)
print("actions low", env.action_space.low)
print("observartions", env.observation_space)
print("observartions high", env.observation_space.high)
print("observartions low", env.observation_space.low)
agent = A2C(
n_actions=env.action_space.shape[0],
n_actionRange=zip(env.action_space.high, env.action_space.low),
n_features=env.observation_space.shape[0],
learning_rate=0.001,
gamma=0.99,
tau=0.001,
mSize=10000,
batchSize=100,
)
reward_history = []
def plot_durations():
y_t = torch.FloatTensor(reward_history)
plt.figure(1)
plt.clf()
plt.title("Training...")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.plot(y_t.numpy())
# Take 100 episode averages and plot them too
if len(reward_history) >= 100:
means = y_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
for n_episode in range(3000):
state = env.reset()
sumR = 0
for t in range(3000): # Don't infinite loop while learning
if RENDER:
env.render()
action = agent.choose_action(state)
state_, reward, done, _ = env.step(action)
if not done:
agent.store_trajectory(state, action, reward, done, state_)
agent.trainCriticTD()
agent.trainActor()
sumR += reward
if done:
break
state = state_
agent.updateTarget()
reward_history.append(sumR)
if RENDER:
plot_durations()
avgR = sum(reward_history[:-11:-1]) / 10
print(
"episode: {:4d} duration: {:4d} Reward: {:5.1f} avgR: {:5.1f}".format(
n_episode, t, sumR, avgR
)
)
# 訓練成功條件
if avgR > -100 and n_episode > 10:
break
# 儲存 model 參數
torch.save(agent.actorCriticEval.state_dict(), "params.pkl")
參考
策略梯度方法Policy Gradient Algorithms
Going Deeper Into Reinforcement Learning: Fundamentals of Policy Gradients
Notes on the Generalized Advantage Estimation Paper
vy007vikas/PyTorch-ActorCriticRL
floodsung/DDPG
An Intuitive Explanation of Policy Gradient
https://zhuanlan.zhihu.com/p/26882898
Deep Deterministic Policy Gradient (DDPG) (Tensorflow)
Deep Reinforcement Learning - 1. DDPG原理和算法
The Complete Reinforcement Learning Dictionary
Reinforcement Learning: An Introduction Sutton&Barto,2017
强化学习—DDPG算法原理详解


留言
張貼留言