PG-REINFORCE tensorflow 2.0

PG-REINFORCE tensorflow 2.0,第1张

REINFORCE 算法实现

REINFORCE算法是策略梯度算法最原始的实现算法,这里采用tensorflow2.0进行实现

import tensorflow as tf
import gym
from matplotlib import pyplot as plt
import numpy as np


def PGReinforce_run(PGReinforce_agent=None, episode=1000):
    PGReinforce_agent = PGReinforce_agent.PGReinforce(n_actions=2, n_features=4)
    PGReinforce_agent.net_init()
    score = []
    env = gym.make('CartPole-v1')
    bias = 5
    for i_episode in range(episode):
        # 初始化,
        observation = env.reset()
        done = False
        t = 0
        while not done:
            env.render()
            action = PGReinforce_agent.choose_action(observation)
            PGReinforce_agent.traj_store(observation, action)
            observation_, reward, done, info = env.step(action)
            x, x_dot, theta, theta_dot = observation
            r2 = - abs(theta)*5
            # r1 = - abs(x)
            PGReinforce_agent.r_calculate(reward + r2)
            observation = observation_
            t += 1
        # PGReinforce_agent.loss_calculate()
        print("Episode finished after {} time steps".format(t + 1))
        score.append(t + 1)
        PGReinforce_agent.learn(5)
        if (i_episode + 1) % 100 == 0:
            plt.plot(score)  # 绘制波形
            # plt.draw()
            plt.savefig(f"RL_algorithm_package/img/pic_{0}_bias-{i_episode + 1}.png")


class PGReinforce:
    def __init__(self, n_actions, n_features, gamma=0.9, learning_rate=0.01):
        self.gamma = gamma
        self.n_actions = n_actions
        self.n_features = n_features
        # 轨迹
        self.traj = []
        # 网络
        self.pg_model = None
        self.net_init()
        # 一个轨迹序列的总回报
        self.r = []
        # 优化器
        self.opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    def choose_action(self, s):
        """
        注意,这里动作的选择与之前DQN不同,DQN是选择最大的可能性的动作,而这里要对输出进行采样,来选择动作
        :param s:
        :return:
        """
        s = s.reshape(1, 4)
        action_value = self.pg_model.predict(np.array(s))
        action = np.random.choice(np.arange(action_value.shape[1]), p=action_value[0])
        return action

    def r_calculate(self, r):
        """
        每次输入r,累加算计每一个回合的总回报值r
        :param r: 回报值
        :return:
        """
        self.r.append(r)

    def loss_calculate(self, bias):
        a_list = []
        s_list = []
        G_list = []
        for index in range(len(self.r)):
            G_list.append([self.gamma ** index * sum(self.r[index:])])
        for index, item in enumerate(self.traj):
            a_list.append(self.traj[index][1])
            s_list.append(self.traj[index][0])
        a_one_hot = tf.one_hot(a_list, self.n_actions)
        s = np.array(s_list)
        G = np.matmul(np.array(G_list), np.ones([1, 2]))
        out_put_a = self.pg_model(s)
        # 注意loss函数的计算。
        log_pro = tf.reduce_sum(a_one_hot * tf.math.log(out_put_a), axis=1)
        # 因为是计算真实的动作和网络输出动作的相似程度,需要采用reduce_mean函数来进行计算loss
        # 这里loss就一个值,因为是一个回合进行更新一次。意思是这个回合的相似程度,这里相当于是一个分类问题
        loss = - tf.reduce_mean((G - bias)*log_pro)
        self.r.clear()
        self.traj.clear()
        return loss

    def traj_store(self, s, a):
        """
        轨迹存储函数,将s,a存储到列表中,done以及r不必存储
        :param s: 状态
        :param a: 动作
        :return:
        """
        s_list = []
        for index in range(self.n_features):
            s_list.append(s[index])
        self.traj.append([s_list, a])

    def net_init(self):
        inputs = tf.keras.Input(shape=(self.n_features,))
        d1 = tf.keras.layers.Dense(32, activation='relu')(inputs)
        output = tf.keras.layers.Dense(self.n_actions, activation='softmax')(d1)
        self.pg_model = tf.keras.Model(inputs=inputs, outputs=output)

    def learn(self, bias):
        # 更新梯度
        with tf.GradientTape() as Tape:
            loss = self.loss_calculate(bias)
            grads = Tape.gradient(loss, self.pg_model.trainable_variables)
            w = self.pg_model.get_weights()
            # print(f"w_before = {w}")
            self.opt.apply_gradients(zip(grads, self.pg_model.trainable_variables))
            w = self.pg_model.get_weights()
            # print(f"w_after = {w}")


①这里需要注意的点是REINFORCE程序的动作选择是通过对输出采样得到的,而不是采用最大化的算法。

    def choose_action(self, s):
        """
        注意,这里动作的选择与之前DQN不同,DQN是选择最大的可能性的动作,而这里要对输出进行采样,来选择动作
        :param s:
        :return:
        """
        s = s.reshape(1, 4)
        action_value = self.pg_model.predict(np.array(s))
        action = np.random.choice(np.arange(action_value.shape[1]), p=action_value[0])
        return action

②这里是蒙特卡洛方法,在一轮游戏结束后,才会对神经网络进行更新。所以这里的回报值也是这一轮游戏的回报值。

    def r_calculate(self, r):
        """
        每次输入r,累加算计每一个回合的总回报值r
        :param r: 回报值
        :return:
        """
        self.r.append(r)

③loss计算

def loss_calculate(self, bias):
        a_list = []
        s_list = []
        G_list = []
        for index in range(len(self.r)):
            G_list.append([self.gamma ** index * sum(self.r[index:])])
        for index, item in enumerate(self.traj):
            a_list.append(self.traj[index][1])
            s_list.append(self.traj[index][0])
        a_one_hot = tf.one_hot(a_list, self.n_actions)
        s = np.array(s_list)
        G = np.matmul(np.array(G_list), np.ones([1, 2]))
        out_put_a = self.pg_model(s)
        # 注意loss函数的计算。
        log_pro = tf.reduce_sum(a_one_hot * tf.math.log(out_put_a), axis=1)
        # 因为是计算真实的动作和网络输出动作的相似程度,需要采用reduce_mean函数来进行计算loss
        # 这里loss就一个值,因为是一个回合进行更新一次。意思是这个回合的相似程度,这里相当于是一个分类问题
        loss = - tf.reduce_mean((G - bias)*log_pro)
        self.r.clear()
        self.traj.clear()
        return loss

④在训练过程中,感觉会出现过拟合的现象。网络到达一个比较好的参数之后,如果继续进行训练,系统的性能反而会下降。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/798810.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存