"""Utilies to get cluster configuration files
"""
import pkg_resources
from pathlib import Path
from typing import Any
from glob import glob
import yaml
from prefect_dask import DaskTaskRunner
[docs]def list_packaged_clusters() -> list[str]:
"""Return a list of cluster names that are available in the packaged set of
dask_jobqueue specification YAML files.
Returns:
list[str]: A list of preinstalled dask_jobqueue cluster specification files
"""
yaml_files_dir = pkg_resources.resource_filename("aces", "cluster_configs/")
yaml_files = glob(f"{yaml_files_dir}/*yaml")
clusters = [Path(f).stem for f in yaml_files]
return clusters
[docs]def get_cluster_spec(cluster: str) -> dict[Any, Any]:
"""
Given a cluster name, obtain the appropriate SLURM configuration
file appropriate for use with SLURMCluster.
This cluster spec is expected to be consistent with the cluster_class
and cluster_kwargs parameters that are used by dask_jobqueue based
specifications.
Args:
cluster (str): Name of cluster to look up for processing
Raises:
ValueError: Raised when cluster is not in KNOWN_CLUSTERS and has not corresponding YAML file.
Returns:
dict[Any, Any]: Dictionary of know options/parameters for dask_jobqueue.SLURMCluster
"""
KNOWN_CLUSTERS = ("galaxy",)
yaml_file = None
if Path(cluster).exists():
yaml_file = cluster
elif cluster == "galaxy":
yaml_file = pkg_resources.resource_filename(
"aces", "cluster_configs/galaxy_small.yaml"
)
else:
yaml_file = pkg_resources.resource_filename(
"aces", f"cluster_configs/{cluster}.yaml"
)
if yaml_file is None or not Path(yaml_file).exists():
raise ValueError(
f"{cluster=} is not known, or its YAML file could not be loaded. Known clusters are {KNOWN_CLUSTERS}"
)
with open(yaml_file, "r") as in_file:
spec = yaml.load(in_file, Loader=yaml.Loader)
return spec
[docs]def get_dask_runner(cluster: str = "galaxy_small", extra_cluster_kwargs: dict=None) -> DaskTaskRunner:
"""Creates and returns a DaskTaskRunner configured to establised a SLURMCluster instance
to manage a set of dask-workers. The SLURMCluster is currently configured only for Galaxy.
Keyword Args:
cluster (str): The cluster name that will be used to search for a cluster specification file.
This could be the name of a known cluster, or the name of a yaml file installed
amoung the `cluster_configs` directory of the aces module.
Returns:
DaskTaskRunner: A dask task runner capable of being used as a task_runner for a prefect flow
"""
spec = get_cluster_spec(cluster)
if extra_cluster_kwargs is not None:
spec["cluster_kwargs"].update(extra_cluster_kwargs)
task_runner = DaskTaskRunner(**spec)
return task_runner