import torch
import numpy as np
from tqdm import tqdm
from batorch.sgmcmc.samplers import Sampler
from batorch.utils.data import DataModule
from sklearn.metrics import r2_score
import logging
logger = logging.getLogger(__name__)
def activate_dropout(model):
"""Function that enables dropout.
:param model: A torch module.
"""
for m in model.modules():
if m.__class__.__name__.startswith('Dropout'):
m.train()
def deactivate_dropout(model):
"""Function that disables dropout.
:param model: A torch module.
"""
for m in model.modules():
if m.__class__.__name__.startswith('Dropout'):
m.eval()
def r2_scores(stage: str, y_true, y_pred):
r"""Function that computes the R2 score defind as:
.. math::
R^2(y,\hat{y}) = 1 - \frac{\mathrm{MSE}(y,\hat{y})}{\mathrm{Var}(y)}
where :math:`\hat{y}` denotes the prediction made by the surrogate model. Note that this function computes separate :math:`R^2` scores
for each column in the matrix :math:`\hat{y}`.
This function is used after each epoch and it is assumed that `y_true` and `y_pred` are a list of tensors or dicts that have been appended during
the epoch. Each entry of the list corresponds to a batch. These lists are automatically concatenated in this function.
:param stage: String that can be equal to train, val, or test.
:param y_true: List of target tensors or list of dicts
:param y_pred:
:return: Dict with keys of the form stage_variableName_colNumber. If `y_true` and `y_pred` are dicts, then their keys are used for variableName. If `y_true` and `y_pred` are tensors, then variableName is replaced by the string "output".
"""
scores = dict()
if isinstance(y_true[0],torch.Tensor) and isinstance(y_pred[0],torch.Tensor):
y_true = torch.concat(y_true,dim=0).detach()
y_pred = torch.concat(y_pred,dim=0).detach()
for col in range(y_pred.size(1)):
scores[stage + "_" + "output_" + str(col)] = r2_score(y_true[:,col].detach().cpu().numpy(), y_pred[:,col].detach().cpu().numpy())
elif isinstance(y_true[0],dict) and isinstance(y_pred[0],dict):
y_true_concat=dict()
y_pred_concat=dict()
for key in set(y_true[0]).intersection(set(y_pred[0])):
y_true_concat[key] = []
y_pred_concat[key] = []
for i in range(len(y_true)):
y_true_concat[key].append(y_true[i][key])
y_pred_concat[key].append(y_pred[i][key])
y_true_concat[key] = torch.concat(y_true_concat[key])
y_pred_concat[key] = torch.concat(y_pred_concat[key])
y_true_concat[key] = y_true_concat[key].reshape(y_pred_concat[key].size())
for col in range(y_true_concat[key].size(1)):
scores[stage + "_" + key + "_" + str(col)] = r2_score(y_true_concat[key][:,col].detach().cpu().numpy(), y_pred_concat[key][:,col].detach().cpu().numpy())
return scores
[docs]def train_loop(num_epoch, model, train_loader, val_loader, optimizer, scheduler, loglike, writer, device, fold=1):
"""Function that performs the a basic training loop for estimating the maximum likelihood (MLE) or maximum a posteriori (MAP).
:param num_epoch: Number of sweeps through the dataset.
:param model: Neural network.
:param train_loader: Dataloader that sweeps through the training dataset.
:param val_loader: Dataloader that sweeps through the validation dataset.
:param optimizer: Optimizer.
:param scheduler: Step size scheduler
:param loglike: Negative log likelihood function
:param writer: Tensorboard summary writer
:param device: Device to be used (cpu or cuda)
:param fold: The fold number, defaults to 1
:return: Dictionnary with losses
"""
# pbar = tqdm(range(num_epoch))
pbar = range(num_epoch)
mean_loss_val=None
mean_loss_train_hist = []
mean_loss_val_hist = []
for epoch in pbar:
model.train()
mean_loss_train=0
y_train_pred = []
y_train = []
for batch_idx, (x, y) in enumerate(train_loader):
# global_step+=1
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
output = model(x)
y_train_pred.append(output)
y_train.append(y.reshape(output.size()))
loss = loglike(output, y.reshape(output.size()))
loss.backward()
optimizer.step()
mean_loss_train+=loss.detach().item()
mean_loss_train/=(batch_idx+1)
mean_loss_train_hist.append(mean_loss_train)
if writer is not None:
r2_train = r2_scores("train_fold_" + str(fold), y_train, y_train_pred)
writer.add_scalars(main_tag="r2", tag_scalar_dict=r2_train, global_step=epoch)
# scheduler_metric = mean_loss_train
if val_loader is not None:
model.eval()
mean_loss_val=0
y_val_pred = []
y_val = []
for batch_idx, (x, y) in enumerate(val_loader):
x, y = x.to(device), y.to(device)
output = model(x)
y_val_pred.append(output)
y_val.append(y.reshape(output.size()))
loss = loglike(output, y.reshape(output.size()))
mean_loss_val+=loss.detach().item()
mean_loss_val/=(batch_idx+1)
mean_loss_val_hist.append(mean_loss_val)
if writer is not None:
r2_val = r2_scores("val_fold_" + str(fold), y_val, y_val_pred)
writer.add_scalars(main_tag="r2", tag_scalar_dict=r2_val, global_step=epoch)
# scheduler_metric = mean_loss_val
writer.add_scalars(main_tag="Loss", tag_scalar_dict={"Train_fold_" + str(fold): mean_loss_train, "Val_fold_" + str(fold): mean_loss_val}, global_step=epoch)
else:
if writer is not None:
writer.add_scalars(main_tag="Loss", tag_scalar_dict={"Train_fold_" + str(fold): mean_loss_train}, global_step=epoch)
# scheduler.step(scheduler_metric)
if scheduler is not None:
scheduler.step()
# pbar.set_description(f"Epoch {epoch} | Train loss: {mean_loss_train}| Val loss: {mean_loss_val}")
return {"train_epoch_loss": np.array(mean_loss_train_hist), "val_epoch_loss": np.array(mean_loss_val_hist)}
[docs]def test_loop(model, test_loader, loglike, writer, device, fold=1, enable_dropout=False):
"""Function that sweeps through a test dataset and outputs the model predictions.
:param model: Neural network
:param test_loader: Dataloader that sweeps through a test dataset
:param loglike: Negative log likelihood function
:param writer: Tensorboard SummaryWriter
:param device: Device to be used (cpu or cuda)
:return: Dictionnary with losses and predictions
"""
if enable_dropout:
activate_dropout(model)
mean_loss_test=0
y_pred=[]
for batch_idx, (x, y) in enumerate(test_loader):
x, y = x.to(device), y.to(device)
output = model(x)
loss = loglike(output, y.reshape(output.size()))
mean_loss_test+=loss.detach().item()
y_pred.append(output)
mean_loss_test/=batch_idx+1
if writer is not None:
writer.add_scalar("Loss/test_fold_" + str(fold), mean_loss_test)
if isinstance(y_pred[0],torch.Tensor):
y_pred_concat = torch.concat(y_pred,dim=0).detach().cpu().numpy()
elif isinstance(y_pred[0],dict):
y_pred_concat=dict()
for key in y_pred[0].keys():
y_pred_concat[key] = []
for i in range(len(y_pred)):
y_pred_concat[key].append(y_pred[i][key])
y_pred_concat[key] = torch.concat(y_pred_concat[key]).detach().cpu().numpy()
# np.save(os.path.join(os.getcwd(),"test_predictions.npy"), test_predictions)
if enable_dropout:
deactivate_dropout(model)
return {"epoch_loss": mean_loss_test, "preds": y_pred_concat}
[docs]def mcmc(sampler: Sampler, datamodule: DataModule, max_burnin_iterations: int, max_sampling_iterations: int, device: torch.device, quantization=None):
"""Pipeline for running SGMCMC algorithms.
:param sampler: SGMCMC sampler.
:param datamodule: DataModule holding at least a training dataset and a test dataset.
:param max_burnin_iterations: Number of burnin iterations.
:param max_sampling_iterations: Number of iterations after burnin.
:param device: Device (cuda or cpu).
:param quantization: Quantization object, default is None.
"""
logger.info(f"[1] Starting burn-in phase with {max_burnin_iterations} iterations!")
num_burnin_iterations=0
def burnin_generator():
while num_burnin_iterations<max_burnin_iterations:
yield
logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}")
pbar = tqdm(burnin_generator())
for _ in pbar:
for batch_idx, (x, y) in enumerate(datamodule.train_dataloader()):
x, y = x.to(device), y.to(device)
loss = sampler.step(x, y)
num_burnin_iterations+=1
pbar.set_description(f"Iteration {num_burnin_iterations} | Stochastic log-posterior: {loss}")
logger.info(f"[2] Starting sampling phase! with {max_sampling_iterations} iterations")
num_sampling_iterations=0
def generator():
while num_sampling_iterations<max_sampling_iterations:
yield
logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}")
samples=[]
pbar = tqdm(generator())
for _ in pbar:
for batch_idx, (x, y) in enumerate(datamodule.train_dataloader()):
x, y = x.to(device), y.to(device)
loss = sampler.step(x, y)
params = sampler.get_params()
samples.append(params)
num_sampling_iterations+=1
pbar.set_description(f"Iteration {num_sampling_iterations} | Stochastic log-posterior: {loss}")
samples = torch.stack(samples,dim=0)
logger.info(f"[2] Done.")
logger.info(f"Memory allocated: {torch.cuda.memory_allocated()}, Memory reserved : {torch.cuda.memory_reserved()}")
if quantization is not None:
logger.info(f"[3] Starting the selected quantization method!")
pi = quantization.select({"samples": samples}, memory_is_bottleneck=False, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))
samples = torch.stack([samples[i] for i in pi],dim=0)
logger.info(f"[5] Predicting!")
predictions = []
with torch.no_grad():
sampler.negloglikelihood.eval()
for i in range(len(samples)):
sampler.set_params(samples[i])
predictions_i = []
for batch_idx, (x, y) in enumerate(datamodule.test_dataloader()):
x, y = x.to(device), y.to(device)
prediction_batch = sampler.negloglikelihood.model(x)
predictions_i.append(prediction_batch)
predictions_i = torch.concat(predictions_i, dim=0)
predictions.append(predictions_i)
predictions = torch.stack(predictions,dim=0).cpu().detach().numpy()
# np.save("test_predictions.npy", predictions)
logger.info(f"[6] Computing statistics")
from batorch.utils.misc import compute_stats_regression
prob_levels = np.linspace(start=0.05, stop=0.95, num=19)
sd_noise=datamodule.test_dataset.sig_noise
x_test=datamodule.test_dataset.x.cpu().detach().numpy().squeeze()
y_test=datamodule.test_dataset.y.cpu().detach().numpy().squeeze()
y_true=datamodule.test_dataset.y_true_mean.cpu().detach().numpy().squeeze()
list_statistics = []
for prob_level in prob_levels:
statistics = compute_stats_regression(probability_level_ci=prob_level, sd_noise=sd_noise, x_test=x_test, y_test=y_test, y_true=y_true, predictions=predictions)
re = statistics["ratio_in_epistemic"]
ra = statistics["ratio_in_aleatoric"]
logger.info(f"Prob. level: {prob_level}, Ratio Epistemic: {re}, Ratio Aleatoric: {ra}")
list_statistics.append(statistics)
# np.save("statistics.npy", list_statistics)
return samples, predictions, list_statistics