Source code for protopipe.mva.utils

import numpy as np
import pandas as pd
import pickle
import gzip
from sklearn.metrics import auc, roc_curve


[docs]def save_obj(obj, name): """Save object in binary""" with gzip.open(name, "wb") as f: pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
[docs]def load_obj(name): """Load object in binary""" with gzip.open(name, "rb") as f: return pickle.load(f)
[docs]def prepare_data(ds, cuts, label=None): """Add variables in data frame""" ds["log10_hillas_intensity"] = np.log10( ds["hillas_intensity_reco"] ) # THIS SHOULDN'T BE HARDCODED!!! ds["log10_impact_dist"] = np.log10(ds["impact_dist"]) ds["log10_true_energy"] = np.log10(ds["true_energy"]) try: # for classification ds["log10_reco_energy"] = np.log10(ds["reco_energy"]) ds["log10_reco_energy_tel"] = np.log10(ds["reco_energy_tel"]) ds["label"] = np.full(len(ds), label) except: pass ds = ds.query(cuts) return ds
[docs]def make_cut_list(cuts): cut_list = "" for idx, cut in enumerate(cuts): cut_list += cut if idx != len(cuts) - 1: cut_list += " and " return cut_list
[docs]def split_train_test(ds, train_fraction, feature_name_list, target_name): # Get the run id corresponding to 70% of the statistics # (and sort to avoid troubles...) obs_ids = np.sort(pd.unique(ds["obs_id"])) max_train_obs_idx = int(train_fraction * len(obs_ids)) run_max_train = obs_ids[max_train_obs_idx] # Split the data for training data_train = ds.query("obs_id < {}".format(run_max_train)) y_train = data_train[target_name] X_train = data_train[feature_name_list] data_train = data_train.set_index(["obs_id", "event_id"]) # Split the data for training data_test = ds.query("obs_id >= {}".format(run_max_train)) y_test = data_test[target_name] X_test = data_test[feature_name_list] data_test = data_test.set_index(["obs_id", "event_id"]) return X_train, X_test, y_train, y_test, data_train, data_test
[docs]def get_evt_subarray_model_output( data, weight_name=None, keep_cols=["reco_energy"], model_output_name="score_img", model_output_name_evt="score", ): """ Returns DataStore with keepcols + score/target columns of model at the level-subarray-event. Parameters ---------- data: `~pandas.DataFrame` Data frame weight_name: `str` Variable name in data frame to weight events with keep_cols: `list`, optional List of variables to keep in resulting data frame model_output_name: `str`, optional Name of model output (image level) model_output_name_evt: `str`, optional Name of averaged model output (event level) Returns -------- data: `~pandas.DataFrame` Data frame """ keep_cols += [model_output_name] keep_cols += [weight_name] new_data = data[keep_cols].copy(deep=True) new_data[model_output_name_evt] = np.zeros(len(new_data)) new_data.set_index(["tel_id"], append=True, inplace=True) new_data[model_output_name_evt] = new_data.groupby(["obs_id", "event_id"]).apply( lambda g: np.average(g[model_output_name], weights=g[weight_name]) ) # Remove columns new_data = new_data.drop(columns=[model_output_name]) # Remove duplicates new_data = new_data[~new_data.index.duplicated(keep="first")] return new_data
[docs]def get_evt_model_output( data_dict, weight_name=None, reco_energy_label="reco_energy", model_output_name="score_img", model_output_name_evt="score", ): """ Returns DataStore with reco energy + score/target columns of model at the level-event. Parameters ---------- data: `~pandas.DataFrame` Data frame with at least (obs_id, evt_id), image score/proba and label weight_name: `str` Variable name in data frame to weight events with reco_energy_label: `list`, optional List of variables to keep in resulting data frame model_output_name: `str`, optional Name of model output (image level) model_output_name: `str`, optional Name of averaged model output (event level) Returns -------- data: `~pandas.DataFrame` Data frame Warnings -------- Need optimisation, too much loops. """ # Get list of observations obs_ids = np.array([]) for cam_id in data_dict: obs_ids = np.hstack((obs_ids, data_dict[cam_id].index.get_level_values(0))) obs_ids = np.unique(obs_ids) obs_id_list = [] evt_id_list = [] model_output_list = [] reco_energy_list = [] label_list = [] # Loop on observation Id for obs_id in obs_ids: # Get joint list of events evt_ids = np.array([]) for cam_id in data_dict: evt_ids = np.hstack( (evt_ids, data_dict[cam_id].loc[(obs_id,)].index.get_level_values(0)) ) evt_ids = np.unique(evt_ids) for evt_id in evt_ids: output = np.array([]) weight = np.array([]) for cam_id in data_dict: try: # Stack camera information data = data_dict[cam_id].xs(obs_id).xs(evt_id) output = np.hstack((output, data[model_output_name])) weight = np.hstack((weight, data[weight_name])) reco_energy = np.hstack((np.array([]), data[reco_energy_label]))[ 0 ] # single entry is enough label = np.hstack((np.array([]), data["label"]))[ 0 ] # single entry is enough except: pass # No information for this type of camera obs_id_list.append(obs_id) evt_id_list.append(evt_id) model_output_list.append(np.sum((output * weight)) / np.sum(weight)) reco_energy_list.append(reco_energy) label_list.append(label) data = { "obs_id": obs_id_list, "event_id": evt_id_list, model_output_name_evt: model_output_list, reco_energy_label: reco_energy_list, "label": label_list, } return pd.DataFrame(data=data)
[docs]def plot_roc_curve(ax, model_output, y, **kwargs): """Plot ROC curve for a given set of model outputs and labels""" fpr, tpr, _ = roc_curve(y_score=model_output, y_true=y) roc_auc = auc(fpr, tpr) label = "{} (area={:.2f})".format(kwargs.pop("label"), roc_auc) # Remove label ax.plot(fpr, tpr, label=label, **kwargs) return ax
[docs]def plot_hist( ax, data, nbin, limit, norm=False, yerr=False, hist_kwargs={}, error_kw={} ): """Utility function to plot histogram""" bin_edges = np.linspace(limit[0], limit[-1], nbin + 1, True) y, tmp = np.histogram(data, bins=bin_edges) weights = np.ones_like(y) if norm is True: weights = weights / float(np.sum(y)) if yerr is True: yerr = np.sqrt(y) * weights else: yerr = np.zeros(len(y)) centers = 0.5 * (bin_edges[1:] + bin_edges[:-1]) width = bin_edges[1:] - bin_edges[:-1] ax.bar( centers, y * weights, width=width, yerr=yerr, error_kw=error_kw, **hist_kwargs ) return ax
[docs]def plot_distributions( feature_list, data_list, nbin=30, hist_kwargs_list={}, error_kw_list={}, ncols=2 ): """Plot feature distributions for several data set. Returns list of axes.""" import matplotlib.pyplot as plt n_feature = len(feature_list) nrows = ( int(n_feature / ncols) if n_feature % ncols == 0 else int((n_feature + 1) / ncols) ) fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5 * ncols, 5 * nrows)) if nrows == 1 and ncols == 1: axes = [axes] else: axes = axes.flatten() for i, colname in enumerate(feature_list): ax = axes[i] # Range for binning range_min = min([data[colname].min() for data in data_list]) range_max = max([data[colname].max() for data in data_list]) myrange = [range_min, range_max] for j, data in enumerate(data_list): ax = plot_hist( ax=ax, data=data[colname], nbin=nbin, limit=myrange, norm=True, yerr=True, hist_kwargs=hist_kwargs_list[j], error_kw=error_kw_list[j], ) ax.set_xlabel(colname) ax.set_ylabel("Arbitrary units") ax.legend(loc="upper left") ax.grid() plt.tight_layout() return fig, axes
[docs]def plot_profile(ax, data, xcol, ycol, nbin, limit, hist_kwargs={}): """Plot profile of a distribution""" bin_edges = np.linspace(limit[0], limit[-1], nbin + 1, True) bin_center = 0.5 * (bin_edges[1:] + bin_edges[:-1]) bin_width = bin_edges[1:] - bin_edges[:-1] y = [] yerr = [] for idx in range(len(bin_center)): counts = data[ (data[xcol] > bin_edges[idx]) & (data[xcol] <= bin_edges[idx + 1]) ][ycol] y.append(counts.mean()) yerr.append(counts.std() / np.sqrt(len(counts))) print(y) print(yerr) ax.errorbar(x=bin_center, y=y, xerr=bin_width / 2.0, yerr=yerr, **hist_kwargs) return ax