Source code for flint.prefect.common.ms

"""Common prefect tasks around interacting with measurement sets"""

from __future__ import annotations

from pathlib import Path
from typing import ParamSpec, TypeVar

from prefect import Task, task

from flint.imager.wsclean import WSCleanResult
from flint.logging import logger
from flint.ms import subtract_model_from_data_column
from flint.predict.addmodel import AddModelOptions, add_model

[docs] P = ParamSpec("P")
[docs] R = TypeVar("R")
[docs] task_subtract_model_from_ms = task(subtract_model_from_data_column)
# TODO: This can be a dispatcher type function should # other modes be added
[docs] def add_model_source_list_to_ms( wsclean_command: WSCleanResult, calibrate_container: Path | None = None ) -> WSCleanResult: logger.info("Updating MODEL_DATA with source list") ms = wsclean_command.ms assert wsclean_command.image_set is not None, ( f"{wsclean_command.image_set=}, which is not allowed" ) source_list_path = wsclean_command.image_set.source_list if source_list_path is None: logger.info(f"{source_list_path=}, so not updating") return wsclean_command assert source_list_path.exists(), f"{source_list_path=} does not exist" if calibrate_container is None: logger.info(f"{calibrate_container=}, so not updating") return wsclean_command assert calibrate_container.exists(), f"{calibrate_container=} does not exist" add_model_options = AddModelOptions( model_path=source_list_path, ms_path=ms.path, mode="c", datacolumn="MODEL_DATA", ) add_model( add_model_options=add_model_options, container=calibrate_container, remove_datacolumn=True, ) return wsclean_command
[docs] task_add_model_source_list_to_ms: Task[P, R] = task(add_model_source_list_to_ms)