This notebook is meant for the students of Basics of Applied Mathematics, in the master of Mathematics in Data and Technology, Freiburg Univsersity, 2025

# Programming Exercise for Homework 10: Training a Classifier with Stochastic Gradient Descent in PyTorch


### Your task

This notebook contains some code snippets with missing lines that you will have to fill in! Please also try to read and understand the parts that you do not have to fill in to understand better how the code works.

### Description

In this exercise, we will train a simple image classifier on the CIFAR10 dataset (https://www.cs.toronto.edu/~kriz/cifar.html) using Stochastic Gradient Descent (SGD) in PyTorch.

This exercise is inspired from the tutorial in https://docs.pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html

The idea is to train a neural network to classify images from the CIFAR10 dataset into 10 different classes (airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck).

The optimization problem takes the form:
$$
\min_{x} \frac{1}{N} \sum_{i=1}^{N} \text{criterion}(\phi(\text{image}_i; x), \text{label}_i)
$$
where the function \text{criterion}(\text{prediction}, \text{label}) is the cross-entropy loss function, that compares the prediction vector (of size 10) with the true label (an integer between 0 and 9).

The model $\phi(\text{image}; x)$ is a Neural Network with parameters $x$ that takes as input an image (a tensor of size 3x32x32) and outputs a prediction vector (a tensor of size 10).
This model is given already, and is defined in pytorch.
The gradient of the objective is already given in a function called `compute_gradient`.

Your task is to implement the SGD algorithm in the function `SGD` that uses the training dataset to optimize the parameters of the model.

Ultimately, the performance is evaluated on a test dataset that is not used during training.

You will also have to tune hyperparameters such that the learning is successful and fast.

### Imports

In [None]:
import numpy as np          # Package for numerical computing
import matplotlib.pyplot as plt # Package for plotting
from time import time # function to check runtime

# Jupyter magic command to make the plots a bit nicer
%matplotlib inline
                     # use "%matplotlib inline" if one uses VS code UI
                     # use "%matplotlib notebook" for Jupyter UI

# You need to have pytorch and torchvision on your machine.
# If you are using conda and struggle with the installation, consider creating a new conda environment dedicated to pytorch.
import torch
import torchvision

### Write helper functions for printing

In [None]:
from IPython.display import Markdown, display 
def nice_print(s): # this is for a better rendering of logs
    display(Markdown(s))

# Write a function to visualize some images along with their ground truth and predicted labels
def show_images(images, labels, names, predicted_labels=None, image_treatment='gaussian', n_rows=1):
    assert len(images) == len(labels), "Number of images and labels must be the same"
    num = len(labels)
    n_col = (num+n_rows-1) // n_rows
    fig, axs = plt.subplots(n_rows, n_col, figsize=(n_col*1.5, n_rows*1.5))

    indices = labels.argsort()  # indices to sort the images by their labels
    for i, ax in enumerate(axs.flatten()):
        if i < num:
            img = images[indices[i]]
            label = labels[indices[i]]
            ax.axis('off')
            ax.imshow(img, interpolation=image_treatment)
            ax.set_title(f"{names[label]} \n", fontsize=10) 
            if predicted_labels is not None:
                predicted_label = predicted_labels[indices[i]]
                if predicted_label == label:
                    ax.text(5, -2.1, f"pred: {names[predicted_label]}", color="green", fontsize=8)    
                else:
                    ax.text(5, -2.1, f"pred: {names[predicted_label]}", color="red", fontsize=8)
        else:
            ax.set_visible(False)
    fig.tight_layout()
    return fig

colors = ["black", "brown", "C0", "C1", "C2", "C3", "C4", "C5", "C6", "C7", "C8", "C9"]
# Function to plot the losses and misclassification rates
def plot_losses(list_infos, examples):     
    fig, axs = plt.subplots(2, figsize=(10, 6))
    axs[0].set_ylabel('Loss')
    axs[1].set_ylabel('Misclassification rate')
    axs[1].set_ylim(0., 1.)
    idx_color = 0
    for (name, val) in examples:
        axs[1].axhline(val, color=colors[idx_color], linestyle='--', label=name)
        idx_color += 1
    for (label, epochs, losses, misclassification_rates) in list_infos:
        axs[0].plot(epochs, losses, 'o-', label=label, color=colors[idx_color])
        axs[1].plot(epochs, misclassification_rates, 'o-', label=label, color=colors[idx_color])
        idx_color += 1
    for ax in axs:
        ax.grid()
        ax.set_xlabel('Number of epochs')
    axs[0].legend()
    return fig

### Load the data

In [None]:
train_dataset = torchvision.datasets.CIFAR10(root='./data', download=True, train=True)
test_dataset = torchvision.datasets.CIFAR10(root='./data', download=True, train=False)
names = list(train_dataset.classes)

imgs_train = train_dataset.data # shape (50000, 32, 32, 3)
labels_train = torch.tensor(train_dataset.targets) # shape (50000)
print(f'The train dataset has {imgs_train.shape[0]} images of shape {imgs_train.shape[1:]}, classified into {len(names)} classes.')

imgs_test = test_dataset.data # shape (10000, 32, 32, 3)
labels_test = torch.tensor(test_dataset.targets) # shape (10000)
print(f'The test dataset has {imgs_test.shape[0]} images of shape {imgs_test.shape[1:]}, classified into {len(names)} classes.')

del train_dataset, test_dataset # free some memory

# Function to convert images to torch tensors and scale them to [-1, 1]
def img_to_tensor(imgs):
    # imgs: (N, 32, 32, 3) Reorder to (N, 3, 32, 32), scale to [-1, 1], convert to torch tensor
    imgs_ = np.transpose(imgs, (0, 3, 1, 2)).copy()          # (N, 3, 32, 32)
    imgs_ = 2. * imgs_ / 255.0  - 1.0        # scale to [-1, 1]
    return torch.from_numpy(imgs_).float()

In [None]:
# Visualize a couple of images from the training dataset
rng = np.random.default_rng(42)
indices = rng.choice(imgs_train.shape[0], size=15, replace=False)
fig = show_images(imgs_train[indices], labels_train[indices], names, n_rows=3)
plt.show()

### Define the neural network functions used for predicting the labels

In [None]:
from torch.nn.utils import parameters_to_vector, vector_to_parameters

# Create a class for a pytorch neural network with two hidden layers
class NeuralNet(torch.nn.Module):
    def __init__(self, model_complexity=100, lambda_penalty=1e-4):
        """
            Define a neural network with two hidden layers of 'model_complexity' neurons each.
        """
        super().__init__()
        self.dim_input = 3 * 32 * 32  # input dimension (3 color channels)
        m = model_complexity # number of neurons in the hidden layers
        n_output = 10
        torch.manual_seed(0) # for reproducibility

        self.nonlin_F = torch.nn.Softplus() # this function is like a smooth ReLU: f(x) = log(1 + exp(x))
        self.lin1 = torch.nn.Linear(self.dim_input, m)  # 3*32*32 --> m  (with m = model_complexity)
        self.lin2 = torch.nn.Linear(m, m)
        self.lin3 = torch.nn.Linear(m, n_output)  # m --> n_output

        self.x0 = parameters_to_vector(self.parameters()).detach()  # initial parameter vector
        # Also store the crtiterion here
        self.criterion = torch.nn.CrossEntropyLoss()
        self.lambda_penalty = lambda_penalty # regularization parameter for weight decay

    def forward(self, input):
        # input tensor has shape [N, 3, 32, 32]
        x = input.reshape(-1, self.dim_input)  # flatten the input --> [N, 3*32*32]
        layer1 = self.nonlin_F(self.lin1(x))  # --> [N, m]
        layer2 = self.nonlin_F(self.lin2(layer1))  # --> [N, m]
        output = self.lin3(layer2)  # --> [N, n_output]
        return output  # raw scores for each class
    
    def compute_predictions(self, x, input):
        vector_to_parameters(x, self.parameters())
        with torch.no_grad():
            return self(input)

    def compute_gradient(self, x, input, output):
        vector_to_parameters(x, self.parameters())
        self.zero_grad() # reinitialize gradients to zero, else they will accumulate
        prediction = self(input)
        loss = self.criterion(prediction, output)
        loss.backward()
        grad = parameters_to_vector([p.grad for p in self.parameters() if p.grad is not None]).detach()
        grad.add_(x, alpha=self.lambda_penalty)  # add gradient of the penalty term
        return grad
        
    def loss_and_accuracy(self, x, inputs, outputs):
        predictions = self.compute_predictions(x, inputs)
        loss = self.criterion(predictions, outputs).item()
        accuracy = (outputs == torch.argmax(predictions, dim=1)).sum().item() / outputs.size(0)
        return loss, accuracy
  

### Perform Stochastic Gradient Descent (SGD) with momentum

<hr/>
<div class="alert alert-block alert-info">
   
**Task:**

Complete the following code to randomly split the indices $\{0, \ldots, N-1\}$ into mini-batches of size `batch_size`. In the function, `rng` is a numpy random generator that you can use to generate random numbers.
</div>


<hr/>
<div class="alert alert-success">
   
**Tips:**
One way to do this is to first shuffle and then split.
Note that the last batch may be smaller than `batch_size` if `N` is not divisible by `batch_size`.
</div>

In [None]:
def random_split(N, batch_size, rng):
    """
        This function returns a list of arrays, each containing the indices of a mini-batch. All mini-batch except for the last one have size `batch_size`.
    """
    # ----- YOUR CODE
    ...
    # -----END YOUR CODE
    return list_indices_batch

<hr/>
<div class="alert alert-block alert-info">
   
**Task:**

Complete the following code to implement min-batch Stochastic Gradient Descent (SGD) with momentum:
$$
    p_{k+1} = \gamma p_{k} + (1-\gamma) g_k(x_k) \\
    x_{k+1} = x_k + \alpha_k p_k
$$
with a decreasing step-size $\alpha_k = \frac{\alpha_0}{\sqrt{k_{ep.}}}$, where $k_{ep.}$ is the epoch number.
The gradient estimate $g_k(x_k)$ is computed on a mini-batch of size $n_{batch}$:
$$
g_k(x_k) = \frac{1}{| B_k |} \sum_{i \in B_k} \nabla f_i(x_k)
$$
where $B_k \subset \{1, \dots, N\}$ is a random subset of indices of size $n_{batch}$.
</div>


<hr/>
<div class="alert alert-success">
   
**Tips:**
You can use the syntax `grad = neural_net.compute_gradient(x, input_batch, output_batch)` to compute the gradient of the loss function for the input tensors `input_batch` and `output_batch`, at the parameter value `x`.

You can also use the randomly generated initial parameters `x0` as a starting point for the optimization.
</div>


<hr/>
<div class="alert alert-success">
   
**Tips:**
For a better performance, you might have a look at the methods , `tensor.mul_()`, `tensor.add_()` and `tensor.lerp_()` that perform in-place operations on tensors instead of creating new ones.
</div>

In [None]:
# Main function to perform SGD training
def SGD(images, outputs, num_epochs=2, batch_size=1, alpha0=0.1, gamma=0.):
    """
            Perform SGD training with mini-batches of size batch_size for num_epochs epochs, momentum parameter gamma, and decreasing step-size alpha = alpha0 / sqrt(k_epoch).
            The function returns the list of of couples (k_e, x) where $k$ is the number of epoch and $x$ is the solution vector at that epoch.
    """
    print("\n \n  ----------")
    name_optimizer = r"SGD ; $n_{batch} = %s$ ; $\alpha=\frac{%s}{\sqrt{k_{ep.}}}$, with momentum: $\gamma = %s$" % (batch_size, alpha0, gamma)
    nice_print(name_optimizer)
    t0 = time()
    neural_net = NeuralNet()
    inputs = img_to_tensor(images)
    rng = np.random.default_rng(42) # random generator for the mini-batches
    # ----- YOUR CODE (initialize some variables if needed)
    ...
    # --------- END YOUR CODE
    print(f"       Number of epoch = 0/{num_epochs}", end=" ")
    for n_epoch in range(num_epochs):
        iterates.append((n_epoch, x.clone()))  # store the iterate
        list_indices_batch = random_split(N, batch_size, rng)
        for indices in list_indices_batch:
            # ----- YOUR CODE (perform one SGD update)
            ...
            # --------- END YOUR CODE
        print(f"{n_epoch+1}/{num_epochs}, ", end=" ")
    iterates.append((num_epochs, x.clone())) # store final iterate
    print(f"\n   ... training finished! Runtime = {time() - t0 :.2f} sec.")

    return name_optimizer, iterates

<hr/>
<div class="alert alert-block alert-info">
   
**Task:**

Complete the following code to run the SGD algorithm with the hyperparameters of your choice.
The code after plots the training loss and the test accuracy over the iterations, which should help you to choose good hyperparameters.
</div>

In [None]:
all_iterations = [] # should store couples (name, iterations), i.e. outputs of the SGD functions

# ----- YOUR CODE (perform SGD for some hyperparameters)
name, iterations = SGD(...)
all_iterations.append((name, iterations))

name, iterations = SGD(...)
all_iterations.append((name, iterations))

...
# ----- END YOUR CODE

### Test the solutions on the test dataset

In [None]:
# Function to compute some statistics about the performance on the test dataset for the solutions obtained
def test_solutions(neural_net, x_solutions, inputs, outputs):
    test_losses, accuracies = [], []
    for x in x_solutions:
        test_loss, accuracy = neural_net.loss_and_accuracy(x, inputs, outputs)
        test_losses.append(test_loss)
        accuracies.append(accuracy)
    return test_losses, accuracies

In [None]:
all_infos = []
solutions = [] 
neural_net = NeuralNet()
inputs_test = img_to_tensor(imgs_test)
for name_opti, iterations in all_iterations:
        epochs, iterates = zip(*iterations)
        solution = iterates[-1].clone()
        test_losses, accuracies = test_solutions(neural_net, iterates, inputs_test, labels_test)
        solutions.append( (name_opti, solution) )
        all_infos.append( (name_opti, epochs, test_losses, accuracies) )
        nice_print(name_opti)
        print(f"    loss on test data: {test_losses[-1]:.2e}  accuracy on test data {accuracies[-1]*100:.2f}%")
del all_iterations # free some memory

In [None]:
examples = [("random assignment", 1/len(names)), (r"50% accuracy", 0.5)]
fig = plot_losses(all_infos, examples)
plt.show()

In [None]:
# Now let us have a look at the predictions on some of the images from the dataset
nam_opti, x_sol = solutions[0]  # use the solution found by the best the first optimizer
inputs_test = img_to_tensor(imgs_test)
neural_net = NeuralNet()
prediction = neural_net.compute_predictions(x_sol, inputs_test)
predicted_labels = torch.argmax(prediction, dim=1)
correctness = predicted_labels == labels_test

# Now compute the counts for each class separately
for i, name in enumerate(names):
    indices_class = (labels_test == i)
    num_in_class = indices_class.sum().item()
    num_correct_in_class = (correctness & indices_class).sum().item()
    print(f"   Class '{name:10s}': {num_correct_in_class:5d} correct / {num_in_class} --> accuracy: {num_correct_in_class/num_in_class*100:.2f}%")

print(f"Total accuracy on test set: {correctness.sum().item()/correctness.size(0)*100:.2f}%")


In [None]:
# Visualize a couple of images from the training dataset
rng = np.random.default_rng(50)
indices = rng.choice(imgs_test.shape[0], size=15, replace=False)
fig = show_images(imgs_test[indices], labels_test[indices], names, predicted_labels=predicted_labels[indices], n_rows=3)
plt.show()