#!/usr/bin/env python3 import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' os.environ['TK_SILENCE_DEPRECATION'] = '1' import tensorflow as tf import numpy as np import math, random, json, sys import pygame import random import tkinter as tk from tkinter import filedialog import tensorflow_probability as tfp from tensorflow import keras from tensorflow.keras import layers, Model from tank import TankGame GREEN = "\033[92m" RED = "\033[91m" RESET = "\033[0m" def create_output_folder(num_hidden_layers, num_neurons, lot_file='.lot_number', new=True): # Read current lot number, or initialize to 0 if file doesn't exist if os.path.exists(lot_file): with open(lot_file, 'r') as f: try: lot_number = int(f.read().strip()) except ValueError: lot_number = 0 else: lot_number = 0 # Increment and update the file with the new lot number lot_number += 1 with open(lot_file, 'w') as f: f.write(str(lot_number)) # Format lot number as a three-digit string lot_str = str(lot_number).zfill(3) folder_name = f"tank_{num_hidden_layers}_{num_neurons}_{lot_str}" if new: folder_name += "+" os.makedirs(folder_name, exist_ok=True) return folder_name # Enable GPU memory growth if available gpus = tf.config.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) except Exception as e: print(e) # ------------------------------- # Define Policy Network # ------------------------------- class PolicyNetwork(Model): def __init__(self, input_dim, num_hidden_layers, first_hidden_neurons, pyramidal=False): super(PolicyNetwork, self).__init__() self.hidden_layers = [] neurons = first_hidden_neurons for i in range(num_hidden_layers): # Create hidden layers with ReLU and pyramidal reduction dense = layers.Dense(neurons, activation='relu') setattr(self, f"hidden_layer_{i}", dense) # ensure layer is tracked self.hidden_layers.append(dense) if pyramidal: neurons = max(1, int(neurons * 0.8)) # Reduce by 20% each layer # Output: 2 directional outputs (tanh) and 1 shooting output (sigmoid) self.direction_layer = layers.Dense(2, activation='tanh') self.shoot_layer = layers.Dense(1, activation='sigmoid') def call(self, inputs): x = inputs for layer in self.hidden_layers: x = layer(x) direction = self.direction_layer(x) shoot = self.shoot_layer(x) return tf.concat([direction, shoot], axis=-1) # ------------------------------- # Helper Function to Export Model to JSON (with input shape) # ------------------------------- def export_model_to_json(model, input_shape, episode, output_folder): model_json = { "input_shape": list(input_shape), "hidden_layers": [], "direction_layer": {}, "shoot_layer": {} } # Save hidden layers for layer in model.hidden_layers: weights, biases = layer.get_weights() model_json["hidden_layers"].append({ "weights": weights.tolist(), "biases": biases.tolist() }) # Save the direction output layer weights, biases = model.direction_layer.get_weights() model_json["direction_layer"] = { "weights": weights.tolist(), "biases": biases.tolist() } # Save the shoot output layer weights, biases = model.shoot_layer.get_weights() model_json["shoot_layer"] = { "weights": weights.tolist(), "biases": biases.tolist() } filename = os.path.join(output_folder, f"tank_{episode:03d}.json") with open(filename, "w") as f: json.dump(model_json, f) print(f"Model saved to {filename} after episode {episode}.") # ------------------------------- # Load Model with User Input # ------------------------------- def load_model(): filename = "tank_000.json" # Hardcoded filename try: with open(filename, "r") as f: model_json = json.load(f) except Exception as e: print(f"Failed to load model from {filename}: {e}") sys.exit(0) return model_json, filename @tf.function(reduce_retracing=True) def train_step(policy_net, optimizer, states, actions, returns, std, mask, clip = 3.5): with tf.GradientTape() as tape: # Forward pass: compute means for the entire batch means = policy_net(states) # states: [batch_size, input_dim] # Create a normal distribution for each sample dists = tfp.distributions.Normal(loc=means, scale=std) # Compute log probabilities and reduce over action dimensions log_probs = tf.reduce_sum(dists.log_prob(actions), axis=1) loss_per_step = -log_probs * returns # Apply mask: sum over valid steps and divide by number of valid steps masked_loss = tf.reduce_sum(loss_per_step * mask) / tf.reduce_sum(mask) grads = tape.gradient(masked_loss, policy_net.trainable_variables) grads, _ = tf.clip_by_global_norm(grads, clip) # gradient clipping optimizer.apply_gradients(zip(grads, policy_net.trainable_variables)) return masked_loss # ------------------------------- # Main Training Program # ------------------------------- def main(): print(tf.config.list_physical_devices('GPU')) load_existing = input("Load existing model? (y/n, default n): ").strip().lower() == 'y' if load_existing: model_json, model_path = load_model() print(f"Loaded model from {model_path}") input_dim = model_json.get("input_shape", [13])[0] hidden_layers = model_json["hidden_layers"] num_hidden_layers = len(hidden_layers) if num_hidden_layers > 0: # Extract neuron counts from each layer. neurons_counts = [len(layer["biases"]) for layer in hidden_layers] neurons_per_layer = neurons_counts[0] # If any layer differs, assume pyramidal. pyramidal = len(set(neurons_counts)) > 1 else: neurons_per_layer = 12 # default value pyramidal = False neurons_per_layer = len(model_json["hidden_layers"][0]["biases"]) if num_hidden_layers > 0 else 12 policy_net = PolicyNetwork(input_dim, num_hidden_layers, neurons_per_layer, pyramidal) dummy = tf.convert_to_tensor(np.zeros((1, input_dim), dtype=np.float32)) policy_net(dummy) # Load hidden layers weights for i, layer in enumerate(policy_net.hidden_layers): weights = np.array(model_json["hidden_layers"][i]["weights"]) biases = np.array(model_json["hidden_layers"][i]["biases"]) layer.set_weights([weights, biases]) # Load weights for the direction output layer weights = np.array(model_json["direction_layer"]["weights"]) biases = np.array(model_json["direction_layer"]["biases"]) policy_net.direction_layer.set_weights([weights, biases]) # Load weights for the shoot output layer weights = np.array(model_json["shoot_layer"]["weights"]) biases = np.array(model_json["shoot_layer"]["biases"]) policy_net.shoot_layer.set_weights([weights, biases]) output_folder = create_output_folder(num_hidden_layers, neurons_per_layer, new=False) else: num_hidden_layers = int(input("Enter number of hidden layers (default 2): ") or 2) neurons_per_layer = int(input("Enter number of neurons in first layer (default 12): ") or 12) pyramidal = input("Fully connected network? (default yes):").strip().lower() == 'n' input_dim = 13 policy_net = PolicyNetwork(input_dim, num_hidden_layers, neurons_per_layer, pyramidal) dummy = tf.convert_to_tensor(np.zeros((1, input_dim), dtype=np.float32)) policy_net(dummy) print("Creating folder") output_folder = create_output_folder(num_hidden_layers, neurons_per_layer, new=True) num_episodes = int(input("Number of training episodes (default 400, ~1 hour): ") or 400) epsilon = float(input("Exploration epsilon (default 1.0 random): ") or 1.0) decay_rate = float(input("Epsilon, decay rate per episode (default 0.995): ") or 0.995) learning_rate = float(input("Enter learning rate (gradient descent) rate (default 0.001): ") or 0.001) gamma = float(input("Gamma, future reward value, 0 is immediate (default 0.995): ") or 0.995) phase = float(input("Rewards: position[1], collision[2], shoot[3], All[default]: ") or 4) display_live = input("Display training live? (y/n, default y): ").strip().lower() != 'n' red_team = input("Play against Red Team? (y/n, default n): ").strip().lower() == 'y' if not red_team: difficulty = float(input("Enemy: avoid[1.0], disabled[0.5], random[0.7], seek[0.9], random[default]: ") or 0.0) else: difficulty = 0.0 min_epsilon = 0.05 avg_reward = 0.0 max_reward = 0.0 optimizer = tf.keras.optimizers.Adam(learning_rate) max_steps = 1250 env = TankGame() # create a new game environment if red_team: env.red_team_control = True env.load_neural_net(team="red") for episode in range(num_episodes): env.reset(difficulty) state = env.get_state(env.enemies[0], is_Player=True) total_reward = 0 step_count = 0 std = tf.convert_to_tensor(epsilon, dtype=tf.float32) states, actions, rewards = [], [], [] while not env.done and step_count < max_steps: states.append(state) state_tensor = tf.convert_to_tensor(state[None, :], dtype=tf.float32) mean = policy_net(state_tensor)[0] if tf.math.is_nan(mean).numpy().any(): print(f"WARNING: mean contains NaN! Value: {mean.numpy()}") dist = tfp.distributions.Normal(loc=mean, scale=std) action = dist.sample() actions.append(action) next_state, reward, done, _ = env.step(action.numpy(), phase) rewards.append(reward) total_reward += reward state = next_state step_count += 1 if display_live: pygame.event.pump() # force event handling env.process_events() # process events so the window updates env.render() # Compute discounted returns (REINFORCE style) returns = [] discounted_sum = 0 for r in rewards[::-1]: discounted_sum = r + gamma * discounted_sum returns.insert(0, discounted_sum) num_steps = len(states) if num_steps < max_steps: pad_steps = max_steps - num_steps mask_np = np.concatenate([np.ones(num_steps), np.zeros(pad_steps)]) states_np = np.array(states) # shape: (num_steps, input_dim) actions_np = np.array(actions) # shape: (num_steps, action_dim) returns_np = np.array(returns) # shape: (num_steps,) # Pad along the time dimension states_np = np.pad(states_np, ((0, pad_steps), (0, 0)), mode='constant') actions_np = np.pad(actions_np, ((0, pad_steps), (0, 0)), mode='constant') returns_np = np.pad(returns_np, ((0, pad_steps)), mode='constant') else: states_np = np.array(states) actions_np = np.array(actions) returns_np = np.array(returns) mask_np = np.ones(num_steps) # Normalize returns using only the valid steps (mask==1) valid = mask_np.astype(bool) mean_return = np.mean(returns_np[valid]) std_return = np.std(returns_np[valid]) + 1e-8 returns_np[valid] = (returns_np[valid] - mean_return) / std_return mask_tensor = tf.convert_to_tensor(mask_np, dtype=tf.float32) states_tensor = tf.convert_to_tensor(states_np, dtype=tf.float32) actions_tensor = tf.convert_to_tensor(actions_np, dtype=tf.float32) returns_tensor = tf.convert_to_tensor(returns_np, dtype=tf.float32) loss_value = abs(train_step(policy_net, optimizer, states_tensor, actions_tensor, returns_tensor, std, mask_tensor)) # Adjust epsilon, learning rate, clipping, and saving based on rewards avg_reward = (avg_reward * 39 + total_reward) / 40 color = GREEN if total_reward >= 0 else RED if avg_reward > max_reward: print(f"{GREEN}New average: {avg_reward:.2f}{RESET}") if epsilon < 0.35 and max_reward > 0.5: # Save if new max average and better than random export_model_to_json(policy_net, (input_dim,), episode + 1, output_folder) max_reward = max(max_reward, avg_reward) reward_limit = min(max_reward, 1.5) print(f"Episode {episode+1}/{num_episodes}: Reward: {color}{total_reward:.2f}{RESET} Epsilon: {epsilon:.2f} Avg reward: {avg_reward:.2f}/{max_reward:.2f} Loss: {loss_value.numpy():.2f}") if epsilon > 0.65 or max_reward > 0.2: # prevent reducing exploration too early epsilon = max(min_epsilon, epsilon * decay_rate) if epsilon < 0.65 and avg_reward < reward_limit - 1: # add exploration if poor rewards epsilon += 0.01 print(f" Test Phase: {phase:.0f} - Low rewards: {avg_reward:.2f}/{max_reward:.2f}") if epsilon < 0.4 and loss_value.numpy() > 3.5: epsilon += 0.05 if learning_rate > 1e-5: learning_rate *= 0.95 optimizer.learning_rate.assign(learning_rate) print(f"Learning rate: {learning_rate}") if (episode + 1) % 200 == 0 or not episode: export_model_to_json(policy_net, (input_dim,), episode + 1, output_folder) export_model_to_json(policy_net, (input_dim,), 0, ".") if __name__ == "__main__": main()