"""Summary containers intended to hold general information obtained throughout a processing run."""
from __future__ import ( # Used for mypy/pylance to like the return type of MS.with_options
annotations,
)
from pathlib import Path
from typing import NamedTuple
import astropy.units as u
from astropy.coordinates import (
AltAz,
EarthLocation,
Latitude,
Longitude,
SkyCoord,
concatenate,
)
from astropy.table import Table
from astropy.time import Time
# Addressing some time interval IERS issue with astropy.
from astropy.utils.iers import conf
from flint.coadd.linmos import LinmosResult
from flint.imager.wsclean import ImageSet, WSCleanResult
from flint.logging import logger
from flint.ms import (
MS,
MSSummary,
describe_ms,
get_pol_axis_from_ms,
get_telescope_location_from_ms,
get_times_from_ms,
)
from flint.naming import get_sbid_from_path, processed_ms_format
from flint.source_finding.aegean import AegeanOutputs
from flint.utils import estimate_skycoord_centre
conf.auto_max_age = None
[docs]
class FieldSummary(NamedTuple):
"""The main information about a processed field. This structure is
intended to store critical components that might be accumulated throughout
processing of a pipeline, and may be most useful when data-products
are zipped (or removed) throughout. Its intended usage is to hold key
components that might be used in stages like validation plotting. It is not
intended to become a catch-all to replace passing through items into
functions directly.
There are known issues around serialising astropy units within a dask/prefect environment,
for example: https://github.com/astropy/astropy/issues/11317,
This could become important if an instance of this object is exchanged
between many prefect or dask like delayed tasks. Ye be warned.
"""
"""SBID of processed field"""
"""SBID of the bandpass calibrator"""
"""The name of the field"""
[docs]
ms_summaries: tuple[MSSummary, ...] | None = None
"""Summaries of measurement sets used in the processing of the filed"""
[docs]
centre: SkyCoord | None = None
"""Centre of the field, which is calculated as the mean position of all phase directions of the `mss` measurement sets"""
[docs]
integration_time: int | None = None
"""The integration time of the observation (seconds)"""
[docs]
no_components: int | None = None
"""Number of components found from the source finder"""
[docs]
holography_path: Path | None = None
"""Path to the file used for holography"""
[docs]
round: int | None = None
"""The self-cal round"""
[docs]
location: EarthLocation | None = None
"""The location of the telescope stored as (X,Y,Z) in meters"""
[docs]
ms_times: Time | None = None
"""The unique scan times of integrations stored in the measurement set"""
[docs]
hour_angles: Longitude | None = None
"""Computed hour-angles of the field"""
[docs]
elevations: Latitude | None = None
"""Computed elevations of the field"""
"""The meanian RMS computed from an RMS image"""
[docs]
beam_summaries: list[BeamSummary] | None = None
"""Summary information from each beam. Contains MSSummary, ImageSet and other information."""
[docs]
linmos_image: Path | None = None
"""The path to the linmos image of all beams"""
[docs]
pol_axis: float | None = None
"""The orientation of the ASKAP third-axis in radians. """
[docs]
def with_options(self, **kwargs) -> FieldSummary:
prop = self._asdict()
prop.update(**kwargs)
return FieldSummary(**prop)
[docs]
def _get_pol_axis_as_rad(ms: MS | Path) -> float:
"""Helper to get the appropriate pol_axis out of a MS. Prioritises the instrumental third-axis imprinted from fixms"""
ms = MS.cast(ms=ms)
# The INSTRUMENT_RECEPTOR_ANGLE comes from fixms and is
# inserted to preserve the original orientation.
try:
pol_axis = get_pol_axis_from_ms(ms=ms, col="INSTRUMENT_RECEPTOR_ANGLE")
logger.info(f"INSTRUMENT_RECEPTOR_ANGLE obtained pol_axis={pol_axis}")
except ValueError:
pol_axis = get_pol_axis_from_ms(ms=ms, col="RECEPTOR_ANGLE")
# The prefect logger (or maybe the logger in general) does not render the quantity
logger.info(f"RECEPTOR_ANGLE obtained pol_axis={pol_axis}")
return pol_axis.to(u.rad).value
# TODO: Need to establise a MSLike type
[docs]
def add_ms_summaries(field_summary: FieldSummary, mss: list[MS]) -> FieldSummary:
"""Obtain a MSSummary instance to add to a FieldSummary
Quantities derived from the field centre (hour angles, elevations) are
also calculated. The field centre position is estimated by taking the
mean position of all phase directions.
See `flint.utils.estimate_skycoord_centre`
Args:
field_summary (FieldSummary): Existing field summary object to update
mss (List[MS]): Set of measurement sets to describe
Returns:
Tuple[MSSummary]: Results from the inspected set of measurement sets
"""
logger.info("Adding MS summaries")
ms_summaries = tuple(map(describe_ms, mss))
centres_list = [ms_summary.phase_dir for ms_summary in ms_summaries]
if len(centres_list) == 0:
raise ValueError("No phase directions found in the MSs")
elif len(centres_list) == 1:
centre = centres_list[0]
else:
centres = concatenate(centres_list)
centre = estimate_skycoord_centre(sky_positions=centres)
telescope = field_summary.location
ms_times = field_summary.ms_times
centre_altaz = centre.transform_to(AltAz(obstime=ms_times, location=telescope))
hour_angles = centre_altaz.az.to(u.hourangle) # type: ignore
elevations = centre_altaz.alt.to(u.deg) # type: ignore
# The INSTRUMENT_RECEPTOR_ANGLE comes from fixms and is
# inserted to preserve the original orientation.
pol_axis = _get_pol_axis_as_rad(ms=mss[0])
field_summary = field_summary.with_options(
ms_summaries=ms_summaries,
centre=centre,
hour_angles=hour_angles,
elevations=elevations,
pol_axis=pol_axis,
)
return field_summary
[docs]
def add_linmos_fits_image(
field_summary: FieldSummary, linmos_command: LinmosResult
) -> FieldSummary:
"""Extract the path of the linmos fits image from the LinmosResult
the co-added the field
Args:
field_summary (FieldSummary): Existing field summary to update
linmos_command (LinmosResult): Instance of a completed linmos command that coadded a field
Returns:
FieldSummary: The updated field summary object with the linmos fits image added
"""
assert isinstance(linmos_command, LinmosResult), (
f"{linmos_command=} is type {type(linmos_command)}, expected LinmosResult"
)
image_fits = linmos_command.image_fits
field_summary = field_summary.with_options(linmos_image=image_fits)
return field_summary
[docs]
def update_field_summary(
field_summary: FieldSummary,
aegean_outputs: AegeanOutputs | None = None,
mss: list[MS] | None = None,
linmos_command: LinmosResult | None = None,
**kwargs,
) -> FieldSummary:
"""Update an existing `FieldSummary` instance with additional information.
If special steps are required to be carried out based on a known input they will be.
Otherwise all additional keyword arguments are passed through to the `FieldSummary.with_options`.
Args:
field_summary (FieldSummary): Field summary object to update
aegean_outputs (Optional[AegeanOutputs], optional): Will add RMS and aegean related properties. Defaults to None.
mss (Optional[Collection[MS]], optionals): Set of measurement sets to describe
linmos_command (Optional[LinmosResult], optional): The linmos command created when co-adding all beam images together
Returns:
FieldSummary: An updated field summary objects
"""
if aegean_outputs:
field_summary = add_rms_information(
field_summary=field_summary, aegean_outputs=aegean_outputs
)
if mss:
field_summary = add_ms_summaries(field_summary=field_summary, mss=mss)
if linmos_command:
field_summary = add_linmos_fits_image(
field_summary=field_summary, linmos_command=linmos_command
)
field_summary = field_summary.with_options(**kwargs)
logger.info(f"Updated {field_summary=}")
return field_summary
[docs]
def create_field_summary(
mss: list[MS | Path],
cal_sbid_path: Path | None = None,
holography_path: Path | None = None,
aegean_outputs: AegeanOutputs | None = None,
**kwargs,
) -> FieldSummary:
"""Create a field summary object using a measurement set.
All other keyword arguments are passed directly through to `FieldSummary`
Args:
ms (Union[MS, Path]): Measurement set information will be pulled from
cal_sbid_path (Optional[Path], optional): Path to an example of a bandpass measurement set. Defaults to None.
holography_path (Optional[Path], optional): The holography fits cube used (or will be) to linmos. Defaults to None.
aegean_outputs (Optional[AegeanOutputs], optional): Should RMS / source information be added to the instance. Defaults to None.
mss (Optional[Collection[MS]], optionals): Set of measurement sets to describe
Returns:
FieldSummary: A summary of a field
"""
logger.info("Creating field summary object")
mss = [MS.cast(ms=ms) for ms in mss]
# TODO: A check here to ensure all MSs are in a consistent format
# and are from the same field
ms = MS.cast(ms=mss[0])
ms_components = processed_ms_format(in_name=ms.path)
assert ms_components is not None, (
f"MS ({ms.path}) is not in a flint-processed format!"
)
sbid = str(ms_components.sbid)
field = ms_components.field
assert field is not None, f"Field name is empty in {ms_components=} from {ms.path=}"
try:
cal_sbid = (
str(get_sbid_from_path(path=cal_sbid_path)) if cal_sbid_path else None
)
except ValueError:
cal_sbid = "-9999"
logger.info(f"Extracting SBID from {cal_sbid_path=} failed. Using {cal_sbid=}")
ms_times = get_times_from_ms(ms=ms)
integration = ms_times.ptp().to(u.second).value
location = get_telescope_location_from_ms(ms=ms)
pol_axis = _get_pol_axis_as_rad(ms=ms)
field_summary = FieldSummary(
sbid=sbid,
field_name=field,
cal_sbid=f"{cal_sbid}", # could be None, and that's OK
location=location,
integration_time=integration,
holography_path=holography_path,
ms_times=Time([ms_times.min(), ms_times.max()]),
pol_axis=pol_axis,
**kwargs,
)
field_summary = add_ms_summaries(
field_summary=field_summary, mss=[MS.cast(ms=ms) for ms in mss]
)
if aegean_outputs:
field_summary = add_rms_information(
field_summary=field_summary, aegean_outputs=aegean_outputs
)
return field_summary
[docs]
class BeamSummary(NamedTuple):
"""Structure intended to collect components from a measurement set
as it is being processed through a continuum imaging pipeline
"""
"""A summary object of a measurement set"""
[docs]
image_set: ImageSet | None = None
"""A set of images that have been created from the measurement set represented by `summary`"""
[docs]
components: AegeanOutputs | None = None
"""The source finding components from the aegean source finder"""
[docs]
def with_options(self, **kwargs) -> BeamSummary:
prop = self._asdict()
prop.update(**kwargs)
return BeamSummary(**prop)
[docs]
def create_beam_summary(
ms: MS | Path,
image_set: ImageSet | WSCleanResult | None = None,
components: AegeanOutputs | None = None,
) -> BeamSummary:
"""Create a summary of a beam
Args:
ms (Union[MS, Path]): The measurement set being considered
image_set (Optional[ImageSet], optional): Images produced from an imager. Defaults to None.
components (Optional[AegeanOutputs], optional): Source finding output components. Defaults to None.
Returns:
BeamSummary: Summary object of a beam
"""
ms = MS.cast(ms=ms)
logger.info(f"Creating BeamSummary for {ms.path=}")
ms_summary = describe_ms(ms=ms)
# TODO: Another example where a .cast type method could be useful
# or where a standardised set of attributes with a HasImageSet type
if image_set:
image_set = (
image_set if isinstance(image_set, ImageSet) else image_set.image_set
)
beam_summary = BeamSummary(
ms_summary=ms_summary, image_set=image_set, components=components
)
return beam_summary