Source code for flint.prefect.common.predict
"""Prefect tasks around model visibility prediction"""
from __future__ import annotations
from prefect import task
from flint.logging import logger
from flint.ms import MS
from flint.options import AddModelSubtractFieldOptions
from flint.predict.crystalball import CrystalBallOptions
@task
[docs]
def task_crystalball_to_ms(ms: MS, crystalball_options: CrystalBallOptions) -> MS:
"""Predict model visibilities into a measurement set using a previously constructed
blackboard sky model. See ``wsclean -save-source-list`. This used the ``crystalball``
python package, which under the hood taps into the same dask task runner running
this flow.
Visibilities are predicted into the MS's ``MODEL_DATA`` column.
Args:
ms (MS): The measurement set where model visibilities will be predicted into.
crystalball_options (CrystalBallOptions): Options around the crystal ball operation
Returns:
MS: An updated MS with the model column set
"""
from prefect_dask import get_dask_client
from flint.predict.crystalball import crystalball_predict
from flint.prefect.helpers import enable_loguru_support
# crystalball uses loguru. We want to try to attach a handler
enable_loguru_support()
with get_dask_client(set_as_default=False) as client:
logger.info("Obtained the Client supporting the DaskTaskRunner.")
return crystalball_predict(
ms=ms,
crystalball_options=crystalball_options,
dask_client=client,
output_column="MODEL_DATA",
)
@task
[docs]
def task_addmodel_to_ms(
ms: MS,
addmodel_subtract_options: AddModelSubtractFieldOptions,
) -> MS:
from flint.imager.wsclean import get_wsclean_output_source_list_path
from flint.predict.addmodel import AddModelOptions, add_model
logger.info(f"Searching for wsclean source list for {ms.path}")
for idx, pol in enumerate(addmodel_subtract_options.wsclean_pol_mode):
wsclean_source_list_path = get_wsclean_output_source_list_path(
name_path=ms.path, pol=pol
)
assert wsclean_source_list_path.exists(), (
f"{wsclean_source_list_path=} was requested, but does not exist"
)
# This should attempt to add model of different polarisations together.
# But to this point it is a future proof and is not tested.
addmodel_options = AddModelOptions(
model_path=wsclean_source_list_path,
ms_path=ms.path,
mode="c" if idx == 0 else "a",
datacolumn="MODEL_DATA",
)
assert addmodel_subtract_options.calibrate_container is not None, (
f"{addmodel_subtract_options.calibrate_container=}, which should not happen"
)
add_model(
add_model_options=addmodel_options,
container=addmodel_subtract_options.calibrate_container,
remove_datacolumn=idx == 0,
)
return ms.with_options(model_column="MODEL_DATA")