In [ ]:
#-------------------------------------------------------------------------
#
# Code by J.D. Correa obaozai@astropema.com www.astropema.com March 2025
#
#-------------------------------------------------------------------------
import gym
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import matplotlib.pyplot as plt
import imageio # For video saving
# Set up CartPole environment
env = gym.make('CartPole-v1')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# **Hyperparameters**
GAMMA = 0.99 # Discount factor for future rewards
LR = 5e-4 # Learning rate
BATCH_SIZE = 128 # Number of samples per training batch
MEMORY_SIZE = 20000 # Experience replay memory capacity
EPSILON_START = 1.0 # Initial exploration rate
EPSILON_END = 0.01 # Minimum exploration rate
EPSILON_DECAY = 1000 # Rate of decay for epsilon
TARGET_UPDATE = 5 # Number of episodes before updating target network
NUM_EPISODES = 1000 # Total training episodes
MAX_STEPS = 500 # Maximum steps per episode
def epsilon_by_frame(frame_idx):
"""Calculate decayed epsilon value for exploration-exploitation tradeoff."""
return EPSILON_END + (EPSILON_START - EPSILON_END) * np.exp(-1. * frame_idx / EPSILON_DECAY)
# **Dueling Deep Q-Network (Dueling DQN)**
class DuelingDQN(nn.Module):
"""Dueling DQN with separate Advantage and Value networks for improved stability."""
def __init__(self, input_dim, output_dim):
super(DuelingDQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 256) # First hidden layer
self.fc2 = nn.Linear(256, 256) # Second hidden layer
# Separate advantage and value streams
self.advantage = nn.Linear(256, output_dim)
self.value = nn.Linear(256, 1)
def forward(self, x):
"""Forward pass of the network."""
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
adv = self.advantage(x)
val = self.value(x)
return val + (adv - adv.mean()) # Combine streams
# **Replay Memory for Experience Replay**
class ReplayBuffer:
"""Stores past experiences (state, action, reward, next_state, done) for training."""
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
"""Save a transition experience to memory."""
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
"""Randomly sample a batch of experiences."""
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return np.array(states), actions, rewards, np.array(next_states), dones
def __len__(self):
return len(self.buffer)
# **Initialize Networks**
num_actions = env.action_space.n # Number of possible actions
obs_dim = env.observation_space.shape[0] # Observation space size
# Main Policy Network
policy_net = DuelingDQN(obs_dim, num_actions).to(device)
# Target Network (used for stability)
target_net = DuelingDQN(obs_dim, num_actions).to(device)
target_net.load_state_dict(policy_net.state_dict()) # Sync target network
target_net.eval()
# Optimizer and Experience Replay Memory
optimizer = optim.Adam(policy_net.parameters(), lr=LR)
memory = ReplayBuffer(MEMORY_SIZE)
# **Training Loop**
epsilon = EPSILON_START # Initialize exploration rate
episode_rewards = [] # Store rewards per episode
for episode in range(NUM_EPISODES):
state = env.reset()
state = state[0] # Adjust for Gym 0.26+
episode_reward = 0 # Track total episode reward
for t in range(MAX_STEPS):
epsilon = epsilon_by_frame(episode * MAX_STEPS + t) # Decay epsilon
if random.random() < epsilon:
action = env.action_space.sample() # Random action (exploration)
else:
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
with torch.no_grad():
action = policy_net(state_tensor).max(1)[1].item() # Best action (exploitation)
next_state, reward, done, _, _ = env.step(action) # Step environment
memory.push(state, action, reward, next_state, done) # Store experience
state = next_state
episode_reward += reward
if done:
break # End episode if terminal state is reached
# **Train Model if Enough Samples are Collected**
if len(memory) > BATCH_SIZE:
states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)
states = torch.FloatTensor(states).to(device)
next_states = torch.FloatTensor(next_states).to(device)
actions = torch.LongTensor(actions).to(device)
rewards = torch.FloatTensor(rewards).to(device)
dones = torch.FloatTensor(dones).to(device)
# Compute Q-values and target Q-values
q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
next_q_values = target_net(next_states).max(1)[0]
expected_q_values = rewards + GAMMA * next_q_values * (1 - dones)
# Compute loss and update model
loss = F.mse_loss(q_values, expected_q_values.detach())
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Update Target Network every few episodes
if episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
episode_rewards.append(episode_reward)
print(f"Episode {episode}: Reward: {episode_reward:.2f} Epsilon: {epsilon:.4f}")
# Save trained model
torch.save(policy_net.state_dict(), "cartpole_dqn.pth")
print("Model saved as 'cartpole_dqn.pth'.")
env.close()
# **Plot Training Progress**
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('CartPole Training Progress')
plt.show()
# **Print Training Summary**
print("\nTraining Summary:")
print(f"Total Episodes: {NUM_EPISODES}")
print(f"Average Reward: {np.mean(episode_rewards):.2f}")
print(f"Best Episode Reward: {np.max(episode_rewards):.2f}")
print(f"Final Episode Reward: {episode_rewards[-1]:.2f}")
# **Visualizing Trained Agent and Saving Video**
def visualize_agent():
"""Runs the trained model and saves a video."""
env = gym.make('CartPole-v1', render_mode='rgb_array')
state = env.reset()
state = state[0]
frames = [] # Store frames for video
for _ in range(MAX_STEPS):
frame = env.render()
frames.append(frame) # Capture frame for video
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
action = policy_net(state_tensor).max(1)[1].item()
next_state, _, done, _, _ = env.step(action)
state = next_state
if done:
break
env.close()
# **Save the video**
video_filename = "cartpole_simulation.mp4"
imageio.mimsave(video_filename, frames, fps=30)
print(f"Video saved as {video_filename}")
# Call visualize_agent() to watch trained model and save video
visualize_agent()