【强化学习】Sarsa算法详解以及用于二维空间探索【Python实现】

【强化学习】Sarsa算法详解以及用于二维空间探索【Python实现】,第1张

Sarsa算法

Sarsa算法,是基于Q-Learning算法。改动其实很小。

本文工作基于之前的Q-Learning的项目,如果有疑问可以看下面两个问题:

  • 【强化学习】Q-Learning算法详解以及Python实现【80行代码】
  • 【强化学习】Q-Learning用于二维空间探索【Python实现】
Sarsa算法细节

本质上,也是维护Q表。只是在迭代方式上,做了轻微的修改。

  • 而Sarsa的迭代公式是:

一般的更新的公式 是
Q [ S , A ] = ( 1 − α ) ∗ Q [ S , A ] + α ∗ ( R + γ ∗ Q [ S n e x t , A n e x t ] ) Q[S, A] = (1-\alpha)*Q[S, A] + \alpha*(R + \gamma * {Q[S_{next}, A_{next}]}) Q[S,A]=(1α)Q[S,A]+α(R+γQ[Snext,Anext])

对于下一步是终点的更新公式 是
Q [ S , A ] = ( 1 − α ) ∗ Q [ S , A ] + α ∗ R Q[S, A] = (1-\alpha)*Q[S, A] + \alpha*R Q[S,A]=(1α)Q[S,A]+αR

  • 而Q-Learning的迭代公式是:

一般的更新的公式 是
Q [ S , A ] = ( 1 − α ) ∗ Q [ S , A ] + α ∗ ( R + γ ∗ m a x Q [ S n e x t , : ] ) Q[S, A] = (1-\alpha)*Q[S, A] + \alpha*(R + \gamma * max{Q[S_{next}, :]}) Q[S,A]=(1α)Q[S,A]+α(R+γmaxQ[Snext,:])

对于下一步是终点的更新公式 是
Q [ S , A ] = ( 1 − α ) ∗ Q [ S , A ] + α ∗ R Q[S, A] = (1-\alpha)*Q[S, A] + \alpha*R Q[S,A]=(1α)Q[S,A]+αR

具体的改变就是在于

m a x Q [ S n e x t , : ] max{Q[S_{next}, :]} maxQ[Snext,:] 变成了 Q [ S n e x t , A n e x t ] {Q[S_{next}, A_{next}]} Q[Snext,Anext]

其实就是整个算法看得更加清晰一点,即明确知道状态转移之后的步骤的结果来迭代,而不是直接用max值来迭代。

  • 其实还是在一定程度上提高的 Q-Learning的泛化能力,避免陷入局部最优解。
代码实现

其实也很简单,基于之前的项目,做简单修改即可。只改了main跟RL的learn部分

  • 【强化学习】Q-Learning用于二维空间探索【Python实现】

还是直接运行treasure_maze_main.py文件即可。

  • RL_Barin.py
import pandas as pd
import numpy as np


class RLBrain(object):
    def __init__(self, actions, lr=0.1, gamma=0.9, epsilon=0.9):
        self.actions = actions
        self.q_table = pd.DataFrame(
            [],
            columns=self.actions
        )
        self.lr, self.gamma, self.epsilon = lr, gamma, epsilon

    def check_state(self, s):
        if s not in self.q_table.index:
            self.q_table = self.q_table.append(
                pd.Series(
                    [0] * len(self.actions),
                    index=self.actions,
                    name=s
                )
            )

    def choose_action(self, s):
        self.check_state(s)
        state_table = self.q_table.loc[s, :]

        if (np.random.uniform() >= self.epsilon) or (state_table == 0).all():
            return np.random.choice(self.actions)
        else:
            return np.random.choice(state_table[state_table == np.max(state_table)].index)

    def learn(self, s, s_, a, r, done, a_):
        self.check_state(s_)
        q_old = self.q_table.loc[s, a]
        if done:
            q_new = r
        else:
            q_new = r + self.gamma * self.q_table.loc[s_, a_]
        self.q_table.loc[s, a] += self.lr * (q_new - q_old)

  • treasure_maze_main.py
from RL_Brain import RLBrain
from env import Maze

if __name__ == '__main__':
    ALPHA = 0.1
    GAMMA = 0.9
    EPSILON = 0.9
    MAX_EPISODE = 15

    env = Maze(shape=(3, 4))
    RL = RLBrain(actions=list(env.actions.keys()))
    for episode in range(MAX_EPISODE):
        env.refresh()

        step_counter = 0
        done = False

        env.update(done, episode, step_counter)

        s = env.point
        a = RL.choose_action(str(s))
        while not done:
            s_, r, done = env.get_env_feedback(a)
            if not done:
                a_ = RL.choose_action(str(s_))
            else:
                a_ = None
            RL.learn(str(s), str(s_), a, r, done, a_)
            s = s_
            a = a_
            step_counter += 1
            env.update(done, episode, step_counter, r)

  • env.py
import time
import numpy as np


class Maze(object):
    def __init__(self, shape=None, hell_num=2):
        if (shape is None) or (not isinstance(shape, (tuple, list))) or (len(shape) > 2):
            shape = (5, 5)
        self.shape = shape
        self.map = np.zeros(shape)
        self.actions = {
            'u': [-1, 0],
            'd': [1, 0],
            'l': [0, -1],
            'r': [0, 1]
        }

        for _ in range(hell_num):
            self._random_num(shape, -1)
        self._random_num(shape, 1)

        self.point = None
        self.refresh()

    def _random_num(self, shape, v):
        n = shape[0] * shape[1]
        while True:
            rd_num = np.random.randint(0, n - 1)
            y = rd_num // shape[0]
            x = rd_num % shape[0]
            if self.map[x][y] == 0:
                self.map[x][y] = v
                break

    def refresh(self):
        self.point = [0, 0]

    def point_check(self, point):
        flags = [0, 1]
        for f in flags:
            if (point[f] < 0) or (point[f] >= self.shape[f]):
                return False
        return True

    def get_env_feedback(self, A):
        if A not in self.actions:
            raise Exception("Wrong Action")
        A = self.actions[A]
        point_ = [
            self.point[0] + A[0],
            self.point[1] + A[1]
        ]
        if self.point_check(point_):
            self.point = point_
            R = self.map[self.point[0]][self.point[1]]
            done = (R != 0)
        else:
            R, done = -1, False
        return self.point, R, done

    def show_matrix(self, m):
        for x in m:
            print(' '.join(list(map(lambda i: str(int(i)) if not isinstance(i, str) else i, x))))

    def update(self, done, episode, step, r=None):
        # os.system("cls")
        m = self.map.tolist()
        m[self.point[0]][self.point[1]] = 'x'
        self.show_matrix(m)
        print("==========")
        if done:
            print("episode: %s; step: %s; reward: %s" % (episode, step, r))
            time.sleep(3)
        else:
            time.sleep(0.3)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/727067.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存