- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Python
李宏毅老師-DRL Lecture 1: Policy Gradient (Review)
簡介:Policy Gradient 概念
Policy Gradient Algorithms
Going Deeper Into Reinforcement Learning: Fundamentals of Policy Gradients
Notes on the Generalized Advantage Estimation Paper
vy007vikas/PyTorch-ActorCriticRL
floodsung/DDPG
An Intuitive Explanation of Policy Gradient
https://zhuanlan.zhihu.com/p/26882898
Deep Deterministic Policy Gradient (DDPG) (Tensorflow)
Deep Reinforcement Learning - 1. DDPG原理和算法
The Complete Reinforcement Learning Dictionary
Reinforcement Learning: An Introduction Sutton&Barto,2017
强化学习—DDPG算法原理详解
- Package:
- PyTorch
李宏毅老師-DRL Lecture 1: Policy Gradient (Review)
簡介:Policy Gradient 概念
符號列表
符號 | 意義 |
---|---|
\(s\in S\) | 狀態 |
\(a\in A\) | 動作 |
\(r\in R\) | 回報 |
\(\tau\) | 遊玩一場遊戲,從開始到結束的全部資料即為一個軌跡 |
\(s_t,a_t,r_t\) | 一個軌跡中第 \(t\) 個時間步對應的狀態、動作以及回報 |
\(\gamma\) | 折扣因子;用於懲罰未來回報中的不確定性;\(0<\gamma \leq 1\) |
\(G_t\) | 累積回報; 或者說累積折扣回報;\(G_t=\sum _{k=0}^\infty \gamma ^k r_{t+k+1}\) |
\(P(s{'},r|s,a)\) | 在當前狀態 \(s\) 下採取動作 \(a\) 後轉移到下一個狀態 \(s{′}\) 並得到回報 \(r\) 的機率 |
\(\pi(a|s)\) | Stochastic policy;\(\pi _\theta(s)\) 代表由 \(\theta\) 參數化的策略 |
\(\mu (s)\) | Deterministic policy;與 Stochastic policy \(\pi (s)\) 區分,故定義成 \(\mu (s)\) |
\(V(s)\) | State-Value Function 衡量狀態 \(s\) 的期望累積回報 \(V_w\) 代表由 \(w\) 參數化的 State-Value Function |
\(V^{\pi}(s)\) | 策略 \(\pi\) 時,在狀態 \(s\) 的期望累積回報; \(V^\pi(s)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s]\) |
\(Q(s,a)\) | State-Action Value Function,與 \(V(s)\) 類似 但它衡量在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報 \(Q_w\) 代表由 \(w\) 參數化的 State-Action Value Function |
\(Q^{\pi}(s,a)\) | 與 \(V^{\pi}(s)\) 類似 策略 \(\pi\) 時,在狀態 \(s\) 下採取動作 \(a\) 後的期望累積回報 \(Q^\pi(s,a)=\mathbb{E}_{a\sim \pi}[G_t|s_t=s,a_t=a]\) |
\(A(s,a)\) | Advantage function,\(A(s,a)=Q(s,a)−V(s)\) 可以認為優勢函數是加強版本的 \(Q(s,a)\) 但是由於它採用 \(V(s)\) 作為基準使得它具有更小的 variance (證明) |
\(d^{\pi}(s)\) | 代表由 \(\pi _\theta\) 引出的馬爾科夫鏈的平穩分佈(\(\pi\) 下的在線策略狀態分佈) |
簡單觀念
$$
\begin{align*}
\overline{R}_\theta &=\sum _\tau R(\tau) p_\theta(\tau) \\
\triangledown \overline{R}_\theta &= \sum _\tau R(\tau) \triangledown p_\theta(\tau) \\
&= \sum _\tau R(\tau) p_\theta(\tau) \frac{\triangledown p_\theta(\tau)}{p_\theta(\tau)} \\
\because \triangledown logf(x) = \frac{\triangledown f(x)}{f(x)}&= \sum _\tau R(\tau) p_\theta(\tau) \triangledown logp_\theta(\tau) \\
&= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ]
\end{align*}
$$
$$
\begin{align*}
p_\theta(\tau) &= p(s_1)\pi_\theta(a_1|s_1)p(s_2|a_1,s_1)\pi_\theta(a_2|s_2)p(s_3|a_2,s_2)\cdots \\
&= p(s_1)\prod_{t=1}^{T}\pi_\theta(a_t|s_t)p(s_{t+1}|a_t,s_t) \\
logp_\theta(\tau) &= logp(s_1) + \sum_{t=1}^{T}\left (log\pi_\theta(a_t|s_t) + logp(s_{t+1}|a_t,s_t) \right ) \\
\triangledown logp_\theta(\tau) &= \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \\
\triangledown \overline{R}_\theta &= E_\tau \left [ R(\tau) \triangledown logp_\theta(\tau) \right ] \\
&= E_\tau \left [ R(\tau) \sum_{t=1}^{T}\triangledown log\pi_\theta(a_t|s_t) \right ] \\
\end{align*}
$$
推導成期望值,而被省略的 \(p_\theta(\tau)\) 則表現在程式收集訓練資料的部分
機率越高的資料則越容易出現,進而被收集
機率越高的資料則越容易出現,進而被收集
回報函數 \(J(\theta)\)
Policy Gradient Theorem 證明
所有的目標,就是在最大化回報
最簡單的形式
$$ J(\theta) = Q^{\pi}(s, a) $$ 常見形式
$$ J(\theta)=\sum_{s \in \mathcal{S}} d^{\pi}(s) V^{\pi}(s)=\sum_{s \in \mathcal{S}} d^{\pi}(s) \sum_{a \in \mathcal{A}} \pi_{\theta}(a | s) Q^{\pi}(s, a)= \mathbb{E}_{s \sim d_{\pi}, a \sim \pi_{\theta}}[Q^{\pi}(s, a)] $$
所有的目標,就是在最大化回報
最簡單的形式
$$ J(\theta) = Q^{\pi}(s, a) $$ 常見形式
$$ J(\theta)=\sum_{s \in \mathcal{S}} d^{\pi}(s) V^{\pi}(s)=\sum_{s \in \mathcal{S}} d^{\pi}(s) \sum_{a \in \mathcal{A}} \pi_{\theta}(a | s) Q^{\pi}(s, a)= \mathbb{E}_{s \sim d_{\pi}, a \sim \pi_{\theta}}[Q^{\pi}(s, a)] $$
常見 reward
越原始的 reward,variance 越大,而 bias 越低
更新式子的直覺性思考
GAE 證明
$$ \begin{align*} \delta_t^V &= r_t + \gamma V(s_{t+1}) - V(s_t) \\ \hat{A}_t^{GAE(\gamma,\lambda,L)} &= \sum_{l=0}^{L-1} (\gamma \lambda)^l \delta_{t+l}^{V} \end{align*} $$
更新式子的直覺性思考
A general form of policy gradient methods. (Image source: Schulman et al., 2016)
補充GAE 證明
$$ \begin{align*} \delta_t^V &= r_t + \gamma V(s_{t+1}) - V(s_t) \\ \hat{A}_t^{GAE(\gamma,\lambda,L)} &= \sum_{l=0}^{L-1} (\gamma \lambda)^l \delta_{t+l}^{V} \end{align*} $$
常見名詞定義
更多定義
- On Policy vs Off Policy
- 更新時是否為不同的策略,若不同則為 Off Policy,反之亦然
例:設計時,通常會分作 target 跟 train 兩個 net
更新時,會使用 target 去更新 train,但收集樣本仍為 train
經過一段時間,再將 train 的參數複製到 target
A2C.py
memory.py
train.py
import torch import torch.nn.functional as F from torch.distributions import Normal from memory import MemoryDataset from collections import namedtuple torch.manual_seed(500) # 固定隨機種子 for 再現性 Trajectory = namedtuple( "Transition", ("state", "action", "reward", "done", "next_state") ) class A2C: def __init__( self, n_actions, n_actionRange, n_features, learning_rate=0.01, gamma=0.9, tau=0.001, mSize=10000, batchSize=200, ): self.n_actionRange = torch.tensor(list(n_actionRange)) self.actorCriticEval = ActorCriticNet(n_actions, n_features) self.actorCriticTarget = ActorCriticNet(n_actions, n_features) print(self.actorCriticEval) print(self.actorCriticTarget) print("max action range:", self.n_actionRange[:, 0]) self.memory = MemoryDataset(mSize) self.batchSize = batchSize self.lr = learning_rate # reward 衰減係數 self.gamma = gamma self.tau = tau # optimizer 是訓練的工具 # 傳入 net 的所有參數, 學習率 self.optimizerCritic = torch.optim.Adam( self.actorCriticEval.critic.parameters(), lr=self.lr ) self.optimizerActor = torch.optim.Adam( self.actorCriticEval.actor.parameters(), lr=self.lr ) def choose_action(self, state): state = torch.from_numpy(state).float() mean, std = self.actorCriticEval.action(state) # print(mean, std) action = torch.normal(mean, std) action = action * self.n_actionRange[:, 0] return action.detach().numpy() def store_trajectory(self, s, a, r, done, s_): self.memory.add(s, a, r, done, s_) # episode train def trainActor(self): if len(self.memory) < self.batchSize * 10: return batch = Trajectory(*zip(*self.memory.sample(self.batchSize))) s = torch.FloatTensor(batch.state) # a = torch.FloatTensor(batch.action) # r = torch.unsqueeze(torch.FloatTensor(batch.reward), dim=1) # done = torch.FloatTensor(batch.done) # s_ = torch.FloatTensor(batch.next_state) mean, std = self.actorCriticEval.action(s) gauss = Normal(mean, std) a = gauss.rsample() qVal = self.actorCriticEval.qValue(s, a) loss = -qVal.sum() self.optimizerActor.zero_grad() loss.backward(retain_graph=True) self.optimizerActor.step() # print(loss.item()) # print(list(self.actorCriticEval.actor.parameters())) # print("=============================================") # step train def trainCriticTD(self): if len(self.memory) < self.batchSize * 10: return batch = Trajectory(*zip(*self.memory.sample(self.batchSize))) s = torch.FloatTensor(batch.state) a = torch.FloatTensor(batch.action) r = torch.FloatTensor(batch.reward) # done = torch.FloatTensor(batch.done) s_ = torch.FloatTensor(batch.next_state) mean, std = self.actorCriticTarget.action(s_) a_ = torch.normal(mean, std) * self.n_actionRange[:, 0] futureVal = torch.squeeze(self.actorCriticTarget.qValue(s_, a_)) val = r + self.gamma * futureVal target = val.detach() predict = torch.squeeze(self.actorCriticEval.qValue(s, a)) self.optimizerCritic.zero_grad() loss = F.smooth_l1_loss(target, predict) loss.backward() self.optimizerCritic.step() # print(list(self.actorCriticEval.critic.parameters())) # print("=============================================") # 逐步更新 target NN def updateTarget(self): for paramEval, paramTarget in zip( self.actorCriticEval.parameters(), self.actorCriticTarget.parameters() ): paramTarget.data = paramEval.data + self.tau * ( paramTarget.data - paramEval.data ) class ActorNet(torch.nn.Module): def __init__(self, n_actions, n_features): super(ActorNet, self).__init__() # 定義每層用什麼樣的形式 self.fcMean1 = torch.nn.Linear(n_features, 5) self.fcMean2 = torch.nn.Linear(5, 3) self.fcMean3 = torch.nn.Linear(3, n_actions) self.fcStd1 = torch.nn.Linear(n_features, 5) self.fcStd2 = torch.nn.Linear(5, 3) self.fcStd3 = torch.nn.Linear(3, n_actions) def forward(self, x): # 這同時也是 Module 中的 forward 功能 # 正向傳播輸入值, 神經網絡分析出輸出值 x_m = F.relu(self.fcMean1(x)) x_m = F.relu(self.fcMean2(x_m)) mean = self.fcMean3(x_m) x_s = F.relu(self.fcStd1(x)) x_s = F.relu(self.fcStd2(x_s)) # 加入 1e-14 防止 std = 0 std = F.relu(self.fcStd3(x_s)) + 1e-14 return mean, std class CriticNet(torch.nn.Module): def __init__(self, n_actions, n_features): super(CriticNet, self).__init__() # 定義每層用什麼樣的形式 self.fcVal1_s = torch.nn.Linear(n_features, 256) self.fcVal2_s = torch.nn.Linear(256, 128) self.fcVal1_a = torch.nn.Linear(n_actions, 128) self.fcVal3 = torch.nn.Linear(256, 128) self.fcVal4 = torch.nn.Linear(128, 1) def forward(self, x, a): # 這同時也是 Module 中的 forward 功能 # 正向傳播輸入值, 神經網絡分析出輸出值 x_v = F.relu(self.fcVal1_s(x)) x_v = F.relu(self.fcVal2_s(x_v)) x_a = F.relu(self.fcVal1_a(a)) x = torch.cat((x_v, x_a), dim=1) x = F.relu(self.fcVal3(x)) qVal = self.fcVal4(x) return qVal class ActorCriticNet(torch.nn.Module): def __init__(self, n_actions, n_features): super(ActorCriticNet, self).__init__() # 定義每層用什麼樣的形式 self.actor = ActorNet(n_actions, n_features) self.critic = CriticNet(n_actions, n_features) def forward(self, x, a): # 這同時也是 Module 中的 forward 功能 # 正向傳播輸入值, 神經網絡分析出輸出值 mean, std = self.actor(x) qVal = self.critic(x, a) return mean, std, qVal def action(self, x): mean, std = self.actor(x) return mean, std def qValue(self, x, a): qVal = self.critic(x, a) return qVal
memory.py
import torch import random from torch.utils.data import Dataset from collections import deque class MemoryDataset(Dataset): def __init__(self, size, transform=None): self.memory = deque(maxlen=size) self.transform = transform def __len__(self): return len(self.memory) def __getitem__(self, idx): sample = self.memory[idx] if self.transform: sample = self.transform(sample) return sample def add(self, s, a, r, done, s_): """ adds a particular transaction in the memory buffer :param s: current state :param a: action taken :param r: reward received :param done: env finish :param s_: next state :return: """ self.memory.append([s, a, r, done, s_]) def sample(self, batchSize): """ samples a random batch from the replay memory buffer :param batchSize: batch size :return: batch (numpy array) """ batchSize = min(batchSize, self.__len__()) batch = random.sample(self.memory, batchSize) return batch
train.py
import gym from A2C import A2C import matplotlib.pyplot as plt import torch RENDER = False # 顯示模擬會拖慢運行速度, 等學得差不多了再顯示 env = gym.make("Pendulum-v0") env.seed(1) # 固定隨機種子 for 再現性 # env = env.unwrapped # 不限定 episode print("actions", env.action_space) print("actions high", env.action_space.high) print("actions low", env.action_space.low) print("observartions", env.observation_space) print("observartions high", env.observation_space.high) print("observartions low", env.observation_space.low) agent = A2C( n_actions=env.action_space.shape[0], n_actionRange=zip(env.action_space.high, env.action_space.low), n_features=env.observation_space.shape[0], learning_rate=0.001, gamma=0.99, tau=0.001, mSize=10000, batchSize=100, ) reward_history = [] def plot_durations(): y_t = torch.FloatTensor(reward_history) plt.figure(1) plt.clf() plt.title("Training...") plt.xlabel("Episode") plt.ylabel("Reward") plt.plot(y_t.numpy()) # Take 100 episode averages and plot them too if len(reward_history) >= 100: means = y_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) plt.plot(means.numpy()) plt.pause(0.001) # pause a bit so that plots are updated for n_episode in range(3000): state = env.reset() sumR = 0 for t in range(3000): # Don't infinite loop while learning if RENDER: env.render() action = agent.choose_action(state) state_, reward, done, _ = env.step(action) if not done: agent.store_trajectory(state, action, reward, done, state_) agent.trainCriticTD() agent.trainActor() sumR += reward if done: break state = state_ agent.updateTarget() reward_history.append(sumR) if RENDER: plot_durations() avgR = sum(reward_history[:-11:-1]) / 10 print( "episode: {:4d} duration: {:4d} Reward: {:5.1f} avgR: {:5.1f}".format( n_episode, t, sumR, avgR ) ) # 訓練成功條件 if avgR > -100 and n_episode > 10: break # 儲存 model 參數 torch.save(agent.actorCriticEval.state_dict(), "params.pkl")
參考
策略梯度方法Policy Gradient Algorithms
Going Deeper Into Reinforcement Learning: Fundamentals of Policy Gradients
Notes on the Generalized Advantage Estimation Paper
vy007vikas/PyTorch-ActorCriticRL
floodsung/DDPG
An Intuitive Explanation of Policy Gradient
https://zhuanlan.zhihu.com/p/26882898
Deep Deterministic Policy Gradient (DDPG) (Tensorflow)
Deep Reinforcement Learning - 1. DDPG原理和算法
The Complete Reinforcement Learning Dictionary
Reinforcement Learning: An Introduction Sutton&Barto,2017
强化学习—DDPG算法原理详解
留言
張貼留言