Source code for flint.prefect.clusters

"""Some utility functions around the creation of Prefect task funners.

For this work we will be using Dask backed workers to perform the compute
operations.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import yaml
from prefect_dask import DaskTaskRunner

from flint.utils import get_packaged_resource_path


[docs] def get_cluster_spec(cluster: str | Path) -> dict[Any, Any]: """ Given a cluster name, obtain the appropriate SLURM configuration file appropriate for use with SLURMCluster. This cluster spec is expected to be consistent with the cluster_class and cluster_kwargs parameters that are used by dask_jobqueue based specifications. Args: cluster (Union[str,Path]): Name of cluster or path to a configuration to look up for processing Raises: ValueError: Raised when cluster is not in KNOWN_CLUSTERS and has not corresponding YAML file. Returns: dict[Any, Any]: Dictionary of know options/parameters for dask_jobqueue.SLURMCluster """ yaml_file = None if Path(cluster).exists(): yaml_file = cluster else: yaml_file = get_packaged_resource_path( package="flint.data.cluster_configs", filename=f"{cluster}.yaml" ) if yaml_file is None or not Path(yaml_file).exists(): raise ValueError( f"{cluster=} is not known, or its YAML file could not be loaded." ) with open(yaml_file) as in_file: spec = yaml.load(in_file, Loader=yaml.Loader) return spec
[docs] def get_dask_runner( cluster: str | Path = "galaxy_small", extra_cluster_kwargs: dict[str, Any] | None = None, ) -> DaskTaskRunner: """Creates and returns a DaskTaskRunner configured to established a SLURMCluster instance to manage a set of dask-workers. The SLURMCluster is currently configured only for Galaxy. Keyword Args: cluster (Union[str,Path]): The cluster name that will be used to search for a cluster specification file. This could be the name of a known cluster, or the name of a yaml file installed among the `cluster_configs` directory of the aces module. Returns: DaskTaskRunner: A dask task runner capable of being used as a task_runner for a prefect flow """ spec = get_cluster_spec(cluster) if extra_cluster_kwargs is not None: spec["cluster_kwargs"].update(extra_cluster_kwargs) task_runner = DaskTaskRunner(**spec) return task_runner