服务好的武进网站建设,中搜网站提交,手机网站模板 网址,开平小学学生做平网站文章目录 强化学习概念实现qLearning基于这个思路,那么解决这个问题的代码如下 强化学习概念
强化学习有一个非常直观的表现#xff0c;就是从出发点到目标之间存在着一个连续的状态转换#xff0c;比如说从状态一到状态456#xff0c;而每一个状态都有多种的行为#xff… 文章目录 强化学习概念实现qLearning基于这个思路,那么解决这个问题的代码如下 强化学习概念
强化学习有一个非常直观的表现就是从出发点到目标之间存在着一个连续的状态转换比如说从状态一到状态456而每一个状态都有多种的行为这些行为会有相应的惩罚和奖励。比如走迷宫问题,每上下左右,或者静止走一步都距离出口进入了不同的状态 我们可以根据每一步进入的状态得到不同的奖励,从而找到出口.最后就得到了我们无论进入哪一个状态而预知的有益的下一步行动的奖励表。 这样就叫做环境状态感知,将一个智能体随机放入当前环境任何状态,他都可以根据学到的决策表选择最佳行动 接下来我们来实现这个走迷宫的问题。来具体的实践 qlearning的基础思想。
实现
状态矩阵,如下矩阵中的每一行都代表了从出发点到目标之间的的每一个状态,分别是状态一,状态二状态三状态4而每一个状态都有5种行动分别是上下左右静止,这个是四个状态上下左右行为的状态奖励表,0表示行动不可以进行,-1表示行动可以进行但是不是终点10则表示是终点。-10是陷阱 解读第一行就是状态一无法向上,得到0分,向下扣10分,无法向左得0分,向右扣1分,静止扣1分
reward np.array([[0, -10, 0, -1, -1],[0, 10, -1, 0, -1],[-1, 0, 0, 10, -10],[-1, 0, -10, 0, 10]])状态转移矩阵,这个矩阵表示的是,哪一个状态可以向哪一个状态进行迁移,分别是上下左右静止所进入的状态,负一表示无法进入新的状态,零表示进入状态0,一表示进入状态一二表示进入状态二,三表示进入状态三。 对应这四个状态的五种行动,分别会进入到什么状态中 解读第一行就是状态一向上无法进入新状态,向下进入状态2,向左无法进入新状态,向右进入状态1,静止待在状态0
transition_matrix np.array([[-1, 2, -1, 1, 0],[-1, 3, 0, -1, 1],[0, -1, -1, 3, 2],[1, -1, 2, -1, 3]])有效行动矩阵这个矩阵表示了每一个状态可行的行为。对应上面的状态转移矩阵。由此智能体无论落在哪一个状态他都可以选择有效的行动。
valid_actions np.array([[1, 3, 4],[1, 2, 4],[0, 3, 4],[0, 2, 4]])综上所述我们定义了,智能体从出发到终点,每一个状态的有效的行为,以及智能体如何进行状态转移。那么我们接下来所要求的是智能体无论落在哪一个状态他都要快速而有效的找到终点,得到每一个状态有效的行动奖励表 我们初始化这个行动表,并设置收益系数为0.8,即是用来计算智能体在他可预见的行为里每一个行为可以得到的收益.
q_matrix np.zeros((4, 5))
gamma 0.8然后我们随机将智能体放在任意的状态上并让他随机地选择行动我们来查看一下, Ta.所统计出的行动奖励表以及每一个行动以及所得到的收益。
for i in range(10):start_state np.random.choice([0, 1, 2], size1)[0] # 随机初始起点print(start_state:{}.format(start_state))current_state start_statewhile current_state ! 3: # 判断是否到达终点action random.choice(valid_actions[current_state]) # greedy 随机选择当前状态下的有效动作next_state transition_matrix[current_state][action] # 通过选择的动作得到下一个状态future_rewards []for action_nxt in valid_actions[next_state]:future_rewards.append(q_matrix[next_state][action_nxt]) # 得到下一个状态所有可能动作的奖励q_state reward[current_state][action] gamma * max(future_rewards) # bellman equationq_matrix[current_state][action] q_state # 更新 q 矩阵current_state next_state # 将下一个状态变成当前状态print(episode: {}, q matrix: \n{}.format(i, q_matrix))print()我们上面这个程序是让每一个智能体随机的走直到走出终点为止然后对他所走过的路给出评分评价。我们可以先看到,第一次智能体落在了状态0,它尝试向右走,得到了-1分来到了状态1,然后向左走来到状态0,又是-1分,从状态0又向下走到状态2,得到了-10分,然后选择静止一次,扣了10分,最后从状态2向右走到终点状态3,得到10分,然后结束第一次尝试,更新记忆表 我怎么看出来的? 我已经说了,每行代表的是每个状态,每列代表的是上下左右静止5个动作,你从状态0去看,看看走哪个状态会得到-10和-1,结合那个迷宫走向,根据得分就可以看出来
qLearning
从上面的基础模型中我们知道,如果要寻找最佳行动策略,需要的事件数据有,当前环境状态,可以进行的行为,当前环境每个行为的下一步奖励,最后得到一个状态奖励表,表示从这个状态里如何走才是最佳策略 问题是,现实世界不简单啊,前面迷宫的状态是有限的,格子总是有限的,动作和状态还可以录入,但是现实中状态太多了. 我们将走迷宫问题改成下棋问题比如围棋我们每走一步都有几百种落点可供选择预示着几百个落点后棋盘会处于多少种不同的状态然后再次走一步继续是几百种不同的状态我们要求ai下棋赢过我们,让ai可以估计接下来的10步如何走才能形成优势,让ai记忆棋谱是行不通的,状态太多了,状态可以逼近了万亿种状态,几乎无穷无尽 如果可以让ai自己探索,自己去学习和观察其中有益的动作状态,自己给自己喂数据,自己训练自己,最后只根据结果完成的好不好来判定,那么就好了 神经网络则提出了一种新的方式,不必知道所有的环境状态,只需要让神经网络学习曾经经历过的有益的状态就可以了,就像人类一样,形成经验,我们可以把当前围棋环境参数输入神经网络,让神经网络自主学习如何走出下一步,估计下一步得分,只要对局足够多,神经网络就会学习的足够多,它就会根据有益的下棋经历,形成一套自己的经验价值奖励表,至于其从对局中学习到了什么,我们是无从得知的,但是从实际结果上看,从aphaGO击败柯洁后,几乎围棋领域已经无人敢称尊了 那么我们不必告诉神经网络我们的环境中有多少种状态,我们只需要告诉他环境参数是如何的,有几种行动就可以了,让它自己找到最佳策略 接下来我们来解决一下gym里经典的摆锤问题: 有一辆小车和一个竖杆,初始状态下,杆子是倾斜的,要求智能体左右移动小车让竖杆保持直立 如果竖杆偏移小车中心2个单位智能体就会失败,相反杆子持续竖着,智能体坚持时间越长奖励越多 那么接下来就需要设计一下,如何让神经网络完成这个目标 先说一下目标,我们希望得到一个模型,这个模型可以根据环境状态给出每个行动状态的下一步状态的奖励预测,从而选择最佳状态去行动 那么我们这个神经网络学习的目标就是如何准确预测当前状态采取的每个行动所得到的奖励,即是q值,这是我们的优化目标
正常来说,需要一个神经网络A,根据当前的环境数据,动作数据,以及跟随动作的下一步环境状态反馈奖励数据,让这个A不断的自我学习,预测出的行动奖励和真实的环境奖励贴近就可以了 但是实际训练中,往往不会直接使用真实的环境奖励,而是采取了缓步更新的方式 这种方式是这样的: 策略网络A:根据当前环境给出估计的下一步最优行动,给出估计奖励值Q1 目标网络B:根据当前环境给出下一步行动的估计奖励值Q2 A与B的网络结构一样,A负责接收环境数据进行训练更新参数, 目标网络B不进行训练,其参数来自于A的复制,但是其估算值Q2和当前环境下一步实际奖励值Q进行一起计算得到Q3,Q3用于对A进行训练,作为A的优化目标值进行A的网络结构更新 B的参数随着A的训练缓步更新,B的参数复制更新总是迟滞于A,Q1向着Q3优化 这样使用同一个网络结构,B来引导A进行训练 这主要是基于以下几个问题考虑: 1.稳定性使用目标网络可以提供一种稳定的目标值来更新在策略网络的Q值。如果直接使用策略网络的Q值来更新那么每次更新都可能导致目标值发生变化这会使得学习过程更加不稳定。通过使用目标网络我们可以确保在一段时间内目标值是稳定的这有助于算法的收敛。 2.减少过拟合目标网络的作用类似于深度学习中的dropout或正则化技术它可以减少在线网络对特定样本的过拟合。如果每次更新都直接依赖于当前策略网络的输出那么策略网络可能会过于拟合当前的数据导致泛化能力下降。 3.解耦目标和当前策略使用目标网络可以解耦目标Q值的计算和当前策略的选择。这意味着我们可以独立地更新目标网络而不需要在每个时间步都重新计算目标Q值。这提高了算法的效率。 4.平滑学习目标网络通过保持一定的稳定性使得学习过程更加平滑。在每次更新时我们不会引入太大的变化这有助于算法逐步逼近最优解。 5.避免陷入局部最优通过引入目标网络算法能够在一定程度上避免陷入局部最优解。因为目标网络提供的目标值是基于过去的策略和Q值估计这有助于引导策略网络探索更多的状态空间
基于这个思路,那么解决这个问题的代码如下
import time
import matplotlib.pyplot as plt
from IPython import display
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymfrom gym import wrappersenv gym.make(CartPole-v1,render_modehuman)
# set up matplotlib
is_ipython inline in matplotlib.get_backend()
if is_ipython:from IPython import display
# 开启交互式绘图模式。动态展示训练过程
plt.ion()# if GPU is to be used
device torch.device(cuda if torch.cuda.is_available() else cpu)
# 命名元组,状态转移变量,state: 表示当前状态。
# action: 表示采取的动作。
# next_state: 表示转移到的下一个状态。
# reward: 表示从当前状态到下一个状态的奖励
Transition namedtuple(Transition,(state, action, next_state, reward))# 经验回放内存类
class ReplayMemory(object):# 经验回放内存的容量。双向队列长度,maxlen 参数限制了队列的最大长度确保内存不会无限增长def __init__(self, capacity):self.memory deque([], maxlencapacity)# 保存一次状态转移,添加到内存队列def push(self, *args):Save a transitionself.memory.append(Transition(*args))# 从内存中随机采样一批状态转移def sample(self, batch_size):return random.sample(self.memory, batch_size)# 获取内存中状态转移的数量def __len__(self):return len(self.memory)# 这个 DQN 模型是一个具有两个隐藏层的前馈神经网络用于近似 Q-值函数
class DQN(nn.Module):# n_observations表示输入状态的维度观测值的数量。# n_actions表示输出动作的维度动作的数量def __init__(self, n_observations, n_actions):super(DQN, self).__init__()# 创建了一个线性层nn.Linear将输入状态映射到一个具有 128 个神经元的隐藏层。self.layer1 nn.Linear(n_observations, 128)# 创建了第二个具有 128 个神经元的隐藏层self.layer2 nn.Linear(128, 128)# 创建了一个线性层将隐藏层的输出映射到动作空间的维度。self.layer3 nn.Linear(128, n_actions)# Called with either one element to determine next action, or a batch# during optimization. Returns tensor([[left0exp,right0exp]...]).# 调用这个方法来获取模型的输出。# 将输入 x 通过 ReLU 激活函数传递给第一个隐藏层然后再传递给第二个隐藏层。最后我们返回输出层的结果表示不同动作的预期值。def forward(self, x):x F.relu(self.layer1(x))x F.relu(self.layer2(x))return self.layer3(x)# 每次训练时从经验回放内存中随机选择的状态转移的数量
BATCH_SIZE 128
# 计算未来奖励的折现值。较大的折扣因子意味着更重视未来奖励
GAMMA 0.99
# ε-贪心策略 中的参数。在训练初期智能体更倾向于探索新的动作ε 较大。
# 随着训练的进行它逐渐减少探索更倾向于选择当前估计最优的动作ε 较小。
EPS_START 0.9
EPS_END 0.05
# 探索率ε的衰减速率。每隔一定步数ε 会减小一次以平衡探索和利用。
EPS_DECAY 1000
# 目标网络 更新的参数。目标网络用于计算目标 Q-值通过软更新滑动平均来更新。较小的 TAU 值意味着更频繁的更新目标网络。
TAU 0.005
# 调整模型权重的更新步长
LR 1e-4
# 获取了 OpenAI Gym 环境中的动作数量n_actions然后获取了状态观测值的数量n_observations
# Get number of actions from gym action space
n_actions env.action_space.n
# Get the number of state observations
state, info env.reset()
n_observations len(state)
# 主要模型用于预测每个动作的 Q 值。它接受状态观测值作为输入并输出每个动作的预期值。
policy_net DQN(n_observations, n_actions).to(device)
# 目标网络用于计算目标 Q 值。在训练过程中它的参数会被软更新滑动平均以稳定训练。
target_net DQN(n_observations, n_actions).to(device)
# 用于将 policy_net 的参数加载到 target_net 中确保两者初始状态相同。可以使用 policy_net 进行训练和推断同时使用 target_net 计算目标 Q 值
target_net.load_state_dict(policy_net.state_dict())
# 使用了 AdamW 优化器来训练模型并创建了一个经验回放内存用于存储状态转移数据
optimizer optim.AdamW(policy_net.parameters(), lrLR, amsgradTrue)
memory ReplayMemory(1000)steps_done 0# 负责根据ε-贪心策略选择动作
# 探索和利用之间的平衡由**探索率epsilon**控制。
# epsilon 越大越倾向于探索随机选择动作。
# epsilon 值随着时间衰减以鼓励智能体在学习过程中更多地进行利用。
def select_action(state):global steps_done# 生成一个介于 0 和 1 之间的随机样本。sample random.random()# 根据当前 epsilon 值计算探索阈值。eps_threshold EPS_END (EPS_START - EPS_END) * \math.exp(-1. * steps_done / EPS_DECAY)steps_done 1# 如果样本大于阈值选择具有最高 Q 值的动作利用。if sample eps_threshold:with torch.no_grad():# t.max(1) will return the largest column value of each row.# second column on max result is index of where max element was# found, so we pick action with the larger expected reward.return policy_net(state).max(1).indices.view(1, 1)# 否则从动作空间中随机采样一个动作探索。else:return torch.tensor([[env.action_space.sample()]], devicedevice, dtypetorch.long)episode_durations []# 绘制训练过程中的回合持续时间Duration,用于绘图
def plot_durations(show_resultFalse):# 创建一个名为 plt.figure(1) 的图形。plt.figure(1)# 将回合持续时间episode_durations转换为张量 durations_t。durations_t torch.tensor(episode_durations, dtypetorch.float)# 如果 show_result 为真标题设置为“Result”否则标题设置为“Training…”。if show_result:plt.title(Result)else:plt.clf()plt.title(Training...)# 横轴表示回合数纵轴表示持续时间。plt.xlabel(Episode)plt.ylabel(Duration)plt.plot(durations_t.numpy())# 绘制了回合持续时间的折线图并计算了最近 100 个回合的平均值。如果回合数超过 100还会绘制平均值的折线图。# 帮助我们可视化训练过程中的回合持续时间# Take 100 episode averages and plot them tooif len(durations_t) 100:means durations_t.unfold(0, 100, 1).mean(1).view(-1)means torch.cat((torch.zeros(99), means))plt.plot(means.numpy())plt.pause(0.001) # pause a bit so that plots are updatedif is_ipython:if not show_result:display.display(plt.gcf())display.clear_output(waitTrue)else:display.display(plt.gcf())# Deep Q-Network (DQN) 训练过程中的关键部分
def optimize_model():# 如果回放内存的大小小于批次大小 (BATCH_SIZE)则提前返回不进行优化if len(memory) BATCH_SIZE:return# 否则从回放内存中随机采样一批过渡数据。transitions memory.sample(BATCH_SIZE)# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for# detailed explanation). This converts batch-array of Transitions# to Transition of batch-arrays.# 转置批处理(参见https://stackoverflow.com/a/19343/3343043# 详细解释)。这将转换转换的批处理数组# to批量数组的转换。batch Transition(*zip(*transitions))# Compute a mask of non-final states and concatenate the batch elements# (a final state wouldve been the one after which simulation ended)# 计算非最终状态的掩码并连接批处理元素# (最终状态是模拟结束后的状态)non_final_mask torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)), devicedevice, dtypetorch.bool)non_final_next_states torch.cat([s for s in batch.next_stateif s is not None])state_batch torch.cat(batch.state)action_batch torch.cat(batch.action)reward_batch torch.cat(batch.reward)# 使用 policy_net 计算当前状态-动作对的 Q 值。# Compute Q(s_t, a) - the model computes Q(s_t), then we select the# columns of actions taken. These are the actions which wouldve been taken# for each batch state according to policy_net# 计算Q(s_t, a) -模型计算Q(s_t)然后我们选择# 所采取的行动列。这些都是应该采取的行动# 根据policy_net对应每个批处理状态state_action_values policy_net(state_batch).gather(1, action_batch)# Compute V(s_{t1}) for all next states.# Expected values of actions for non_final_next_states are computed based# on the older target_net; selecting their best reward with max(1).values# This is merged based on the mask, such that well have either the expected# state value or 0 in case the state was final.# 计算所有下一个状态的V(s_{t1})# non_final_next_states的动作期望值是基于计算的# 在“旧的”target_net上;选择最大(1)个值的最佳奖励# 这是基于掩码合并的这样我们就会有预期的# state值如果状态为final则为0。# 使用 target_net 计算下一状态的预期 Q 值。next_state_values torch.zeros(BATCH_SIZE, devicedevice)with torch.no_grad():next_state_values[non_final_mask] target_net(non_final_next_states).max(1).values# Compute the expected Q values# 计算预期 Q 值与当前 Q 值之间的均方误差作为损失。通过反向传播更新 policy_net 的参数。# 这个函数使用从回放内存中采样的一批过渡数据来优化 policy_netexpected_state_action_values (next_state_values * GAMMA) reward_batch# SmoothL1Loss 是一种用于回归任务的损失函数它在绝对误差和平方误差之间取得平衡。# 在 DQN 训练中我们使用它来计算 Q 值的损失。这段代码计算了损失并通过反向传播更新了模型的参数# Compute Huber losscriterion nn.SmoothL1Loss()loss criterion(state_action_values, expected_state_action_values.unsqueeze(1))# Optimize the modeloptimizer.zero_grad()loss.backward()# In-place gradient clippingtorch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)optimizer.step()if torch.cuda.is_available():num_episodes 6000
else:num_episodes 500for i_episode in range(num_episodes):# Initialize the environment and get its state# 初始化环境并获取初始状态。env.render()state, info env.reset()state torch.tensor(state, dtypetorch.float32, devicedevice).unsqueeze(0)for t in count():# 在每个回合中根据当前状态选择动作。action select_action(state)# 执行动作获取奖励和下一状态。observation, reward, terminated, truncated, _ env.step(action.item())reward torch.tensor([reward], devicedevice)done terminated or truncatedif terminated:next_state Noneelse:next_state torch.tensor(observation, dtypetorch.float32, devicedevice).unsqueeze(0)# 将过渡数据存储到回放内存中。# Store the transition in memorymemory.push(state, action, next_state, reward)# Move to the next statestate next_state# Perform one step of the optimization (on the policy network)optimize_model()# 执行一步优化更新策略网络的参数。# 软更新目标网络的权重。# Soft update of the target networks weights# θ′ ← τ θ (1 −τ )θ′target_net_state_dict target_net.state_dict()policy_net_state_dict policy_net.state_dict()for key in policy_net_state_dict:target_net_state_dict[key] policy_net_state_dict[key] * TAU target_net_state_dict[key] * (1 - TAU)target_net.load_state_dict(target_net_state_dict)# 如果回合结束记录回合持续时间并绘制图表。if done:episode_durations.append(t 1)plot_durations()print(Episode finished after {} timesteps.format(t 1))breakprint(Complete)
plot_durations(show_resultTrue)
plt.ioff()
plt.show()