"""A prefect based pipeline that:
- will perform bandpass calibration with PKS B1934-638 data, or from a derived sky-model
- copy and apply to science field
- image and self-calibration the science fields
- run aegean source finding
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from configargparse import ArgumentParser
from prefect import flow, tags, unmapped
from flint.calibrate.aocalibrate import find_existing_solutions
from flint.catalogue import verify_reference_catalogues
from flint.coadd.linmos import LinmosResult
from flint.configuration import (
Strategy,
get_options_from_strategy,
load_and_copy_strategy,
)
from flint.logging import logger
from flint.masking import consider_beam_mask_round
from flint.ms import find_mss
from flint.naming import (
CASDANameComponents,
add_timestamp_to_path,
extract_components_from_name,
get_sbid_from_path,
)
from flint.options import (
FieldOptions,
add_options_to_parser,
create_options_from_parser,
dump_field_options_to_yaml,
)
from flint.prefect.clusters import get_dask_runner
from flint.prefect.common.imaging import (
create_convol_linmos_images,
create_convolve_linmos_cubes,
task_copy_and_preprocess_casda_askap_ms,
task_create_apply_solutions_cmd,
task_create_image_mask_model,
task_flag_ms_aoflagger,
task_gaincal_applycal_ms,
task_potato_peel,
task_preprocess_askap_ms,
task_rename_column_in_ms,
task_run_bane_and_aegean,
task_select_solution_for_ms,
task_split_by_field,
task_wsclean_imager,
task_zip_ms,
validation_items,
)
from flint.prefect.common.ms import (
task_add_model_source_list_to_ms,
task_jolly_roger_tractor,
)
from flint.prefect.common.utils import (
task_archive_sbid,
task_create_beam_summary,
task_create_field_summary,
task_flatten,
task_update_field_summary,
task_update_with_options,
)
from flint.selfcal.utils import consider_skip_selfcal_on_round
[docs]
def _check_field_options(field_options: FieldOptions) -> None:
run_aegean = (
False if field_options.aegean_container is None else field_options.run_aegean
)
if (
field_options.imaging_strategy is not None
and not field_options.imaging_strategy.exists()
):
raise ValueError(
f"Imaging strategy file {field_options.imaging_strategy} is set, but the path does not exist"
)
if field_options.use_beam_masks is True and run_aegean is False:
raise ValueError(
"run_aegean and aegean container both need to be set is beam masks is being used. "
)
if field_options.reference_catalogue_directory:
if not verify_reference_catalogues(
reference_directory=field_options.reference_catalogue_directory
):
raise ValueError(
f"{field_options.reference_catalogue_directory=} does not appear to be valid. Check for reference catalogues"
)
if field_options.rounds is not None:
if field_options.rounds >= 1 and field_options.casa_container is None:
raise ValueError(
"CASA Container needs to be set if self-calibration is to be performed"
)
if field_options.coadd_cubes:
if (
field_options.yandasoft_container is None
or not field_options.yandasoft_container
):
raise ValueError(
"Unable to create linmos cubes without a yandasoft container"
)
[docs]
def _check_create_output_split_science_path(
science_path: Path, split_path: Path, check_exists: bool = True
) -> Path:
"""Create the output path that the science MSs will be placed.
Args:
science_path (Path): The directory that contains the MSs for science processing
split_path (Path): Where the output MSs will be written to and processed
check_exists (bool, optional): Should we check to make sure output directory does not exist. Defaults to True.
Raises:
ValueError: Raised when the output directory exists
Returns:
Path: The output directory
"""
science_folder_name = science_path.name
assert str(science_folder_name).isdigit(), (
f"We require the parent directory to be the SBID (all digits), got {science_folder_name=}"
)
output_split_science_path = (
Path(split_path / science_folder_name).absolute().resolve()
)
if check_exists and output_split_science_path.exists():
logger.critical(
f"{output_split_science_path=} already exists. It should not. Exiting. "
)
raise ValueError("Output science directory already exists. ")
logger.info(f"Creating {output_split_science_path!s}")
output_split_science_path.mkdir(parents=True)
return output_split_science_path
@flow(name="Flint Continuum Pipeline")
[docs]
def process_science_fields(
science_path: Path,
split_path: Path,
field_options: FieldOptions,
bandpass_path: Path | None = None,
) -> None:
# Verify no nasty incompatible options
_check_field_options(field_options=field_options)
# Get some placeholder names
run_aegean = (
False if field_options.aegean_container is None else field_options.run_aegean
) # This is also in check_field_options
run_validation = field_options.reference_catalogue_directory is not None
science_mss = find_mss(
mss_parent_path=science_path, expected_ms_count=field_options.expected_ms
)
output_split_science_path = _check_create_output_split_science_path(
science_path=science_path, split_path=split_path, check_exists=True
)
dump_field_options_to_yaml(
output_path=add_timestamp_to_path(
input_path=output_split_science_path / "field_options.yaml"
),
field_options=field_options,
)
archive_wait_for: list[Any] = []
strategy: Strategy | None = load_and_copy_strategy(
output_split_science_path=output_split_science_path,
imaging_strategy=field_options.imaging_strategy,
)
logger.info(f"{field_options=}")
logger.info(f"Found the following raw measurement sets: {science_mss}")
# TODO: This feels a little too much like that feeling of being out
# at sea for to long. Should refactor (or mask a EMU only).
if isinstance(
extract_components_from_name(name=science_mss[0].path), CASDANameComponents
):
preprocess_science_mss = task_copy_and_preprocess_casda_askap_ms.map(
casda_ms=science_mss,
casa_container=field_options.casa_container,
output_directory=output_split_science_path,
)
if field_options.flagger_container is not None:
preprocess_science_mss = task_flag_ms_aoflagger.map( # type: ignore
ms=preprocess_science_mss, container=field_options.flagger_container
)
else:
# TODO: This will likely need to be expanded should any
# other calibration strategies get added
# Scan the existing bandpass directory for the existing solutions
assert bandpass_path, f"{bandpass_path=}, it needs to be set"
calibrate_cmds = find_existing_solutions(
bandpass_directory=bandpass_path,
use_preflagged=field_options.use_preflagger,
use_smoothed=field_options.use_smoothed,
)
logger.info(f"Constructed the following {calibrate_cmds=}")
split_science_mss = task_split_by_field.map(
ms=science_mss,
field=None,
out_dir=unmapped(output_split_science_path),
column=unmapped("DATA"),
)
# This will block until resolved
flat_science_mss = task_flatten.submit(split_science_mss).result()
solutions_paths = task_select_solution_for_ms.map(
calibrate_cmds=unmapped(calibrate_cmds), ms=flat_science_mss
)
apply_solutions_cmds = task_create_apply_solutions_cmd.map(
ms=flat_science_mss,
solutions_file=solutions_paths,
container=field_options.calibrate_container,
)
if field_options.flagger_container is not None:
flagged_mss = task_flag_ms_aoflagger.map(
ms=apply_solutions_cmds, container=field_options.flagger_container
)
column_rename_mss = task_rename_column_in_ms.map(
ms=flagged_mss,
original_column_name=unmapped("DATA"),
new_column_name=unmapped("INSTRUMENT_DATA"),
)
preprocess_science_mss = task_preprocess_askap_ms.map(
ms=column_rename_mss,
data_column=unmapped("CORRECTED_DATA"),
instrument_column=unmapped("DATA"),
overwrite=True,
)
if field_options.no_imaging:
logger.info(
f"No imaging will be performed, as requested by {field_options.no_imaging=}"
)
return
if field_options.wsclean_container is None:
logger.info("No wsclean container provided. Returning. ")
return
if field_options.potato_container:
# The call into potato peel task has two potential update option keywords.
# So for the moment we will not use the task decorated version.
potato_wsclean_init = get_options_from_strategy(
strategy=strategy, mode="wsclean", round_info=0, operation="selfcal"
)
potato_peel_options = get_options_from_strategy(
strategy=strategy, mode="potatopeel", round_info=0, operation="selfcal"
)
preprocess_science_mss = task_potato_peel.map(
ms=preprocess_science_mss,
potato_container=field_options.potato_container,
update_wsclean_options=unmapped(potato_wsclean_init),
update_potato_peel_options=unmapped(potato_peel_options),
)
if field_options.use_jolly_tukey_tractor:
# TODO: Need to consider expanding the task below to
# handle MS column?
tukey_tractor_options = get_options_from_strategy(
strategy=strategy, mode="tukeytractor", round_info=0, operation="selfcal"
)
preprocess_science_mss = task_jolly_roger_tractor.map(
ms=preprocess_science_mss,
update_tukey_tractor_options=unmapped(tukey_tractor_options),
)
# Some preprocessing stages (temporarily) modify the MS name.
# Run the field summary here to avoid attemptign to read at
# poor timem when MS is renamed, ya see dog
field_summary = task_create_field_summary.submit(
mss=preprocess_science_mss,
cal_sbid_path=bandpass_path,
holography_path=field_options.holofile,
) # type: ignore
logger.info(f"{field_summary=}")
# The stokes-v mss are updated throughout the self-calibration loop
# as the file names change
stokes_v_mss = preprocess_science_mss
wsclean_results = task_wsclean_imager.map(
in_ms=preprocess_science_mss,
wsclean_container=field_options.wsclean_container,
update_wsclean_options=unmapped(
get_options_from_strategy(
strategy=strategy,
mode="wsclean",
round_info=0,
operation="selfcal",
)
),
) # type: ignore
wsclean_results = (
task_add_model_source_list_to_ms.map(
wsclean_command=wsclean_results,
calibrate_container=field_options.calibrate_container,
)
if field_options.update_model_data_with_source_list
else wsclean_results
)
beam_summaries = task_create_beam_summary.map(
ms=preprocess_science_mss, image_set=wsclean_results
)
archive_wait_for.extend(beam_summaries)
archive_wait_for.extend(wsclean_results)
beam_aegean_outputs = None
if run_aegean:
update_bane_options = get_options_from_strategy(
strategy=strategy,
mode="bane",
round_info=0,
operation="selfcal",
)
update_aegean_options = get_options_from_strategy(
strategy=strategy,
mode="aegean",
round_info=0,
operation="selfcal",
)
beam_aegean_outputs = task_run_bane_and_aegean.map(
image=wsclean_results,
aegean_container=unmapped(field_options.aegean_container),
update_bane_options=unmapped(update_bane_options),
update_aegean_options=unmapped(update_aegean_options),
)
beam_summaries = task_update_with_options.map(
input_object=beam_summaries, components=beam_aegean_outputs
)
field_summary = task_update_with_options.submit(
input_object=field_summary, beam_summaries=beam_summaries
)
if field_options.yandasoft_container:
parsets = create_convol_linmos_images(
wsclean_results=wsclean_results,
field_options=field_options,
field_summary=field_summary,
additional_linmos_suffix_str="noselfcal", # indicate in output linmos name no selfcal
)
archive_wait_for.extend(parsets)
parset = parsets[-1]
if run_aegean:
aegean_field_output = task_run_bane_and_aegean.submit(
image=parset, aegean_container=unmapped(field_options.aegean_container)
) # type: ignore
field_summary = task_update_field_summary.submit(
field_summary=field_summary,
aegean_outputs=aegean_field_output,
linmos_command=parset,
) # type: ignore
archive_wait_for.append(field_summary)
if run_validation and field_options.reference_catalogue_directory:
validation_items(
field_summary=field_summary,
aegean_outputs=aegean_field_output,
reference_catalogue_directory=field_options.reference_catalogue_directory,
)
# Set up the default value should the user activated mask option is not set
fits_beam_masks = None
for current_round in range(1, field_options.rounds + 1):
with tags(f"selfcal-{current_round}"):
final_round = current_round == field_options.rounds
skip_gaincal_current_round = consider_skip_selfcal_on_round(
current_round=current_round,
skip_selfcal_on_rounds=field_options.skip_selfcal_on_rounds,
)
update_gain_options = get_options_from_strategy(
strategy=strategy,
mode="gaincal",
round_info=current_round,
operation="selfcal",
)
cal_mss = task_gaincal_applycal_ms.map(
ms=wsclean_results,
selfcal_round=current_round,
archive_input_ms=field_options.zip_ms,
skip_selfcal=skip_gaincal_current_round,
rename_ms=field_options.rename_ms,
archive_cal_table=True,
casa_container=field_options.casa_container,
update_gain_cal_options=unmapped(update_gain_options),
wait_for=[
field_summary,
# To make sure field summary is created with unzipped MSs
*beam_summaries,
# to make sure gaincal does not run before the beam summary
],
) # type: ignore
stokes_v_mss = cal_mss
fits_beam_masks = None
if consider_beam_mask_round(
current_round=current_round,
mask_rounds=(
field_options.use_beam_masks_rounds
if field_options.use_beam_masks_rounds
else field_options.use_beam_masks_from
),
allow_beam_masks=field_options.use_beam_masks,
):
# TODO: the aegean outputs are only needed should
# the signal image be needed or per beam sources are needed
update_bane_options = get_options_from_strategy(
strategy=strategy,
mode="bane",
round_info=current_round,
operation="selfcal",
)
update_aegean_options = get_options_from_strategy(
strategy=strategy,
mode="aegean",
round_info=current_round,
operation="selfcal",
)
beam_aegean_outputs = (
task_run_bane_and_aegean.map(
image=wsclean_results,
aegean_container=unmapped(field_options.aegean_container),
update_bane_options=unmapped(update_bane_options),
update_aegean_options=unmapped(update_aegean_options),
)
if (current_round >= 2 or not beam_aegean_outputs)
else beam_aegean_outputs
)
# Early versions of the masking procedure required aegean outputs
# to construct the sginal images. Since aegean is run outside of
# this self-cal loop once already, we can skip their creation on
# the first loop
update_masking_options = get_options_from_strategy(
strategy=strategy,
mode="masking",
round_info=current_round,
operation="selfcal",
)
fits_beam_masks = task_create_image_mask_model.map(
image=wsclean_results,
image_products=beam_aegean_outputs,
update_masking_options=unmapped(update_masking_options),
) # type: ignore
update_wsclean_options = get_options_from_strategy(
strategy=strategy,
mode="wsclean",
operation="selfcal",
round_info=current_round,
)
wsclean_results = task_wsclean_imager.map(
in_ms=cal_mss,
wsclean_container=field_options.wsclean_container,
fits_mask=fits_beam_masks,
update_wsclean_options=unmapped(update_wsclean_options),
)
wsclean_results = (
task_add_model_source_list_to_ms.map(
wsclean_command=wsclean_results,
calibrate_container=field_options.calibrate_container,
)
if field_options.update_model_data_with_source_list
else wsclean_results
)
archive_wait_for.extend(wsclean_results)
# Do source finding on the last round of self-cal'ed images
if round == field_options.rounds and run_aegean:
task_run_bane_and_aegean.map(
image=wsclean_results,
aegean_container=unmapped(field_options.aegean_container),
)
parsets_self: None | list[LinmosResult] = None # Without could be unbound
if field_options.yandasoft_container:
parsets_self = create_convol_linmos_images(
wsclean_results=wsclean_results,
field_options=field_options,
field_summary=field_summary,
)
archive_wait_for.extend(parsets_self)
if final_round and run_aegean and parsets_self:
aegean_outputs = task_run_bane_and_aegean.submit(
image=parsets_self[-1],
aegean_container=unmapped(field_options.aegean_container),
) # type: ignore
field_summary = task_update_field_summary.submit(
field_summary=field_summary,
aegean_outputs=aegean_outputs,
round=current_round,
) # type: ignore
if run_validation:
assert field_options.reference_catalogue_directory, (
f"Reference catalogue directory should be set when {run_validation=}"
)
val_results = validation_items(
field_summary=field_summary,
aegean_outputs=aegean_outputs,
reference_catalogue_directory=field_options.reference_catalogue_directory,
)
archive_wait_for.append(val_results)
if field_options.coadd_cubes:
with tags("cubes"):
cube_parset = create_convolve_linmos_cubes(
wsclean_results=wsclean_results, # type: ignore
field_options=field_options,
current_round=(field_options.rounds if field_options.rounds else None),
additional_linmos_suffix_str="cube",
)
archive_wait_for.append(cube_parset)
if field_options.stokes_v_imaging:
with tags("stokes-v"):
stokes_v_wsclean_options = get_options_from_strategy(
strategy=strategy, mode="wsclean", operation="stokesv"
)
wsclean_results = task_wsclean_imager.map(
in_ms=stokes_v_mss,
wsclean_container=field_options.wsclean_container,
update_wsclean_options=unmapped(stokes_v_wsclean_options),
fits_mask=fits_beam_masks,
wait_for=wsclean_results, # Ensure that measurement sets are doubled up during imaging
)
if field_options.yandasoft_container:
parsets = create_convol_linmos_images(
wsclean_results=wsclean_results,
field_options=field_options.with_options(linmos_residuals=False),
field_summary=field_summary,
)
archive_wait_for.extend(parsets)
# zip up the final measurement set, which is not included in the above loop
if field_options.zip_ms:
archive_results = task_zip_ms.map(
in_item=wsclean_results,
wait_for=wsclean_results, # can zip MS once all imaging done
)
archive_wait_for.extend(archive_results)
if field_options.sbid_archive_path or field_options.sbid_copy_path:
update_archive_options = get_options_from_strategy(
strategy=strategy, mode="archive", round_info=0, operation="selfcal"
)
task_archive_sbid.submit(
science_folder_path=output_split_science_path,
archive_path=field_options.sbid_archive_path,
copy_path=field_options.sbid_copy_path,
max_round=field_options.rounds if field_options.rounds else None,
update_archive_options=update_archive_options,
wait_for=archive_wait_for,
)
[docs]
def setup_run_process_science_field(
cluster_config: str | Path,
science_path: Path,
split_path: Path,
field_options: FieldOptions,
bandpass_path: Path | None = None,
) -> None:
science_sbid = get_sbid_from_path(path=science_path)
if field_options.sbid_copy_path:
updated_sbid_copy_path = field_options.sbid_copy_path / f"{science_sbid}"
logger.info(f"Updating archive copy path to {updated_sbid_copy_path=}")
field_options = field_options.with_options(
sbid_copy_path=updated_sbid_copy_path
)
dask_task_runner = get_dask_runner(cluster=cluster_config)
process_science_fields.with_options(
name=f"Flint Continuum Pipeline - {science_sbid}", task_runner=dask_task_runner
)(
science_path=science_path,
bandpass_path=bandpass_path,
split_path=split_path,
field_options=field_options,
)
# TODO: Put the archive stuff here?
[docs]
def get_parser() -> ArgumentParser:
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--cli-config", is_config_file=True, help="Path to configuration file"
)
parser.add_argument(
"science_path",
type=Path,
help="Path to directories containing the beam-wise science measurementsets that will have solutions copied over and applied.",
)
parser.add_argument(
"--split-path",
type=Path,
default=Path("."),
help="Location to write field-split MSs to. Will attempt to use the parent name of a directory when writing out a new MS. ",
)
parser.add_argument(
"--calibrated-bandpass-path",
type=Path,
default=None,
help="Path to directory containing the uncalibrated beam-wise measurement sets that contain the bandpass calibration source. If None then the '--sky-model-directory' should be provided. ",
)
parser.add_argument(
"--cluster-config",
type=str,
default="petrichor",
help="Path to a cluster configuration file, or a known cluster name. ",
)
parser = add_options_to_parser(parser=parser, options_class=FieldOptions)
return parser
[docs]
def cli() -> None:
import logging
# logger = logging.getLogger("flint")
logger.setLevel(logging.INFO)
parser = get_parser()
args = parser.parse_args()
field_options: FieldOptions = create_options_from_parser(
parser_namespace=args,
options_class=FieldOptions,
)
setup_run_process_science_field(
cluster_config=args.cluster_config,
science_path=args.science_path,
bandpass_path=args.calibrated_bandpass_path,
split_path=args.split_path,
field_options=field_options,
)
if __name__ == "__main__":
cli()