当前位置:网站首页>pytorch笔记:TD3
pytorch笔记:TD3
2022-07-27 05:43:00 【UQI-LIUWJ】
参考代码来源:easy-rl/codes/TD3 at master · datawhalechina/easy-rl (github.com)
理论部分:强化学习笔记:双延时确定策略梯度 (TD3)_UQI-LIUWJ的博客-CSDN博客
1 task1_train.py
1.1 导入库
import sys,os
curr_path = os.path.dirname(__file__)
parent_path=os.path.dirname(curr_path)
sys.path.append(parent_path) # add current terminal path to sys.path
import torch
import gym
import numpy as np
import datetime
from agent import TD3
from utils import save_results,make_dir,plot_rewards
curr_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# 获取当前时间1.2 TD3Config
TD3的一些基本配置
class TD3Config:
def __init__(self) -> None:
self.algo = 'TD3'
# 算法名称
self.env_name = 'Pendulum-v1'
# 环境名称
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 是否使用GPU
self.train_eps = 600
# 训练的回合数
self.epsilon_start = 50
#在这一步之前的episode,随机选择动作;
#在这一步之后的episode,根据actor选择动作
self.eval_freq = 10
# 多少轮episode输出一次中间结果
self.max_timestep = 100000
# 一个episode最大迭代多少轮
self.expl_noise = 0.1
#exploration的时候,高斯噪声的标准差
self.batch_size = 256
# actor和critic的batch size
self.gamma = 0.9
# 折扣因子
self.lr=3e-4
# 学习率
self.soft = 0.0005
# 软更新大小
self.policy_noise = 0.2
# 在更新actor的时候,往动作上加截断正态分布,这个policy_noise是乘在N(0,1)上的系数
self.noise_clip = 0.3
# 在更新actor的时候,往动作上加截断正态分布,这个noise_clip就是截断值
self.policy_freq = 2
#不是每一步都更新actor和目标网络,隔几步再更新它们1.3 PlotConfig
绘图路径的一些配置
class PlotConfig(TD3Config):
def __init__(self) -> None:
super().__init__()
self.result_path = "./outputs/" + self.env_name + '/'+curr_time+'/results/'
print(self.result_path)
# 保存结果的路径
self.model_path = "./outputs/" + self.env_name + '/'+curr_time+'/models/'
# 保存模型的路径
self.save = True
# 是否保存图片1.4 train
def train(cfg,env,agent):
print('开始训练!')
print(f'环境:{cfg.env_name}, 算法:{cfg.algo}, 设备:{cfg.device}')
rewards = []
# 记录所有回合的奖励
ma_rewards = []
# 记录所有回合的滑动平均奖励
for i_ep in range(int(cfg.train_eps)):
ep_reward = 0
ep_timesteps = 0
state, done = env.reset(), False
while not done:
ep_timesteps += 1
if i_ep < cfg.epsilon_start:
action = env.action_space.sample()
#随机选择动作
else:
action = (
agent.choose_action(np.array(state))
+ np.random.normal(0, max_action * cfg.expl_noise, size=n_actions)
).clip(-max_action, max_action)
#根据actor选择动作,然后添加高斯噪声,最后进行截断(保证动作是一个可行的动作)
'''
一开始critic都不准的时候,如果贸然使用critic来辅佐actor选择动作,可能效果不太好,所以一开始随机选择action,来train critic
到一定程度之后,critic已经训出一点感觉了,就可以使用actor来选择动作了
'''
next_state, reward, done, _ = env.step(action)
#用这个action和环境进行交互
done_bool = float(done) if ep_timesteps < env._max_episode_steps else 0
#是否是终止状态(有无后续的state)
#两种情况没有后续state:到达最大的episode数量/环境返回终止状态
agent.memory.push(state, action, next_state, reward, done_bool)
#将transition存入经验回放中
state = next_state
ep_reward += reward
# Train agent after collecting sufficient data
if i_ep+1 >= cfg.epsilon_start:
agent.update()
#在此之间,由于动作都是随机选的,和actor的决策无关,所以不用更新actor
if (i_ep+1)%cfg.eval_freq == 0:
print('回合:{}/{}, 奖励:{:.2f}'.format(i_ep+1, cfg.train_eps, ep_reward))
rewards.append(ep_reward)
if ma_rewards:
ma_rewards.append(0.9*ma_rewards[-1]+0.1*ep_reward)
else:
ma_rewards.append(ep_reward)
#滑动平均奖励
print('完成训练!')
return rewards, ma_rewards
1.5 eval
def eval(env_name,agent, eval_episodes=10):
eval_env = gym.make(env_name)
rewards,ma_rewards =[],[]
for i_episode in range(eval_episodes):
ep_reward = 0
state, done = eval_env.reset(), False
while not done:
# eval_env.render()
action = agent.choose_action(np.array(state))
#根据学到的agent的actor选择action
state, reward, done, _ = eval_env.step(action)
ep_reward += reward
print(f"Episode:{i_episode+1}, Reward:{ep_reward:.3f}")
rewards.append(ep_reward)
# 计算滑动窗口的reward
if ma_rewards:
ma_rewards.append(0.9*ma_rewards[-1]+0.1*ep_reward)
else:
ma_rewards.append(ep_reward)
return rewards,ma_rewards1.6 主函数
if __name__ == "__main__":
cfg = TD3Config()
#配置TD3相关的参数
plot_cfg = PlotConfig()
#配置画图的路径
env = gym.make(cfg.env_name)
#导入环境
env.seed(1)
torch.manual_seed(1)
np.random.seed(1)
# 设置随机种子
n_states = env.observation_space.shape[0]
n_actions = env.action_space.shape[0]
#根据环境确定状态数和动作数
max_action = float(env.action_space.high[0])
#动作空间最大的数值(2.0)
agent = TD3(n_states,n_actions,max_action,cfg)
#初始化模型
#输入维度为3,输出维度为1
rewards,ma_rewards = train(cfg,env,agent)
#训练TD3
make_dir(plot_cfg.result_path,plot_cfg.model_path)
#逐个创建文件夹
agent.save(path=plot_cfg.model_path)
#保存actor和critic的路径
save_results(rewards,ma_rewards,tag='train',path=plot_cfg.result_path)
#保存 奖励和滑动奖励
plot_rewards(rewards,ma_rewards,plot_cfg,tag="train")
#绘图
###############################测试部分##########################################
eval_agent=TD3(n_states,n_actions,max_action,cfg)
eval_agent.load(path=plot_cfg.model_path)
rewards,ma_rewards = eval(cfg.env_name,eval_agent)
make_dir(plot_cfg.result_path_eval)
save_results(rewards,ma_rewards,tag='eval',path=plot_cfg.result_path_eval)
plot_rewards(rewards,ma_rewards,plot_cfg,tag="eval")
################################################################################
2 agent.py

2.1 导入库
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from TD3.memory import ReplayBuffer2.2 Actor
class Actor(nn.Module):
def __init__(self, input_dim, output_dim, max_action):
'''[summary]
Args:
input_dim (int): 输入维度,这里等于n_states
output_dim (int): 输出维度,这里等于n_actions
max_action (int): action的最大值
'''
super(Actor, self).__init__()
self.l1 = nn.Linear(input_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, output_dim)
self.max_action = max_action
def forward(self, state):
a = F.relu(self.l1(state))
a = F.relu(self.l2(a))
return self.max_action * torch.tanh(self.l3(a))
#经过tanh之后,返回[-1,1]区间的内容,再乘上max_action,就是环境需要的action范围2.3 Critic
class Critic(nn.Module):
def __init__(self, input_dim, output_dim):
super(Critic, self).__init__()
# Q1 architecture
self.l1 = nn.Linear(input_dim + output_dim, 256)
self.l2 = nn.Linear(256, 256)
self.l3 = nn.Linear(256, 1)
# Q2 architecture
self.l4 = nn.Linear(input_dim + output_dim, 256)
self.l5 = nn.Linear(256, 256)
self.l6 = nn.Linear(256, 1)
'''
相当于把TD3图中的目测策略网络1和目标策略网络2在一个class中实现了
'''
def forward(self, state, action):
sa = torch.cat([state, action], 1)
q1 = F.relu(self.l1(sa))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)
q2 = F.relu(self.l4(sa))
q2 = F.relu(self.l5(q2))
q2 = self.l6(q2)
return q1, q2
def Q1(self, state, action):
sa = torch.cat([state, action], 1)
q1 = F.relu(self.l1(sa))
q1 = F.relu(self.l2(q1))
q1 = self.l3(q1)
return q12.4 TD3
class TD3(object):
def __init__(
self,
input_dim,
output_dim,
max_action,
cfg,
):
self.max_action = max_action
#动作空间最大的数值(2.0)
self.gamma = cfg.gamma
# 折扣因子
self.lr = cfg.lr
# 学习率
self.soft=cfg.soft
# 软更新大小
self.policy_noise = cfg.policy_noise
# 在更新actor的时候,往动作上加截断正态分布,这个policy_noise是乘在N(0,1)上的系数
self.noise_clip = cfg.noise_clip
# 在更新actor的时候,往动作上加截断正态分布,这个noise_clip就是截断值
self.policy_freq = cfg.policy_freq
#不是每一步都更新actor,隔几步再更新actor(但是每一步都更新critic)
self.batch_size = cfg.batch_size
#actor和critic的batch size
self.device = cfg.device
#进行训练的设备
self.total_it = 0
#迭代次数的计数器(由于区分哪些步骤需要更新actor和目标函数)
self.actor = Actor(input_dim, output_dim, max_action).to(self.device)
self.actor_target = copy.deepcopy(self.actor)
#目标策略网络
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=self.lr)
self.critic = Critic(input_dim, output_dim).to(self.device)
self.critic_target = copy.deepcopy(self.critic)
#目标价值网络(这里将TD3的两个目标价值网络二合一了)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=self.lr)
self.memory = ReplayBuffer(input_dim, output_dim)
#经验回放
def choose_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
#state[3]——>[1,3]
return self.actor(state).cpu().data.numpy().flatten()
#[1,3]——>[1]
def update(self):
self.total_it += 1
# Sample replay buffer
state, action, next_state, reward, not_done = self.memory.sample(self.batch_size)
with torch.no_grad():
# Select action according to policy and add clipped noise
noise = (
torch.randn_like(action) * self.policy_noise
).clamp(-self.noise_clip, self.noise_clip)
#截断正态分布
next_action = (
self.actor_target(next_state) + noise
).clamp(-self.max_action, self.max_action)
#加上截断高斯分布的next_action
#目标策略网络的输出
# Compute the target Q value
target_Q1, target_Q2 = self.critic_target(next_state, next_action)
#两个目标策略网络的输出
target_Q = torch.min(target_Q1, target_Q2)
#用两个目标策略网络输出较小的一个作为下一个时刻TD target
target_Q = reward + not_done * self.gamma * target_Q
#根据是否终止 计算当前时刻的TD target
current_Q1, current_Q2 = self.critic(state, action)
#两个价值网络的输出
critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
#同步更新两个价值网络的参数
if self.total_it % self.policy_freq == 0:
#不是每一步都更新actor和目标网络,隔几步再更新它们(但是每一步都更新critic)
actor_loss = -self.critic.Q1(state, self.actor(state)).mean()
#计算actor的loss
#根据TD3,只是价值网络1的输出,所以这里只是Q1的value
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
#更新actor
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(self.soft * param.data + (1 - self.soft) * target_param.data)
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(self.soft * param.data + (1 - self.soft) * target_param.data)
#软更新目标网络
def save(self, path):
torch.save(self.critic.state_dict(), path + "td3_critic")
torch.save(self.critic_optimizer.state_dict(), path + "td3_critic_optimizer")
torch.save(self.actor.state_dict(), path + "td3_actor")
torch.save(self.actor_optimizer.state_dict(), path + "td3_actor_optimizer")
def load(self, path):
self.critic.load_state_dict(torch.load(path + "td3_critic"))
self.critic_optimizer.load_state_dict(torch.load(path + "td3_critic_optimizer"))
self.critic_target = copy.deepcopy(self.critic)
self.actor.load_state_dict(torch.load(path + "td3_actor"))
self.actor_optimizer.load_state_dict(torch.load(path + "td3_actor_optimizer"))
self.actor_target = copy.deepcopy(self.actor)
3 memory.py
import numpy as np
import torch
class ReplayBuffer(object):
def __init__(self, n_states, n_actions, max_size=int(1e6)):
self.max_size = max_size
#经验回放的最大大小
self.ptr = 0
#当前指针位置(下一个push进来的值的坐标)
self.size = 0
#当前经验回放的大小
self.state = np.zeros((max_size, n_states))
self.action = np.zeros((max_size, n_actions))
self.next_state = np.zeros((max_size, n_states))
self.reward = np.zeros((max_size, 1))
self.not_done = np.zeros((max_size, 1))
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def push(self, state, action, next_state, reward, done):
#将transition放入经验回放中
self.state[self.ptr] = state
self.action[self.ptr] = action
self.next_state[self.ptr] = next_state
self.reward[self.ptr] = reward
self.not_done[self.ptr] = 1. - done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample(self, batch_size):
#随机采样batch_size个transition
ind = np.random.randint(0, self.size, size=batch_size)
return (
torch.FloatTensor(self.state[ind]).to(self.device),
torch.FloatTensor(self.action[ind]).to(self.device),
torch.FloatTensor(self.next_state[ind]).to(self.device),
torch.FloatTensor(self.reward[ind]).to(self.device),
torch.FloatTensor(self.not_done[ind]).to(self.device)
)
4 utils.py
import os
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
4.1 plot_rewards
def plot_rewards(rewards, ma_rewards, plot_cfg, tag='train'):
sns.set()
plt.figure()
plt.title("learning curve on {} of {}".format(plot_cfg.device, 'TD3'))
plt.xlabel('epsiodes')
plt.plot(rewards, label='rewards')
plt.plot(ma_rewards, label='ma rewards')
#分别画出奖励和滑动奖励的图
plt.legend()
if plot_cfg.save:
plt.savefig(plot_cfg.result_path+"{}_rewards_curve".format(tag))
plt.show()
4.2 save_results
def save_results(rewards, ma_rewards, tag='train', path='./results'):
''' 保存奖励
'''
np.save(path+'{}_rewards.npy'.format(tag), rewards)
np.save(path+'{}_ma_rewards.npy'.format(tag), ma_rewards)
print('结果保存完毕!')4.3 make_dir
def make_dir(*paths):
''' 创建文件夹
'''
for path in paths:
Path(path).mkdir(parents=True, exist_ok=True)5 输出结果

边栏推荐
- DNA modified zinc oxide | DNA modified gold nanoparticles | DNA coupled modified carbon nanomaterials
- 工控用Web组态软件比组态软件更高效
- Unittest suite and runner
- CASS11.0.0.4 for AutoCAD2010-2023免狗使用方法
- 基于SSM医院预约管理系统
- Fix the problem that the paging data is not displayed when searching the easycvr device management list page
- Talk about multimodality of fire
- Deepsort工作原理分析
- Derivative, partial derivative and gradient
- deepsort源码解读(六)
猜你喜欢

Summary of APP launch in vivo application market

How can chrome quickly transfer a group of web pages (tabs) to another device (computer)

Significance of NVIDIA SMI parameters

强网杯2021 pwn 赛题解析——babypwn

How to delete or replace the loading style of easyplayer streaming media player?

Book borrowing management system based on SSM

Watermelon book learning notes - Chapter 4 decision tree

基于SSM图书借阅管理系统

MySQL的基本语句(1)—增删改查

内部类--看这篇就懂啦~
随机推荐
ZnS-DNA QDs近红外硫化锌ZnS量子点改性脱氧核糖核酸DNA|DNA修饰ZnS量子点
基于SSM音乐网站管理系统
Hospital reservation management system based on SSM
deepsort源码解读(四)
Consideration on how the covariance of Kalman filter affects the tracking effect of deepsort
DNA (deoxyribonucleic acid) supply | carbon nanotube nucleic acid loaded dna/rna material | dna/rna nucleic acid modified magnetic nanoparticles
Unittest suite and runner
PNA修饰多肽ARMS-PNA|PNA-DNA|suc-AAPF-pNA|Suc-(Ala)3-pNA
Digital image processing Chapter 1 Introduction
Watermelon book learning Chapter 5 --- neural network
Sok: the faults in our asrs: an overview of attacks against automatic speech recognition
Analysis on the current situation and optimization strategy of customer experience management in banking industry
How to make the minimum API bind the array in the query string
Boostrap
Gbase 8C product introduction
Customer cases | focus on process experience to help bank enterprise app iteration
Norms of vectors and matrices
AI:业余时间打比赛—挣它个小小目标—【阿里安全×ICDM 2022】大规模电商图上的风险商品检测比赛
Student achievement management system based on SSM
Derivative, partial derivative and gradient