Source code for protopipe.pipeline.utils

import yaml
import argparse
import math

import astropy.units as u
import matplotlib.pyplot as plt
import os.path as path

from ctapipe.io import event_source


[docs]class bcolors: """Color definitions for standard and debug printing.""" HEADER = "\033[95m" OKBLUE = "\033[94m" OKGREEN = "\033[92m" WARNING = "\033[93m" BOLDWARNING = "\033[1m\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" BOLDGREEN = "\033[1m\033[92m" REVERSED = "\033[7m" PURPLE = "\033[95m"
[docs]def save_fig(outdir, name, fig=None): """Save a figure in multiple formats.""" for ext in ["pdf", "png"]: if fig: fig.savefig(path.join(outdir, f"{name}.{ext}")) else: plt.savefig(path.join(outdir, f"{name}.{ext}")) return None
[docs]def load_config(name): """Load YAML configuration file.""" try: with open(name, "r") as stream: cfg = yaml.load(stream, Loader=yaml.FullLoader) except FileNotFoundError as e: print(e) raise return cfg
[docs]class SignalHandler: """ handles ctrl+c signals; set up via `signal_handler = SignalHandler() `signal.signal(signal.SIGINT, signal_handler)` # or for two step interupt: `signal.signal(signal.SIGINT, signal_handler.stop_drawing)` """ def __init__(self): self.stop = False self.draw = True
[docs] def __call__(self, signal, frame): if self.stop: print("you pressed Ctrl+C again -- exiting NOW") exit(-1) print("you pressed Ctrl+C!") print("exiting after current event") self.stop = True
[docs] def stop_drawing(self, signal, frame): if self.stop: print("you pressed Ctrl+C again -- exiting NOW") exit(-1) if self.draw: print("you pressed Ctrl+C!") print("turn off drawing") self.draw = False else: print("you pressed Ctrl+C!") print("exiting after current event") self.stop = True
[docs]def str2bool(v): if v.lower() in ("yes", "true", "t", "y", "1"): return True elif v.lower() in ("no", "false", "f", "n", "0"): return False else: raise argparse.ArgumentTypeError("Boolean value expected.")
[docs]def make_argparser(): from os.path import expandvars parser = argparse.ArgumentParser(description="") # Add configuration file parser.add_argument("--config_file", type=str, required=True) parser.add_argument("-o", "--outfile", type=str, required=True) parser.add_argument( "-m", "--max_events", type=int, default=None, help="maximum number of events considered per file", ) parser.add_argument( "-i", "--indir", type=str, default=expandvars("$CTA_DATA/Prod3b/Paranal") ) parser.add_argument( "-f", "--infile_list", type=str, default="", nargs="*", help="give a specific list of files to run on", ) parser.add_argument( "--cam_ids", type=str, default=["LSTCam", "NectarCam"], nargs="*", help="give the specific list of camera types to run on", ) parser.add_argument( "--wave_dir", type=str, default=None, help="directory where to find mr_filter. " "if not set look in $PATH", ) parser.add_argument( "--wave_temp_dir", type=str, default="/dev/shm/", help="directory where mr_filter to store the temporary fits" " files", ) mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument( "--wave", dest="mode", action="store_const", const="wave", default="tail", help="if set, use wavelet cleaning -- default", ) mode_group.add_argument( "--tail", dest="mode", action="store_const", const="tail", help="if set, use tail cleaning, otherwise wavelets", ) return parser
[docs]def final_array_to_use(sim_array, array, subarrays=None): """Infer IDs of telescopes and cameras with equivalent focal lengths. This is an helper function for utils.prod3b_array. Parameters ---------- subarrays : dict, optional Dictionary of subarray names linked to lists of tel_ids automatically extracted by telescope type. If set, it will extract tel_ids from there, otherwise from the custom list given by 'array'. sim_array : ctapipe.instrument.SubarrayDescription Full simulated array from the first event. array : list, str Custom list of telescope IDs that the user wants to use or name of specific subarray. Returns ------- tel_ids : list List of telescope IDs to use in a format readable by ctapipe. cams_and_foclens : dict Dictionary containing the IDs of the involved cameras as inferred from the involved telescopes IDs, together with the equivalent focal lengths of the telescopes. The camera IDs will feed both the estimators and the image cleaning. The equivalent focal lengths will be used to calculate the radius of the camera on which cut for truncated images. subarray : ctapipe.instrument.SubarrayDescription Complete subarray information of the final array/subarray selected. """ if subarrays: tel_ids = subarrays[array] subarray = sim_array.select_subarray("", tel_ids) else: subarray = sim_array.select_subarray("", array) tel_ids = subarray.tel_ids tel_types = subarray.telescope_types cams_and_foclens = { tel_types[i] .camera.camera_name: tel_types[i] .optics.equivalent_focal_length.value for i in range(len(tel_types)) } return set(tel_ids), cams_and_foclens, subarray
[docs]def prod3b_array(fileName, site, array): """Return tel IDs and involved cameras from configuration and simtel file. The initial check (and the too-high cyclomatic complexity) will disappear with the advent of the final array layouts. Currently not very performant: it is necessary get at least the first event of the simtel file to read the simulated array information. Parameters ---------- first_fileName : str Name of the first file of the list of files given by the user. array : str or list Name of the subarray or - if not supported - a custom list of telescope IDs that the user wants to use site : str Can be only "north" or "south". Currently relevant only for baseline simulations. For non-baseline simulations only custom lists of IDs matter. Returns ------- tel_ids : list List of telescope IDs to use in a format readable by ctapipe. cameras : list List of camera types inferred from tel_ids. This will feed both the estimators and the image cleaning. """ source = event_source(input_url=fileName, max_events=1) for event in source: # get only first event pass sim_array = source.subarray # get simulated array # Dictionaries of subarray names for BASELINE simulations subarrays_N = { # La Palma has only 2 cameras "subarray_LSTs": sim_array.get_tel_ids_for_type("LST_LST_LSTCam"), "subarray_MSTs": sim_array.get_tel_ids_for_type("MST_MST_NectarCam"), "full_array": sim_array.tel_ids, } subarrays_S = { # Paranal has only 3 cameras "subarray_LSTs": sim_array.get_tel_ids_for_type("LST_LST_LSTCam"), "subarray_MSTs": sim_array.get_tel_ids_for_type("MST_MST_FlashCam"), "subarray_SSTs": sim_array.get_tel_ids_for_type("SST_GCT_CHEC"), "full_array": sim_array.tel_ids, } if site.lower() == "north": if sim_array.num_tels > 19: # this means non-baseline simulation.. if ( sim_array.num_tels > 125 # Paranal non-baseline or sim_array.num_tels == 99 # Paranal baseline or sim_array.num_tels == 98 # gamma_test_large ): raise ValueError( "\033[91m ERROR: infile and site uncorrelated! \033[0m" ) if type(array) == str and array != "full_array": raise ValueError( "\033[91m ERROR: Only 'full_array' supported for this production.\n\ Please, use that or define a custom array with a list of tel_ids. \033[0m" ) elif array == "full_array": return final_array_to_use(sim_array, array, subarrays_N) elif ( type(array) == list ): # ..for which only custom lists are currently supported return final_array_to_use(sim_array, array) else: raise ValueError( f"\033[91m ERROR: array {array} not supported. \033[0m" ) else: # this is a baseline simulation if type(array) == str: if array not in subarrays_N.keys(): raise ValueError( "\033[91m ERROR: requested missing camera from simtel file. \033[0m" ) else: return final_array_to_use(sim_array, array, subarrays_N) elif type(array) == list: if any((tel_id < 1 or tel_id > 19) for tel_id in array): raise ValueError( "\033[91m ERROR: non-existent telescope ID. \033[0m" ) return final_array_to_use(sim_array, array) else: raise ValueError( f"\033[91m ERROR: array {array} not supported. \033[0m" ) elif site.lower() == "south": if sim_array.num_tels > 99: # this means non-baseline simulation.. if sim_array.num_tels < 126: raise ValueError( "\033[91m ERROR: infile and site uncorrelated! \033[0m" ) if type(array) == str and array != "full_array": raise ValueError( "\033[91m ERROR: Only 'full_array' supported for this production.\n\ Please, use that or define a custom array with a list of tel_ids. \033[0m" ) if array == "full_array": return final_array_to_use(sim_array, array, subarrays_S) elif ( type(array) == list ): # ..for which only custom lists are currently supported return final_array_to_use(sim_array, array) else: raise ValueError( f"\033[91m ERROR: Array {array} not supported. \033[0m" ) else: # this is a baseline simulation if sim_array.num_tels == 19: raise ValueError( "\033[91m ERROR: infile and site uncorrelated! \033[0m" ) if type(array) == str: if array not in subarrays_S.keys(): raise ValueError( "\033[91m ERROR: requested missing camera from simtel file. \033[0m" ) else: if sim_array.num_tels == 98: # this is gamma_test_large subarrays_S["subarray_SSTs"] = sim_array.get_tel_ids_for_type( "SST_ASTRI_ASTRICam" # in this file SSTs are ASTRI ) return final_array_to_use(sim_array, array, subarrays_S) elif type(array) == list: if any((tel_id < 1 or tel_id > 99) for tel_id in array): raise ValueError( "\033[91m ERROR: non-existent telescope ID. \033[0m" ) return final_array_to_use(sim_array, array) else: raise ValueError( f"\033[91m ERROR: Array {array} not supported. \033[0m" )
[docs]def effective_focal_lengths(camera_name): """Provide the effective focal length for the required camera. Data comes from Konrad's table. Parameters ---------- camera_name : str Name of the camera from ctapipe.instrument.CameraDescription Returns ---------- eff_foc_len : float Effective focal length in meters """ effective_focal_lengths = { "LSTCam": 29.30565 * u.m, "NectarCam": 16.44505 * u.m, "FlashCam": 16.44505 * u.m, "ASTRICam": 2.15191 * u.m, "SCTCam": 5.58310 * u.m, "CHEC": 2.30913 * u.m, "DigiCam": 5.69705 * u.m, } eff_foc_len = effective_focal_lengths[camera_name] return eff_foc_len
[docs]def camera_radius(camid_to_efl, cam_id="all"): """Get camera radii. Inspired from pywi-cta CTAMarsCriteria, CTA Mars like preselection cuts. This should be replaced by a function in ctapipe getting the radius either from the pixel poisitions or from an external database Notes ----- average_camera_radius_meters = math.tan(math.radians(average_camera_radius_degree)) * foclen The average camera radius values are, in degrees : - LST: 2.31 - Nectar: 4.05 - Flash: 3.95 - SST-1M: 4.56 - GCT-CHEC-S: 3.93 - ASTRI: 4.67 """ average_camera_radii_deg = { "ASTRICam": 4.67, "CHEC": 3.93, "DigiCam": 4.56, "FlashCam": 3.95, "NectarCam": 4.05, "LSTCam": 2.31, "SCTCam": 4.0, # dummy value } if cam_id in camid_to_efl.keys(): foclen_meters = camid_to_efl[cam_id] average_camera_radius_meters = ( math.tan(math.radians(average_camera_radii_deg[cam_id])) * foclen_meters ) elif cam_id == "all": print("Available camera radii in meters:") for cam_id in camid_to_efl.keys(): print(f"* {cam_id} : {camera_radius(camid_to_efl, cam_id)}") average_camera_radius_meters = 0 else: raise ValueError("Unknown camid", cam_id) return average_camera_radius_meters
[docs]def CTAMARS_radii(camera_name): """Radii of the cameras as defined in CTA-MARS. These values are defined in the code of CTA-MARS. They correspond to the radius of an equivalent FOV covering the same solid angle. Parameters ---------- camera_name : str Name of the camera. Returns ---------- average_camera_radii_deg : dict Dictionary containing the hard-coded values. """ average_camera_radii_deg = { "ASTRICam": 4.67, "CHEC": 3.93, "DigiCam": 4.56, "FlashCam": 3.95, "NectarCam": 4.05, "LSTCam": 2.31, "SCTCam": 4.0, # dummy value } return average_camera_radii_deg[camera_name]