import torch import torch.nn as nn import json import torch.optim as optim from torch.utils.data import Dataset, DataLoader import pandas as pd class Network(nn.Module): def __init__(self, input_dim=13, hidden_layers=[64, 64], dropout_rate=0.0, batch_norm=False, output_configs=None): self.input_dim = input_dim self.hidden_layers = hidden_layers self.dropout_rate = dropout_rate self.batch_norm = batch_norm super(Network, self).__init__() # Default output configuration if none provided if output_configs is None: output_configs = [ {"units": 2, "activation": "tanh"}, {"units": 1, "activation": "sigmoid"} ] # Build shared layers layers = [] prev_units = input_dim for units in hidden_layers: layers.append(nn.Linear(prev_units, units)) if batch_norm: layers.append(nn.BatchNorm1d(units)) layers.append(nn.ReLU()) if dropout_rate > 0: layers.append(nn.Dropout(dropout_rate)) prev_units = units self.shared_layers = nn.Sequential(*layers) # Map activation strings to PyTorch modules activation_map = { "softmax": lambda: nn.Softmax(dim=1), "sigmoid": lambda: nn.Sigmoid(), "relu": lambda: nn.ReLU(), "tanh": lambda: nn.Tanh(), "none": lambda: nn.Identity() } # Build output heads self.heads = nn.ModuleList() self.head_activations = nn.ModuleList() self.head_activation_names = [] for config in output_configs: self.heads.append(nn.Linear(prev_units, config["units"])) act_name = config.get("activation", "none") self.head_activation_names.append(act_name) #print(f"Activation function: {act_name}, Mapped to: {activation_map.get(act_name, nn.Identity)}") self.head_activations.append(activation_map.get(act_name, lambda: nn.Identity)()) def forward(self, x): x = self.shared_layers(x) outputs = [] for head, activation in zip(self.heads, self.head_activations): out = activation(head(x)) outputs.append(out) return torch.cat(outputs, dim=1) def build_network(input_dim=13, output_configs=None, hidden_layers=[64, 64], dropout_rate=0.0, batch_norm=False): """ Constructs the multi-head network. Parameters: input_dim: Number of input features (default 13). output_configs: List of dicts with keys 'units' and 'activation'. hidden_layers: Number of neurons per hidden layer. Repeat for multiple layers. dropout_rate: Dropout rate (default 0.0). batch_norm: Whether to use batch normalization. Returns: A PyTorch model instance. """ return Network(input_dim, hidden_layers, dropout_rate, batch_norm, output_configs) def save_network_to_json(model, filename): # Ensure the model is a Network instance if not isinstance(model, Network): raise ValueError("Expected a Network instance, but got {}".format(type(model))) # Create a dict following the expected JSON structure save_data = {} # Save input shape as a list (e.g., [13]) save_data["input_shape"] = [model.input_dim] # Extract all Linear layers from the shared_layers Sequential linear_layers = [m for m in model.shared_layers if isinstance(m, nn.Linear)] hidden_layers_data = [] for layer in linear_layers: layer_data = { "weights": layer.weight.t().tolist(), # transpose so shape becomes (in_features, out_features) "biases": layer.bias.tolist() } hidden_layers_data.append(layer_data) save_data["hidden_layers"] = hidden_layers_data # Save output heads: # Assume heads[0] is the direction_layer and heads[1] is the shoot_layer. direction_layer = model.heads[0] shoot_layer = model.heads[1] save_data["direction_layer"] = { "weights": direction_layer.weight.t().tolist(), "biases": direction_layer.bias.tolist() } save_data["shoot_layer"] = { "weights": shoot_layer.weight.t().tolist(), "biases": shoot_layer.bias.tolist() } # Write to file with open(filename, 'w') as f: json.dump(save_data, f) print(f"Network saved to {filename}") def load_network_from_json(dropout=0.2, filename="sl_checkpoint.json"): with open(filename, 'r') as f: data = json.load(f) # Extract configuration from the file input_dim = data["input_shape"][0] hidden_layers_list = [len(layer["biases"]) for layer in data["hidden_layers"]] output_configs = [ {"units": len(data["direction_layer"]["biases"]), "activation": "tanh"}, {"units": len(data["shoot_layer"]["biases"]), "activation": "sigmoid"} ] # Build the model using the extracted configuration model = build_network( input_dim=input_dim, output_configs=output_configs, hidden_layers=hidden_layers_list, dropout_rate=dropout, batch_norm=False ) # Reconstruct the state_dict from the file (transpose weights as needed) state_dict = {} # Find indices of Linear layers in model.shared_layers dynamically linear_indices = [idx for idx, m in enumerate(model.shared_layers) if isinstance(m, nn.Linear)] if len(linear_indices) != len(data["hidden_layers"]): print("Warning: Mismatch in number of Linear layers saved vs. model.") for saved_i, layer in enumerate(data["hidden_layers"]): # Use the actual index in model.shared_layers where the Linear layer is located. actual_idx = linear_indices[saved_i] weight_key = f"shared_layers.{actual_idx}.weight" bias_key = f"shared_layers.{actual_idx}.bias" state_dict[weight_key] = torch.tensor(layer["weights"]).t() state_dict[bias_key] = torch.tensor(layer["biases"]) # Load output head weights state_dict["heads.0.weight"] = torch.tensor(data["direction_layer"]["weights"]).t() state_dict["heads.0.bias"] = torch.tensor(data["direction_layer"]["biases"]) state_dict["heads.1.weight"] = torch.tensor(data["shoot_layer"]["weights"]).t() state_dict["heads.1.bias"] = torch.tensor(data["shoot_layer"]["biases"]) model.load_state_dict(state_dict) print(f"Network loaded from {filename}") return model class CSVTrainingDataset(Dataset): """ Reads a CSV file and extracts: - First 13 columns as input features. - Last 3 columns as output targets. """ def __init__(self, csv_file): # Load CSV without headers to avoid column name issues df = pd.read_csv(csv_file, header=None) # Ensure correct input-output separation self.X = torch.tensor(df.iloc[:, :13].values, dtype=torch.float32) self.y = torch.tensor(df.iloc[:, 13:].values, dtype=torch.float32) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] def generate_csv(model, num_cases, filename): """ Generates a CSV file with num_cases rows. Each row contains 13 random inputs (uniformly sampled from -1 to 1) and the corresponding 3 outputs from the model. The CSV has no header row. """ model.eval() # Set model to evaluation mode cases = [] with torch.no_grad(): for _ in range(num_cases): # Generate random input: shape (1, 13) inputs = 2 * torch.rand(1, 13) - 1 # values in [-1, 1] outputs = model(inputs) # Expected shape (1, 3) # Flatten inputs and outputs to lists row = inputs.squeeze().tolist() + outputs.squeeze().tolist() cases.append(row) # Create DataFrame and save to CSV with no header or index. df = pd.DataFrame(cases) df.to_csv(filename, header=False, index=False) def train_network(model, dataloader, max_epochs, loss_threshold, learning_rate): """ Trains the given PyTorch model on a CSV file with 13 inputs and 3 outputs. Args: model: A PyTorch model that takes in 13 features and outputs 3 values. max_epochs (int): Maximum number of training epochs (default 50). loss_threshold (float): Early stopping threshold (default 1e-3). learning_rate (float): Learning rate for the optimizer (default 1e-3). Returns: model: The trained PyTorch model. """ # Move model to device (CPU or GPU) device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu") model.to(device) # Define an optimizer (Adam) and a loss function (MSE for demonstration) optimizer = optim.Adam(model.parameters(), lr=learning_rate) criterion_mse = nn.MSELoss() criterion_bce = nn.BCELoss() stopped_early = False for epoch in range(max_epochs): model.train() # Set model to training mode epoch_loss = 0.0 for inputs, targets in dataloader: inputs, targets = inputs.to(device), targets.to(device) # Forward pass optimizer.zero_grad() predictions = model(inputs) # Should be shape (batch_size, 3) # Split predictions and targets: tanh_pred = predictions[:, :2] # First two outputs (tanh activated) sigmoid_pred = predictions[:, 2].unsqueeze(1) # Last output (sigmoid activated) tanh_target = targets[:, :2] sigmoid_target = targets[:, 2].unsqueeze(1) # Ensure shape [batch, 1] # Compute separate losses: loss_tanh = criterion_mse(tanh_pred, tanh_target) loss_sigmoid = criterion_bce(sigmoid_pred, sigmoid_target) total_loss = loss_tanh + loss_sigmoid total_loss.backward() # Apply gradient clipping to avoid exploding gradients torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=3.5) optimizer.step() epoch_loss += total_loss.item() avg_loss = epoch_loss / len(dataloader) print(f"Epoch [{epoch+1}/{max_epochs}], Loss: {avg_loss:.6f}") # Early stopping if loss is below threshold if avg_loss < loss_threshold: print(f"Stopping early: loss < {loss_threshold}") stopped_early = True break return model, stopped_early def info(model_obj): # If a dictionary is provided, extract the config; otherwise, build it from the model instance. if isinstance(model_obj, dict): config = model_obj["config"] else: config = { "input_dim": model_obj.input_dim, "hidden_layers": model_obj.hidden_layers, "dropout_rate": model_obj.dropout_rate, "batch_norm": model_obj.batch_norm, "output_configs": [{"units": head.out_features, "activation": act} for head, act in zip(model_obj.heads, model_obj.head_activation_names)] } print("\n=== Network Description ===") print(f"Input Features: {config['input_dim']}") print(f"Hidden Layers: {len(config['hidden_layers'])}") for i, units in enumerate(config["hidden_layers"]): print(f" - Hidden Layer {i+1}: {units} neurons") print(f"Dropout Rate: {config['dropout_rate']}") print(f"Batch Normalization: {'Enabled' if config['batch_norm'] else 'Disabled'}") print("\nOutput Layers:") for i, output in enumerate(config["output_configs"]): units = output["units"] activation = output.get("activation", "none") print(f" - Output {i+1}: {units} neurons, Activation: {activation}") # Check gradients if available (after a backward pass) exploding_threshold = 3.4 # Adjust threshold based on your scenario vanishing_threshold = 1e-6 # Adjust threshold based on your scenario exploding = False vanishing = False grad_found = False low_count = 0 for param in model_obj.parameters(): if param.grad is not None: grad_found = True grad_norm = param.grad.data.norm(2).item() if grad_norm > exploding_threshold: exploding = True high = grad_norm if grad_norm < vanishing_threshold: low_count += 1 if low_count > 100: vanishing = True if not grad_found: print("\nNo gradients found. Make sure to run a backward pass before calling info().") else: print("\nGradient Analysis:") if exploding: print(f" Warning: Exploding gradients detected: {high:.2f}") elif vanishing: print(f" Warning: {low_count} vanishing gradients detected") else: print(" Gradients are within normal range.") print("==========================\n") if __name__ == "__main__": load = input("Load from sl_checkpoint.json? (default no): ").strip().lower() == 'y' if load: generate = input("Generate batch.csv training data? (default no): ").strip().lower() == 'y' if generate: num_cases = int(input("Number of cases to generate (default 1,000): ") or 1000) file = input("Training csv file (default 'batch'): ") or "batch" file += ".csv" epochs = int(input("How many epochs (x1,000)? (default 100): ") or 100) loss_threshold = float(input("Loss threshold for training stop (default 1e-4)") or 1e-4) dropout = float(input("Dropout rate - higher for resiliency (default 0.2): ") or 0.2) learning_rate = float(input("Learning rate (default 0.001): ") or 1e-3) if load: model = load_network_from_json(dropout=dropout, filename="sl_checkpoint.json") if generate: generate_csv(model, num_cases, filename="batch.csv") else: # Create a model instance with custom parameters num_hidden_layers = int(input("Enter number of hidden layers (default 4): ") or 4) neurons_per_layer = int(input("Enter number of neurons in first layer (default 32): ") or 32) pyramidal = input("Use pyramidal structure? (default no): ").strip().lower() == 'y' if pyramidal: hidden_layers = [max(1, int(neurons_per_layer * (0.8 ** i))) for i in range(num_hidden_layers)] else: hidden_layers = [neurons_per_layer] * num_hidden_layers model = build_network( input_dim=13, output_configs=[ {"units": 2, "activation": "tanh"}, {"units": 1, "activation": "sigmoid"} ], hidden_layers=hidden_layers, dropout_rate=dropout, batch_norm=False ) info(model) dataset = CSVTrainingDataset(file) dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=4) for count in range(epochs): trained_model, stopped_early = train_network(model, dataloader, 1000, loss_threshold, learning_rate) save_network_to_json(trained_model, "sl_checkpoint.json") print(f"Checkpoint saved: sl_checkpoint.json after {count+1},000 epochs") model = trained_model learning_rate *= 0.9 info(model) if stopped_early: break