Source code for aces.holography.bptable

#!/usr/bin/env python3
import sys
import os
import argparse
import numpy as np
from casacore.tables import *
import shutil
from aces.askapdata.schedblock import SchedulingBlock
import string


[docs]class BandpassTable:
[docs] def getcol(self, inTable, colName, antNum, beamNum, nPol=4): """Get column of BP table :param inTable: Name of table on disk :type inTable: str :param colName: Name of column 'BANDPASS' or 'BPLEAKAGE' :type colName: str :param antNum: Antenna index :type antNum: int :param beamNum: Beam index :type beamNum: int :param nPol: Number of polarisations, defaults to 4 :type nPol: int, optional """ with table(inTable, readonly=True, ack=False) as t: self.bp = t.getcol(colName, startrow=0, nrow=-1, rowincr=1) # Read the flag table colName = colName + '_VALID' self.bpf = t.getcol(colName, startrow=0, nrow=-1, rowincr=1) # Get some properties handy: time = t.getcol('TIME', startrow=0, nrow=-1, rowincr=1) self.time = (time - time[0])*24.0*60.0 # In minutes from start self.nTime = self.bp.shape[0] self.nbeams = self.bp.shape[1] self.nante = self.bp.shape[2] self.nchan = self.bp.shape[3]//nPol self.chan_arr = np.linspace(0, self.nchan-1, self.nchan) self.ant_arr = np.linspace(0, self.nante-1, self.nante) # x-pol: self.fx = np.array( self.bpf[0, beamNum, antNum, 0:nPol*self.nchan:2].astype( dtype="float32", casting='same_kind' ) ) self.xr = np.multiply( self.bp[0, beamNum, antNum, 0:nPol*self.nchan:2].real, self.fx ) self.xi = np.multiply( self.bp[0, beamNum, antNum, 0:nPol*self.nchan:2].imag, self.fx ) self.xa = abs( np.array( self.xr + 1j*self.xi ) ) self.xp = np.angle( np.array( self.xr + 1j*self.xi ) )*180.0/np.pi # Now for y-pol:' self.fy = np.array( self.bpf[0, beamNum, antNum, 1:nPol*self.nchan:2].astype( dtype="float32", casting='same_kind' ) ) self.yr = np.multiply( self.bp[0, beamNum, antNum, 1:nPol*self.nchan:2].real, self.fy ) self.yi = np.multiply( self.bp[0, beamNum, antNum, 1:nPol*self.nchan:2].imag, self.fy ) self.ya = abs( np.array( self.yr + 1j*self.yi ) ) self.yp = np.angle( np.array( self.yr + 1j*self.yi ) )*180.0/np.pi
[docs] def putcol(self, outTable, colName, antNum, beamNum, nPol=4): """Write column to BPTable :param outTable: Output table :type outTable: str :param colName: Column to overwrite - 'BANDPASS' or 'BPLEAKAGE' :type colName: str :param antNum: Antenna index :type antNum: int :param beamNum: Beam index :type beamNum: int :param nPol: Number of polarisations, defaults to 4 :type nPol: int, optional """ with table(outTable, readonly=False, ack=False) as t: bp = t.getcol(colName, startrow=0, nrow=-1, rowincr=1) # Read the flag table colNameF = colName + '_VALID' bpf = t.getcol(colNameF, startrow=0, nrow=-1, rowincr=1) new_bp = np.copy(bp) # x-pol: x = self.xr + 1j*self.xi new_bp[0, beamNum, antNum, 0:nPol*self.nchan:2] = x # Now for y-pol: y = self.yr + 1j*self.yi new_bp[0, beamNum, antNum, 1:nPol*self.nchan:2] = y t.putcol(colName, new_bp, startrow=0, nrow=-1, rowincr=1)
[docs]def copy_and_overwrite(from_path, to_path): """Copy and overwrite directory Args: from_path (str): Source path to_path (str): Destination path """ if os.path.exists(to_path): shutil.rmtree(to_path) shutil.copytree(from_path, to_path)
[docs]def main(bs_tab, hl_tab, beam, mySBID): """Main script :param bs_tab: Boresight bandpass table :type bs_tab: str :param hl_tab: Holography bandpass table :type hl_tab: str :param beam: Beam index :type beam: int :param mySBID: SBID of holography observation :type mySBID: int :return: Combined bandpass table :rtype: str """ sb = SchedulingBlock(mySBID) myRA = int(''.join(filter(lambda char: char in string.digits, sb.get_parameters()['holography.ref_antenna']))) iRA = myRA - 1 ms_comb = bs_tab.replace('boresight', 'combined') copy_and_overwrite(hl_tab, ms_comb) bpTable = BandpassTable() for col in ['BANDPASS', 'BPLEAKAGE']: bpTable.getcol(bs_tab, col, iRA, beam, nPol=4) bpTable.putcol(ms_comb, col, iRA, beam, nPol=4) return ms_comb
[docs]def cli(): descStr = """ Combine bandpass tables for holography processing. """ # Parse the command line options parser = argparse.ArgumentParser( description=descStr, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( 'bs_tab', metavar='bs_tab', type=str, help='Boresight bandpass table.' ) parser.add_argument( 'hl_tab', metavar='hl_tab', type=str, help='Holography bandpass table.' ) parser.add_argument( 'beam', metavar='beam', type=int, help='Beam of bandpass table.' ) parser.add_argument( 'sbid', metavar='sbid', type=int, help='SBID of holography observation.' ) args = parser.parse_args() main( args.bs_tab, args.hl_tab, args.beam, args.sbid, )
if __name__ == "__main__": sys.exit(cli())