REINFORCE算法是策略梯度算法最原始的实现算法,这里采用tensorflow2.0进行实现
import tensorflow as tf
import gym
from matplotlib import pyplot as plt
import numpy as np
def PGReinforce_run(PGReinforce_agent=None, episode=1000):
PGReinforce_agent = PGReinforce_agent.PGReinforce(n_actions=2, n_features=4)
PGReinforce_agent.net_init()
score = []
env = gym.make('CartPole-v1')
bias = 5
for i_episode in range(episode):
# 初始化,
observation = env.reset()
done = False
t = 0
while not done:
env.render()
action = PGReinforce_agent.choose_action(observation)
PGReinforce_agent.traj_store(observation, action)
observation_, reward, done, info = env.step(action)
x, x_dot, theta, theta_dot = observation
r2 = - abs(theta)*5
# r1 = - abs(x)
PGReinforce_agent.r_calculate(reward + r2)
observation = observation_
t += 1
# PGReinforce_agent.loss_calculate()
print("Episode finished after {} time steps".format(t + 1))
score.append(t + 1)
PGReinforce_agent.learn(5)
if (i_episode + 1) % 100 == 0:
plt.plot(score) # 绘制波形
# plt.draw()
plt.savefig(f"RL_algorithm_package/img/pic_{0}_bias-{i_episode + 1}.png")
class PGReinforce:
def __init__(self, n_actions, n_features, gamma=0.9, learning_rate=0.01):
self.gamma = gamma
self.n_actions = n_actions
self.n_features = n_features
# 轨迹
self.traj = []
# 网络
self.pg_model = None
self.net_init()
# 一个轨迹序列的总回报
self.r = []
# 优化器
self.opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)
def choose_action(self, s):
"""
注意,这里动作的选择与之前DQN不同,DQN是选择最大的可能性的动作,而这里要对输出进行采样,来选择动作
:param s:
:return:
"""
s = s.reshape(1, 4)
action_value = self.pg_model.predict(np.array(s))
action = np.random.choice(np.arange(action_value.shape[1]), p=action_value[0])
return action
def r_calculate(self, r):
"""
每次输入r,累加算计每一个回合的总回报值r
:param r: 回报值
:return:
"""
self.r.append(r)
def loss_calculate(self, bias):
a_list = []
s_list = []
G_list = []
for index in range(len(self.r)):
G_list.append([self.gamma ** index * sum(self.r[index:])])
for index, item in enumerate(self.traj):
a_list.append(self.traj[index][1])
s_list.append(self.traj[index][0])
a_one_hot = tf.one_hot(a_list, self.n_actions)
s = np.array(s_list)
G = np.matmul(np.array(G_list), np.ones([1, 2]))
out_put_a = self.pg_model(s)
# 注意loss函数的计算。
log_pro = tf.reduce_sum(a_one_hot * tf.math.log(out_put_a), axis=1)
# 因为是计算真实的动作和网络输出动作的相似程度,需要采用reduce_mean函数来进行计算loss
# 这里loss就一个值,因为是一个回合进行更新一次。意思是这个回合的相似程度,这里相当于是一个分类问题
loss = - tf.reduce_mean((G - bias)*log_pro)
self.r.clear()
self.traj.clear()
return loss
def traj_store(self, s, a):
"""
轨迹存储函数,将s,a存储到列表中,done以及r不必存储
:param s: 状态
:param a: 动作
:return:
"""
s_list = []
for index in range(self.n_features):
s_list.append(s[index])
self.traj.append([s_list, a])
def net_init(self):
inputs = tf.keras.Input(shape=(self.n_features,))
d1 = tf.keras.layers.Dense(32, activation='relu')(inputs)
output = tf.keras.layers.Dense(self.n_actions, activation='softmax')(d1)
self.pg_model = tf.keras.Model(inputs=inputs, outputs=output)
def learn(self, bias):
# 更新梯度
with tf.GradientTape() as Tape:
loss = self.loss_calculate(bias)
grads = Tape.gradient(loss, self.pg_model.trainable_variables)
w = self.pg_model.get_weights()
# print(f"w_before = {w}")
self.opt.apply_gradients(zip(grads, self.pg_model.trainable_variables))
w = self.pg_model.get_weights()
# print(f"w_after = {w}")
①这里需要注意的点是REINFORCE程序的动作选择是通过对输出采样得到的,而不是采用最大化的算法。
def choose_action(self, s):
"""
注意,这里动作的选择与之前DQN不同,DQN是选择最大的可能性的动作,而这里要对输出进行采样,来选择动作
:param s:
:return:
"""
s = s.reshape(1, 4)
action_value = self.pg_model.predict(np.array(s))
action = np.random.choice(np.arange(action_value.shape[1]), p=action_value[0])
return action
②这里是蒙特卡洛方法,在一轮游戏结束后,才会对神经网络进行更新。所以这里的回报值也是这一轮游戏的回报值。
def r_calculate(self, r):
"""
每次输入r,累加算计每一个回合的总回报值r
:param r: 回报值
:return:
"""
self.r.append(r)
③loss计算
def loss_calculate(self, bias):
a_list = []
s_list = []
G_list = []
for index in range(len(self.r)):
G_list.append([self.gamma ** index * sum(self.r[index:])])
for index, item in enumerate(self.traj):
a_list.append(self.traj[index][1])
s_list.append(self.traj[index][0])
a_one_hot = tf.one_hot(a_list, self.n_actions)
s = np.array(s_list)
G = np.matmul(np.array(G_list), np.ones([1, 2]))
out_put_a = self.pg_model(s)
# 注意loss函数的计算。
log_pro = tf.reduce_sum(a_one_hot * tf.math.log(out_put_a), axis=1)
# 因为是计算真实的动作和网络输出动作的相似程度,需要采用reduce_mean函数来进行计算loss
# 这里loss就一个值,因为是一个回合进行更新一次。意思是这个回合的相似程度,这里相当于是一个分类问题
loss = - tf.reduce_mean((G - bias)*log_pro)
self.r.clear()
self.traj.clear()
return loss
④在训练过程中,感觉会出现过拟合的现象。网络到达一个比较好的参数之后,如果继续进行训练,系统的性能反而会下降。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)