"""Common prefect related utilities that can be used between flows."""
from __future__ import annotations
import base64
from pathlib import Path
from typing import Any, Callable, Iterable, ParamSpec, TypeVar
from uuid import UUID
from prefect import task
from prefect.artifacts import create_markdown_artifact
from flint.archive import copy_sbid_files_archive, create_sbid_tar_archive
from flint.logging import logger
from flint.naming import (
add_timestamp_to_path,
get_fits_cube_from_paths,
get_sbid_from_path,
rename_linear_to_stokes,
)
from flint.options import ArchiveOptions
from flint.summary import (
create_beam_summary,
create_field_summary,
update_field_summary,
)
[docs]
C = TypeVar("C", bound=Callable)
[docs]
SUPPORTED_IMAGE_TYPES = ("png",)
@task
[docs]
def task_create_object(object: C, **kwargs) -> C:
return object(**kwargs)
@task
[docs]
def task_getattr(
item: object,
attribute: str,
/,
) -> Any:
"""Retrieve an attribute from an input instance of a class or structure.
Args:
item (Any): The item that has the input class or structure
attribute (str): The attribute to extract
Returns:
Any: Value of the requested attribute
"""
logger.debug(f"Pulling {attribute=}")
return getattr(item, attribute)
@task
[docs]
def task_sorted(
iterable: Iterable[T],
/,
*,
key: Any = None,
reverse: bool = False,
) -> list[T]:
return sorted(iterable, key=key, reverse=reverse)
@task
[docs]
def task_zip_list_of_list(list_of_list: list[list[T]]) -> list[tuple[T, ...]]:
return list(zip(*list_of_list))
[docs]
def upload_image_as_artifact(
image_path: Path, description: str | None = None
) -> UUID | None:
"""Create and submit a markdown artifact tracked by prefect for an
input image. Currently supporting png formatted images.
The input image is converted to a base64 encoding, and embedded directly
within the markdown string. Therefore, be mindful of the image size as this
is tracked in the postgres database.
Args:
image_path (Path): Path to the image to upload
description (Optional[str], optional): A description passed to the markdown artifact. Defaults to None.
Returns:
UUID | None: Generated UUID of the registered artifact. If the upload fails ``None`` is returned.
"""
from prefect.exceptions import PrefectHTTPStatusError
image_type = image_path.suffix.replace(".", "")
assert image_path.exists(), f"{image_path} does not exist"
assert image_type in SUPPORTED_IMAGE_TYPES, (
f"{image_path} has type {image_type}, and is not supported. Supported types are {SUPPORTED_IMAGE_TYPES}"
)
with open(image_path, "rb") as open_image:
logger.info(f"Encoding {image_path} in base64")
image_base64 = base64.b64encode(open_image.read()).decode()
logger.info("Creating markdown tag")
markdown = f""
try:
image_uuid: UUID | None = None
logger.info("Registering artifact")
image_uuid = create_markdown_artifact(
markdown=markdown, description=description
)
except PrefectHTTPStatusError as e:
logger.warning("Failed to register artefact. ")
logger.warning(f"{e=}")
return image_uuid
[docs]
task_update_field_summary = task(update_field_summary)
[docs]
task_create_field_summary = task(create_field_summary)
[docs]
task_create_beam_summary = task(create_beam_summary)
[docs]
task_get_fits_cube_from_paths = task(get_fits_cube_from_paths)
[docs]
task_rename_linear_to_stokes = task(rename_linear_to_stokes)
@task
[docs]
def task_archive_sbid(
science_folder_path: Path,
archive_path: Path | None = None,
copy_path: Path | None = None,
max_round: int | None = None,
update_archive_options: dict[str, Any] | None = None,
) -> Path:
"""Create a tarball of files, or copy files, from a processing folder.
Args:
science_folder_path (Path): Path that contains the imaged produced
archive_path (Optional[Path], optional): Location to create and store the tar ball at. If None no tarball is created. Defaults to None.
copy_path (Optional[Path], optional): Location to copy selected files into. If None no files are copied. Defaults to None.
max_round (Optional[int], optional): The last self-calibration round performed. If provided some files form this round are copied (assuming wsclean imaging). If None, the default file patterns in ArchiveOptions are used. Defaults to None.
update_archive_options (Optional[Dict[str, Any]], optional): Additional options to provide to ArchiveOptions. Defaults to None.
Returns:
Path: The science folder files were copied from
"""
sbid = get_sbid_from_path(path=science_folder_path)
archive_options = ArchiveOptions()
if update_archive_options:
logger.info(f"Updating archive options with {update_archive_options=}")
archive_options = archive_options.with_options(**update_archive_options)
# TODO: What should this be? Just general new regexs passed through,
# or is this fine?
if max_round:
updated_file_patterns = (
*tuple(archive_options.tar_file_re_patterns),
f".*beam[0-9]+\\.round{max_round}-.*-image\\.fits",
f".*beam[0-9]+\\.round{max_round}\\.ms\\.(zip|tar)",
)
archive_options = archive_options.with_options(
tar_file_re_patterns=updated_file_patterns
)
if archive_path:
tar_file_name = add_timestamp_to_path(Path(archive_path) / f"SB{sbid}.tar")
create_sbid_tar_archive(
tar_out_path=tar_file_name,
base_path=science_folder_path,
archive_options=archive_options,
)
if copy_path:
copy_sbid_files_archive(
copy_out_path=copy_path,
base_path=science_folder_path,
archive_options=archive_options,
)
return science_folder_path
@task
[docs]
def task_update_with_options(input_object: T, **kwargs) -> T:
"""Updated any object via its `.with_options()` interface.
All key-word arguments other than `input_object` are passed through
to that `input_object`s `.with_options()` method.
Args:
input_object (T): The object that has an `.with_options` method that will be updated
Returns:
T: The updated object
"""
assert "with_options" in dir(input_object), (
f"{type(input_object)=} does not have a with_options method"
)
updated_object = input_object.with_options(**kwargs) # type: ignore
return updated_object
@task
[docs]
def task_get_attributes(item: Any, attribute: str) -> Any:
"""Retrieve an attribute from an input instance of a class or structure.
This is intended to be used when dealing with a prefect future object that
has yet to be evaluated or is otherwise not immediately accessible.
Args:
item (Any): The item that has the input class or structure
attribute (str): The attribute to extract
Returns:
Any: Value of the requested attribute
"""
logger.debug(f"Pulling {attribute=}")
return item.__dict__[attribute]
@task
[docs]
def task_flatten(to_flatten: list[list[T]]) -> list[T]:
"""Will flatten a list of lists into a single list. This
is useful for when a task-descorated function returns a list.
Args:
to_flatten (List[List[T]]): Input list of lists to flatten
Returns:
List[T]: Flattened form of input
"""
logger.debug(f"Received {len(to_flatten)} to flatten.")
flatten_list = [item for sublist in to_flatten for item in sublist]
logger.debug(f"Flattened list {len(flatten_list)}")
return flatten_list