Source code for flint.prefect.flows.bandpass_pipeline

"""Pipeline to calibrate a ASKAP style bandpass observation.

At the moment this is expected to be performed against observations taken
towards PKS1934-638. At the time of writing a typical ASKAP bandpass
observation will cycle each beam so that it is centred on this source.
This means that practically there are 36 separate fields at slightly
different field centres. The bandpass calibration process will first have
to split the correct field out before actually calibration.
"""

from __future__ import annotations

from argparse import ArgumentParser
from pathlib import Path
from typing import Collection

from prefect import flow, task, unmapped

from flint.bandpass import extract_correct_bandpass_pointing
from flint.calibrate.aocalibrate import (
    ApplySolutions,
    CalibrateCommand,
    create_apply_solutions_cmd,
    create_calibrate_cmd,
    flag_aosolutions,
    select_aosolution_for_ms,
)
from flint.flagging import flag_ms_aoflagger
from flint.logging import logger
from flint.ms import preprocess_askap_ms, split_by_field
from flint.naming import get_sbid_from_path
from flint.options import (
    MS,
    BandpassOptions,
    add_options_to_parser,
    create_options_from_parser,
)
from flint.prefect.clusters import get_dask_runner
from flint.prefect.common.utils import upload_image_as_artifact
from flint.sky_model import get_1934_model

# These are generic functions that are wrapped. Their inputs are fairly standard
# and do not require any type of unpacking or testing before they are use.
[docs] task_extract_correct_bandpass_pointing = task(extract_correct_bandpass_pointing)
[docs] task_preprocess_askap_ms = task(preprocess_askap_ms)
[docs] task_flag_ms_aoflagger = task(flag_ms_aoflagger)
[docs] task_create_calibrate_cmd = task(create_calibrate_cmd)
[docs] task_split_by_field = task(split_by_field)
[docs] task_select_solution_for_ms = task(select_aosolution_for_ms)
[docs] task_create_apply_solutions_cmd = task(create_apply_solutions_cmd)
# The tasks below are ones that require some of the inputs to be cast or transformed # into something that is known to the actual worker functions. @task
[docs] def task_bandpass_create_apply_solutions_cmd( ms: MS, calibrate_cmd: CalibrateCommand, container: Path, output_column: str | None = None, ) -> ApplySolutions: """Apply an ao-calibrate style solutions file to an input measurement set. Internally the solutions path to apply to the nominaled measurement set is extracted from the incoming ``calibrate_cmd``. Args: ms (MS): The measurement set that will have solutions applied calibrate_cmd (CalibrateCommand): The calibrate command and meta-data describing the solutions to apply container (Path): Path to singularity container that will apply the solutions output_column (Optional[Path], optional): the output column name to create. Defaults to None. Returns: ApplySolutions: The apply solutions command and meta-data """ return create_apply_solutions_cmd( ms=ms, solutions_file=calibrate_cmd.solution_path, output_column=output_column, container=container, )
@task
[docs] def task_flag_solutions( calibrate_cmd: CalibrateCommand, **kwargs, ) -> CalibrateCommand: """Flag calibration solutions Args: calibrate_cmd (CalibrateCommand): Calibrate command that contains path to the solution file that will be flagged Returns: CalibrateCommand: Calibrate command with update meta-data describing the new solutions file """ solution_path = calibrate_cmd.solution_path ms_path = calibrate_cmd.ms.path plot_dir = ms_path.parent / Path("preflagger") if not plot_dir.exists(): try: logger.info(f"Attempting to create {plot_dir}") plot_dir.mkdir(parents=True) except FileExistsError: logger.warning( "Creating the directory failed. Likely already exists. Race conditions, me-hearty." ) flagged_solutions = flag_aosolutions( solutions_path=solution_path, ref_ant=-1, flag_cut=3, plot_dir=plot_dir, **kwargs, ) for image_path in flagged_solutions.plots: upload_image_as_artifact(image_path=image_path, description=image_path.name) return calibrate_cmd.with_options( solution_path=flagged_solutions.path, preflagged=True )
[docs] def run_bandpass_stage( bandpass_mss: Collection[MS], output_split_bandpass_path: Path, bandpass_options: BandpassOptions, model_path: Path, source_name_prefix: str = "B1934-638", skip_rotation: bool = False, ) -> list[CalibrateCommand]: """Executes the bandpass calibration (using ``calibrate``) against a set of input measurement sets. Args: bandpass_mss (Collection[MS]): Set of bandpass measurement sets to calibrate output_split_bandpass_path (Path): The location where the extract field centred on the calibration field (typically PKSB19340638) bandpass_options (BandpassOptions): Configurables that will specify the bandpass calibbration process model_path (Path): Path to the model used to calibrate against source_name_prefix (str, optional): Name of the field being calibrated. Defaults to "B1934-638". skip_rotation (bool, optional): If ``True`` the rotation of the ASKAP visibility from the antenna frame to the sky-frame will be skipped. Defaults to False. Returns: List[CalibrateCommand]: Set of calibration commands used """ assert bandpass_options.flag_calibrate_rounds >= 0, ( f"Currently {bandpass_options.flag_calibrate_rounds=}, needs to be 0 or higher" ) if not output_split_bandpass_path.exists(): logger.info(f"Creating {output_split_bandpass_path!s}") output_split_bandpass_path.mkdir(parents=True) calibrate_cmds: list[CalibrateCommand] = [] extract_bandpass_mss = task_extract_correct_bandpass_pointing.map( ms=bandpass_mss, source_name_prefix=source_name_prefix, ms_out_dir=output_split_bandpass_path, ) preprocess_bandpass_mss = task_preprocess_askap_ms.map( ms=extract_bandpass_mss, skip_rotation=skip_rotation ) flag_bandpass_mss = task_flag_ms_aoflagger.map( ms=preprocess_bandpass_mss, container=bandpass_options.flagger_container, ) calibrate_cmds = task_create_calibrate_cmd.map( ms=flag_bandpass_mss, calibrate_model=model_path, container=bandpass_options.calibrate_container, update_calibrate_options=unmapped(dict(minuv=bandpass_options.minuv)), ) for i in range(bandpass_options.flag_calibrate_rounds): # Apply and then recalibrate apply_cmds = task_bandpass_create_apply_solutions_cmd.map( ms=calibrate_cmds, calibrate_cmd=calibrate_cmds, output_column="CORRECTED_DATA", container=bandpass_options.calibrate_container, ) flag_bandpass_mss = task_flag_ms_aoflagger.map( ms=apply_cmds, container=bandpass_options.flagger_container ) calibrate_cmds = task_create_calibrate_cmd.map( ms=flag_bandpass_mss, calibrate_model=model_path, container=bandpass_options.calibrate_container, calibrate_data_column="DATA", update_calibrate_options=unmapped(dict(minuv=bandpass_options.minuv)), ) flag_calibrate_cmds = task_flag_solutions.map( calibrate_cmd=calibrate_cmds, smooth_solutions=bandpass_options.smooth_solutions, smooth_window_size=bandpass_options.smooth_window_size, smooth_polynomial_order=bandpass_options.smooth_polynomial_order, mean_ant_tolerance=bandpass_options.preflagger_ant_mean_tolerance, mesh_ant_flags=bandpass_options.preflagger_mesh_ant_flags, max_gain_amplitude=bandpass_options.preflagger_jones_max_amplitude, ) return flag_calibrate_cmds
@flow
[docs] def calibrate_bandpass_flow( bandpass_path: Path, split_path: Path, bandpass_options: BandpassOptions, ) -> Path: """Create and run the prefect flow to calibrate a set of bandpass measurement sets. The measurement sets that will be calibreated are expected to: - be following the raw name format convention - reside in a directory whose name is the SBID of the observation The current bandpass procedure currently relies on the Andre Offringa's ``calibrate`` tool, with slight modification from Emil Lenc. The well known source PKS1934-638 is the only source supported for bandpass calibration, and its model is packaged inside ``flint``. This model is in the AO-style format. Each measurement set will correspond to a solutions file once the ``calibrate`` tool has been executed successfully. These should be kept together -- there is not enough meta-data in the single solutions file to denote the frequency / channels / beam number described in the measurement set. Args: bandpass_path (Path): Location to the folder containing the raw ASKAP bandpass measurement sets that will be calibrated split_path (Path): Location that will contain a folder, named after the SBID of the observation, that will contain the output bandpass measurement sets, solutions and plots bandpass_options (BandpassOptions): Options that specify configurables of the bandpass processing. Returns: Path: Directory that contains the extracted measurement sets and the ao-style gain solutions files. """ assert bandpass_path.exists() and bandpass_path.is_dir(), ( f"{bandpass_path!s} does not exist or is not a folder. " ) bandpass_mss = list([MS.cast(ms_path) for ms_path in bandpass_path.glob("*.ms")]) assert len(bandpass_mss) == bandpass_options.expected_ms, ( f"Expected to find {bandpass_options.expected_ms} in {bandpass_path!s}, found {len(bandpass_mss)}." ) logger.info( f"Found the following bandpass measurement set: {[bp.path for bp in bandpass_mss]}." ) bandpass_folder_name = bandpass_path.name output_split_bandpass_path = ( Path(split_path / bandpass_folder_name).absolute().resolve() ) logger.info( f"Will write extracted bandpass MSs to: {output_split_bandpass_path!s}." ) # This is the model that we will calibrate the bandpass against. # At the time of writing 1934-638 is the only model that is supported, # not only by this pirate ship, but also the ASKAP telescope itself. model_path: Path = get_1934_model(mode="calibrate") source_name_prefix: str = "B1934-638" run_bandpass_stage( bandpass_mss=bandpass_mss, output_split_bandpass_path=output_split_bandpass_path, bandpass_options=bandpass_options, model_path=model_path, source_name_prefix=source_name_prefix, skip_rotation=True, ) return output_split_bandpass_path
[docs] def setup_run_bandpass_flow( bandpass_path: Path, split_path: Path, cluster_config: Path, bandpass_options: BandpassOptions, ) -> Path: """Create and run the prefect flow to calibrate a set of bandpass measurement sets. The measurement sets that will be calibreated are expected to: - be following the raw name format convention - reside in a directory whose name is the SBID of the observation The current bandpass procedure currently relies on the Andre Offringa's ``calibrate`` tool, with slight modification from Emil Lenc. The well known source PKS1934-638 is the only source supported for bandpass calibration, and its model is packaged inside ``flint``. This model is in the AO-style format. Each measurement set will correspond to a solutions file once the ``calibrate`` tool has been executed successfully. These should be kept together -- there is not enough meta-data in the single solutions file to denote the frequency / channels / beam number described in the measurement set. Args: bandpass_path (Path): Location to the folder containing the raw ASKAP bandpass measurement sets that will be calibrated split_path (Path): Location that will contain a folder, named after the SBID of the observation, that will contain the output bandpass measurement sets, solutions and plots cluster_config (Path): Path to a yaml file that is used to configure a prefect dask task runner. bandpass_options (BandpassOptions): Options that specify configurables of the bandpass processing. Returns: Path: Directory that contains the extracted measurement sets and the ao-style gain solutions files. """ dask_task_runner = get_dask_runner(cluster=cluster_config) bandpass_sbid = get_sbid_from_path(path=bandpass_path) calibrate_bandpass_flow.with_options( name=f"Flint Bandpass Pipeline -- {bandpass_sbid}", task_runner=dask_task_runner )( bandpass_path=bandpass_path, split_path=split_path, bandpass_options=bandpass_options, ) return bandpass_path
[docs] def get_parser() -> ArgumentParser: """Create an argument parser for the bandpass prefect workflow Returns: ArgumentParser: CLI argument parser """ parser = ArgumentParser( description="Perform bandpass calibration against an ASKAP SBID. " ) parser.add_argument( "bandpass_path", type=Path, help="Path to the directory containing the uncalibrated bandpass measurement sets. ", ) parser.add_argument( "--split-path", type=Path, default=Path("."), help="Location to write the field-split MSs. Will attempt to create a directory using the SBID of the bandpass observation. ", ) parser.add_argument( "--cluster-config", type=str, default="petrichor", help="Path to a cluster configuration file, or a known cluster name. ", ) parser = add_options_to_parser(parser=parser, options_class=BandpassOptions) return parser
[docs] def cli() -> None: import logging logger.setLevel(logging.INFO) parser = get_parser() args = parser.parse_args() bandpass_options = create_options_from_parser( parser_namespace=args, options_class=BandpassOptions ) setup_run_bandpass_flow( bandpass_path=args.bandpass_path, split_path=args.split_path, cluster_config=args.cluster_config, bandpass_options=bandpass_options, )
if __name__ == "__main__": cli()