强化学习(实践):REINFORCE,AC,PPO

强化学习(实践):REINFORCE,AC,PPO,第1张

1,REINFORCE

在车杆环境中进行 REINFORCE 算法的实验:

import gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import rl_utils

首先定义策略网络 PolicyNet,其输入是某个状态,输出则是该状态下的动作概率分布,这里采用在离散动作空间上的 softmax()函数来实现一个可学习的多项分布。

class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return F.softmax(self.fc2(x), dim=1)

再定义我们的 REINFORCE 算法。在函数take_action()函数中,我们通过动作概率分布对离散的动作进行采样。在更新过程中,我们按照算法将损失函数写为策略回报的负数,即,对求导后就可以通过梯度下降来更新策略。

class REINFORCE:
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device):
        self.policy_net = PolicyNet(state_dim, hidden_dim,action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(),lr=learning_rate)  # 使用Adam优化器
        self.gamma = gamma  # 折扣因子
        self.device = device

    def take_action(self, state):  # 根据动作概率分布随机采样
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.policy_net(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        reward_list = transition_dict['rewards']
        state_list = transition_dict['states']
        action_list = transition_dict['actions']

        G = 0
        self.optimizer.zero_grad()
        for i in reversed(range(len(reward_list))):  # 从最后一步算起
            reward = reward_list[i]
            state = torch.tensor([state_list[i]],dtype=torch.float).to(self.device)
            action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)
            log_prob = torch.log(self.policy_net(state).gather(1, action))
            G = self.gamma * G + reward
            loss = -log_prob * G  # 每一步的损失函数
            loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 梯度下降
learning_rate = 1e-3
num_episodes = 1000
hidden_dim = 128
gamma = 0.98
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = "CartPole-v0"
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = REINFORCE(state_dim, hidden_dim, action_dim, learning_rate, gamma,device)

return_list = []
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            transition_dict = {
                'states': [],
                'actions': [],
                'next_states': [],
                'rewards': [],
                'dones': []
            }
            state = env.reset()
            env.render()
            done = False
            while not done:
                action = agent.take_action(state)
                next_state, reward, done, _ = env.step(action)
                transition_dict['states'].append(state)
                transition_dict['actions'].append(action)
                transition_dict['next_states'].append(next_state)
                transition_dict['rewards'].append(reward)
                transition_dict['dones'].append(done)
                state = next_state
                episode_return += reward
            return_list.append(episode_return)
            agent.update(transition_dict)
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

在 CartPole-v0 环境中,满分就是 200 分,我们发现 REINFORCE 算法效果很好,可以达到 200 分。接下来我们绘制训练过程中每一条轨迹的回报变化图。由于回报抖动比较大,往往会进行平滑处理。

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('REINFORCE on {}'.format(env_name))
plt.show()

mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('REINFORCE on {}'.format(env_name))
plt.show()

可以看到,随着收集到的轨迹越来越多,REINFORCE 算法有效地学习到了最优策略。不过,相比于前面的 DQN 算法,REINFORCE 算法使用了更多的序列,这是因为 REINFORCE 算法是一个在线策略算法,之前收集到的轨迹数据不会被再次利用。此外,REINFORCE 算法的性能也有一定程度的波动,这主要是因为每条采样轨迹的回报值波动比较大,这也是 REINFORCE 算法主要的不足。

2,Actor-Critic算法

仍然在 Cartpole 环境上进行 Actor-Critic 算法的实验。

import gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import rl_utils

定义我们的策略网络 PolicyNet,与 REINFORCE 算法中一样。

class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return  F.softmax(self.fc2(x),dim=1)

Actor-Critic 算法中额外引入一个价值网络,接下来的代码定义我们的价值网络 ValueNet,输入是状态,输出状态的价值。

class ValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

再定义我们的 ActorCritic 算法。主要包含采取动作和更新网络参数两个函数。

class ActorCritic:
    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device) # 价值网络
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # 价值网络优化器
        self.gamma = gamma

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float)
        probs = self.actor(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1)

        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) # 时序差分目标
        td_delta = td_target - self.critic(states) # 时序差分误差
        log_probs = torch.log(self.actor(states).gather(1, actions))
        actor_loss = torch.mean(-log_probs * td_delta.detach())
        critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 均方误差损失函数
        self.actor_optimizer.zero_grad()
        self.critic_optimizer.zero_grad()
        actor_loss.backward() # 计算策略网络的梯度
        critic_loss.backward() # 计算价值网络的梯度
        self.actor_optimizer.step() # 更新策略网络参数
        self.critic_optimizer.step() # 更新价值网络参数
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 1000
hidden_dim = 128
gamma = 0.98
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = ActorCritic(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device)

return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Actor-Critic on {}'.format(env_name))
plt.show()

mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Actor-Critic on {}'.format(env_name))
plt.show()

根据实验结果我们发现,Actor-Critic 算法很快便能收敛到最优策略,并且训练过程非常稳定,抖动情况相比 REINFORCE 算法有了明显的改进,这多亏了价值函数的引入减小了方差。

3,TRPO 3.1,算法概述

基于策略的方法:策略梯度和AC算法。这些算法虽然简单、直观,但在实际应用过程中会遇到训练不稳定的情况。基于策略的方法中参数化智能体策略,并设计衡量策略好坏的目标函数,通过梯度上升的方法来最大化这个目标函数,使得策略最优。具体来说:假设  表示策略  的参数,定义 ,基于策略的方法的目标是找到 ,策略梯度算法主要沿着  方向迭代更新策略参数 。但是这种算法有一个明显的缺点:当策略网络是深度模型时,沿着梯度策略梯度更新参数  ,很有可能由于步长太长,策略突然显著变差,进而影响训练结果。

针对上面的问题,考虑在更新时找到一块信任区域(Trust Region),在这个区域上更新策略时能够得到某种策略性能的安全保证,这就是信任区域策略优化(Truest Region Policy Optimization,TRPO)算法的主要思想。TRPO算法在2015年被提出,它在理论上能够保证策略学习的性能单调性,并在实际应用中取得了比策略梯度算法更好的效果。

3.2,策略目标

假设当前策略为 ,参数为 。考虑如何借助当前的  找到一个更优的参数 ,是使得 。具体来说,由于初始状态  的分布和策略无关,因此上述策略  下的优化目标  可以写成新策略  下的期望形式:

基于以上等式,可以推导出新旧策略的目标函数之间的差距:

将时序差分残差定义为优势函数 (确定性策略中的一种思想):

最后一个等号的成立用到了状态访问分布的定义:,所以只需要找个一个策略,使得 ,就能保证策略性能单调递增,即 。

但是直接求解非常困难,因为  是我们需要求解的策略,但我们又要用它来收集样本。把所有的新策略都拿来收集数据,然后判断哪个策略满足上述条件的做法显然是不现实的。于是TRPO做了一步近似 *** 作,对状态访问分布进行了相应的处理。具体而言,忽略两个策略之间的状态访问分布变化,直接采用旧的策略  的状态分布,定义如下替代优化目标:

当新旧策略非常接近时,状态访问分布变化很小,这样的近似是合理的。其中,动作仍然使用新策略  采样得到,可以用重要性采样对动作分布进行处理:

这样,我们就可以基于旧策略  已经采样出的数据来估计并优化新策略  了。为了保证新旧策略足够接近,TRPO使用了库尔贝克-莱布勒(KL)散度来衡量策略之间的距离,并给出了整体优化公式:

 s.t 

这里不等式约束定义了策略空间中的一个库尔贝克-莱布勒球,称为信任区域。在这个区域中,可以认为当前学习策略和环境交互的状态分布与上一轮策略最后采样的状态分布一致,进而可以基于一步动作的重要性采样方法使当前学习策略稳定上升。

3.3,近似求解

直接求解上述带约束的优化问题比较麻烦,TRPO在其具体实现中做了一步近似 *** 作来快速求解。为了方便起见,在接下来公式中使用  代替之前的  ,表示第  次迭代之后的策略。首先对目标函数和约束在  进行泰勒展开,分别用 1阶、2阶进行近似:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/728987.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存