Source code for bob.ip.binseg.engine.inferencer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import logging
import time
import datetime
import numpy as np
import torch
import pandas as pd
import torchvision.transforms.functional as VF
from tqdm import tqdm

import bob.io.base

from bob.ip.binseg.utils.metric import SmoothedValue, base_metrics
from bob.ip.binseg.utils.plot import precision_recall_f1iso_confintval
from bob.ip.binseg.utils.summary import summary



[docs]def batch_metrics(predictions, ground_truths, names, output_folder, logger): """ Calculates metrics on the batch and saves it to disc Parameters ---------- predictions : :py:class:`torch.Tensor` tensor with pixel-wise probabilities ground_truths : :py:class:`torch.Tensor` tensor with binary ground-truth names : list list of file names output_folder : str output path logger : :py:class:`logging.Logger` python logger Returns ------- list list containing batch metrics: ``[name, threshold, precision, recall, specificity, accuracy, jaccard, f1_score]`` """ step_size = 0.01 batch_metrics = [] for j in range(predictions.size()[0]): # ground truth byte gts = ground_truths[j].byte() file_name = "{}.csv".format(names[j]) logger.info("saving {}".format(file_name)) with open (os.path.join(output_folder,file_name), "w+") as outfile: outfile.write("threshold, precision, recall, specificity, accuracy, jaccard, f1_score\n") for threshold in np.arange(0.0,1.0,step_size): # threshold binary_pred = torch.gt(predictions[j], threshold).byte() # equals and not-equals equals = torch.eq(binary_pred, gts) # tensor notequals = torch.ne(binary_pred, gts) # tensor # true positives tp_tensor = (gts * binary_pred ) # tensor tp_count = torch.sum(tp_tensor).item() # scalar # false positives fp_tensor = torch.eq((binary_pred + tp_tensor), 1) fp_count = torch.sum(fp_tensor).item() # true negatives tn_tensor = equals - tp_tensor tn_count = torch.sum(tn_tensor).item() # false negatives fn_tensor = notequals - fp_tensor fn_count = torch.sum(fn_tensor).item() # calc metrics metrics = base_metrics(tp_count, fp_count, tn_count, fn_count) # write to disk outfile.write("{:.2f},{:.5f},{:.5f},{:.5f},{:.5f},{:.5f},{:.5f} \n".format(threshold, *metrics)) batch_metrics.append([names[j],threshold, *metrics ]) return batch_metrics
[docs]def save_probability_images(predictions, names, output_folder, logger): """ Saves probability maps as image in the same format as the test image Parameters ---------- predictions : :py:class:`torch.Tensor` tensor with pixel-wise probabilities names : list list of file names output_folder : str output path logger : :py:class:`logging.Logger` python logger """ images_subfolder = os.path.join(output_folder,'images') for j in range(predictions.size()[0]): img = VF.to_pil_image(predictions.cpu().data[j]) filename = '{}.png'.format(names[j].split(".")[0]) fullpath = os.path.join(images_subfolder, filename) logger.info("saving {}".format(fullpath)) fulldir = os.path.dirname(fullpath) if not os.path.exists(fulldir): os.makedirs(fulldir) img.save(fullpath)
[docs]def save_hdf(predictions, names, output_folder, logger): """ Saves probability maps as image in the same format as the test image Parameters ---------- predictions : :py:class:`torch.Tensor` tensor with pixel-wise probabilities names : list list of file names output_folder : str output path logger : :py:class:`logging.Logger` python logger """ hdf5_subfolder = os.path.join(output_folder,'hdf5') if not os.path.exists(hdf5_subfolder): os.makedirs(hdf5_subfolder) for j in range(predictions.size()[0]): img = predictions.cpu().data[j].squeeze(0).numpy() filename = '{}.hdf5'.format(names[j].split(".")[0]) fullpath = os.path.join(hdf5_subfolder, filename) logger.info("saving {}".format(filename)) fulldir = os.path.dirname(fullpath) if not os.path.exists(fulldir): os.makedirs(fulldir) bob.io.base.save(img, fullpath)
[docs]def do_inference( model, data_loader, device, output_folder = None ): """ Run inference and calculate metrics Parameters --------- model : :py:class:`torch.nn.Module` neural network model (e.g. DRIU, HED, UNet) data_loader : py:class:`torch.torch.utils.data.DataLoader` device : str device to use ``'cpu'`` or ``'cuda'`` output_folder : str """ logger = logging.getLogger("bob.ip.binseg.engine.inference") logger.info("Start evaluation") logger.info("Output folder: {}, Device: {}".format(output_folder, device)) results_subfolder = os.path.join(output_folder,'results') os.makedirs(results_subfolder,exist_ok=True) model.eval().to(device) # Sigmoid for probabilities sigmoid = torch.nn.Sigmoid() # Setup timers start_total_time = time.time() times = [] # Collect overall metrics metrics = [] for samples in tqdm(data_loader): names = samples[0] images = samples[1].to(device) ground_truths = samples[2].to(device) with torch.no_grad(): start_time = time.perf_counter() outputs = model(images) # necessary check for hed architecture that uses several outputs # for loss calculation instead of just the last concatfuse block if isinstance(outputs,list): outputs = outputs[-1] probabilities = sigmoid(outputs) batch_time = time.perf_counter() - start_time times.append(batch_time) logger.info("Batch time: {:.5f} s".format(batch_time)) b_metrics = batch_metrics(probabilities, ground_truths, names,results_subfolder, logger) metrics.extend(b_metrics) # Create probability images save_probability_images(probabilities, names, output_folder, logger) # save hdf5 save_hdf(probabilities, names, output_folder, logger) # DataFrame df_metrics = pd.DataFrame(metrics,columns= \ ["name", "threshold", "precision", "recall", "specificity", "accuracy", "jaccard", "f1_score"]) # Report and Averages metrics_file = "Metrics.csv".format(model.name) metrics_path = os.path.join(results_subfolder, metrics_file) logger.info("Saving average over all input images: {}".format(metrics_file)) avg_metrics = df_metrics.groupby('threshold').mean() std_metrics = df_metrics.groupby('threshold').std() # Uncomment below for F1-score calculation based on average precision and metrics instead of # F1-scores of individual images. This method is in line with Maninis et. al. (2016) #avg_metrics["f1_score"] = (2* avg_metrics["precision"]*avg_metrics["recall"])/ \ # (avg_metrics["precision"]+avg_metrics["recall"]) avg_metrics["std_pr"] = std_metrics["precision"] avg_metrics["pr_upper"] = avg_metrics['precision'] + avg_metrics["std_pr"] avg_metrics["pr_lower"] = avg_metrics['precision'] - avg_metrics["std_pr"] avg_metrics["std_re"] = std_metrics["recall"] avg_metrics["re_upper"] = avg_metrics['recall'] + avg_metrics["std_re"] avg_metrics["re_lower"] = avg_metrics['recall'] - avg_metrics["std_re"] avg_metrics["std_f1"] = std_metrics["f1_score"] avg_metrics.to_csv(metrics_path) maxf1 = avg_metrics['f1_score'].max() optimal_f1_threshold = avg_metrics['f1_score'].idxmax() logger.info("Highest F1-score of {:.5f}, achieved at threshold {}".format(maxf1, optimal_f1_threshold)) # Plotting np_avg_metrics = avg_metrics.to_numpy().T fig_name = "precision_recall.pdf" logger.info("saving {}".format(fig_name)) fig = precision_recall_f1iso_confintval([np_avg_metrics[0]],[np_avg_metrics[1]],[np_avg_metrics[7]],[np_avg_metrics[8]],[np_avg_metrics[10]],[np_avg_metrics[11]], [model.name,None], title=output_folder) fig_filename = os.path.join(results_subfolder, fig_name) fig.savefig(fig_filename) # Report times total_inference_time = str(datetime.timedelta(seconds=int(sum(times)))) average_batch_inference_time = np.mean(times) total_evalution_time = str(datetime.timedelta(seconds=int(time.time() - start_total_time ))) logger.info("Average batch inference time: {:.5f}s".format(average_batch_inference_time)) times_file = "Times.txt" logger.info("saving {}".format(times_file)) with open (os.path.join(results_subfolder,times_file), "w+") as outfile: date = datetime.datetime.now() outfile.write("Date: {} \n".format(date.strftime("%Y-%m-%d %H:%M:%S"))) outfile.write("Total evaluation run-time: {} \n".format(total_evalution_time)) outfile.write("Average batch inference time: {} \n".format(average_batch_inference_time)) outfile.write("Total inference time: {} \n".format(total_inference_time)) # Save model summary summary_file = 'ModelSummary.txt' logger.info("saving {}".format(summary_file)) with open (os.path.join(results_subfolder,summary_file), "w+") as outfile: summary(model,outfile)