TaherFattahi's picture
init: td3 robot nav
a54010a
from models.TD3.TD3 import TD3
import torch
import numpy as np
from sim import SIM_ENV
from utils import get_buffer
def main(args=None):
"""Main training function"""
action_dim = 2 # number of actions produced by the model
max_action = 1 # maximum absolute value of output actions
state_dim = 25 # number of input values in the neural network (vector length of state input)
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu"
) # using cuda if it is available, cpu otherwise
nr_eval_episodes = 10 # how many episodes to use to run evaluation
max_epochs = 60 # max number of epochs
epoch = 0 # starting epoch number
episodes_per_epoch = 70 # how many episodes to run in single epoch
episode = 0 # starting episode number
train_every_n = 2 # train and update network parameters every n episodes
training_iterations = 80 # how many batches to use for single training cycle
batch_size = 64 # batch size for each training iteration
max_steps = 300 # maximum number of steps in single episode
steps = 0 # starting step number
load_saved_buffer = False # whether to load experiences from assets/data.yml
pretrain = False # whether to use the loaded experiences to pre-train the model (load_saved_buffer must be True)
pretraining_iterations = (
10 # number of training iterations to run during pre-training
)
save_every = 10 # save the model every n training cycles
model = TD3(
state_dim=state_dim,
action_dim=action_dim,
max_action=max_action,
device=device,
save_every=save_every,
load_model=False,
) # instantiate a model
sim = SIM_ENV() # instantiate environment
replay_buffer = get_buffer(
model,
sim,
load_saved_buffer,
pretrain,
pretraining_iterations,
training_iterations,
batch_size,
)
latest_scan, distance, cos, sin, collision, goal, a, reward = sim.step(
lin_velocity=0.0, ang_velocity=0.0
) # get the initial step state
while epoch < max_epochs: # train until max_epochs is reached
state, terminal = model.prepare_state(
latest_scan, distance, cos, sin, collision, goal, a
) # get state a state representation from returned data from the environment
action = model.get_action(np.array(state), True) # get an action from the model
a_in = [
(action[0] + 1) / 4,
action[1],
] # clip linear velocity to [0, 0.5] m/s range
latest_scan, distance, cos, sin, collision, goal, a, reward = sim.step(
lin_velocity=a_in[0], ang_velocity=a_in[1]
) # get data from the environment
next_state, terminal = model.prepare_state(
latest_scan, distance, cos, sin, collision, goal, a
) # get a next state representation
replay_buffer.add(
state, action, reward, terminal, next_state
) # add experience to the replay buffer
if (
terminal or steps == max_steps
): # reset environment of terminal stat ereached, or max_steps were taken
latest_scan, distance, cos, sin, collision, goal, a, reward = sim.reset()
episode += 1
if episode % train_every_n == 0:
model.train(
replay_buffer=replay_buffer,
iterations=training_iterations,
batch_size=batch_size,
) # train the model and update its parameters
steps = 0
else:
steps += 1
if (
episode + 1
) % episodes_per_epoch == 0: # if epoch is concluded, run evaluation
episode = 0
epoch += 1
evaluate(model, epoch, sim, eval_episodes=nr_eval_episodes)
def evaluate(model, epoch, sim, eval_episodes=10):
print("..............................................")
print(f"Epoch {epoch}. Evaluating scenarios")
avg_reward = 0.0
col = 0
goals = 0
for _ in range(eval_episodes):
count = 0
latest_scan, distance, cos, sin, collision, goal, a, reward = sim.reset()
done = False
while not done and count < 501:
state, terminal = model.prepare_state(
latest_scan, distance, cos, sin, collision, goal, a
)
action = model.get_action(np.array(state), False)
a_in = [(action[0] + 1) / 4, action[1]]
latest_scan, distance, cos, sin, collision, goal, a, reward = sim.step(
lin_velocity=a_in[0], ang_velocity=a_in[1]
)
avg_reward += reward
count += 1
if collision:
col += 1
if goal:
goals += 1
done = collision or goal
avg_reward /= eval_episodes
avg_col = col / eval_episodes
avg_goal = goals / eval_episodes
print(f"Average Reward: {avg_reward}")
print(f"Average Collision rate: {avg_col}")
print(f"Average Goal rate: {avg_goal}")
print("..............................................")
model.writer.add_scalar("eval/avg_reward", avg_reward, epoch)
model.writer.add_scalar("eval/avg_col", avg_col, epoch)
model.writer.add_scalar("eval/avg_goal", avg_goal, epoch)
if __name__ == "__main__":
main()