Source code for process_holography

#!/usr/bin/env python3

"""A prefect2 based pipeline to process holography data taken by ASKAP. It will use components of the aces module,
which will be parallelised in a beam-wise manner where possible. The output data products are FITS files that 
are compatible with the linmos package of yandasoft. 
"""
import argparse
import logging
import os
import shutil
import sys
from glob import glob
from pathlib import Path
from typing import Optional, Union

import pkg_resources
from casacore.tables import table
from prefect import flow, get_run_logger, task

from aces.clink.tasks import task_create_emit_clink_event
from aces.cluster_configs.clusters import get_dask_runner
from aces.cluster_configs.environment import log_environment
from aces.holography import (
    beamset_from_ms,
    clean_holography,
    divmodel,
    dump_point,
    grid_holography,
    merge_holography,
    plot_summary,
)
from aces.operations.actions import find_ms_in_dir
from aces.operations.directory import setup_chdir_workdir
from aces.operations.flows import flow_find_copy_measurement_sets

# Known objects with a model available to correct the holography data with
[docs]KNOWN_HOLO_MODELS: tuple[str] = ("virgo",)
[docs]logger = logging.getLogger(__name__)
[docs]def get_model_resource(holo_src: str) -> str: """Look up and return a path to a model file that corresponds to a known bright source that holography was performed against. Args: holo_src (str): Name of the target source, as retreived from the field table of a measurement set Raises: ValueError: Raised when a model file is expected but not found through package resources. Returns: str: Path to a model file to use for a known source. """ model = None if holo_src == "virgo": model = pkg_resources.resource_filename("aces.holography", "VirgoA.mod") if model is None: raise ValueError( f"Unable to resolve {holo_src} to known model. Known sources are {KNOWN_HOLO_MODELS}. " ) return model
[docs]def get_holography_src(ms: Union[str, Path]) -> str: """Identifies the source that holography was carried out against by examining a measurement set. This intended to be used fo any additional processing/corrections (such as the case of virgo) Args: ms (Union[str,Path]): A measurement set to inspect to identify the target source holography was carried out against Returns: str: Name of the target source """ # table() does not play well with Paths with table(f"{str(ms)}/FIELD") as field: fields = list(set(field.getcol("NAME"))) assert ( len(fields) == 1 ), f"Expected a single field to be present, instead found {fields=}" return fields[0]
@task(name="Generate beamset")
[docs]def task_beamset_from_ms(*args, **kwargs) -> None: """A wrapper around the beamset_from_ms main entrypoint that converts MSs to a beamset. All args and kwargs are passed into aces.holography.beamset_from_ms.main. """ logger = get_run_logger() logger.info("Converting measurement set to beamset") beamset_from_ms.main(*args, **kwargs)
@task(name="Normalising MS based on model")
[docs]def task_divmodel(*args, **kwargs) -> None: """A wrapper around the divmodel main entry point that corrects holography data. All args and kwargs are passed into aces.holography.divmodel.main. """ logger = get_run_logger() logger.info("Inserting expected source model") divmodel.main(*args, **kwargs)
@task(name="Get pointing data")
[docs]def task_dump_point(*args, **kwargs) -> None: """A wrapper around the dump_point main entry point that extracts pointing information from an MS. All args and kwargs are passed into aces.holography.dump_point.main. """ logger = get_run_logger() logger.info("Extracting the point model") dump_point.main(*args, **kwargs)
@task(name="Regrid holography")
[docs]def task_grid_holography(beam: int, sbid: int, holo_dir: str, ant: int) -> None: """A wrapper around the grid_holography main entrypoint, which will grid and interpolate holography data. Args: beam (int): The beam number that will be processed from the beamset sbid (int): The SBID number of the holography data to process holo_dir (str): The path of the data that will be processed ant (int): Reference antenna used throughout the processing """ logger = get_run_logger() logger.info(f"Gridding the holography data for {sbid=} {beam=}") grid_holography.main(beam=beam, sbid=sbid, holo_dir=holo_dir, refant=ant)
@task(name="Merge beamsets")
[docs]def task_merge_holography(*args, **kwargs) -> None: """A wrapper around the merge_holography main entry point that collates holgraphy results together. All args and kwargs are passed into aces.holography.merge_holography.main. """ logger = get_run_logger() logger.info("Mering the holography data together") merge_holography.main(*args, **kwargs)
@task(name="Clean holography")
[docs]def task_clean_holography(*args, **kwargs) -> None: """A wrapper around the cleaan_holography main entry point that corrects holography data from RFI. All args and kwargs are passed into aces.holography.clean_holography.main. """ logger = get_run_logger() logger.info("Cleaning the merged holography data") clean_holography.main(*args, **kwargs)
@task(name="Plot holography")
[docs]def task_plot_summary(*args, **kwargs) -> None: """A wrapper around the plot_summary main entry point to create visualisations of the holograhy data. data. All args and kwargs are passed into aces.holography.plot_summmary.main. """ logger = get_run_logger() logger.info("Creating summary plots") plot_summary.main(*args, **kwargs)
@task(name="Get calibrated holography measurement sets")
[docs]def get_cal_holo_mslist(sbid: int, workdir: str) -> list[str]: """Get list of calibrated MSs from holography dir Args: sbid (int): Holography SBID workdir (str): Working directory Returns: list[str]: Calibrated holography MSs """ mslist = sorted( glob(f"{workdir}/{sbid}/*[0-9].cal.ms"), key=lambda x: int(x[-x[::-1].find("_") : x.find(".cal.ms")]), ) return mslist
@task(name="Copying final data products")
[docs]def task_copy_final_beams(final_path: Path, sbid: int, dry_run: bool = False) -> None: """Copy the constructed beams from the working directory to a final location Args: final_path (Path): Path to install the constructed beams into. This must exists and be a directory. sbid (int): The SBID of the holography observation, which is used when constructing the files to copy. dry_run (bool, optional): If True, the copying process is only reported, and no actual copying is performed. Defaults to False. """ logger = get_run_logger() beams = glob(f"akpb.iquv.*.SB{sbid}.*.fits") assert ( len(beams) == 2 ), f"Expected to copy two beam fits, instead found {len(beams)=}" assert ( final_path.exists() and final_path.is_dir() ), f"The {final_path=} directory does not exist. It should be created before attempting to run the pipeline. " final_path = final_path.absolute() logger.info(f"The beams will be installed into {final_path=}") if dry_run: logger.info( "The final beam data products will not be copied into place as the dry-run option has been specified. " ) for beam in beams: beam_file = Path(beam).absolute() dest_name = final_path / beam_file.name if dest_name.exists(): logger.warn(f"{dest_name} already exists. Skipping the copy. ") continue logger.info(f"Copying {beam_file} to {final_path}") if not dry_run: shutil.copy(beam_file, final_path)
# Define flow @flow()
[docs]def flow_holography( workdir: str, sbid: int, grid_ant: Optional[int] = None, beam_path: Path = None, ) -> None: """The prefect workflow to execute the holography processing and analysis pipeline. This flow brings together the main stages that are implemented in other scripts, and wraps them up with the prefect logic to allow them to be called. The workflow expects that in the current working directory there to be a folder whose name is the sbid to be processed, and the measurement sets to be processed have been copied there already Args: workdir (str): Where the data processing will be carried out, including creation of products sbid (int): SBID number of the target holography observation to be procesed grid_ant (Optional[int], optional): Reference antenna used in observation. If None, attempts are made to dderive it from the data. Defaults to None. beam_path (Path, optional): The final location to copy the output beams to. If None, no copying is performed. Defaults to None. """ logger = get_run_logger() # Make this a Path. Some of the aces code _really_ expects strings and does its own # path wranggling that I do not waant to fix workdir_path = Path(workdir) # Get MS list from holography. Submit is ommitted to block for result # The majority of the holography code expects strings to be passed, and # not Path objects mslist_holo = [ str(ms) for ms in find_ms_in_dir(workdir_path / str(sbid), expected_no=36) ] # Dump pointing data to disk task_dump_point.submit(sbid=sbid, mslist=mslist_holo, workdir=workdir) holo_src = get_holography_src(mslist_holo[0]) logger.info(f"The holography target source is {holo_src=}") if holo_src in KNOWN_HOLO_MODELS: logger.info( f"Will be normalising visibility data specific to {holo_src} processing. " ) model = get_model_resource(holo_src) future_divmodel = task_divmodel.map(sbid, mslist_holo, model) else: logger.info( f"The extracted {holo_src=} is not in {KNOWN_HOLO_MODELS=}, so no models will be inserted. " ) future_divmodel = None # Produce beams in HDF5 future_beamset = task_beamset_from_ms.submit( sbid, mslist_holo, workdir, correct=False, wait_for=future_divmodel ) logger.info(f"{sbid=}") logger.info(f"{workdir=}") logger.info(f"{grid_ant=}") logger.info(f"{mslist_holo=}") future_grid = task_grid_holography.map( [beam_no for beam_no, _ in enumerate(mslist_holo)], sbid, workdir, grid_ant, wait_for=future_beamset, ) # type: ignore future_merge = task_merge_holography.submit( sbid=sbid, holo_dir=workdir, wait_for=future_grid ) # type: ignore # post-process to FITS future_clean = task_clean_holography.submit( sbid=sbid, holo_dir=workdir, remake_stokes=True, max_order=5, param="BIC", snr_limit=3, wait_for=future_merge, ) # Plot future_summary = task_plot_summary.submit( sbid=sbid, holo_dir=workdir, wait_for=future_clean ) if beam_path is not None: task_copy_final_beams.submit(beam_path, sbid, wait_for=future_summary) # type: ignore logger.info("Finished. Wrapping up. ")
@flow(name="Holography Main Flow")
[docs]def main( sbid: int, workdir: Optional[Path] = None, grid_ant: Optional[int] = None, cluster: str = "galaxy_small", beam_path: Path = None, schedule_block_path: Path = None, emit_clink: bool = False, ): """The main driver of the holography pipeline procedure. This entrypoint is responsible for - creating a dask task runner for use between subflows - run the sub-flow to find and copy measurement sets, if requested - run the sub-flow to perform the actual holography processing - emit a clink even if requested on completion of the two sub-flows Args: sbid (int): SBID of the observation used to collect the holography data workdir (Optional[Path], optional): The directory that the holography data will be processed in. There is an attempt to change to this directory. Defaults to None. grid_ant (Optional[int], optional): Antenna number that was the 'reference' antenna during the holography observation. Defaults to None. cluster (str, optional): The name of the cluster configuration file to load. Defaults to "galaxy_small". beam_path (Path, optional): Path to install the constructed holography beams into. If None, the beams are not copied. Defaults to None. schedule_block_path (Path, optional): Path to the space on disk where the Measurement sets are stored (inside a SBID folder). If not None and the SBID folder exists with the 36 measurementsets, these are copied over and delete the measurments in the workdir, if they exists. Defaults to None. emit_clink (bool, optional): Whether a clink event message will be raised to process manager """ logger = get_run_logger() logger.info("The main flow of the ASKAP holography processing pipeline") # Directory changed here in case agent deployment is used workdir = setup_chdir_workdir(workdir=workdir) if beam_path is not None: assert ( beam_path.exists() and beam_path.is_dir() ), f"The {beam_path=} directory does not exist. It should be created before attempting to run the pipeline. " log_environment() # This is a common dask backed runner that will be used for separate flows logger.info("Acquiring dask worker") task_runner = get_dask_runner( cluster, extra_cluster_kwargs=dict(local_directory=os.getcwd()) ) # Copy the measureement sets into place if requested if schedule_block_path is not None: src_path = schedule_block_path / str(sbid) out_path = Path(workdir) / str(sbid) logger.debug(f"Searching {src_path} to copy MS to {out_path}") flow_find_copy_measurement_sets.with_options( name=f"Copy measurement sets -- {sbid}", task_runner=task_runner )(src_path, out_path, expected_no=36) # Do the holography magix. Same dask runner used. # Sub-flows likst the one above are blocking flow_holography.with_options( name=f"Processing holography -- {sbid}", task_runner=task_runner, )( str(workdir), sbid, grid_ant=grid_ant, beam_path=beam_path, ) # Sub-flows, including flow_holography, are blocking. No need # to wait_for= here if emit_clink: task_create_emit_clink_event.submit( "clink-holo", sbid, workdir, "holography", state="completed" ) # type: ignore
[docs]def cli(): descStr = """ Process holography data from a measurement set, pack into a beamset HDF5 and Stokes FITS cube. In your working directory have a directory containing the holography measurement sets (and optionally the 1934 bandpass MSs) under the name of its SBID. """ # Parse the command line options parser = argparse.ArgumentParser( description=descStr, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument("workdir", type=Path, help="Working directory.") parser.add_argument("sbid", type=int, help="SBID of holography.") parser.add_argument( "-a", "--ant", type=int, default=None, help="Antenna to use for regrid reference - specify to override [ref. ant. + 1].", ) parser.add_argument( "-v", "--verbosity", action="count", help="Increase output verbosity", default=0 ) parser.add_argument( "-c", "--cluster", type=str, default="galaxy_small", help="Name of the cluster specification file to use for interacting with a compute cluster. Inputs may be a resolved path to a yaml file, or a name of a known cluster file packaged with the aces repository (without the trailing .yaml extension)", ) parser.add_argument( "-b", "--beam-path", type=Path, default=None, help="Path that the final beam data products will be copied to. If None, beams are not copied. ", ) parser.add_argument( "-s", "--schedule-block-path", type=Path, default=None, help="The directory to the location of SBID folders with raw measurement sets. If this is not None and is valid, the data will be copied fresh, clobbering the existing SBID folder in the desired work directory, should it exist. ", ) parser.add_argument( "-e", "--emit-clink", default=False, action="store_true", help="Issue a CLINK event to inform process manager of completion. ", ) args = parser.parse_args() main( args.sbid, workdir=args.workdir, grid_ant=args.ant, cluster=args.cluster, beam_path=args.beam_path, schedule_block_path=args.schedule_block_path, emit_clink=args.emit_clink, )
if __name__ == "__main__": sys.exit(cli())