Source code for aces.holography.beamset_from_ms

"""
Get holography data from a measurement set and pack into a beamset object
"""
import argparse
import logging
import string
import sys

import askap.time as at
import casacore.tables as pt
import numpy as np
import pytz
from tqdm import tqdm

import aces.beamset.mapset as ms
from aces.askapdata.schedblock import SchedulingBlock
from aces.holography import holo_filenames as hf

[docs]log = logging.getLogger(__name__)
[docs]def cli(): descStr = """ Get holography data from a measurement set and pack into a beamset object """ # Parse the command line options parser = argparse.ArgumentParser( description=descStr, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( 'mySBID', metavar='mySBID', type=int, help='Holography SBID to process.' ) parser.add_argument( 'mslist', metavar='mslist', type=str, help='Input holography measurement sets.', nargs='+' ) parser.add_argument( 'workdir', metavar='workdir', type=str, help='Working directory.' ) parser.add_argument( '-c', '--correct', action='store_false', help="Read from CORRECTED_DATA column (default True). If set, will read from DATA column" ) args = parser.parse_args() main( mySBID=args.mySBID, mslist=args.mslist, workdir=args.workdir, correct=args.correct )
# TODO: Revert this code back to being functional. OOP style is not required. # This is a mess caused by me trying to parallelise in a dumb way in the past. # TODO: This is also the slowest part of the code. We can use Dask for parallelisation
[docs]class BeamSetter: def __init__(self, mySBID, mslist, workdir, correct): sb = SchedulingBlock(mySBID) myRA = int(''.join(filter(lambda char: char in string.digits, sb.get_parameters()['holography.ref_antenna']))) myWeights = sb.get_weights_prefix() if myWeights[0:2] == "SB": myWeights = int(''.join(filter( lambda char: char in string.digits, sb.get_weights_prefix().split('_')[0]))) elif myWeights == 'auto': myWeights = int(sb.get_variables()[ 'schedblock.scan000.weights.id']) myFootprint = sb.get_footprint_name() myPitch = float(sb.get_footprint_pitch()) myRotation = float(sb.get_footprint_rotation()) myGridSize = int(sb.get_parameters()['holography.grid_size']) myGridStep = float(sb.get_parameters()['holography.grid_step']) # bsb = SchedulingBlock(myWeights) # dt = at.isoUTC2datetime(bsb.get_variables()['executive.start_time']) # dta = dt.replace(tzinfo=pytz.UTC) # bfTime = at.utcDt2mjd(dta) bfAngle = float(''.join(filter(lambda char: char in string.digits + '.', sb.get_parameters()['common.target.src%d.pol_axis']))) log.info(f"Reference Antenna = {myRA}") log.info(f"Holography SBID = {mySBID}") log.info(f"Weights ID = {myWeights}") # Need to set the beams to process manually for now beams = [i for i in range(1, len(mslist)+1)] beam_arr = np.array(beams) log.info(f"The beams that are loaded {beams=}") assert len(mslist) == len( beams), "Unexpected number of measurement sets" myMS = mslist[0] # beam 0 with pt.table(myMS + "/ANTENNA") as tba: antNames = tba.getcol("NAME") ants = [] for ant in antNames: ants.append(int(ant.replace('ak', ''))) with pt.table(myMS + "/SPECTRAL_WINDOW") as tbs: freqNames = tbs.getcol("CHAN_FREQ") freqs = [] for freq in freqNames[0]: freqs.append(float(freq / 1000000.0)) dt = at.isoUTC2datetime(sb.get_variables()['executive.start_time']) dta = dt.replace(tzinfo=pytz.UTC) holoTime = at.utcDt2mjd(dta) offsets = [] # Reproduced the logic from the observing procedure, without bulk offset support for x in range(myGridSize): offInDeg = ((x - (myGridSize - 1) / 2.) * myGridStep) offsets.append((np.pi / 180.0) * offInDeg) try: cast_myWeights = int(myWeights) except: cast_myWeights = str(myWeights) log.info(f"Casted {type(cast_myWeights)=} from {type(myWeights)=}") # 'beamformingEpoch': bfTime, metadata = {'times': [holoTime], 'antennas': ants, 'beams': beams, 'frequencies': freqs, 'polarizations': ['XX', 'XY', 'YX', 'YY'], 'fp_name': myFootprint, 'fp_pitch': myPitch, 'fp_angle': myRotation, 'beamformingPA': bfAngle, 'beamformingSBID': cast_myWeights, 'holographyEpoch': holoTime, 'holographySBID': mySBID, 'payloadshape': [myGridSize, myGridSize], 'xAxis': offsets, 'yAxis': offsets} bm = ms.MapSet(metadata=metadata) mds = bm.get_container_shape() bm.flags = np.zeros(mds, np.bool_) self.mslist = mslist self.mds = mds self.metadata = metadata self.myRA = myRA self.beams = beams self.ants = ants self.freqs = freqs self.myGridSize = myGridSize self.beam_arr = beam_arr self.bm = bm self.correct = correct self.workdir = workdir self.mySBID = mySBID
[docs] def get_mslist(self): return self.mslist
[docs] def read_data(self, myMS): # Extract the needed data from the measurement set with pt.table(myMS) as tb: iRA = self.myRA - 1 # Get the ID of the antenna under test and reject auto-correlations with pt.taql( 'select from $tb where ANTENNA1 = $iRA or ANTENNA2 = $iRA' ) as tbs: ANTENNA1 = tbs.getcol("ANTENNA1") ANTENNA2 = tbs.getcol("ANTENNA2") FEED1 = tbs.getcol("FEED1") SCAN = tbs.getcol("SCAN_NUMBER") if self.correct: DATA = tbs.getcol("CORRECTED_DATA") else: DATA = tbs.getcol("DATA") FLAG = tbs.getcol("FLAG") # Conjugate if ANTENNA2 is reference -- ACES-584 DATA[ANTENNA2 == iRA] = np.conjugate(DATA[ANTENNA2 == iRA]) return iRA, ANTENNA1, ANTENNA2, FEED1, SCAN, DATA, FLAG
@staticmethod
[docs] def update(iRA, ANTENNA1, ANTENNA2, FEED1, SCAN, DATA, FLAG, temp, ints, flgs, beam_arr): ant_1_idx = np.logical_and(iRA == ANTENNA1, iRA != ANTENNA2) ant_2_idx = np.logical_and(iRA == ANTENNA2, iRA != ANTENNA1) TANT = np.ones_like(ANTENNA1) * np.nan TANT[ant_1_idx] = ANTENNA2[ant_1_idx] TANT[ant_2_idx] = ANTENNA1[ant_2_idx] TANT = TANT.astype(np.int16) BEAM = beam_arr[FEED1] - 1 # CASA docs: Data are flagged bad if the FLAG array element is True. flags = np.array([False in flag for flag in FLAG]) TANT, BEAM, SCAN, DATA, FLAG = TANT[flags], BEAM[flags], SCAN[flags], DATA[flags], FLAG[flags] for a, b, s, data, flag in zip(TANT, BEAM, SCAN, DATA, FLAG): if a < 0: continue else: temp[0, a, b, :, :, s] += data.T ints[0, a, b, :, :, s] += 1 flgs[0, a, b, :, :, s] += flag.T return temp, ints, flgs
[docs] def ms_to_arr(self, mslist): temp = np.zeros(self.mds + [len(self.metadata['xAxis']) * len(self.metadata['yAxis'])], np.complex64) ints = np.zeros(self.mds + [len(self.metadata['xAxis']) * len(self.metadata['yAxis'])], np.complex64) flgs = np.zeros(self.mds + [len(self.metadata['xAxis']) * len(self.metadata['yAxis'])], bool) # Open the measurement set for c, myMS in tqdm(enumerate(mslist), desc='Processing MS'): log.info(f"Processing {c+1} of {len(mslist)} - {myMS}") iRA, ANTENNA1, ANTENNA2, FEED1, SCAN, DATA, FLAG = self.read_data( myMS ) temp, ints, flgs = self.update( iRA, ANTENNA1, ANTENNA2, FEED1, SCAN, DATA, FLAG, temp, ints, flgs, self.beam_arr ) del ANTENNA1, ANTENNA2, FEED1, SCAN, DATA, FLAG # Regrids to same final shape as data # This is different to old scheme! # -- Gives a flag per pixel (not per image) shape = [i for i in flgs.shape[:-1]] + [self.myGridSize, self.myGridSize] flgs = flgs.reshape(shape) self.temp = temp self.ints = ints self.bm.flags = flgs
[docs] def normalise(self): # Normalise by integration count and write to HDF5 structure print('Normalising by count') new_shape = list(self.temp.shape[:-1]) + [self.myGridSize, self.myGridSize] self.bm.data = (self.temp / self.ints).reshape(new_shape)
[docs] def write_bm(self): # Write to disk outname = hf.make_file_name(self.bm, kind='hdf5') self.bm.write_to_hdf5(f'{self.workdir}/{outname}')
[docs]def main(mySBID, mslist, workdir, correct=True): """Main script Args: mySBID (int): SBID of holography mslist (list): workdir (str): Working directory correct (bool): Whether to use the raw data or the column that has been corrected for source structure """ setter = BeamSetter(mySBID, mslist, workdir, correct) mslist = setter.get_mslist() setter.ms_to_arr(mslist) setter.normalise() setter.write_bm() del setter
if __name__ == "__main__": sys.exit(cli())