"""
Definition of the `CameraCalibrator` class, providing all steps needed to apply
calibration and image extraction, as well as supporting algorithms.
"""
from functools import cache
import astropy.units as u
import numpy as np
from numba import float32, float64, guvectorize, int64
from ctapipe.containers import DL0CameraContainer, DL1CameraContainer, PixelStatus
from ctapipe.core import TelescopeComponent
from ctapipe.core.env import CTAPIPE_DISABLE_NUMBA_CACHE
from ctapipe.core.traits import (
BoolTelescopeParameter,
ComponentName,
TelescopeParameter,
)
from ctapipe.image.extractor import ImageExtractor, VarianceExtractor
from ctapipe.image.invalid_pixels import InvalidPixelHandler
from ctapipe.image.reducer import DataVolumeReducer
__all__ = ["CameraCalibrator"]
@cache
def _get_pixel_index(n_pixels):
"""Cached version of ``np.arange(n_pixels)``"""
return np.arange(n_pixels)
[docs]
class CameraCalibrator(TelescopeComponent):
"""
Calibrator to handle the full camera calibration chain, in order to fill
the DL1 data level in the event container.
Attributes
----------
data_volume_reducer_type: str
The name of the DataVolumeReducer subclass to be used
for data volume reduction
image_extractor_type: str
The name of the ImageExtractor subclass to be used for image extraction
"""
data_volume_reducer_type = ComponentName(
DataVolumeReducer, default_value="NullDataVolumeReducer"
).tag(config=True)
image_extractor_type = TelescopeParameter(
trait=ComponentName(ImageExtractor, default_value="NeighborPeakWindowSum"),
default_value="NeighborPeakWindowSum",
help="Name of the ImageExtractor subclass to be used.",
).tag(config=True)
invalid_pixel_handler_type = ComponentName(
InvalidPixelHandler,
default_value="NeighborAverage",
help="Name of the InvalidPixelHandler to use",
allow_none=True,
).tag(config=True)
apply_waveform_time_shift = BoolTelescopeParameter(
default_value=False,
help=(
"Apply waveform time shift corrections."
" The minimal integer shift to synchronize waveforms is applied"
" before peak extraction if this option is True"
),
).tag(config=True)
apply_peak_time_shift = BoolTelescopeParameter(
default_value=True,
help=(
"Apply peak time shift corrections."
" Apply the remaining absolute and fractional time shift corrections"
" to the peak time after pulse extraction."
" If `apply_waveform_time_shift` is False, this will apply the full time shift"
),
).tag(config=True)
def __init__(
self,
subarray,
config=None,
parent=None,
image_extractor=None,
data_volume_reducer=None,
**kwargs,
):
"""
Parameters
----------
subarray: ctapipe.instrument.SubarrayDescription
Description of the subarray. Provides information about the
camera which are useful in calibration. Also required for
configuring the TelescopeParameter traitlets.
config: traitlets.loader.Config
Configuration specified by config file or cmdline arguments.
Used to set traitlet values.
This is mutually exclusive with passing a ``parent``.
parent: ctapipe.core.Component or ctapipe.core.Tool
Parent of this component in the configuration hierarchy,
this is mutually exclusive with passing ``config``
data_volume_reducer: ctapipe.image.reducer.DataVolumeReducer
The DataVolumeReducer to use.
This is used to override the options from the config system
and to enable passing a preconfigured reducer.
image_extractor: ctapipe.image.extractor.ImageExtractor
The ImageExtractor to use. If None, the default via the
configuration system will be constructed.
"""
super().__init__(subarray=subarray, config=config, parent=parent, **kwargs)
self.subarray = subarray
self._r1_empty_warn = False
self._dl0_empty_warn = False
self.image_extractors = {}
if image_extractor is None:
for _, _, name in self.image_extractor_type:
self.image_extractors[name] = ImageExtractor.from_name(
name, subarray=self.subarray, parent=self
)
else:
name = image_extractor.__class__.__name__
self.image_extractor_type = [("type", "*", name)]
self.image_extractors[name] = image_extractor
if data_volume_reducer is None:
self.data_volume_reducer = DataVolumeReducer.from_name(
self.data_volume_reducer_type, subarray=self.subarray, parent=self
)
else:
self.data_volume_reducer = data_volume_reducer
self.invalid_pixel_handler = None
if self.invalid_pixel_handler_type is not None:
self.invalid_pixel_handler = InvalidPixelHandler.from_name(
self.invalid_pixel_handler_type,
subarray=self.subarray,
parent=self,
)
def _check_r1_empty(self, waveforms):
if waveforms is None:
if not self._r1_empty_warn:
self.log.debug(
"Encountered an event with no R1 data. "
"DL0 is unchanged in this circumstance."
)
self._r1_empty_warn = True
return True
else:
return False
def _check_dl0_empty(self, waveforms):
if waveforms is None:
if not self._dl0_empty_warn:
self.log.warning(
"Encountered an event with no DL0 data. "
"DL1 is unchanged in this circumstance."
)
self._dl0_empty_warn = True
return True
else:
return False
def _calibrate_dl0(self, event, tel_id):
r1 = event.r1.tel[tel_id]
if self._check_r1_empty(r1.waveform):
return
signal_pixels = self.data_volume_reducer(
r1.waveform,
tel_id=tel_id,
selected_gain_channel=r1.selected_gain_channel,
)
dl0_waveform = r1.waveform.copy()
dl0_waveform[:, ~signal_pixels] = 0
dl0_pixel_status = r1.pixel_status.copy()
# set dvr pixel bit in pixel_status for pixels kept by DVR
dl0_pixel_status[signal_pixels] |= np.uint8(PixelStatus.DVR_STATUS)
# unset dvr bits for removed pixels
dl0_pixel_status[~signal_pixels] &= ~np.uint8(PixelStatus.DVR_STATUS)
event.dl0.tel[tel_id] = DL0CameraContainer(
event_type=r1.event_type,
event_time=r1.event_time,
waveform=dl0_waveform,
selected_gain_channel=r1.selected_gain_channel,
pixel_status=dl0_pixel_status,
first_cell_id=r1.first_cell_id,
calibration_monitoring_id=r1.calibration_monitoring_id,
)
def _calibrate_dl1(self, event, tel_id):
waveforms = event.dl0.tel[tel_id].waveform
if self._check_dl0_empty(waveforms):
return
n_channels, n_pixels, n_samples = waveforms.shape
calib = event.monitoring.tel[tel_id].camera.coefficients
selected_gain_channel = event.dl0.tel[tel_id].selected_gain_channel
pixel_index = _get_pixel_index(n_pixels)
pedestal = calib.pedestal_offset
factor = calib.factor
time_shift = calib.time_shift
if selected_gain_channel is not None:
if factor is not None:
factor = factor[selected_gain_channel, pixel_index]
if time_shift is not None:
time_shift = time_shift[selected_gain_channel, pixel_index]
readout = self.subarray.tel[tel_id].camera.readout
# subtract any remaining pedestal before extraction
if pedestal is not None:
if selected_gain_channel is not None:
pedestal = pedestal[selected_gain_channel, pixel_index]
# this copies intentionally, we don't want to modify the dl0 data
# waveforms have shape (n_channels, n_pixel, n_samples), pedestals (n_pixels)
waveforms = waveforms.copy()
waveforms -= np.atleast_2d(pedestal)[..., np.newaxis]
if n_samples == 1:
# To handle ASTRI and dst
# TODO: Improved handling of ASTRI and dst
# - dst with custom EventSource?
# - Read into dl1 container directly?
# - Don't do anything if dl1 container already filled
# - Update on SST review decision
dl1 = DL1CameraContainer(
image=np.squeeze(waveforms).astype(np.float32),
peak_time=np.zeros(n_pixels, dtype=np.float32),
is_valid=True,
)
else:
# shift waveforms if time_shift is available
remaining_shift = None
if time_shift is not None:
if self.apply_waveform_time_shift.tel[tel_id]:
sampling_rate = readout.sampling_rate.to_value(u.GHz)
time_shift_samples = time_shift * sampling_rate
waveforms, remaining_shift = shift_waveforms(
waveforms, time_shift_samples
)
remaining_shift /= sampling_rate
else:
remaining_shift = time_shift
extractor = self.image_extractors[self.image_extractor_type.tel[tel_id]]
dl1 = extractor(
waveforms,
tel_id=tel_id,
selected_gain_channel=selected_gain_channel,
broken_pixels=calib.outlier_mask,
)
# correct non-integer remainder of the shift if given
if (
dl1.peak_time is not None
and self.apply_peak_time_shift.tel[tel_id]
and remaining_shift is not None
):
dl1.peak_time -= remaining_shift
# Calibrate extracted charge
if factor is not None:
if n_samples > 1 and isinstance(extractor, VarianceExtractor):
factor = factor**2
dl1.image *= factor
# handle invalid pixels
if self.invalid_pixel_handler is not None:
dl1.image, dl1.peak_time = self.invalid_pixel_handler(
tel_id,
dl1.image,
dl1.peak_time,
calib.outlier_mask,
)
# store the results in the event structure
event.dl1.tel[tel_id] = dl1
[docs]
def __call__(self, event):
"""
Perform the full camera calibration from R1 to DL1. Any calibration
relating to data levels before the data level the file is read into
will be skipped.
Parameters
----------
event : container
A `~ctapipe.containers.ArrayEventContainer` event container
"""
# TODO: How to handle different calibrations depending on tel_id?
tel = event.r1.tel or event.dl0.tel or event.dl1.tel
for tel_id in tel.keys():
self._calibrate_dl0(event, tel_id)
self._calibrate_dl1(event, tel_id)
def shift_waveforms(waveforms, time_shift_samples):
"""
Shift the waveforms by the mean integer shift to mediate
time differences between pixels.
The remaining shift (mean + fractional part) is returned
to be applied later to the extracted peak time.
Parameters
----------
waveforms: ndarray of shape (n_channels, n_pixels, n_samples)
The waveforms to shift
time_shift_samples: ndarray
The shift to apply in units of samples.
Waveforms are shifted to the left by the smallest integer
that minimizes inter-pixel differences.
Returns
-------
shifted_waveforms: ndarray of shape (n_channels, n_pixels, n_samples)
The shifted waveforms
remaining_shift: ndarray
The remaining shift after applying the integer shift to the waveforms.
"""
mean_shift = time_shift_samples.mean(axis=-1, keepdims=True)
integer_shift = np.round(time_shift_samples - mean_shift).astype("int16")
remaining_shift = time_shift_samples - integer_shift
shifted_waveforms = _shift_waveforms_by_integer(waveforms, integer_shift)
return shifted_waveforms, remaining_shift
@guvectorize(
[(float64[:], int64, float64[:]), (float32[:], int64, float32[:])],
"(s),()->(s)",
nopython=True,
cache=not CTAPIPE_DISABLE_NUMBA_CACHE,
)
def _shift_waveforms_by_integer(waveforms, integer_shift, shifted_waveforms):
n_samples = waveforms.size
for new_sample_idx in range(n_samples):
# repeat first value if out ouf bounds to the left
# repeat last value if out ouf bounds to the right
sample_idx = min(max(new_sample_idx + integer_shift, 0), n_samples - 1)
shifted_waveforms[new_sample_idx] = waveforms[sample_idx]