雨宿り

だいぶスローペース

PPOの実装(ネットワーク共有版)

はじめに

PolicyとValueのネットワークをタイプのものを全然見かけなかったので、 勉強も兼ねてPyTorchで実装してみました。
コードは以下に置いてあります。
https://github.com/tsubame-mz/machine_learning/blob/master/RL/ppo.py

学習モデル

特に解説することも無いですが、ネットワークを途中から枝分かれさせ、 PolicyとValueをそれぞれ出力するようにしています。

class PVNet(nn.Module):
    def __init__(self, in_features, out_features, hid_num, droprate):
        super(PVNet, self).__init__()

        # 共通ネットワーク
        self.layer = nn.Sequential(
            nn.Linear(in_features, hid_num),
            nn.ReLU(inplace=True),
            nn.Dropout(p=droprate),
            nn.Linear(hid_num, hid_num),
            nn.ReLU(inplace=True),
            nn.Dropout(p=droprate),
        )

        # Policy : 各行動の確率を出力
        self.policy = nn.Sequential(nn.Linear(hid_num, out_features), nn.Softmax(dim=-1))

        # Value : 状態の価値を出力
        self.value = nn.Sequential(nn.Linear(hid_num, 1))

        # ネットワークの重みを初期化
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        h = self.layer(x)
        return self.policy(h), self.value(h)

Memory

メモリには1ステップ毎に以下を記録していきます。

  • 状態(State)
  • 行動(Action)
  • 報酬(Reward)
  • 状態価値(Value)
  • 行動の対数尤度(Likelihood)

また、学習前に以下を計算します(finish_path)。

  • 割引報酬和(Return)
  • アドバンテージ(GAE)

finish_path()

ここでは前述のとおり、割引報酬和とアドバンテージを計算します。

割引報酬和( R_t)は以下の式で計算できます。
 r_tは報酬、 \gamma (\lt 1)は割引率です。 実装上は報酬を終端から逆順に辿って加算していっています。

 \displaystyle R_t = \sum_{i=t}^{T} \gamma ^i r_i

last_return = self.reward_lst[t] + gamma * last_return
self.return_lst[t] = last_return

アドバンテージ( A_t)はGAE(generalized advantage estimation)を使用します。 アドバンテージ自体の理解が怪しいのですが、状態行動価値(≒報酬)から状態価値を減算することで、 その行動の相対的な価値を計算しているそうです。

GAEは以下の式で計算できます。
 V_tは状態価値で、ネットワークから出力された値を使用します。

 \displaystyle
\begin{eqnarray}
A_t = \sum_{i=t}^{T} (\gamma\lambda) ^i \delta_i \\
\delta_t = r_t + \gamma V_{t+1} - V_t
\end{eqnarray}

delta = self.reward_lst[t] + gamma * value_lst[t + 1] - value_lst[t]
last_delta = delta + (gamma * lam) * last_delta
self.advantage_lst[t] = last_delta

get_batch()

メモリに溜まっているいるデータをPyTorchのテンソルにして返し、メモリ内をからっぽに初期化します。 アドバンテージは標準化(平均0、分散1)となるように補正をかけています。

advantage_tsr = torch.FloatTensor(self.advantage_lst)
advantage_tsr = (advantage_tsr - advantage_tsr.mean()) / (advantage_tsr.std() + adv_eps)

Agent

行動のサンプリングと学習の進行を行わせます。

get_action()

行動をサンプリングします。 学習に使うための価値と対数尤度もまとめて返しています。 楽するためにカテゴリカル分布を表現しているCategoricalを使用しました。
sample()で確率に従って値をひとつ取得、log_prob()で行動に対する対数尤度を計算してくれます。 エントロピーの計算もしてくれるので、後述の学習時に使用しています。

pi, value = self.model(state)
c = Categorical(pi)
action = c.sample()
return action.squeeze().item(), value.item(), c.log_prob(action).item()

update()

モデルの学習を行います。

最初にメモリから取得した状態をまとめてモデルに放り込み、確率と価値を計算させます。 そしてget_action()と同様に行動に対する対数尤度を計算し、メモリにある尤度との比を計算します。 比は以下の式で確率の除算となってますが、ここでは対数をとっているので減算の指数をとれば同じ意味になります(両辺の対数をとれば変形できる)。

 \displaystyle
\begin{eqnarray}
ratio = \frac{\pi (a|s)_{new}}{\pi (a|s)_{old}}
\end{eqnarray}

pi, value = self.model(state_tsr)
c = Categorical(pi)
new_log_pi = c.log_prob(action_tsr)
ratio = (new_log_pi - log_pi_tsr).exp()  # pi(a|s) / pi_old(a|s)

上記のratioとアドバンテージを掛けたものがPolicy側のlossになりますが、 ここでPPOの肝となるクリッピングを行います。 lossの限界をclip_ratioで制限しています(clip_ratio=0.2なら比は0.8~1.2倍まで)

Value側のlossは (R_t - V_t)^2で計算します。

最後にこれらとエントロピーを全て足し合わせたものが全体のlossになります。 これをbackward()して学習を勧めます。

clip_adv = torch.where(
    advantage_tsr >= 0, (1 + clip_ratio) * advantage_tsr, (1 - clip_ratio) * advantage_tsr
)
pi_loss = -(torch.min(ratio * advantage_tsr, clip_adv)).mean()
v_loss = v_loss_c * (return_tsr - value).pow(2).mean()
entropy = ent_c * c.entropy().mean()
toral_loss = pi_loss + v_loss + entropy

学習は同じデータを使いまして何回か連続で行いますが、 学習に使っている確率 \pi (a|s) (の対数)が今のネットワークの出力と離れてきたら そのデータでの学習は打ち切るようにしています。 (厳密にはKLダイバージェンスではないですが)

kl = (new_log_pi - log_pi_tsr).mean().item()  # KL-divergence
if kl > 1.5 * target_kl:
    break

学習ループ

前述のAgentやMemoryを使用してループを回します。 今回は簡単に1エピソード毎に1回学習というふうにし、学習の早期完了もなしとしてます。 また報酬はそのままだとうまくいかなかったので、CartPole用に調整しています
(195ステップ以上で報酬1、転んだら0。それ以外は0)。

だいたい200エピソードを超え始めると200ステップ立ち続けるかと思います。

env = gym.make(args.env)
in_features = env.observation_space.shape[0]
out_features = env.action_space.n

model = PVNet(in_features, out_features, args.hid_num, args.droprate)
agent = Agent(model)
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
memory = Memory()

for episode in range(args.max_episodes):
    model.eval()  # 評価モード
    state = env.reset()
    memory.initialize()

    done = False
    step = 0
    while not done:
        state_tsr = torch.from_numpy(state).reshape(1, -1).float()  # バッチ形式のテンソル化
        action, value, log_pi = agent.get_action(state_tsr)
        next_state, _, done, _ = env.step(action)

        reward = reward_modify(step, done)  # 報酬を手動調整

        memory.add(state, action, reward, value, log_pi)

        state = next_state
        step += 1

    memory.finish_path(args.gamma, args.lam)
    model.train()  # 学習モード
    batch = memory.get_batch(args.adv_eps)
    pi_loss, v_loss, entropy, total_loss = agent.update(
        batch, optimizer, args.clip_ratio, args.v_loss_c, args.ent_c, args.target_kl, args.train_iters
    )

    print(
        "Episode[{:3d}], Step[{:3d}], Loss(P/V/E/T)[{:+.6f}/{:+.6f}/{:+.6f}/{:+.6f}]".format(
            episode + 1, step, pi_loss, v_loss, entropy, total_loss
        )
    )

おわりに

最低限動くものだけなので、だいぶシンプルな実装になったかと思います。
もう少しまともにするにはメモリ周りの修正(DQNでよくみるリングバッファとか…)が必要になると思いますが、 それはまた気が向いたときにでも。

参考