Source code for protopipe.mva.diagnostic

import numpy as np
from scipy.optimize import curve_fit
from sklearn.metrics import accuracy_score

from .utils import plot_hist, plot_distributions, plot_roc_curve

__all__ = [
    "ModelDiagnostic",
    "RegressorDiagnostic",
    "ClassifierDiagnostic",
    "BoostedDecisionTreeDiagnostic",
]


[docs]class ModelDiagnostic(object): """ Base class for model diagnostics. Parameters ---------- model: `~sklearn.base.BaseEstimator` Best model feature_name_list: list List of the features used to buil the model target_name: str Name of the target (e.g. score, gamaness, energy, etc.) """ def __init__(self, model, feature_name_list, target_name): self.model = model self.feature_name_list = feature_name_list self.target_name = target_name
[docs] def plot_feature_importance(self, ax, **kwargs): """ Plot importance of features Parameters ---------- ax: `~matplotlib.axes.Axes` Axis """ if ax is None: import matplotlib.pyplot as plt ax = plt.gca() importance = self.model.feature_importances_ importance, feature_labels = zip( *sorted(zip(importance, self.feature_name_list), reverse=True) ) bin_edges = np.arange(0, len(importance) + 1) bin_width = bin_edges[1:] - bin_edges[:-1] - 0.1 ax.bar(bin_edges[:-1], importance, width=bin_width, **kwargs) ax.set_xticks(np.arange(0, len(importance) + 1)) ax.set_xticklabels(feature_labels, rotation=75) return ax
[docs] def plot_features( self, data_list, nbin=30, hist_kwargs_list={}, error_kw_list={}, ncols=2 ): """ Plot model features for different data set (e.g. training and test samples). Parameters ---------- data_list: list List of data nbin: int Number of bin hist_kwargs_list: dict Dictionary with histogram options error_kw_list: dict Dictionary with error bar options ncols: int Number of columns """ return plot_distributions( self.feature_name_list, data_list, nbin, hist_kwargs_list, error_kw_list, ncols, )
[docs] def add_image_model_output(self): raise NotImplementedError("Please Implement this method")
[docs]class RegressorDiagnostic(ModelDiagnostic): """ Class to plot several diagnostic plot for regression Parameters ---------- model: sklearn.base.BaseEstimator Scikit model feature_name_list: str List of features target_name: str Name of target (e.g. `mc_energy`) data_train: `~pandas.DataFrame` Data frame data_test: `~pandas.DataFrame` Data frame """ def __init__( self, model, feature_name_list, target_name, data_train, data_test, output_name ): super().__init__(model, feature_name_list, target_name) self.data_train = data_train self.data_test = data_test self.target_estimation_name = self.target_name self.output_name = output_name self.output_name_img = output_name + "_img" # Compute and add target estimation self.data_train = self.add_image_model_output( self.data_train, col_name=self.output_name_img ) self.data_test = self.add_image_model_output( self.data_test, col_name=self.output_name_img )
[docs] @staticmethod def plot_resolution_distribution( ax, y_true, y_reco, nbin=100, fit_range=[-3, 3], fit_kwargs={}, hist_kwargs={} ): """ Compute bias and resolution with a gaussian fit and returns a plot with the fit results and the migration distribution """ def gauss(x, ampl, mean, std): return ampl * np.exp(-0.5 * ((x - mean) / std) ** 2) if ax is None: import matplotlib.pyplot as plt ax = plt.gca() migration = (y_reco - y_true) / y_true bin_edges = np.linspace(fit_range[0], fit_range[-1], nbin + 1, True) y, tmp = np.histogram(migration, bins=bin_edges) x = (bin_edges[:-1] + bin_edges[1:]) / 2 try: param, cov = curve_fit(gauss, x, y) except: param = [-1, -1, -1] cov = [[]] print("Not enough stat ? (#evts={})".format(len(y_true))) plot_hist( ax=ax, data=migration, nbin=nbin, yerr=False, norm=False, limit=fit_range, hist_kwargs=hist_kwargs, ) ax.plot(x, gauss(x, param[0], param[1], param[2]), **fit_kwargs) return ax, param, cov
[docs] def add_image_model_output(self, data, col_name): data[col_name] = self.model.predict(data[self.feature_name_list]) return data
[docs]class ClassifierDiagnostic(ModelDiagnostic): """ Class to plot several diagnostic plot for classification. Assume that positives and negatives are respectively labeled as 1 and 0. Parameters ---------- model: sklearn.base.BaseEstimator Scikit model feature_name_list: list List of features model_output_name: str Name of output is_output_proba: bool If false, `decision_function` will be called, otherwise, predict_proba. In the last case we only consider the probability for signal event """ def __init__( self, model, feature_name_list, target_name, data_train, data_test, model_output_name="score", is_output_proba=False, ): super().__init__(model, feature_name_list, target_name) self.data_train = data_train self.data_test = data_test self.model_output_name = model_output_name self.is_output_proba = is_output_proba # Compute and add model output self.data_train = self.add_image_model_output( self.data_train, col_name=self.model_output_name ) self.data_test = self.add_image_model_output( self.data_test, col_name=self.model_output_name )
[docs] def add_image_model_output(self, data, col_name): """Add model output column""" if self.is_output_proba is False: data[col_name] = self.model.decision_function(data[self.feature_name_list]) else: # Interested in signal probability data[col_name] = self.model.predict_proba(data[self.feature_name_list])[ :, 1 ] return data
[docs] def plot_image_model_output_distribution( self, cut=None, nbin=30, hist_kwargs_list=[ { "edgecolor": "blue", "color": "blue", "label": "Gamma training sample", "alpha": 0.2, "fill": True, "ls": "-", "lw": 2, }, { "edgecolor": "blue", "color": "blue", "label": "Gamma test sample", "alpha": 1, "fill": False, "ls": "--", "lw": 2, }, { "edgecolor": "red", "color": "red", "label": "Proton training sample", "alpha": 0.2, "fill": True, "ls": "-", "lw": 2, }, { "edgecolor": "red", "color": "red", "label": "Proton test sample", "alpha": 1, "fill": False, "ls": "--", "lw": 2, }, ], error_kw_list=[ dict(ecolor="blue", lw=2, capsize=3, capthick=2, alpha=0.2), dict(ecolor="blue", lw=2, capsize=3, capthick=2, alpha=1), dict(ecolor="red", lw=2, capsize=3, capthick=2, alpha=0.2), dict(ecolor="red", lw=2, capsize=3, capthick=2, alpha=1), ], ): """Plot output distribution. Need more output column""" if cut is not None: data_test = self.data_test.query(cut) data_train = self.data_train.query(cut) else: data_test = self.data_test data_train = self.data_train return plot_distributions( [self.model_output_name], [ data_train.query("label==1"), data_test.query("label==1"), data_train.query("label==0"), data_test.query("label==0"), ], nbin, hist_kwargs_list, error_kw_list, 1, )
# def plot_evt_model_output_distribution(self, # data_train, # data_test, # nbin=30, # hist_kwargs_list=[ # {'edgecolor': 'blue', 'color': 'blue', 'label': 'Gamma training sample', # 'alpha': 0.2, 'fill': True, 'ls': '-', 'lw': 2}, # {'edgecolor': 'blue', 'color': 'blue', 'label': 'Gamma test sample', # 'alpha': 1, 'fill': False, 'ls': '--', 'lw': 2}, # {'edgecolor': 'red', 'color': 'red', 'label': 'Proton training sample', # 'alpha': 0.2, 'fill': True, 'ls': '-', 'lw': 2}, # {'edgecolor': 'red', 'color': 'red', 'label': 'Proton test sample', # 'alpha': 1, 'fill': False, 'ls': '--', 'lw': 2} # ], # error_kw_list=[ # dict(ecolor='blue', lw=2, capsize=3, capthick=2, alpha=0.2), # dict(ecolor='blue', lw=2, capsize=3, capthick=2, alpha=1), # dict(ecolor='red', lw=2, capsize=3, capthick=2, alpha=0.2), # dict(ecolor='red', lw=2, capsize=3, capthick=2, alpha=1) # ]): # return plot_distributions( # [self.model_output_name], # [data_train.query('label==1'), data_test.query('label==1'), # data_train.query('label==0'), data_test.query('label==0')], # nbin, # hist_kwargs_list, # error_kw_list, # 1 # ) # # def plot_evt_model_output_distribution_variation(self, data_train, data_test, cut_list, # nbin=30, ncols=2, hist_kwargs_list={}, # error_kw_list={}): # """ # Plot model output distribution for several data set for a list of cut # # Parameters # ---------- # data_train: `~pandas.DataFrame` # Data frame for training sample # data_test:`~pandas.DataFrame` # Data frame for test sample # cut_list: list # List of cuts # nbin: int # Number of bins # ncols: int # Number of column to display variation # hist_kwargs_list: list # list of kwargs # error_kw_list: # list of error_dict # # Returns # ------- # fig: `matplotlib.figure.Figure` # Figure object # axes: list # List of `~matplotlib.axes.Axes` # """ # import matplotlib.pyplot as plt # n_feature = len(cut_list) # nrows = int(n_feature / ncols) if n_feature % ncols == 0 else int( # (n_feature + 1) / ncols) # fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5 * ncols, 3 * nrows)) # if nrows == 1 and ncols == 1: # axes = [axes] # else: # axes = axes.flatten() # # data_list = [data_train.query('label==1'), data_test.query('label==1'), # data_train.query('label==0'), data_test.query('label==0')] # # for i, colname in enumerate(cut_list): # ax = axes[i] # # # Range for binning # range_min = np.array([data.query(cut_list[i])[self.model_output_name].min() for data in data_list]) # range_min = range_min[np.where(np.isfinite(range_min))[0]] # range_min = min(range_min) # # range_max = np.array([data.query(cut_list[i])[self.model_output_name].max() for data in data_list]) # range_max = range_max[np.where(np.isfinite(range_max))[0]] # range_max = min(range_max) # # # myrange = [range_min, range_max] # myrange = [0,1] # # for j, data in enumerate(data_list): # if len(data) == 0: # continue # # ax = plot_hist( # ax=ax, data=data.query(cut_list[i])[self.model_output_name], # nbin=nbin, limit=myrange, # norm=True, yerr=True, # hist_kwargs=hist_kwargs_list[j], # error_kw=error_kw_list[j] # ) # # ax.set_xlim(myrange) # ax.set_xlabel(self.model_output_name) # ax.set_ylabel('Arbitrary units') # ax.legend(loc='best', fontsize='x-small') # ax.set_title(cut_list[i]) # ax.grid() # plt.tight_layout() # # return fig, axes # # def plot_evt_roc_curve_variation(self, ax, data_test, cut_list): # """ # # Parameters # ---------- # ax: `~matplotlib.axes.Axes` # Axis # data_test: `~pd.DataFrame` # Test data # cut_list: `list` # Cut list # # Returns # ------- # ax: `~matplotlib.axes.Axes` # Axis # """ # color = 1. # step_color = 1. / (len(cut_list)) # for i, cut in enumerate(cut_list): # c = color - (i + 1) * step_color # # data = data_test.query(cut) # if len(data) == 0: # continue # # opt = dict(color=str(c), lw=2, label='{}'.format(cut.replace('reco_energy', 'E'))) # plot_roc_curve(ax, data[self.model_output_name], data['label'], **opt) # ax.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') # # return ax
[docs]class BoostedDecisionTreeDiagnostic(object): """ Class producing diagnostic plot for the BDT method """
[docs] @classmethod def plot_error_rate(cls, ax, model, data_scikit, **kwargs): """Diagnostic plot showing error rate as a function of the specialisation""" import matplotlib.pyplot as plt if ax is None: ax = plt.gca() test_errors = [] for test_predict in model.staged_predict(data_scikit["X_test"]): test_errors.append( 1.0 - accuracy_score(test_predict, data_scikit["y_test"]) ) ntrees = len(model) ax.plot(range(1, ntrees + 1), test_errors, **kwargs) ax.set_xlabel("Number of Trees") ax.set_ylabel("Error rate") ax.grid() plt.tight_layout() return ax
[docs] @classmethod def plot_tree_error_rate(cls, ax, model, **kwargs): """Diagnostic plot showing tree error rate""" import matplotlib.pyplot as plt if ax is None: ax = plt.gca() ntrees = len(model) estimator_errors = model.estimator_errors_[:ntrees] ntrees = len(model) ax.plot(range(1, ntrees + 1), estimator_errors, **kwargs) ax.set_xlabel("Tree number") ax.set_ylabel("Error rate / tree") ax.grid() plt.tight_layout() return ax
# class RandomForestDiagnostic(object): # """ # Class producing diagnostic plot for the RF method # """ # # @classmethod # def plot_error_rate