In [ ]:
#-------------------------------------------------------------------------
#
# Code by J.D. Correa obaozai@astropema.com www.astropema.com March 2025
#
#-------------------------------------------------------------------------
import gym
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import matplotlib.pyplot as plt
import imageio # For saving video
# Fix for numpy bool8 issue
if not hasattr(np, "bool8"):
np.bool8 = np.bool_
# Set up LunarLander environment
env = gym.make('LunarLander-v2')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Hyperparameters
GAMMA = 0.99
LR = 5e-4
BATCH_SIZE = 128
MEMORY_SIZE = 50000
EPSILON_START = 1.0
EPSILON_END = 0.01
EPSILON_DECAY = 5000
TARGET_UPDATE = 5
NUM_EPISODES = 1000
MAX_STEPS = 1000 # Steps per episode
# Epsilon decay function
def epsilon_by_frame(frame_idx):
return EPSILON_END + (EPSILON_START - EPSILON_END) * np.exp(-1. * frame_idx / EPSILON_DECAY)
# Dueling DQN class
class DuelingDQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DuelingDQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.advantage = nn.Linear(256, output_dim)
self.value = nn.Linear(256, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
adv = self.advantage(x)
val = self.value(x)
return val + (adv - adv.mean())
# Experience Replay Buffer
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return np.array(states), actions, rewards, np.array(next_states), dones
def __len__(self):
return len(self.buffer)
# Initialize Networks
num_actions = env.action_space.n
obs_dim = env.observation_space.shape[0]
policy_net = DuelingDQN(obs_dim, num_actions).to(device)
target_net = DuelingDQN(obs_dim, num_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.Adam(policy_net.parameters(), lr=LR)
memory = ReplayBuffer(MEMORY_SIZE)
episode_rewards = []
epsilon = EPSILON_START
# Training loop
for episode in range(NUM_EPISODES):
state = env.reset()
state = state[0]
episode_reward = 0
for t in range(MAX_STEPS):
epsilon = epsilon_by_frame(episode * MAX_STEPS + t)
if random.random() < epsilon:
action = env.action_space.sample()
else:
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
with torch.no_grad():
action = policy_net(state_tensor).max(1)[1].item()
next_state, reward, done, _, _ = env.step(action)
memory.push(state, action, reward, next_state, done)
state = next_state
episode_reward += reward
if done:
break
if len(memory) > BATCH_SIZE:
states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)
states = torch.FloatTensor(states).to(device)
next_states = torch.FloatTensor(next_states).to(device)
actions = torch.LongTensor(actions).to(device)
rewards = torch.FloatTensor(rewards).to(device)
dones = torch.FloatTensor(dones).to(device)
q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
next_q_values = target_net(next_states).max(1)[0]
expected_q_values = rewards + GAMMA * next_q_values * (1 - dones)
loss = F.mse_loss(q_values, expected_q_values.detach())
optimizer.zero_grad()
loss.backward()
optimizer.step()
if episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
episode_rewards.append(episode_reward)
print(f"Episode {episode}: Reward: {episode_reward:.2f} Epsilon: {epsilon:.4f}")
# Save trained model
torch.save(policy_net.state_dict(), "lunar_lander_dqn.pth")
print("Model saved as 'lunar_lander_dqn.pth'.")
env.close()
# Plot rewards
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Lunar Lander Training Progress')
plt.show()
# Training Summary
print("\nTraining Summary:")
print(f"Total Episodes: {NUM_EPISODES}")
print(f"Average Reward: {np.mean(episode_rewards):.2f}")
print(f"Best Episode Reward: {np.max(episode_rewards):.2f}")
print(f"Final Episode Reward: {episode_rewards[-1]:.2f}")
# Visualization of trained agent with video saving
def visualize_agent():
env = gym.make('LunarLander-v2', render_mode='rgb_array')
state = env.reset()
state = state[0]
frames = [] # Store frames for video
for _ in range(1000):
frame = env.render()
frames.append(frame) # Capture frame for video
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
action = policy_net(state_tensor).max(1)[1].item()
next_state, _, done, _, _ = env.step(action)
state = next_state
if done:
break
env.close()
# Save the video
video_filename = "lunar_lander_simulation.mp4"
imageio.mimsave(video_filename, frames, fps=30)
print(f"Video saved as {video_filename}")
# Call visualize_agent() to watch trained model and save video
visualize_agent()