Source code for aces.operations.actions

"""This contains some bespoke operations related to the general orchastration 
of a workflow, including:

- copying measurement sets
"""

import logging
import shutil
from glob import glob
from pathlib import Path
from typing import Optional

from aces.exceptions import MissingMSError

[docs]logger = logging.getLogger(__name__)
# TODO: Should these find tasks be consolidated into one and # instead as an option have the glob string?
[docs]def find_sbid_ms_in_dir( sbid: int, target_dir: Path, expected_no: Optional[int] = None ) -> list[Path]: """Searches for measurement sets of a particular SBID in a target directory Args: sbid (int): The SBID of interest target_dir (Path): The directory expected to contain the MS expected_no (Optional[int], optional): Number of expected MSs to find. Defaults to None. Raises: MissingMSError: Raised when expected_no is not None and that many MSs are not found Returns: list[Path]: Location to the MSs that were found in the target_dir """ mslist = sorted( glob(f"{str(target_dir)}/*{sbid}*[0-9].ms"), key=lambda x: int(x[-x[::-1].find("_") : x.find(".ms")]), ) # Useful if calling code can only deal with a expected number, which # is currently the case for the holography and SEFD code -- one MS for # each beam if expected_no is not None and len(mslist) != expected_no: raise MissingMSError( f"Expected {expected_no} MSs, found {len(mslist)} for {target_dir=} and {sbid=}" ) return [Path(ms) for ms in mslist]
[docs]def find_ms_in_dir( target_dir: Path, expected_no: Optional[int] = None ) -> list[Path]: """Searches for measurement sets of a particular SBID in a target directory Args: target_dir (Path): The directory expected to contain the MS expected_no (Optional[int], optional): Number of expected MSs to find. Defaults to None. Raises: MissingMSError: Raised when expected_no is not None and that many MSs are not found Returns: list[Path]: Location to the MSs that were found in the target_dir """ mslist = sorted( glob(f"{str(target_dir)}/*[0-9].ms"), key=lambda x: int(x[-x[::-1].find("_") : x.find(".ms")]), ) # Useful if calling code can only deal with a expected number, which # is currently the case for the holography and SEFD code -- one MS for # each beam if expected_no is not None and len(mslist) != expected_no: raise MissingMSError( f"Expected {expected_no} MSs, found {len(mslist)} for {target_dir=}." ) return [Path(ms) for ms in mslist]
[docs]def copy_measurement_set( target_ms: Path, output_dir: Path, overwrite: bool = False ) -> Path: """Copy a measurement set from a source location to the working directory where the holography pipeline is being executed. Although measurement sets are strictly directories, this function could be expanded for measurement specific tasks. Args: target_ms (Path): Full path to the measurement set to be copied output_dir (Path): Location to save the measurement set overwrite (bool, optional): Overwrite the measurement set if it exists. Defaults to False. Returns: Path: The name of the copied measurement set in the target directory """ # This function can be called in parallel. It is therefore unsafe to trust it # to create the output directory should it not exist assert ( output_dir.exists() and output_dir.is_dir() ), f"Output location {output_dir} is not a directory or does not exist. " target_ms = Path(target_ms) assert ( target_ms.exists() and target_ms.is_dir() ), f"Target measurment set {target_ms=} is not available. " copy_path = output_dir / target_ms.name logger.info(f"Copying {target_ms=} to {copy_path=}") if copy_path.exists(): if not overwrite: logger.warn(f"File {copy_path} exists. Not copying. ") return copy_path logger.warn(f"Output MS {str(copy_path)} exista and {overwrite=}. Removing.") shutil.rmtree(copy_path) shutil.copytree(target_ms, copy_path) logger.info(f"Copying {target_ms=} finished") return copy_path