Source code for ctapipe.tools.compute_irf

"""Tool to generate IRFs"""

from importlib.util import find_spec

if find_spec("pyirf") is None:
    from ..exceptions import OptionalDependencyMissing

    raise OptionalDependencyMissing("pyirf") from None

import operator
from functools import partial

import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.table import vstack
from pyirf.cuts import evaluate_binned_cut
from pyirf.io import create_rad_max_hdu

from ..core import Provenance, Tool, ToolConfigurationError, traits
from ..core.traits import AstroQuantity, Bool, Integer, classes_with_traits, flag
from ..io.dl2_tables_preprocessing import (
    DL2EventLoader,
    DL2EventQualityQuery,
)
from ..irf import (
    OptimizationResult,
    Spectra,
    check_bins_in_range,
)
from ..irf.benchmarks import (
    AngularResolutionMakerBase,
    EnergyBiasResolutionMakerBase,
    SensitivityMakerBase,
)
from ..irf.irfs import (
    BackgroundRateMakerBase,
    EffectiveAreaMakerBase,
    EnergyDispersionMakerBase,
    PSFMakerBase,
)

__all__ = ["IrfTool"]


[docs] class IrfTool(Tool): "Tool to create IRF files in GADF format" name = "ctapipe-compute-irf" description = __doc__ examples = """ ctapipe-compute-irf \\ --cuts cuts.fits \\ --gamma-file gamma.dl2.h5 \\ --proton-file proton.dl2.h5 \\ --electron-file electron.dl2.h5 \\ --output irf.fits.gz \\ --benchmark-output benchmarks.fits.gz """ do_background = Bool( True, help="Compute background rate using supplied files.", ).tag(config=True) range_check_error = Bool( False, help="Raise error if asking for IRFs outside range where cut optimisation is valid.", ).tag(config=True) cuts_file = traits.Path( exists=True, directory_ok=False, help="Path to optimized cuts input file.", ).tag(config=True) gamma_file = traits.Path( exists=True, directory_ok=False, help="Gamma input filename and path." ).tag(config=True) gamma_target_spectrum = traits.UseEnum( Spectra, default_value=Spectra.CRAB_HEGRA, help="Name of the spectrum used for weights of gamma events.", ).tag(config=True) proton_file = traits.Path( exists=True, directory_ok=False, help="Proton input filename and path.", ).tag(config=True) proton_target_spectrum = traits.UseEnum( Spectra, default_value=Spectra.IRFDOC_PROTON_SPECTRUM, help="Name of the spectrum used for weights of proton events.", ).tag(config=True) electron_file = traits.Path( default_value=None, allow_none=True, exists=True, directory_ok=False, help="Electron input filename and path.", ).tag(config=True) electron_target_spectrum = traits.UseEnum( Spectra, default_value=Spectra.IRFDOC_ELECTRON_SPECTRUM, help="Name of the spectrum used for weights of electron events.", ).tag(config=True) chunk_size = Integer( default_value=100000, allow_none=True, help="How many subarray events to load at once while selecting.", ).tag(config=True) output_path = traits.Path( directory_ok=False, help="Output file", ).tag(config=True) benchmarks_output_path = traits.Path( default_value=None, allow_none=True, directory_ok=False, help="Optional second output file for benchmarks.", ).tag(config=True) obs_time = AstroQuantity( default_value=u.Quantity(50, u.hour), physical_type=u.physical.time, help=( "Observation time in the form ``<value> <unit>``." " This is used for flux normalization and estimating a background rate." ), ).tag(config=True) energy_dispersion_maker_name = traits.ComponentName( EnergyDispersionMakerBase, default_value="EnergyDispersion2dMaker", help="The parameterization of the energy dispersion to be used.", ).tag(config=True) effective_area_maker_name = traits.ComponentName( EffectiveAreaMakerBase, default_value="EffectiveArea2dMaker", help="The parameterization of the effective area to be used.", ).tag(config=True) psf_maker_name = traits.ComponentName( PSFMakerBase, default_value="PSF3DMaker", help="The parameterization of the point spread function to be used.", ).tag(config=True) background_maker_name = traits.ComponentName( BackgroundRateMakerBase, default_value="BackgroundRate2dMaker", help="The parameterization of the background rate to be used.", ).tag(config=True) energy_bias_resolution_maker_name = traits.ComponentName( EnergyBiasResolutionMakerBase, default_value="EnergyBiasResolution2dMaker", help=( "The parameterization of the bias and resolution benchmark " "for the energy prediction." ), ).tag(config=True) angular_resolution_maker_name = traits.ComponentName( AngularResolutionMakerBase, default_value="AngularResolution2dMaker", help="The parameterization of the angular resolution benchmark.", ).tag(config=True) sensitivity_maker_name = traits.ComponentName( SensitivityMakerBase, default_value="Sensitivity2dMaker", help="The parameterization of the point source sensitivity benchmark.", ).tag(config=True) spatial_selection_applied = Bool( False, help=( "Compute an IRF after applying a direction cut (``SpatialSelection=RAD_MAX``) " ), ).tag(config=True) aliases = { "cuts": "IrfTool.cuts_file", "gamma-file": "IrfTool.gamma_file", "proton-file": "IrfTool.proton_file", "electron-file": "IrfTool.electron_file", "output": "IrfTool.output_path", "benchmark-output": "IrfTool.benchmarks_output_path", "chunk_size": "IrfTool.chunk_size", } flags = { **flag( "do-background", "IrfTool.do_background", "Compute background rate.", "Do not compute background rate.", ), **flag( "spatial-selection-applied", "IrfTool.spatial_selection_applied", "Compute an IRF after applying a direction cut (``SpatialSelection=RAD_MAX``).", "Compute an IRF without any direction cut (``SpatialSelection=None``).", ), } classes = ( [ DL2EventLoader, ] + classes_with_traits(BackgroundRateMakerBase) + classes_with_traits(EffectiveAreaMakerBase) + classes_with_traits(EnergyDispersionMakerBase) + classes_with_traits(PSFMakerBase) + classes_with_traits(AngularResolutionMakerBase) + classes_with_traits(EnergyBiasResolutionMakerBase) + classes_with_traits(SensitivityMakerBase) ) def _check_config(self): if self.gamma_file is None: self.log.critical( "Setting gamma_file is required (via --gamma-file or a config file)." ) self.exit(1) if self.cuts_file is None: self.log.critical( "Setting cuts_file is required (via --cuts or a config file)." ) self.exit(1) if self.output_path is None: self.log.critical( "Setting output_path is required (via --output or a config file)." ) self.exit(1)
[docs] def setup(self): """ Initialize components from config and load g/h (and theta) cuts. """ self._check_config() self.opt_result = OptimizationResult.read(self.cuts_file) if ( self.spatial_selection_applied and self.opt_result.spatial_selection_table is None ): raise ToolConfigurationError( f"{self.cuts_file} does not contain any direction cut, " "but --spatial-selection-applied was given." ) check_e_bins = partial( check_bins_in_range, valid_range=self.opt_result.valid_energy, raise_error=self.range_check_error, ) self.event_loaders = { "gammas": DL2EventLoader( parent=self, file=self.gamma_file, target_spectrum=self.gamma_target_spectrum, ), } if self.do_background: if not self.proton_file or ( self.proton_file and not self.proton_file.exists() ): raise ValueError( "At least a proton file required when specifying `do_background`." ) self.event_loaders["protons"] = DL2EventLoader( parent=self, file=self.proton_file, target_spectrum=self.proton_target_spectrum, ) if self.electron_file and self.electron_file.exists(): self.event_loaders["electrons"] = DL2EventLoader( parent=self, file=self.electron_file, target_spectrum=self.electron_target_spectrum, ) else: self.log.warning("Estimating background without electron file.") self.background_maker = BackgroundRateMakerBase.from_name( self.background_maker_name, parent=self ) check_e_bins( bins=self.background_maker.reco_energy_bins, source="background reco energy", ) self.energy_dispersion_maker = EnergyDispersionMakerBase.from_name( self.energy_dispersion_maker_name, parent=self ) self.effective_area_maker = EffectiveAreaMakerBase.from_name( self.effective_area_maker_name, parent=self ) self.psf_maker = PSFMakerBase.from_name(self.psf_maker_name, parent=self) if self.benchmarks_output_path is not None: self.angular_resolution_maker = AngularResolutionMakerBase.from_name( self.angular_resolution_maker_name, parent=self ) if self.angular_resolution_maker.use_reco_energy: check_e_bins( bins=self.angular_resolution_maker.reco_energy_bins, source="Angular resolution energy", ) self.bias_resolution_maker = EnergyBiasResolutionMakerBase.from_name( self.energy_bias_resolution_maker_name, parent=self ) self.sensitivity_maker = SensitivityMakerBase.from_name( self.sensitivity_maker_name, parent=self ) check_e_bins( bins=self.sensitivity_maker.reco_energy_bins, source="Sensitivity reco energy", )
[docs] def calculate_selections(self, reduced_events: dict) -> dict: """ Add the selection columns to the signal and optionally background tables. Parameters ---------- reduced_events: dict dict containing the signal (``"gammas"``) and optionally background tables (``"protons"``, ``"electrons"``) Returns ------- dict ``reduced_events`` with selection columns added. """ reduced_events["gammas"]["selected_gh"] = evaluate_binned_cut( reduced_events["gammas"]["gh_score"], reduced_events["gammas"]["reco_energy"], self.opt_result.gh_cuts, operator.ge, ) if self.spatial_selection_applied: reduced_events["gammas"]["selected_theta"] = evaluate_binned_cut( reduced_events["gammas"]["theta"], reduced_events["gammas"]["reco_energy"], self.opt_result.spatial_selection_table, operator.le, ) reduced_events["gammas"]["selected"] = ( reduced_events["gammas"]["selected_theta"] & reduced_events["gammas"]["selected_gh"] ) else: reduced_events["gammas"]["selected"] = reduced_events["gammas"][ "selected_gh" ] if self.do_background: backgrounds = ( ["protons", "electrons"] if self.electron_file else ["protons"] ) n_sel = {"protons": 0, "electrons": 0} for bkg_type in backgrounds: reduced_events[bkg_type]["selected_gh"] = evaluate_binned_cut( reduced_events[bkg_type]["gh_score"], reduced_events[bkg_type]["reco_energy"], self.opt_result.gh_cuts, operator.ge, ) n_sel[bkg_type] = np.count_nonzero( reduced_events[bkg_type]["selected_gh"] ) self.log.info( "Keeping %d signal, %d proton events, and %d electron events" % ( np.count_nonzero(reduced_events["gammas"]["selected"]), n_sel["protons"], n_sel["electrons"], ) ) else: self.log.info( "Keeping %d signal events" % (np.count_nonzero(reduced_events["gammas"]["selected"])) ) return reduced_events
def _make_signal_irf_hdus(self, hdus, sim_info): hdus.append( self.effective_area_maker( events=self.signal_events[self.signal_events["selected"]], spatial_selection_applied=self.spatial_selection_applied, signal_is_point_like=self.signal_is_point_like, sim_info=sim_info, ) ) hdus.append( self.energy_dispersion_maker( events=self.signal_events[self.signal_events["selected"]], spatial_selection_applied=self.spatial_selection_applied, ) ) hdus.append( self.psf_maker(events=self.signal_events[self.signal_events["selected_gh"]]) ) if self.spatial_selection_applied: # TODO: Support fov binning self.log.debug( "Currently multiple fov bins is not supported for RAD_MAX. " "Using `fov_offset_bins = [valid_offset.min, valid_offset.max]`." ) hdus.append( create_rad_max_hdu( rad_max=self.opt_result.spatial_selection_table["cut"].reshape( -1, 1 ), reco_energy_bins=np.append( self.opt_result.spatial_selection_table["low"], self.opt_result.spatial_selection_table["high"][-1], ), fov_offset_bins=u.Quantity( [ self.opt_result.valid_offset.min, self.opt_result.valid_offset.max, ] ).reshape(-1), ) ) return hdus def _make_benchmark_hdus(self, hdus): hdus.append( self.bias_resolution_maker( events=self.signal_events[self.signal_events["selected"]], ) ) hdus.append( self.angular_resolution_maker( events=self.signal_events[self.signal_events["selected_gh"]], ) ) if self.do_background: if self.opt_result.spatial_selection_table is None: raise ValueError( "Calculating the point-source sensitivity requires " f"theta cuts, but {self.cuts_file} does not contain any." ) hdus.append( self.sensitivity_maker( signal_events=self.signal_events[self.signal_events["selected"]], background_events=self.background_events[ self.background_events["selected_gh"] ], spatial_selection_table=self.opt_result.spatial_selection_table, gamma_spectrum=self.gamma_target_spectrum, ) ) return hdus
[docs] def start(self): """ Load events and calculate the irf (and the benchmarks). """ reduced_events = dict() for particle_type, loader in self.event_loaders.items(): if loader.epp.gammaness_classifier != self.opt_result.clf_prefix: raise RuntimeError( "G/H cuts are only valid for gammaness scores predicted by " "the same classifier model. Requested model: %s. " "Model used for g/h cuts: %s." % ( loader.epp.gammaness_classifier, self.opt_result.clf_prefix, ) ) if ( loader.epp.quality_query.quality_criteria != self.opt_result.quality_query.quality_criteria ): self.log.warning( "Quality criteria are different from quality criteria used for " "calculating g/h / theta cuts. Provided quality criteria:\n%s. " "\nUsing the same quality criteria as g/h / theta cuts:\n%s. " % ( loader.epp.quality_query.to_table(functions=True)[ "criteria", "func" ], self.opt_result.quality_query.to_table(functions=True)[ "criteria", "func" ], ) ) loader.epp.quality_query = DL2EventQualityQuery( parent=loader, quality_criteria=self.opt_result.quality_query.quality_criteria, ) self.log.debug( "%s Quality criteria: %s" % (particle_type, loader.epp.quality_query.quality_criteria) ) events, count, meta = loader.load_preselected_events( self.chunk_size, self.obs_time ) # Only calculate event weights if background or sensitivity should be calculated. if self.do_background: # Sensitivity is only calculated, if do_background is true # and benchmarks_output_path is given. if self.benchmarks_output_path is not None: events = loader.make_event_weights( events, meta["spectrum"], particle_type, self.sensitivity_maker.fov_offset_bins, ) # If only background should be calculated, # only calculate weights for protons and electrons. elif particle_type in ("protons", "electrons"): events = loader.make_event_weights( events, meta["spectrum"], particle_type ) reduced_events[particle_type] = events reduced_events[f"{particle_type}_count"] = count reduced_events[f"{particle_type}_meta"] = meta self.log.debug( "Loaded %d %s events" % (reduced_events[f"{particle_type}_count"], particle_type) ) if particle_type == "gammas": self.signal_is_point_like = ( meta["sim_info"].viewcone_max - meta["sim_info"].viewcone_min ).value == 0 if self.signal_is_point_like: errormessage = """The gamma input file contains point-like simulations. Therefore, the IRF can only be calculated at a single point in the FoV, but `fov_offset_n_bins > 1`.""" if ( self.energy_dispersion_maker.fov_offset_n_bins > 1 or self.effective_area_maker.fov_offset_n_bins > 1 ): raise ToolConfigurationError(errormessage) if ( not self.spatial_selection_applied and self.psf_maker.fov_offset_n_bins > 1 ): raise ToolConfigurationError(errormessage) if self.do_background and self.background_maker.fov_offset_n_bins > 1: raise ToolConfigurationError(errormessage) if self.benchmarks_output_path is not None and ( self.angular_resolution_maker.fov_offset_n_bins > 1 or self.bias_resolution_maker.fov_offset_n_bins > 1 or self.sensitivity_maker.fov_offset_n_bins > 1 ): raise ToolConfigurationError(errormessage) reduced_events = self.calculate_selections(reduced_events) self.signal_events = reduced_events["gammas"] if self.do_background: if self.electron_file: self.background_events = vstack( [reduced_events["protons"], reduced_events["electrons"]] ) else: self.background_events = reduced_events["protons"] hdus = [fits.PrimaryHDU()] hdus = self._make_signal_irf_hdus( hdus, reduced_events["gammas_meta"]["sim_info"] ) if self.do_background: hdus.append( self.background_maker( self.background_events[self.background_events["selected_gh"]], self.obs_time, ) ) if "protons" in reduced_events.keys(): hdus.append( self.effective_area_maker( events=reduced_events["protons"][ reduced_events["protons"]["selected_gh"] ], spatial_selection_applied=self.spatial_selection_applied, signal_is_point_like=False, sim_info=reduced_events["protons_meta"]["sim_info"], extname="EFFECTIVE AREA PROTONS", ) ) if "electrons" in reduced_events.keys(): hdus.append( self.effective_area_maker( events=reduced_events["electrons"][ reduced_events["electrons"]["selected_gh"] ], spatial_selection_applied=self.spatial_selection_applied, signal_is_point_like=False, sim_info=reduced_events["electrons_meta"]["sim_info"], extname="EFFECTIVE AREA ELECTRONS", ) ) self.hdus = hdus if self.benchmarks_output_path is not None: b_hdus = [fits.PrimaryHDU()] b_hdus = self._make_benchmark_hdus(b_hdus) self.b_hdus = b_hdus
[docs] def finish(self): """ Write the irf (and the benchmarks) to the (respective) output file(s). """ self.log.info("Writing outputfile '%s'" % self.output_path) fits.HDUList(self.hdus).writeto( self.output_path, overwrite=self.overwrite, ) Provenance().add_output_file(self.output_path, role="IRF") if self.benchmarks_output_path is not None: self.log.info( "Writing benchmark file to '%s'" % self.benchmarks_output_path ) fits.HDUList(self.b_hdus).writeto( self.benchmarks_output_path, overwrite=self.overwrite, ) Provenance().add_output_file(self.benchmarks_output_path, role="Benchmark")
def main(): tool = IrfTool() tool.run() if __name__ == "main": main()