Source code for batorch.utils.pipelines

import torch
import numpy as np

from tqdm import tqdm

from batorch.sgmcmc.samplers import Sampler
from batorch.utils.data import DataModule

from sklearn.metrics import r2_score

import logging
logger = logging.getLogger(__name__)

def activate_dropout(model):
    """Function that enables dropout.

    :param model: A torch module.
    """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.train()

def deactivate_dropout(model):
    """Function that disables dropout.

    :param model: A torch module.
    """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.eval()

def r2_scores(stage: str, y_true, y_pred):
    r"""Function that computes the R2 score defind as:

    .. math::
        R^2(y,\hat{y}) = 1 - \frac{\mathrm{MSE}(y,\hat{y})}{\mathrm{Var}(y)}

    where :math:`\hat{y}` denotes the prediction made by the surrogate model. Note that this function computes separate :math:`R^2` scores
    for each column in the matrix :math:`\hat{y}`.
    
    This function is used after each epoch and it is assumed that `y_true` and `y_pred` are a list of tensors or dicts that have been appended during
    the epoch. Each entry of the list corresponds to a batch. These lists are automatically concatenated in this function.

    :param stage: String that can be equal to train, val, or test.
    :param y_true: List of target tensors or list of dicts
    :param y_pred: 
    :return: Dict with keys of the form stage_variableName_colNumber. If `y_true` and `y_pred` are dicts, then their keys are used for variableName. If `y_true` and `y_pred` are tensors, then variableName is replaced by the string "output".
    """
    scores = dict()
    if isinstance(y_true[0],torch.Tensor) and isinstance(y_pred[0],torch.Tensor):
        y_true = torch.concat(y_true,dim=0).detach()
        y_pred = torch.concat(y_pred,dim=0).detach()
        for col in range(y_pred.size(1)):
            scores[stage + "_" + "output_" + str(col)] = r2_score(y_true[:,col].detach().cpu().numpy(), y_pred[:,col].detach().cpu().numpy())
    elif isinstance(y_true[0],dict) and isinstance(y_pred[0],dict):
        y_true_concat=dict()
        y_pred_concat=dict()
        for key in set(y_true[0]).intersection(set(y_pred[0])):
            y_true_concat[key] = []
            y_pred_concat[key] = []
            for i in range(len(y_true)):
                y_true_concat[key].append(y_true[i][key])
                y_pred_concat[key].append(y_pred[i][key])
            y_true_concat[key] = torch.concat(y_true_concat[key])
            y_pred_concat[key] = torch.concat(y_pred_concat[key])
            y_true_concat[key] = y_true_concat[key].reshape(y_pred_concat[key].size())
            for col in range(y_true_concat[key].size(1)):
                scores[stage + "_" + key + "_" + str(col)] = r2_score(y_true_concat[key][:,col].detach().cpu().numpy(), y_pred_concat[key][:,col].detach().cpu().numpy())
    return scores

[docs]def train_loop(num_epoch, model, train_loader, val_loader, optimizer, scheduler, loglike, writer, device, fold=1): """Function that performs the a basic training loop for estimating the maximum likelihood (MLE) or maximum a posteriori (MAP). :param num_epoch: Number of sweeps through the dataset. :param model: Neural network. :param train_loader: Dataloader that sweeps through the training dataset. :param val_loader: Dataloader that sweeps through the validation dataset. :param optimizer: Optimizer. :param scheduler: Step size scheduler :param loglike: Negative log likelihood function :param writer: Tensorboard summary writer :param device: Device to be used (cpu or cuda) :param fold: The fold number, defaults to 1 :return: Dictionnary with losses """ # pbar = tqdm(range(num_epoch)) pbar = range(num_epoch) mean_loss_val=None mean_loss_train_hist = [] mean_loss_val_hist = [] for epoch in pbar: model.train() mean_loss_train=0 y_train_pred = [] y_train = [] for batch_idx, (x, y) in enumerate(train_loader): # global_step+=1 x, y = x.to(device), y.to(device) optimizer.zero_grad() output = model(x) y_train_pred.append(output) y_train.append(y.reshape(output.size())) loss = loglike(output, y.reshape(output.size())) loss.backward() optimizer.step() mean_loss_train+=loss.detach().item() mean_loss_train/=(batch_idx+1) mean_loss_train_hist.append(mean_loss_train) if writer is not None: r2_train = r2_scores("train_fold_" + str(fold), y_train, y_train_pred) writer.add_scalars(main_tag="r2", tag_scalar_dict=r2_train, global_step=epoch) # scheduler_metric = mean_loss_train if val_loader is not None: model.eval() mean_loss_val=0 y_val_pred = [] y_val = [] for batch_idx, (x, y) in enumerate(val_loader): x, y = x.to(device), y.to(device) output = model(x) y_val_pred.append(output) y_val.append(y.reshape(output.size())) loss = loglike(output, y.reshape(output.size())) mean_loss_val+=loss.detach().item() mean_loss_val/=(batch_idx+1) mean_loss_val_hist.append(mean_loss_val) if writer is not None: r2_val = r2_scores("val_fold_" + str(fold), y_val, y_val_pred) writer.add_scalars(main_tag="r2", tag_scalar_dict=r2_val, global_step=epoch) # scheduler_metric = mean_loss_val writer.add_scalars(main_tag="Loss", tag_scalar_dict={"Train_fold_" + str(fold): mean_loss_train, "Val_fold_" + str(fold): mean_loss_val}, global_step=epoch) else: if writer is not None: writer.add_scalars(main_tag="Loss", tag_scalar_dict={"Train_fold_" + str(fold): mean_loss_train}, global_step=epoch) # scheduler.step(scheduler_metric) if scheduler is not None: scheduler.step() # pbar.set_description(f"Epoch {epoch} | Train loss: {mean_loss_train}| Val loss: {mean_loss_val}") return {"train_epoch_loss": np.array(mean_loss_train_hist), "val_epoch_loss": np.array(mean_loss_val_hist)}
[docs]def test_loop(model, test_loader, loglike, writer, device, fold=1, enable_dropout=False): """Function that sweeps through a test dataset and outputs the model predictions. :param model: Neural network :param test_loader: Dataloader that sweeps through a test dataset :param loglike: Negative log likelihood function :param writer: Tensorboard SummaryWriter :param device: Device to be used (cpu or cuda) :return: Dictionnary with losses and predictions """ if enable_dropout: activate_dropout(model) mean_loss_test=0 y_pred=[] for batch_idx, (x, y) in enumerate(test_loader): x, y = x.to(device), y.to(device) output = model(x) loss = loglike(output, y.reshape(output.size())) mean_loss_test+=loss.detach().item() y_pred.append(output) mean_loss_test/=batch_idx+1 if writer is not None: writer.add_scalar("Loss/test_fold_" + str(fold), mean_loss_test) if isinstance(y_pred[0],torch.Tensor): y_pred_concat = torch.concat(y_pred,dim=0).detach().cpu().numpy() elif isinstance(y_pred[0],dict): y_pred_concat=dict() for key in y_pred[0].keys(): y_pred_concat[key] = [] for i in range(len(y_pred)): y_pred_concat[key].append(y_pred[i][key]) y_pred_concat[key] = torch.concat(y_pred_concat[key]).detach().cpu().numpy() # np.save(os.path.join(os.getcwd(),"test_predictions.npy"), test_predictions) if enable_dropout: deactivate_dropout(model) return {"epoch_loss": mean_loss_test, "preds": y_pred_concat}
[docs]def mcmc(sampler: Sampler, datamodule: DataModule, max_burnin_iterations: int, max_sampling_iterations: int, device: torch.device, quantization=None): """Pipeline for running SGMCMC algorithms. :param sampler: SGMCMC sampler. :param datamodule: DataModule holding at least a training dataset and a test dataset. :param max_burnin_iterations: Number of burnin iterations. :param max_sampling_iterations: Number of iterations after burnin. :param device: Device (cuda or cpu). :param quantization: Quantization object, default is None. """ logger.info(f"[1] Starting burn-in phase with {max_burnin_iterations} iterations!") num_burnin_iterations=0 def burnin_generator(): while num_burnin_iterations<max_burnin_iterations: yield logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}") pbar = tqdm(burnin_generator()) for _ in pbar: for batch_idx, (x, y) in enumerate(datamodule.train_dataloader()): x, y = x.to(device), y.to(device) loss = sampler.step(x, y) num_burnin_iterations+=1 pbar.set_description(f"Iteration {num_burnin_iterations} | Stochastic log-posterior: {loss}") logger.info(f"[2] Starting sampling phase! with {max_sampling_iterations} iterations") num_sampling_iterations=0 def generator(): while num_sampling_iterations<max_sampling_iterations: yield logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}") samples=[] pbar = tqdm(generator()) for _ in pbar: for batch_idx, (x, y) in enumerate(datamodule.train_dataloader()): x, y = x.to(device), y.to(device) loss = sampler.step(x, y) params = sampler.get_params() samples.append(params) num_sampling_iterations+=1 pbar.set_description(f"Iteration {num_sampling_iterations} | Stochastic log-posterior: {loss}") samples = torch.stack(samples,dim=0) logger.info(f"[2] Done.") logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}") if quantization is not None: logger.info(f"[3] Starting the selected quantization method!") pi = quantization.select({"samples": samples}, memory_is_bottleneck=False, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")) samples = torch.stack([samples[i] for i in pi],dim=0) logger.info(f"[5] Predicting!") predictions = [] with torch.no_grad(): sampler.negloglikelihood.eval() for i in range(len(samples)): sampler.set_params(samples[i]) predictions_i = [] for batch_idx, (x, y) in enumerate(datamodule.test_dataloader()): x, y = x.to(device), y.to(device) prediction_batch = sampler.negloglikelihood.model(x) predictions_i.append(prediction_batch) predictions_i = torch.concat(predictions_i, dim=0) predictions.append(predictions_i) predictions = torch.stack(predictions,dim=0).cpu().detach().numpy() # np.save("test_predictions.npy", predictions) logger.info(f"[6] Computing statistics") from batorch.utils.misc import compute_stats_regression prob_levels = np.linspace(start=0.05, stop=0.95, num=19) sd_noise=datamodule.test_dataset.sig_noise x_test=datamodule.test_dataset.x.cpu().detach().numpy().squeeze() y_test=datamodule.test_dataset.y.cpu().detach().numpy().squeeze() y_true=datamodule.test_dataset.y_true_mean.cpu().detach().numpy().squeeze() list_statistics = [] for prob_level in prob_levels: statistics = compute_stats_regression(probability_level_ci=prob_level, sd_noise=sd_noise, x_test=x_test, y_test=y_test, y_true=y_true, predictions=predictions) re = statistics["ratio_in_epistemic"] ra = statistics["ratio_in_aleatoric"] logger.info(f"Prob. level: {prob_level}, Ratio Epistemic: {re}, Ratio Aleatoric: {ra}") list_statistics.append(statistics) # np.save("statistics.npy", list_statistics) return samples, predictions, list_statistics