Source code for ctapipe.tools.process

"""
Generate DL1 (a or b) output files in HDF5 format from {R0,R1,DL0} inputs.
"""
# pylint: disable=W0201
import sys

from tqdm.auto import tqdm

from ..calib import CameraCalibrator, GainSelector
from ..core import QualityQuery, Tool, ToolConfigurationError
from ..core.traits import Bool, ComponentName, List, classes_with_traits, flag
from ..exceptions import InputMissing
from ..image import ImageCleaner, ImageModifier, ImageProcessor
from ..image.extractor import ImageExtractor
from ..image.muon import MuonProcessor
from ..instrument import SoftwareTrigger
from ..io import (
    DataLevel,
    DataWriter,
    EventSource,
    MonitoringSource,
    MonitoringType,
    metadata,
    write_table,
)
from ..io.datawriter import DATA_MODEL_VERSION
from ..io.hdf5dataformat import (
    DL1_IMAGE_STATISTICS_TABLE,
    DL2_EVENT_STATISTICS_GROUP,
)
from ..reco import Reconstructor, ShowerProcessor
from ..utils import EventTypeFilter

COMPATIBLE_DATALEVELS = [
    DataLevel.R1,
    DataLevel.DL0,
    DataLevel.DL1_IMAGES,
    DataLevel.DL1_PARAMETERS,
]

COMPATIBLE_MONITORINGTYPES = [
    MonitoringType.PIXEL_STATISTICS,
    MonitoringType.CAMERA_COEFFICIENTS,
    MonitoringType.TELESCOPE_POINTINGS,
]


__all__ = ["ProcessorTool"]


[docs] class ProcessorTool(Tool): """ Process data from lower-data levels up to DL1 and DL2, including image extraction and optionally image parameterization as well as muon analysis and shower reconstruction. Note that the muon analysis and shower reconstruction both depend on parametrized images and therefore compute image parameters even if DataWriter.write_dl1_parameters=False in case these are not already present in the input file. """ name = "ctapipe-process" description = ( __doc__ + f" This currently uses data model version {DATA_MODEL_VERSION}" ) examples = """ To process data with all default values: > ctapipe-process --input events.simtel.gz --output events.dl1.h5 --progress Or use an external configuration file, where you can specify all options: > ctapipe-process --config stage1_config.json --progress The config file should be in JSON or python format (see traitlets docs). For an example, see ctapipe/examples/stage1_config.json in the main code repo. """ progress_bar = Bool( help="show progress bar during processing", default_value=False ).tag(config=True) force_recompute_dl1 = Bool( help="Enforce dl1 recomputation even if already present in the input file", default_value=False, ).tag(config=True) force_recompute_dl2 = Bool( help="Enforce dl2 recomputation even if already present in the input file", default_value=False, ).tag(config=True) monitoring_source_list = List( ComponentName(MonitoringSource), help=( "List of monitoring sources to use during processing " "if the calibration of the data is requested. Later " "MonitoringSource instances overwrite earlier ones if " "the monitoring types of the different instances overlap." ), default_value=[], ).tag(config=True) aliases = { ("i", "input"): "EventSource.input_url", ("o", "output"): "DataWriter.output_path", ("t", "allowed-tels"): "EventSource.allowed_tels", ("m", "max-events"): "EventSource.max_events", "monitoring-source": "ProcessorTool.monitoring_source_list", "reconstructor": "ShowerProcessor.reconstructor_types", "image-cleaner-type": "ImageProcessor.image_cleaner_type", } flags = { "overwrite": ( {"DataWriter": {"overwrite": True}}, "Overwrite output file if it exists", ), **flag( "progress", "ProcessorTool.progress_bar", "show a progress bar during event processing", "don't show a progress bar during event processing", ), **flag( "recompute-dl1", "ProcessorTool.force_recompute_dl1", "Enforce DL1 recomputation even if already present in the input file", "Only compute DL1 if there are no DL1b parameters in the file", ), **flag( "recompute-dl2", "ProcessorTool.force_recompute_dl2", "Enforce DL2 recomputation even if already present in the input file", "Only compute DL2 if there is no shower reconstruction in the file", ), **flag( "write-images", "DataWriter.write_dl1_images", "store DL1/Event/Telescope images in output", "don't store DL1/Event/Telescope images in output", ), **flag( "write-parameters", "DataWriter.write_dl1_parameters", "store DL1/Event/Telescope parameters in output", "don't store DL1/Event/Telescope parameters in output", ), **flag( "write-showers", "DataWriter.write_dl2", "store DL2/Event parameters in output", "don't DL2/Event parameters in output", ), **flag( "write-muon-parameters", "DataWriter.write_muon_parameters", "store DL1/Event/Telescope muon parameters in output", "don't store DL1/Event/Telescope muon parameters in output", ), "camera-frame": ( {"ImageProcessor": {"use_telescope_frame": False}}, "Use camera frame for image parameters instead of telescope frame", ), } classes = ( [ CameraCalibrator, DataWriter, ImageProcessor, MuonProcessor, ShowerProcessor, metadata.Instrument, metadata.Contact, SoftwareTrigger, ] + classes_with_traits(EventSource) + classes_with_traits(MonitoringSource) + classes_with_traits(ImageCleaner) + classes_with_traits(ImageExtractor) + classes_with_traits(GainSelector) + classes_with_traits(QualityQuery) + classes_with_traits(ImageModifier) + classes_with_traits(EventTypeFilter) + classes_with_traits(Reconstructor) )
[docs] def setup(self): # setup components: try: self.event_source = self.enter_context(EventSource(parent=self)) except InputMissing: self.log.critical( "Specifying EventSource.input_url is required (via -i, --input or a config file)." ) self.exit(1) if not self.event_source.has_any_datalevel(COMPATIBLE_DATALEVELS): self.log.critical( "%s needs the EventSource to provide at least one of these datalevels: %s" ", %s provides only %s", self.name, COMPATIBLE_DATALEVELS, self.event_source, self.event_source.datalevels, ) sys.exit(1) subarray = self.event_source.subarray # Setup the monitoring sources self._monitoring_sources = [] for mon_source_name in self.monitoring_source_list: mon_source = self.enter_context( MonitoringSource.from_name( mon_source_name, subarray=subarray, parent=self ) ) # Check if monitoring source has compatible monitoring types if not mon_source.has_any_monitoring_types(COMPATIBLE_MONITORINGTYPES): msg = ( f"'{mon_source_name}' needs the MonitoringSource to provide at least " f"one of these monitoring types: {COMPATIBLE_MONITORINGTYPES}, " f"{mon_source_name} provides only '{mon_source.monitoring_types}'. " f"Please make sure the '{mon_source_name}' and its input " f"are suitable for calibrating the data you are processing." ) self.log.critical(msg) raise ToolConfigurationError(msg) # Append the monitoring source to the list if it has compatible monitoring types self._monitoring_sources.append(mon_source) self.software_trigger = SoftwareTrigger(parent=self, subarray=subarray) self.calibrate = CameraCalibrator(parent=self, subarray=subarray) self.process_images = ImageProcessor(subarray=subarray, parent=self) self.process_shower = ShowerProcessor( subarray=subarray, atmosphere_profile=self.event_source.atmosphere_density_profile, parent=self, ) self.write = self.enter_context( DataWriter(event_source=self.event_source, parent=self) ) self.process_muons = None if self.should_compute_muon_parameters: self.process_muons = MuonProcessor(subarray=subarray, parent=self) self.event_type_filter = EventTypeFilter(parent=self)
@property def should_compute_dl2(self): """returns true if we should compute DL2 info""" if self.force_recompute_dl2: return True return self.write.write_dl2 @property def should_compute_dl1(self): """returns true if we should compute DL1 info""" if self.force_recompute_dl1: return True if DataLevel.DL1_PARAMETERS in self.event_source.datalevels: return False return ( self.write.write_dl1_parameters or self.should_compute_dl2 or self.should_compute_muon_parameters ) @property def should_calibrate(self): """returns true if data should be calibrated""" if self.force_recompute_dl1: return True if ( self.write.write_dl1_images and DataLevel.DL1_IMAGES not in self.event_source.datalevels ): return True if self.should_compute_dl1: return DataLevel.DL1_IMAGES not in self.event_source.datalevels return False @property def should_compute_muon_parameters(self): """returns true if we should compute muon parameters info""" if self.write.write_muon_parameters: return True return False def _write_processing_statistics(self): """write out the event selection stats, etc.""" # NOTE: don't remove this, not part of DataWriter if self.should_compute_dl1: image_stats = self.process_images.check_image.to_table(functions=True) write_table( image_stats, self.write.output_path, path=DL1_IMAGE_STATISTICS_TABLE, append=True, ) if self.should_compute_dl2: reconstructors = self.process_shower.reconstructors reconstructor_names = self.process_shower.reconstructor_types for reconstructor_name, reconstructor in zip( reconstructor_names, reconstructors ): write_table( reconstructor.quality_query.to_table(functions=True), self.write.output_path, f"{DL2_EVENT_STATISTICS_GROUP}/{reconstructor_name}", append=True, )
[docs] def start(self): """ Process events """ self.log.info("applying calibration: %s", self.should_calibrate) self.log.info("(re)compute DL1: %s", self.should_compute_dl1) self.log.info("(re)compute DL2: %s", self.should_compute_dl2) self.log.info( "compute muon parameters: %s", self.should_compute_muon_parameters ) self.event_source.subarray.info(printer=self.log.info) for event in tqdm( self.event_source, desc=self.event_source.__class__.__name__, total=self.event_source.max_events, unit="ev", disable=not self.progress_bar, ): self.log.debug("Processing event_id=%s", event.index.event_id) if not self.event_type_filter(event): continue if not self.software_trigger(event): self.log.debug( "Skipping event %i due to software trigger", event.index.event_id ) continue if self.should_calibrate: # Fill monitoring containers needed for the calibration for mon_source in self._monitoring_sources: self.log.debug("Filling monitoring container for '%s'", mon_source) mon_source.fill_monitoring_container(event) self.calibrate(event) if self.should_compute_dl1: self.process_images(event) if self.should_compute_muon_parameters: self.process_muons(event) if self.should_compute_dl2: self.process_shower(event) self.write(event)
[docs] def finish(self): """ Last steps after processing events. """ shower_dists = self.event_source.simulated_shower_distributions self.write.write_simulated_shower_distributions(shower_dists) self._write_processing_statistics()
def main(): """run the tool""" tool = ProcessorTool() tool.run() if __name__ == "__main__": main()