Source code for aces.holography.grid_holography

#!/usr/bin/env python3
""" Grid holography data """
import sys
import numpy as np
import copy
import pkg_resources

from askap.footprint import Skypos
from aces.beamset.mapset import MapSet
from scipy.interpolate import Rbf
import aces.beamset.beamfactory as bf
import logging
from aces.holography import holo_filenames as hf
from aces.askapdata.schedblock import SchedulingBlock
import string

[docs]log = logging.getLogger(__name__)
from astropy.coordinates import SkyCoord, Angle, EarthLocation, AltAz from astropy.time import Time from astropy import units as u from astropy.utils.iers import conf conf.auto_max_age = None # Catch annoying warnings np.seterr(divide="ignore", invalid="ignore")
[docs]def get_antenna_locations(location_file): """Will parse a file the describes the location of the ASKAP antennas. This partticular files is newline delimited, with a separate line describing their wgs84 and itrf positions. This file returns their wgs84 position as an antenna ordered list of XYZ positions. :param location_file: Path to 'antenna_positions_itrf.txt' file (packaged with aces.holography submodule. ) :return: list of XYZ antenna positions, ordered by antenna number """ lines = open(location_file, 'r').readlines() lines = [li.strip() for li in lines] aa = [] a_loc = [] for li in lines: if 'wgs84' in li: d = li.split('.') a = int(d[2][3:]) i = li.index('[') d = li[i + 1:-1].split(',') lon = float(d[0]) lat = float(d[1]) # hgt = float(d[2]) ASKAP_longitude = Angle(lon, unit=u.deg) ASKAP_latitude = Angle(lat, unit=u.deg) a_loc.append(EarthLocation(lat=ASKAP_latitude, lon=ASKAP_longitude)) aa.append(a) list1, a_locs = zip(*sorted(zip(aa, a_loc))) return a_locs
[docs]def select_obs_radec(sbid): """ Read pointing data extracted from ms tables, locate one point per grid point and return az,el for ref ant (az0, el0) az,el for another ant (aza,ela), RA,Dec for each, and the time (UTC). :param sbid: The sbid of the observation being processed :return: The RA and Dec positions the beams were pointed towards, and the times int UTC they were there """ pointing = np.load("SB%d_beam00.pointing" % sbid, allow_pickle=True, encoding='bytes') time = np.load("SB%d_beam00.time" % sbid, allow_pickle=True, encoding='bytes') ra = pointing[:, :, 0] dec = pointing[:, :, 1] ij = ra.shape[0] // 36 ra = ra.reshape(ij, 36) dec = dec.reshape(ij, 36) ant = 0 tim = time.reshape(ij, 36)[:, ant] # Get one value for Time, RA, Dec from each scan wh = np.where(np.diff(dec[:, ant]) < -1.)[0] dw = 2 # return coordinate arrays with shapes [36, nx*nx] ra_s = ra[wh + dw].transpose() dec_s = dec[wh + dw].transpose() tim_s = tim[wh + dw] return ra_s, dec_s, tim_s
[docs]def dipa_to_lm(d, pa): """ Given a distance, position_angle offset relative to a sky position, return the equivalent (l,m) :param d: distance :param pa:position_angle :return: rectangular offset - orthographic proejction """ # x = np.sin(d) * np.sin(pa) y = np.sin(d) * np.cos(pa) return x, y
[docs]def get_true_azel(ra, dec, tim, ant_loc): """ Given a sequence of celestial coordinates (ra, dec) and times, and the observer location, calculate the topcentric coordinates of each. Coordinates should be for J2000. Times should be UTC. The routine uses astropy procedures. :param ra: Sequence of right ascensions in radians :param dec: Sequence of declinations in radians :param tim: Sequence of times in seconds of MJD :param ant_loc: observer's location, geocentric in metres :return: """ azt, elt = [], [] for r, d, t in zip(ra, dec, tim): sc = SkyCoord(Angle(r, unit=u.rad), Angle(d, unit=u.rad)) start_time = Time(t / 60.0 / 60.0 / 24.0, format='mjd', scale='utc', location=ant_loc) altaz = sc.transform_to(AltAz(obstime=start_time, location=ant_loc)) azt.append(altaz.az.rad) elt.append(altaz.alt.rad) return np.array(azt), np.array(elt)
[docs]def get_obs_grid(az, el, refant): """ :param az: :param el: :return: """ # Construct positions for reference source, and boresight over all cycles arr = np.array ae_ants = arr([[Skypos(a, e) for a, e in zip(ai, ei)] for ai, ei in zip(az, el)]) print('ae_ants shape ', ae_ants.shape) # compute bs position relative to ref in az-el frame # and 'x' and 'y' components x is horizontal, y is vertical # These, (bsx, bsy) over all cycles should fall on a square grid ae_ref = ae_ants[refant] bs_dpa = arr([[ref.d_pa(bs) for ref, bs in zip(ae_ref, ae_ant)] for ae_ant in ae_ants]) sb_dpa = arr([[bs.d_pa(ref) for ref, bs in zip(ae_ref, ae_ant)] for ae_ant in ae_ants]) sbx, sby = dipa_to_lm(sb_dpa[:, :, 0], sb_dpa[:, :, 1]) return -sbx, -sby
[docs]def grid_corr(data, mapset, refant, ant_loc): """ Regrids the sampled data onto the intended grid on the sky. The input data should be in its native form (not subjected to sky_transform) :param data: Holography complex voltages in mapset formatted array :param mapset: mapset object holding this dataset :param refant: Reference antenna; if None, retrieved from SB database :param ant_loc Antenna locations :return: regridded data """ sbid = mapset.metadata['holographySBID'] # Select an antenna that was used, but not the refant # The commanded pointing for all test antennas should be the same. antennas = [a - 1 for a in mapset.metadata['antennas']] ra, dec, tim = select_obs_radec(sbid) log.info("Got pointing data") n = len(ra) if np.sqrt(n) % 1.0 != 0.0: log.warning('Grid not square? Has {:d} points'.format(n)) data_i = None return data_i az_true, el_true = np.zeros(ra.shape), np.zeros(ra.shape) for a in antennas: ae = get_true_azel(ra[a], dec[a], tim, ant_loc[a]) az_true[a], el_true[a] = ae # Construct the intended sampling grid nx = mapset.payloadShape[0] x0, x1 = mapset.metadata['xAxis'][0], mapset.metadata['xAxis'][-1] y0, y1 = mapset.metadata['yAxis'][0], mapset.metadata['yAxis'][-1] xh = np.linspace(x0, x1, nx) yh = np.linspace(y0, y1, nx) # Note that we need to reverse (transpose) x and y here to align our grid points with the # sequence of measurement points. yg, xg = np.meshgrid(xh, yh) sbx, sby = get_obs_grid(az_true, el_true, refant) sh = data.shape Q = np.product(sh[2:5]) sh2 = [sh[1], Q, sh[-2], sh[-1]] data = data.reshape(sh2) data_i = np.zeros(sh2, dtype=complex) # Skip reference antenna ants = [a for a in range(sh2[0]) if a != refant] for ant in ants: for j in range(sh2[1]): im = data[ant, j] if np.isnan(im.sum()): pass else: interp_func = Rbf(sbx[ant], sby[ant], im, function='cubic', smooth=0) im_gr = interp_func(xg, yg) data_i[ant, j] = im_gr data_i = data_i.reshape(sh) return data_i
[docs]def raw_to_grid(raw_file_name, beam, refant, ant_loc): """ Reads holography data in hdf5 MapSet format. Performs operations on the data hyper-cube: * interpolates data onto regular grid * saves gridded data for the given beam in hdf5 MapSet format :param raw_file_name: (str) Name of hdf5 holography data file :param beam: (int) Beam number to process (0-based) :param refant: (int) Antenna number to use for grid reference (0-based) :param ant_loc Antenna locations """ # Load holography data: mso.data in ndarray mso = bf.load_beamset_class(raw_file_name) if mso.metadata['holographySBID'] == 21613: products_e = np.array(mso.data)[:, :, beam:beam + 1, :, :, ::-1, :].transpose() log.info("Got data for beam {:d} (21613)".format(beam)) products = grid_corr(products_e, mso, refant, ant_loc)[:, :, :, :, :, ::-1, :].transpose() else: products_e = np.array(mso.data)[:, :, beam:beam + 1] log.info("Got data for beam {:d}".format(beam)) products = grid_corr(products_e, mso, refant, ant_loc) del products_e # products = grid_corr(products_e, mso) flags_xy = np.array(mso.flags)[:, :, beam:beam + 1] if flags_xy.ndim == 7: flags = flags_xy.all(axis=(-2, -1)) else: flags = flags_xy del flags_xy log.debug(f'Flag shape: {flags.shape}') log.info("Grid corrected") md = copy.deepcopy(mso.metadata) md['beams'] = np.array([beam + 1]) mso_grid = MapSet(metadata=md, data=products, flags=flags, filename=None ) del products, flags, mso mso_grid.add_to_history('Resampled onto grid') return mso_grid
[docs]def main(sbid, beam, holo_dir='.', refant=None, ): """Main script :param sbid: SBID :type sbid: int :param beam: beam number :type beam: int :param holo_dir: directory holding holograophy data :type holo_dir: str :param refant: (int) Antenna number to use for grid reference (0-based) """ log.info(f"Starting grid_holography on SBID {sbid:d}") # Clean up dir if holo_dir[-1] == '/': holo_dir = holo_dir[:-1] if refant is None: sb = SchedulingBlock(sbid) ak_ref = int(''.join(filter(lambda char: char in string.digits, sb.get_parameters()['holography.ref_antenna']))) refant = ak_ref - 1 # 0-based log.info("Automatically selecting antenna AK{:02d} to use for grid definition".format(refant)) location_file = pkg_resources.resource_filename("aces.holography", "antenna_positions_itrf.txt") log.info(f"Loading {location_file=}") antenna_locations = get_antenna_locations(location_file) in_name = hf.find_holo_file(holo_dir, pol='xxyy', sbid=sbid, kind='hdf5') mso_grid = raw_to_grid(in_name, beam, refant, antenna_locations) log.debug(f'Flag shape: {mso_grid.flags.shape}') # out_name = f'{holo_dir}/{sbid:d}_Holo_grid_{beam:02d}.hdf5' kind = f'grid.beam{beam:02d}.hdf5' out_name = f'{holo_dir}/{hf.make_file_name(mso_grid, kind=kind)}' mso_grid.write_to_hdf5(out_name) del mso_grid log.info("All finished")
[docs]def cli(): import argparse """ Command line interface """ # Help string to be shown using the -h option descStr = """ Clean holography data """ epilog_text = """ """ # Parse the command line options parser = argparse.ArgumentParser(description=descStr, epilog=epilog_text, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument( 'sbid', metavar='sbid', type=int, help='SBID of processed holography (stored in HDF5) [no default].') parser.add_argument( '-b', dest='beam', type=int, default=None, help='Beam to process. No default') parser.add_argument( '-d', dest='holo_dir', type=str, default='.', help='Directory containing holography data [./].') parser.add_argument( '-a', '--refant', type=int, default=None, help='Antenna to use for grid reference - specify to override [ref. ant. + 1].' ) parser.add_argument( "-v", "--verbosity", action="count", help="Increase output verbosity", default=0 ) args = parser.parse_args() if args.verbosity == 1: log.basicConfig( level=log.INFO, format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) elif args.verbosity >= 2: log.basicConfig( level=log.DEBUG, format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) main(sbid=args.sbid, beam=args.beam, holo_dir=args.holo_dir, refant=args.refant )
if __name__ == "__main__": sys.exit(cli())