加入星计划,您可以享受以下权益:

  • 创作内容快速变现
  • 行业影响力扩散
  • 作品版权保护
  • 300W+ 专业用户
  • 1.5W+ 优质创作者
  • 5000+ 长期合作伙伴
立即加入
  • 正文
    • 安装和导入所需的库和环境
    • Q网络搭建
    • 经验回放实现
    • DQNAgent实现
    • 训练
  • 推荐器件
  • 相关推荐
  • 电子产业图谱
申请入驻 产业图谱

基于DQN和TensorFlow的LunarLander实现(全代码)

01/30 13:52
2544
阅读需 19 分钟
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

使用深度Q网络(Deep Q-Network, DQN)来训练一个在openai-gym的LunarLander-v2环境中的强化学习agent,让小火箭成功着陆。

下面代码直接扔到jupyter notebook或CoLab上就能跑起来。

安装和导入所需的库和环境

安装和设置所需的库和环境,使其能够在Jupyter Notebook中运行。

!pip install gym
!apt-get install xvfb -y
!pip install pyvirtualdisplay #用于在没有显示器的环境中创建虚拟显示
!pip install Pillow #一个图像处理
!pip install swig
!pip install "gym[box2d]"

创建并启动一个虚拟显示,在没有图形界面的服务器上运行强化学习环境:

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

引入所需库:

import gym
import time
import tqdm
import numpy as np
from IPython import display as ipydisplay
from PIL import Image

创建一个LunarLander-v2环境的DQN代理:

agent = DQNAgent('LunarLander-v2')

total_score, records = agent.simulate(visualize=True)
print(f'Total score {total_score:.2f}')
record_list = []
for i in tqdm.tqdm(range(100)):
total_score, _ = agent.simulate(visualize=False)
record_list.append(total_score)

print(f'Average score in 100 episode {np.mean(record_list):.2f}')

Q网络搭建

import tensorflow as tf

L = tf.keras.layers

def create_network_model(input_shape: np.ndarray,
action_space: np.ndarray,
learning_rate=0.001) -> tf.keras.Sequential:
model = tf.keras.Sequential([
L.Dense(512, input_shape=input_shape, activation="relu"),
L.Dense(256, input_shape=input_shape, activation="relu"),
L.Dense(action_space)
])
model.compile(loss="mse",
optimizer=tf.optimizers.Adam(lr=learning_rate))
return model

经验回放实现

经验回放是一种在深度强化学习中常用的技术,主要用于打破数据的相关性和减少过拟合。

在强化学习中,代理通常会在训练过程中与环境进行大量交互,经验回放允许代理存储这些经验,并在后续的训练中反复利用这些数据。这种机制有助于改善学习效率,减少数据样本间的时间相关性,提高训练过程的稳定性。

import random
import numpy as np
from collections import namedtuple

# 代表每一个样本的 namedtuple,方便存储和读取数据
Experience = namedtuple('Experience', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory:

def __init__(self, max_size):
self.max_size = max_size
self.memory = []

def append(self, state, action, reward, next_state, done):
"""记录一个新的样本"""
sample = Experience(state, action, reward, next_state, done)
self.memory.append(sample)
# 只留下最新记录的 self.max_size 个样本
self.memory = self.memory[-self.max_size:]

def sample(self, batch_size):
"""按照给定批次大小取样"""
samples = random.sample(self.memory, batch_size)
batch = Experience(*zip(*samples))

# 转换数据为 numpy 张量返回
states = np.array(batch.state)
actions = np.array(batch.action)
rewards = np.array(batch.reward)
states_next = np.array(batch.next_state)
dones = np.array(batch.done)

return states, actions, rewards, states_next, dones

def __len__(self):
return len(self.memory)

DQNAgent实现

DQNAgent类是DQN算法的核心实现。它包含以下关键部分:

1、初始化:初始化环境、神经网络模型和经验回放缓存。
2、行为选择(choose_action):根据当前状态和ε-greedy策略选择行为。
3、经验回放(replay):从记忆中随机抽取小批量经验进行学习。
4、训练(train):进行多个episode的训练。

from IPython import display
from PIL import Image

# 定义超参数
LEARNING_RATE = 0.001
GAMMA = 0.99
EPSILON_DECAY = 0.995
EPSILON_MIN = 0.01

class DQNAgent:
def __init__(self, env_name):
self.env = gym.make(env_name)
self.observation_shape = self.env.observation_space.shape
self.action_count = self.env.action_space.n
self.model = create_network_model(self.observation_shape, self.action_count)
self.memory = ReplayMemory(500000)
self.epsilon = 1.0
self.batch_size = 64

def choose_action(self, state, epsilon=None):
"""
根据给定状态选择行为
- epsilon == 0 完全使用模型选择行为
- epsilon == 1 完全随机选择行为
"""
if epsilon is None:
epsilon = self.epsilon
if np.random.rand() < epsilon:
return np.random.randint(self.action_count)
else:
q_values = self.model.predict(np.expand_dims(state, axis=0))
return np.argmax(q_values[0])

def replay(self):
"""进行经验回放学习"""

# 如果当前经验池经验数量少于批次大小,则跳过
if len(self.memory) < self.batch_size:
return

states, actions, rewards, states_next, dones = self.memory.sample(self.batch_size)
q_pred = self.model.predict(states)

q_next = self.model.predict(states_next).max(axis=1)
q_next = q_next * (1 - dones)
q_update = rewards + GAMMA * q_next

indices = np.arange(self.batch_size)
q_pred[[indices], [actions]] = q_update

self.model.train_on_batch(states, q_pred)

def simulate(self, epsilon=None, visualize=True):
records = []
state = self.env.reset()
is_done = False
total_score = 0
total_step = 0
while not is_done:
action = self.choose_action(state, epsilon)
state, reward, is_done, info = self.env.step(action)
total_score += reward
total_step += 1

rgb_array = self.env.render(mode='rgb_array')
records.append((rgb_array, action, reward, total_score))

if visualize:
display.clear_output(wait=True)
img = Image.fromarray(rgb_array)
# 当前 Cell 中展示图片
display.display(img)
print(f'Action {action} Action reward {reward:.2f} | Total score {total_score:.2f} | Step {total_step}')

time.sleep(0.01)
self.env.close()
return total_score, records

def train(self, episode_count: int, log_dir: str):
"""
训练方法,按照给定 episode 数量进行训练,并记录训练过程关键参数到 TensorBoard
"""
# 初始化一个 TensorBoard 记录器
file_writer = tf.summary.create_file_writer(log_dir)
file_writer.set_as_default()

score_list = []
best_avg_score = -np.inf

for episode_index in range(episode_count):
state = self.env.reset()
score, step = 0, 0
is_done = False
while not is_done:
# 根据状态选择一个行为
action = self.choose_action(state)
# 执行行为,记录行为和结果到经验池
state_next, reward, is_done, info = self.env.step(action)
self.memory.append(state, action, reward, state_next, is_done)
score += reward

state = state_next
# 每 6 步进行一次回放训练
# 此处也可以选择每一步回放训练,但会降低训练速度,这个是一个经验技巧
if step % 1 == 0:
self.replay()
step += 1

# 记录当前 Episode 的得分,计算最后 100 Episode 的平均得分
score_list.append(score)
avg_score = np.mean(score_list[-100:])

# 记录当前 Episode 得分,epsilon 和最后 100 Episode 的平均得分到 TensorBoard
tf.summary.scalar('score', data=score, step=episode_index)
tf.summary.scalar('average score', data=avg_score, step=episode_index)
tf.summary.scalar('epsilon', data=self.epsilon, step=episode_index)

# 终端输出训练进度
print(f'Episode: {episode_index} Reward: {score:03.2f} '
f'Average Reward: {avg_score:03.2f} Epsilon: {self.epsilon:.3f}')

# 调整 epsilon 值,逐渐减少随机探索比例
if self.epsilon > EPSILON_MIN:
self.epsilon *= EPSILON_DECAY

# 如果当前平均得分比之前有改善,保存模型
# 确保提前创建目录 outputs/chapter_15
if avg_score > best_avg_score:
best_avg_score = avg_score
self.model.save(f'outputs/chapter_15/dqn_best_{episode_index}.h5')

训练

# 使用 LunarLander 初始化 Agent
agent = DQNAgent('LunarLander-v2')
import glob
# 读取现在已经记录的日志数量,避免日志重复记录
tf_log_index = len(glob.glob('tf_dir/lunar_lander/run_*'))
log_dir = f'tf_dir/lunar_lander/run_{tf_log_index}'

# 训练 2000 个 Episode
agent.train(20, log_dir)

agent.model.summary()

 

推荐器件

更多器件
器件型号 数量 器件厂商 器件描述 数据手册 ECAD模型 风险等级 参考价格 更多信息
FS32K146HAT0MLLT 1 NXP Semiconductors RISC Microcontroller

ECAD模型

下载ECAD模型
$11.53 查看
ATSAMD21G18A-MUT 1 Atmel Corporation RISC Microcontroller, 32-Bit, FLASH, CORTEX-M0 CPU, 48MHz, CMOS, MO-220VKKD-4, QFN-48

ECAD模型

下载ECAD模型
$3.52 查看
MK70FX512VMJ15 1 Freescale Semiconductor Kinetis K 32-bit MCU, ARM Cortex-M4 core, 512KB Flash, 150MHz, Graphics LCD, MAPBGA 256

ECAD模型

下载ECAD模型
$17.99 查看

相关推荐

电子产业图谱