#!/usr/bin/env python3
""" Clean holography data """
import copy
import logging
import os
import string
import sys
from typing import Dict, Any
import numpy as np
import numpy.fft as fft
import pkg_resources
import scipy.interpolate as sp
from astropy.io import fits
from scipy.stats import iqr
from astropy.time import Time
import aces.beamset.beamfactory as bf
from aces import __version__ as aces_version
from aces.askapdata.schedblock import SchedulingBlock
from aces.beamset.beamset import BeamSet
from aces.beamset.mapset import MapSet
from aces.holography import holo_filenames as hf
[docs]log = logging.getLogger(__name__)
# Catch annoying warnings
np.seterr(divide="ignore", invalid="ignore")
[docs]def tft(t):
return tuple(t) == (True, False, True)
[docs]def report_flags(flags, tag):
n_payloads = np.product(flags.shape)
n_set = flags.sum()
print("{:20s} {:d} of {:d} flags set".format(tag, n_set, n_payloads))
[docs]def robust_poly_fit(spec, poly_ord=2, sigma_lim=0.8, max_iter=20, low_fact=10.):
"""Fit a polynomial to data with outliers
The test for the presence of outliers is a comparison between standard deviation (std) and
the inter-quartile range (IQR). The IQR is a robust measure of the distribution width,
and for a gaussian distribution std/IQR = approx 0.6. The value of std/IQR at which to cease
the search for outliers is sigma_lim.
:param spec: Spectrum to fit
:type spec: np.ndarray
:param poly_ord: Order of polynomial, defaults to 5
:type poly_ord: int, optional
:param sigma_lim: Sigma limit, defaults to 0.8
:type sigma_lim: float, optional
:param max_iter: Maxium interations, defaults to 20
:type max_iter: int, optional
:return: TODO
:rtype: TODO
"""
ss = spec.shape
if len(ss) == 1:
n_spec = 1
spec2 = spec[np.newaxis, :]
else:
n_spec = ss[0]
spec2 = spec
len_spec = spec2.shape[1]
x0 = np.arange(len_spec)
# First estimate the width of the distribution of first differences
dymc = np.ma.masked_array([])
for s in range(n_spec):
y = spec2[s]
dym = np.ma.masked_invalid(np.diff(y, append=0.0))
dymc = np.ma.concatenate([dymc, dym])
iqr_diff = iqr(dymc.compressed())
# Use this measure for flagging places where first differences are large
xmc = np.ma.masked_array([])
ymc = np.ma.masked_array([])
dymc = np.ma.masked_array([])
for s in range(n_spec):
y = spec2[s]
x = np.ma.masked_array(x0, mask=y.mask)
dym = np.ma.masked_invalid(np.diff(y, append=0.0))
dmax = iqr_diff * 2.0
xm = np.ma.masked_array(x, np.abs(dym) > dmax)
for i in range(1, len(xm.mask)):
if tft(xm.mask[i - 1:i + 2]):
xm.mask[i] = True
ym = np.ma.masked_array(y, xm.mask)
xmc = np.ma.concatenate([xmc, xm])
ymc = np.ma.concatenate([ymc, ym])
dymc = np.ma.concatenate([dymc, dym])
plots = []
resid = []
fac = low_fact
result = {'masked_indep': np.ma.masked_array(x0, mask=True, shrink=False)}
# Hide 'full' parameter; can be unhidden for debugging or analysis of fits
full = False
if xmc.count() > poly_ord + 1:
# start with yu = ymc
yu = ymc
qq = np.polyfit(xmc.compressed(), yu.compressed(), poly_ord, full=full)
if full:
q, residuals, rank, singular_values, rcond = qq
else:
q = qq
singular_values = None
residuals = None
yf = np.polyval(q, xmc)
for n in range(max_iter):
yd = ymc - yf
plots.append(yd)
iq = iqr(yd.compressed())
sd = yd.std()
med = np.ma.median(yd)
resid.append(iq)
lower, upper = med - fac * iq, med + fac * iq
fac = max(1.0, fac * 0.7)
std_excess = sd / iq
if std_excess < sigma_lim:
# print('break ',snr)
break
condition = np.logical_or(yd < lower, yd > upper)
ymc = np.ma.masked_where(condition, ymc, copy=False)
xmc = np.ma.masked_array(xmc, ymc.mask)
if xmc.count() <= poly_ord:
break
q = np.polyfit(xmc.compressed(), ymc.compressed(), poly_ord)
yf = np.polyval(q, xmc)
resid = np.array(resid)
result['masked_indep'] = xmc
result['masked_input'] = ymc
result['y_fit'] = np.polyval(q, x0)
result['resid'] = resid
result['coeffs'] = q
if full:
result['singular_vals'] = singular_values
result['residuals'] = residuals
return result
[docs]def fit_spec(spec, poly_ord=5, sigma_lim=3.0, max_iter=50):
"""Fit a polynomial to data with outliers
:param spec: Spectrum to fit
:type spec: np.ndarray
:param poly_ord: Order of polynomial, defaults to 5
:type poly_ord: int, optional
:param sigma_lim: Sigma limit, defaults to 3.0
:type sigma_lim: float, optional
:param max_iter: Maxium interations, defaults to 50
:type max_iter: int, optional
:return: TODO
:rtype: TODO
"""
y = spec
ym = np.ma.masked_array(y.copy())
resid = []
ims = []
snrs = []
x = range(y.shape[0])
xm = np.ma.masked_array(x, ym.mask)
if xm.count() > 3:
q = np.polyfit(xm.compressed(), ym.compressed(), poly_ord)
yf = np.polyval(q, x)
for n in range(max_iter):
yd = ym - yf
resid.append(yd.std())
ymax = np.abs(yd).max()
snr = ymax / yd.std()
snrs.append(snr)
if snr < sigma_lim:
break
im = np.where(np.abs(yd) == ymax)[0][0]
ims.append(im)
ym.mask[im] = True
xm = np.ma.masked_array(x, ym.mask)
q = np.polyfit(xm.compressed(), ym.compressed(), poly_ord)
yf = np.polyval(q, x)
resid = np.array(resid)
return y, ym, yf, resid, ims, snrs
else:
return y, y, y, resid, ims, snrs
[docs]def fit_all_spectra(hm, poly_ord=3):
"""
Determines the best fitting models and generates that model for a given mean cube
Args:
hm ([array]): data cube (masked array)
poly_ord (int):
Returns:
[array]: Returns the best fitting model over all pixels and the associated frequencies.
"""
ret = np.zeros((hm.shape[0], hm.shape[1], hm.shape[2],
hm.shape[3], hm.shape[4])) # create output array of same dimension as hm cube
# TODO: Replace these loops - they are inefficient
for js in range(hm.shape[1]): # loop over stokes
log.info(f"Stokes parameter {js+1} of {hm.shape[1]}")
for jb in range(hm.shape[0]): # loop over beams
log.info(f"\tBeam number {jb+1} of {hm.shape[0]}")
for jx in range(hm.shape[-2]): # loops over x pixels
for jy in range(hm.shape[-1]): # loops over y pixels
spec = hm[jb, js, :, jx, jy] # creates slice of hm cube
result = robust_poly_fit(spec, poly_ord=poly_ord)
# save out model of slice
if 'y_fit' in result:
ret[jb, js, :, jx, jy] = result['y_fit']
return ret
# # # Some (temporary?) routines for interpolation needed for holography sampling-grid error.
[docs]def write_to_fits(
data_clean: np.ndarray, mapset: MapSet, interp_factor: float, output_dir: str
):
"""Write data cube to FITS:
:param data_clean: Cleaned array (with Stokes axis).
:type data_clean: np.ndarray
:param mapset: Input holography dataset
:type mapset: MapSet
:param interp_factor: Intperpolation grid used in deg
:type interp_factor: float
:param output_dir: Directory to save file to
:type output_dir: str
"""
freq = mapset.frequencies
hdu = fits.PrimaryHDU(data_clean.astype(np.float32))
hdu.header['CTYPE1'] = 'RA---SIN'
hdu.header['CRVAL1'] = 0.0
hdu.header['CDELT1'] = -interp_factor
hdu.header['CRPIX1'] = data_clean.shape[4] // 2 + 1.0
hdu.header['CTYPE2'] = 'DEC--SIN'
hdu.header['CRVAL2'] = 0.0
hdu.header['CDELT2'] = interp_factor
hdu.header['CRPIX2'] = data_clean.shape[3] // 2 + 1.0
hdu.header['CTYPE3'] = 'FREQ'
hdu.header['CRVAL3'] = freq[0] * 1e6
hdu.header['CDELT3'] = np.diff(freq)[0] * 1e6
hdu.header['CRPIX3'] = 1.0
hdu.header['CTYPE4'] = 'STOKES'
hdu.header['CRPIX4'] = 1.0
hdu.header['CDELT4'] = 1.0
hdu.header['CRVAL4'] = 1.0
hdu.header['CTYPE5'] = 'BEAM'
hdu.header['CRVAL5'] = 0.0
hdu.header['CDELT5'] = 1.0
hdu.header['CRPIX5'] = 1.0
hdu.header['EQUINOX'] = 2.000000000000E+03
hdu.header['RADESYS'] = 'FK5 '
hdu.header['LONPOLE'] = 1.800000000000E+02
hdu.header['LATPOLE'] = 0.000000000000E+00
hdu.header['SPECSYS'] = 'TOPOCENT'
add_metadata_to_header(
hdu.header, mapset.metadata
)
log.debug('Output header:')
log.debug(repr(hdu.header))
fits_name = f'{output_dir}/{hf.make_file_name(mapset, kind="cube.fits")}'
log.info(f'Saving to {fits_name}')
hdu.writeto(fits_name, overwrite=True, output_verify='fix+warn')
[docs]def write_tt_to_fits(
beamtt: np.ndarray,
mapset: MapSet,
fref: float,
bw: float,
dxy:float,
output_dir: float
):
"""Write Taylor Terms to FITS:
:param beamtt: Taylor term beams
:type beamtt: np.ndarray
:param mapset: Input holography dataset
:type mapset: MapSet
:param fref: Reference frequency for Taylor terms
:type fref: float
:param bw; The observing bandwidth that is recorded to the header
:type bw: float
:param dxy: Interpolation grid in deg
:type dxy: float
:param output_dir: Directory to save file to
:type output_dir: str
"""
# beamtt axes are: beam, taylor, stokes, freq, dec, ra
# need to swap to: beam, stokes, taylor, freq, dec, ra for LINMOS
beamtt = beamtt.swapaxes(1, 2)
hdu = fits.PrimaryHDU(beamtt.astype(np.float32))
hdu.header['CTYPE1'] = 'RA---SIN'
hdu.header['CRVAL1'] = 0.0
hdu.header['CDELT1'] = -dxy
hdu.header['CRPIX1'] = beamtt.shape[-1] // 2 + 1.0
hdu.header['CTYPE2'] = 'DEC--SIN'
hdu.header['CRVAL2'] = 0.0
hdu.header['CDELT2'] = dxy
hdu.header['CRPIX2'] = beamtt.shape[-2] // 2 + 1.0
hdu.header['CTYPE3'] = 'FREQ'
hdu.header['CRVAL3'] = fref
hdu.header['CDELT3'] = bw
hdu.header['CRPIX3'] = 1.0
hdu.header['CTYPE4'] = 'TAYLOR'
hdu.header['CRVAL4'] = 0.0
hdu.header['CDELT4'] = 1.0
hdu.header['CRPIX4'] = 1.0
hdu.header['CTYPE5'] = 'STOKES'
hdu.header['CRVAL5'] = 1.0 # Start at Stokes I
hdu.header['CDELT5'] = 1.0 # One Stokes per pixel
hdu.header['CRPIX5'] = 1.0 # Start at pixel 1 (fortran convention)
hdu.header['CTYPE6'] = 'BEAM'
hdu.header['CRVAL6'] = 0.0
hdu.header['CDELT6'] = 1.0
hdu.header['CRPIX6'] = 1.0
hdu.header['EQUINOX'] = 2.000000000000E+03
hdu.header['RADESYS'] = 'FK5 '
hdu.header['SPECSYS'] = 'TOPOCENT'
hdu.header['RESTFRQ'] = fref
add_metadata_to_header(
hdu.header, mapset.metadata
)
log.debug(repr(hdu.header))
# need to add header items for spatial and spectral axes
out_name = f'{output_dir}/{hf.make_file_name(mapset, kind="taylor.fits")}'
hdu.writeto(out_name, overwrite=True, output_verify='fix+warn')
[docs]def interp2D_cmplx(z, pixpos, k=2):
"""
:param z: (ndarray) Complex cube of shape Nt,Na,Nb,Np,Nf,nx,ny
:param pixpos: (ndarray) Array of shape (nbeams,2) giving beam positions in pixels
:param k: (int) Order of spline interpolation used.
:return interp (ndarray) Interpolated values at beam positions - shape Nt,Na,Nb,Np,Nf
"""
# Note that RectBivariateSpline expects the data array to have
# shape (x.size,y.size). Elsewhere, including matplotlib, shapes of
# 2D arrays are expected to be (y.size,x.size). Grr.
nt, na, nb, nq, nf, nx, ny = z.shape
newshape = [nt * na * nb * nq * nf, nx, ny]
t = np.reshape(z, newshape)
outr = pixpos[:, np.newaxis, :]
outr = np.repeat(outr, nq * nf, axis=1)[np.newaxis]
outr = np.repeat(outr, na, axis=0).reshape([nt * na * nb * nq * nf, 2])
inx = range(nx)
iny = range(ny)
fn_real = [sp.RectBivariateSpline(
iny, inx, np.real(zi), kx=k, ky=k) for zi in t]
fn_imag = [sp.RectBivariateSpline(
iny, inx, np.imag(zi), kx=k, ky=k) for zi in t]
interp = np.array([fri(out[1], out[0]) + 1j * fii(out[1], out[0])
for fri, fii, out in zip(fn_real, fn_imag, outr)])
return interp.reshape([nt, na, nb, nq, nf])
[docs]def normalise(v, bpos, refant):
"""
Given an array of raw holography visibilities v, normalise to give
the response of each antenna relative to its response to a point source
at beam centre. Each map value at beam centre is determined by
interpolation: vxx0, vyy0. The normalised quantities are computed as
Nxx = vxx/vxx0, Nxy = vxy/vxx0, Nyx = vyx/vyy0, Nyy = vyy/vyy0
:param v:(numpy.ndarray) visibilities
:param bpos: (np.ndarray) beam position in pixels (nb,2)
:param refant: (int) Reference antenna number (0 based)
:return N: (numpy.ndarray) normalised visibilities
NOTE: This scheme works provided the reference antenna has a lower index than all target antennas
For cases where the correlation is Target * Reference we need:
scale_xy = scale_yy
scale_yx = scale_xx
"""
vxx = v[:, :, :, :1, :, :, :]
vxx0 = interp2D_cmplx(vxx, bpos)
vyy = v[:, :, :, -1:, :, :, :]
vyy0 = interp2D_cmplx(vyy, bpos)
vxy0 = vxx0
vyx0 = vyy0
scales = 1.0 / np.concatenate((vxx0, vxy0, vyx0, vyy0), axis=3)
if refant > 0:
sh = list(scales.shape)
sh1 = sh[:3] + [2, 2] + sh[4:]
scales = scales.reshape(sh1)
scales[:, :refant] = np.einsum('ijkmln', scales[:, :refant])
scales = scales.reshape(sh)
log.info("Scales found")
N = np.einsum('ijklmno,ijklm->ijklmno', v, scales)
return N
[docs]def volt_to_power(v):
"""
Given a Jones matrix v, return the corresponding brightness
matrix b as
b = v * v(h) where (h) is the hermitian transpose.
In both v and b, the polarisation axis is the 4th of 7 axes:
sh = (nt,ne,nb,4,nf,nx,ny)
:param v: (numpy.ndarray) voltage vector of shape sh
:return b: (numpy.ndarray) brightness vector of shape sh
"""
sh = list(v.shape)
new_shape = sh[:3] + [2, 2] + sh[4:]
J = np.reshape(v, new_shape)
# produce the hermitian
JH = np.conjugate(J).transpose((0, 1, 2, 4, 3, 5, 6, 7))
b = np.einsum('tabij...,tabjk...->tabik...', J, JH)
b = np.reshape(b, sh)
return b
[docs]def feed_to_stokes(f):
"""
Given a matrix (voltage or power) in the feed frame, return the corresponding
Stokes matrix. Do this for ASKAP, assuming the measurements were made with the
antenna rotated +45 degrees.
In both f and s, the polarisation axis is the 4th of 7 axes:
sh = (nt,ne,nb,4,nf,nx,ny)
:param f: (numpy.ndarray) vector in feed frame: XX,XY,YX,YY
:return s: (numpy.ndarray) vector in Stokes frame: I,Q,U,V
"""
i = 0 + 1j
# Referring to ASKAP Science Observation Guide (Appendix C) that sets out the
# Mueller matrix for ASKAP with its X-Y probes rotated 45 degrees:
# T = np.array([[1, 0, 0, 1], [s, c, c, -s],[-c, s, s, c], [0, -i, i, 0]])
# where s = sin(2*PA), c = cos(2*PA)
#
# for PA = -45deg (used in the holography observations)
T = np.array([[1, 0, 0, 1], [-1, 0, 0, 1], [0, -1, -1, 0], [0, -i, i, 0]])
s = np.real(np.einsum('ji,tabi...->tabj...', T, f))
return s
[docs]def raw_to_stokes(raw_file_name, refant):
"""
Reads holography data in hdf5 MapSet format. Performs operations on the
data hyper-cube:
* transforms to axes aligned with celestial coords
* determines map values at beam positions in Vxx, Vyy
* derives normalising factors for Vxx, Vxy, Vyx, Vyy visibilities
* normalises the complex data
* determines brightness vector XX, XY, YX, YY
* converts to Stokes I Q U V
* saves both Stokes vector and scale values in hdf5 MapSet format
:param raw_file_name: (str) Name of hdf5 holography data file
:param refant: (int) Reference antenna
:return stokes_obj (MapSet) Holography maps, normalised and in Stokes brightness I Q U V
:return centre_obj (MapSet) Normalising scales - visibilities at beam positions (interpolated).
"""
# Load holography data: mapset.data in ndarray
mso = bf.load_beamset_class(raw_file_name)
log.info("Got data")
# SKY_TRANSFORM - products are XX,XY,YX,YY
products = MapSet.sky_transform_hyper(mso.data)
log.info("Transformed")
# beam positions
pixoff = mso.get_beam_positions()
prod_norm = normalise(products, pixoff, refant)
log.info("Normalised")
# We now have (assuming a good model for the reference source), the polarised response to a source
# with I,Q,U,V = 1,0,0,0 in the complex correlator products Vxx, Vxy, Vyx, Vyy (a Jones matrix)
# Now convert to brightness matrix Bxy
Bxy = volt_to_power(prod_norm)
# and form the stokes brightness vector Bstokes
Bstokes = feed_to_stokes(Bxy)
log.info("Stokes formed")
md = copy.deepcopy(mso.metadata)
md['polarizations'] = ['I', 'Q', 'U', 'V']
# md['payloadshape'] = stokes_cube[0, 0, 0, 0, 0].shape
n_data = MapSet.sky_transform_hyper(Bstokes)
n_flags = np.zeros(Bstokes.shape[:-2], dtype=int)
log.debug(f'n flags sum {n_flags.sum()}')
mso_stokes = MapSet(metadata=md,
data=n_data,
flags=n_flags,
filename=None
)
mso_stokes.add_to_history('Normalised, converted to Stokes')
return mso_stokes
[docs]def get_quality_flags(mapset, amp_max=1.4):
"""
Take MapSet object holding Holography power patterns in the IQUV
Stokes parameters;
look for and flag image planes whose maximum stokes I value significantly exceeds the expected value
of 2.0.
Typically, the large value flags arise from planes that were effected by RFI and produce large
values at random locations away from the beam peak:
:param mapset: MapSet object holding cube of beam images
:type mapset: aces.beamset.mapset.MapSet
:param amp_max: Maximum allowed amplitude
:type amp_max: float, optional
:return: result - a dict holding flags and correspondingly masked data
:rtype: dict
"""
data = MapSet.sky_transform_hyper(mapset.data)
flags = (np.array(mapset.flags)).astype(bool)
report_flags(flags, 'in get_quality_flags')
if flags.all():
log.warning('All flags are True - this is not expected. Setting all flags to False')
flags = np.zeros(flags.shape, dtype=bool)
nt, na, nb, nq, nf = flags.shape
nx = data.shape[5]
# Form a masked version of the data using the flags derived earlier
# the flags array from MapSet object are given two additional axes to
# allow the construction of a masked array from the data.
a = np.repeat(flags[:, :, :, :, :, np.newaxis], nx, axis=-1)
flg = np.repeat(a[:, :, :, :, :, :, np.newaxis], nx, axis=-1)
fdata = np.ma.masked_array(data, mask=flg)
# First flag image planes with an unrealistic maximum in Stokes I
fmax = fdata[0, :, :, 0, :].max(axis=3).max(axis=3)
# Factor of 2 required because we have normalised XX and YY and formed sum.
mk_max = fmax > 2.0 * amp_max
# and remask data buffer
a = np.repeat(mk_max[:, :, np.newaxis, :], nq, axis=2)
flags[0] = np.logical_or(flags[0], np.array(a))
a = np.repeat(flags[:, :, :, :, :, np.newaxis], nx, axis=-1)
flg = np.repeat(a[:, :, :, :, :, :, np.newaxis], nx, axis=-1)
fdata = np.ma.masked_invalid(np.ma.masked_array(data, mask=flg))
result = {'flags': flags, 'masked_data': fdata}
return result
[docs]def save_mean(mean_cube, mapset, out_name):
"""
Takes the array-wide mean cube and writes it to an hdf5 file in the BeamSet format
:param mean_cube: Data cube of shape (nb, npol, nf, nx, ny)
:param mapset: BeamSet object from full Stokes cube; used to derive new metadata for output
:param out_name: Name of output file
:type out_name: str
:return:
"""
in_shape = mean_cube.shape
out_shape = [1, 1] + list(in_shape)
mdc = copy.deepcopy(mapset.metadata)
mdc['antennas'] = [0]
p_data = np.zeros(out_shape, dtype=float)
p_data[0, 0] = mean_cube.data
p_data = MapSet.sky_transform_hyper(p_data)
p_flags = np.zeros(out_shape[:-2], dtype=bool)
if isinstance(mean_cube, np.ma.core.MaskedArray):
p_flags[0, 0] = mean_cube.mask.all(axis=(-2, -1))
mso_mean = MapSet(metadata=mdc,
data=p_data,
flags=p_flags,
filename=None
)
mso_mean.add_to_history('mean obj made')
mso_mean.write_to_hdf5(out_name)
[docs]def centre_beams(mean_cube, mapset, kxy=5, nxy=101):
"""Takes the mean holography cube (averaged over antennas) and
interpolates each beam onto a grid centred on its nominal position
and with the given grid spacing
:param mean_cube: shape Nb,Np,Nf,Nx,Ny
:type mean_cube: np.ndarray
:param mapset: parent object - for metadata
:type mapset: [type]
:param kxy: Degrees of bivariate spline used in RecBivariateSpline, defaults to 5
:type kxy: int, optional
:param nxy: Size of output grid (must be odd integer)., defaults to 101
:type nxy: int, optional
:return: TODO
:rtype: TODO
"""
big = 100.
pixpos = mapset.get_beam_positions()
# Determine size from minimum position to edge distance
nb = mapset.Nb
xstep = mapset.xs[1] - mapset.xs[0]
x0, y0 = 0, 0
x1, y1 = mapset.metadata['payloadshape']
minx, miny = big, big
for ib in range(nb):
px, py = pixpos[ib]
minx = min(minx, abs(px - x0), abs(px - x1))
miny = min(miny, abs(py - y0), abs(py - y1))
semi_size = min(minx, miny)
nxy_out = nxy
xh = np.linspace(pixpos[:, 0] - semi_size,
pixpos[:, 0] + semi_size, nxy_out)
yh = np.linspace(pixpos[:, 1] - semi_size,
pixpos[:, 1] + semi_size, nxy_out)
nb, nq, nf, nx, ny = mean_cube.shape
# Change shape from beams x pols x channels to channels x pols x beams
work = mean_cube.transpose((2, 1, 0, 3, 4))
work = np.reshape(work, [nq * nf, nb, nx, ny])
inx = range(nx)
iny = range(ny)
res = []
for ib in range(nb):
xy = np.array([xh[:, ib], yh[:, ib]])
outr = xy
fn = [sp.RectBivariateSpline(iny, inx, w, kx=kxy, ky=kxy)
for w in work[:, ib]]
interp = np.array([fni(outr[1], outr[0]) for fni in fn])
res.append(interp)
# Set result shape to be beams x channels x pols
res = np.array(res).reshape((nb, nf, nq, nxy_out, nxy_out))
res = np.array(res)
dxy = np.degrees((xh[1] - xh[0]) * xstep)
return res, dxy[0]
[docs]def tt(fstart, beam_cube, nterm):
"""For correction of continuum images we turn the frequency axis
into a taylor term axis.
- Note this requires picking a centre frequency f0, for which the
Taylor coefficients are computed
:param fstart: Starting frequency in MHz
:type fstart: float
:param beam_cube: Beam maps, averaged over antennas
:type beam_cube: np.ndarray
:param nterm: Number of Taylor terms
:type nterm: int
:return: TODO
:rtype: TODO
"""
nb, nstokes, nchan, nx, ny = beam_cube.shape
# Assuming 1MHz channels
chanw = 1.e6
fend = fstart + (nchan - 1) * chanw
f0 = (fstart + fend) / 2
f = np.arange(fstart, fend + chanw / 10, chanw)
bw = fend - fstart
# frequency weights
w = (f - f0) / f0
wv = np.array([w / w, w, w * w][:nterm])
# Hessian
H = np.zeros((nterm, nterm))
for i in range(nterm):
for j in range(nterm):
H[i, j] = np.sum(w ** (i + j))
# Inverse hessian
Hinv = np.linalg.inv(H)
# t -1 t
# Calculating (A A) A b, with A = frequency weights vector
speclist = []
pblist = []
for s in range(nstokes):
b = np.tensordot(wv, beam_cube[:, s], axes=([1], [1]))
pbt = np.tensordot(Hinv, b, axes=([1], [0]))
pbt = pbt.swapaxes(0, 1)
pblist.append(pbt)
# evaluate terms
ttspectra = np.tensordot(pbt, wv, axes=([1], [0]))
ttspectra = np.moveaxis(ttspectra, -1, 1)
speclist.append(ttspectra)
# Stack the taylor terms
pbcube = np.stack(pblist, axis=2)
tt_cube = np.stack(speclist, axis=2)
# Axes are: beam, taylor, stokes, freq, dec, ra
# Add empty axis for dummy frequency
pbcube = np.expand_dims(pbcube, axis=3)
tt_cube = np.expand_dims(tt_cube, axis=3)
return pbcube, tt_cube, f0, bw
[docs]class BeamModel(object):
"""
Class - to be improved 1. hold data in hdf5 file, not pickle.
"""
def __init__(self, bs_obj):
pkg = self._radial_gau_gen()
xk, ref_freq, model, fn = pkg
self.xs = xk
xax = np.degrees(bs_obj.metadata['xAxis'])
self.nperside = len(xax)
self.centre = (self.nperside - 1) / 2.0
self.delta_x = xax[1] - xax[0]
self.edge = xax[-1]
self.ref_freq = ref_freq
self.model = model
self.bv_func = fn
[docs] def get(self, pixoff, freq):
"""TODO
:param pixoff: TODO
:type pixoff: TODO
:param freq: TODO
:type freq: TODO
:return: TODO
:rtype: TODO
"""
px, py = self.delta_x * (pixoff - self.centre) * freq / self.ref_freq
end = self.edge * freq / self.ref_freq
xm = np.linspace(-end - px, end - px, self.nperside)
ym = np.linspace(-end - py, end - py, self.nperside)
return self.bv_func(ym, xm)
@staticmethod
[docs] def _radial_gau_gen():
"""
Parameters used here were derived from a 'good set' of mid-band beams (SBID=16881),
the Gaussian parameter (0.6945) minimises the difference between model gaussian and measured
average beam shape at 1438.5 MHz.
"""
ref_freq = 1438.5
xg = np.linspace(-4.2, 4.2, 211)
yg = xg
xh, yh = np.meshgrid(xg, yg)
rh = np.sqrt(xh ** 2 + yh ** 2)
ga = np.exp(-(rh / 0.6945) ** 2) * 2
fn2 = sp.RectBivariateSpline(xg, yg, ga, kx=2, ky=2)
pkg2 = (xg, ref_freq, ga, fn2)
return pkg2
[docs]def get_beampos_errs(mapset, bmodel):
"""TODO
:param mapset: TODO
:type mapset: TODO
:param bmodel: TODO
:type bmodel: TODO
:return: TODO
:rtype: TODO
"""
na, nb, nf = mapset.Na, mapset.Nb, mapset.Nf
shiftss = np.ones((na, nb, nf, 2))
resid = np.zeros([na, nb, nf])
amps = np.zeros([na, nb, nf])
pixoff = mapset.get_beam_positions()
cube_s = MapSet.sky_transform_hyper(mapset.data)[0]
flags = mapset.flags[0]
ants = range(na)
for chan in range(nf):
for beam in range(nb):
ref = bmodel.get(pixoff[beam], mapset.frequencies[chan])
for ant in ants:
if flags[ant, beam, 0, chan]:
shiftss[ant, beam, chan] = [np.nan, np.nan]
resid[ant, beam, chan] = np.nan
else:
refa = np.abs(cube_s[ant, beam, 0, chan])
gs_ret = get_shift(mapset, ref, refa, frac=0.25)
shiftss[ant, beam, chan] = gs_ret['shift']
amps[ant, beam, chan] = gs_ret['amp_ratio']
result = gs_ret['result']
if len(result[1]) == 0:
resid[ant, beam, chan] = np.nan
else:
resid[ant, beam, chan] = result[1][0] / result[4]
return shiftss, resid, amps
[docs]def get_shift(mapset, ref_im, target_im, frac=0.25):
"""Compute the shift of target_im relative to ref_im.
Use Fourier shift theorem
The sense of the returned shifts:
If the returned value shift = (dl, dm), and if the position of reference and target
beams are (lr, mr) and (lt, mt) respectively, then dl > 0.0 indicates lt > lr
and if dm > 0.0 then tm > rm. Coordinate $l$ increases towards the west; $m$ increases towards the north.
:param mapset: Primary mapset
:type mapset: [type]
:param ref_im: Reference image ndim=2
:type ref_im: np.ndarray
:param target_im: Target image
:type target_im: np.ndarray
:param frac: Mask transform values < ampMax * frac in phase fit, defaults to 0.25
:type frac: float, optional
:return: TODO
:rtype: TODO
"""
n = ref_im.shape[0]
n2 = n // 2
f = fft.fftshift(fft.fft2(ref_im))
fa = fft.fftshift(fft.fft2(target_im))
zf = np.angle(fa / f).flatten()
amp = np.abs(fa)
clip = amp.max() * frac
zm = np.ma.masked_array(zf, amp < clip)
x = np.arange(0, n, 1) - n2
y = np.arange(0, n, 1) - n2
xg, yg = np.meshgrid(x, y)
xf = xg.flatten()
yf = yg.flatten()
zf = zm[~zm.mask]
xf = xf[~zm.mask]
yf = yf[~zm.mask]
zf = np.expand_dims(zf, 1)
npts = zf.shape[0]
A = np.c_[xf, yf]
result = np.linalg.lstsq(A, zf, rcond=None)
coeff, r, rank, s = result
result_ret = result + (npts,)
xs = mapset.metadata['xAxis']
span = xs[-1] - xs[0]
shift = -1.0 * coeff[:, 0] / (2.0 * np.pi) * span
if len(r) > 0:
qual = np.ma.masked_array(np.abs(fa / f), amp < clip).mean()
else:
qual = np.nan
result = {'shift': shift,
'result': result_ret,
'G': f,
'H': fa,
'amp_ratio': qual}
return result
[docs]def centre_mass(im, xaxis):
xg, yg = np.meshgrid(xaxis, xaxis)
imx = im * xg
imy = im * yg
sh = np.array([imx.sum() / im.sum(), imy.sum() / im.sum()])
return sh
[docs]def get_beampos_errs_from_fits(fits_header, fits_data):
fh = fits_header
fd = fits_data
na, nb, nf = 1, fd.shape[0], 1
shiftss = np.ones((na, nb, nf, 2))
resid = np.zeros([na, nb, nf])
amps = np.zeros([na, nb, nf])
dx = fh['CDELT2']
xax = (np.arange(0.0, 101., 1.0) - (fh['CRPIX1'] - 1.0)) * dx
cube_s = np.expand_dims(fd, axis=0)
ants = range(na)
for chan in range(nf):
for beam in range(nb):
for ant in ants:
refa = np.abs(cube_s[ant, beam, chan])
sh = centre_mass(refa, xax)
shiftss[ant, beam, chan] = np.radians(sh)
return shiftss, resid, amps
[docs]def shifts_to_mapset(mapset, shifts, resids, amps):
"""
Package the beam shift data into a Mapset object
:param mapset: Input mapset object, holdng the holography maps
:param shifts: Stokes I beam position offsets
:param resids: Residuals to offset fit
:param amps:
:return: Mapset object holdong shift data
"""
mds = copy.deepcopy(mapset.metadata)
mds['polarizations'] = ['I']
mds['payloadshape'] = (4,)
mds['xAxis'] = [0.0]
mds['yAxis'] = [0.0]
sdata = np.zeros([1, mapset.Na, mapset.Nb, 1, mapset.Nf, 4])
sdata[0, :, :, 0, :, :2] = shifts
sdata[0, :, :, 0, :, 2] = resids
sdata[0, :, :, 0, :, 3] = amps
sflags = np.array(mapset.flags)
bso_shift = BeamSet(metadata=mds,
data=sdata,
flags=sflags,
filename=None
)
bso_shift.add_to_history('Beam position errs, resid, amps')
return bso_shift
[docs]def save_shifts(beamset, holo_dir):
"""
Save a BeamSet object holding beam errors to an hdf5 file:
:param beamset: BeamSet object holding beam pos errors
:type beamset: BeamSet
:param holo_dir: Directory to receive hdf5 file
:type holo_dir: str
"""
outnam = f'{holo_dir}/{hf.make_file_name(beamset, "beam_shifts.hdf5")}'
print('Save shifts to {}'.format(outnam))
beamset.write_to_hdf5(outnam)
[docs]def design_mat(x, op):
N = len(x)
M = op
a = np.zeros([N, M])
for j in range(op):
a[:, M - 1 - j] = x ** j
return a
[docs]def pfit(x, y, op):
a = design_mat(x, op + 1)
yc = y.filled(fill_value=0.0)
am = a.copy()
am[y.mask] = 0.0
coeff, r, rank, s = np.linalg.lstsq(am, yc, rcond=None)
return coeff, r, a
[docs]def get_posfit_flags(resid, amps, res_limits, amp_limits):
"""TODO
:param resid: TODO
:type resid: TODO
:param amps: TODO
:type amps: TODO
:return: n_flags New flags of shape to match obj.flags; True for bad data
:rtype: TODO
"""
na, nb, nf = resid.shape
xs = np.arange(0, nf, 1.0)
coeffs, upper = res_limits
poly_order = coeffs.shape[0]
A = design_mat(xs, poly_order)
base = np.dot(A, coeffs).T
upper_div = upper[:, None] / base
resid_div = np.ma.masked_invalid(resid / base[None, :, :]).filled(fill_value=-1.0)
compr = np.logical_or(resid_div > 1.0 + upper_div[None, :, :], resid_div < 0.0)
amp_lo, amp_hi = amp_limits
compa = np.logical_or(amps < amp_lo, amps > amp_hi)
comp = np.logical_or(compr, compa)
# print('compr sum ', compr.sum())
# print('compa sum ', compa.sum())
# print('comp sum ', comp.sum())
n_flags = np.repeat(comp[:, :, None, :], 4, axis=2)
# print('n_flags sum ', n_flags.sum())
return n_flags[np.newaxis]
[docs]def set_resid_lim(resids, order=2, iqr_fac=2.0):
# Look at statistics to guess a fair limit above which to flag.
# Do this separately for each beam
# At some frequencies, the shift residuals are quite beam and frequency
# dependent. The current (new as of 2022June) approach is to fit a low-order
# polynomial to the array-wide spectrum for each beam, robustly, excluding outliers.
# This routine returns, for each beam, coefficients and lower and upper increments:
# the intention is that, for any antenna,beam spectrum of residuals, points outside
# the range [poly-lower,poly+upper] should be flagged.
na, nb, nf = resids.shape
coeffs = np.zeros([order + 1, nb])
upper = np.zeros(nb)
for beam in range(nb):
ys = np.ma.masked_invalid(resids[:, beam])
rpf = robust_poly_fit(ys, poly_ord=order, sigma_lim=0.8, max_iter=17)
ymsk = rpf['masked_input']
yfit = rpf['y_fit']
yres = rpf['resid']
coeff = rpf['coeffs']
coeffs[:, beam] = coeff
upper[beam] = rpf['resid'][-1] * iqr_fac
return coeffs, upper
[docs]def get_shift_flags(shifts, shift_fit_flags):
"""TODO
:param shifts:
:param shift_fit_flags:
:return: n_flags New flags of shape to match obj.flags; True for bad data
:rtype: TODO
"""
abshifts = np.ma.masked_array(np.sqrt(shifts[:, :, :, 0] ** 2 + shifts[:, :, :, 1] ** 2),
mask=shift_fit_flags[0, :, :, 0])
na, nb, nf = abshifts.shape
flagged = np.ones([na, nb, nf], dtype=bool)
for a in range(na):
for b in range(nb):
spec = abshifts[a, b]
if spec.count() > 0:
spf = robust_poly_fit(spec, poly_ord=3)
flagged[a, b] = spf['masked_indep'].mask
n_flags = np.repeat(flagged[:, :, None, :], 4, axis=2)
return n_flags[np.newaxis]
[docs]def main(sbid,
holo_dir='.',
remake_stokes=False,
max_order=5,
param='BIC',
max_iter=50,
snr_limit=3):
"""Main script
:param sbid: SBID
:type sbid: int
:param holo_dir: Directory containing holo data, defaults to '.'
:type holo_dir: str, optional
:param remake_stokes: Remake Stokes parameters, defaults to False
:type remake_stokes: bool, optional
:param max_order: Maximum polynomial to fit, defaults to 5
:type max_order: int, optional
:param param: Criterion to evaluate fit, defaults to 'BIC'
:type param: str, optional
:param max_iter: Maximum interations for fitting, defaults to 50
:type max_iter: int, optional
:param snr_limit: SNR limit for fitting, defaults to 3
:type snr_limit: int, optional
"""
# Stage 1:
# Raw holography visibility data to Stokes hypercube and Scales data (both in MapSet format)
# Either construct the Stokes cube, or read it from a previous determination.
# Clean up dir
if holo_dir[-1] == '/':
holo_dir = holo_dir[:-1]
try:
sb = SchedulingBlock(sbid)
myRA = int(''.join(filter(lambda char: char in string.digits,
sb.get_parameters()['holography.ref_antenna'])))
except:
myRA = 1
refant = myRA - 1 # 0-based
if remake_stokes:
# Construct Stokes cube from raw holography data
in_name_g = hf.find_holo_file(holo_dir, pol='xxyy', sbid=sbid, kind='grid.hdf5')
if in_name_g is not None:
in_name = in_name_g
log.info("Using regridded data : {}".format(in_name_g))
else:
log.warning("No regridded data found : {}".format(in_name_g))
in_name = hf.find_holo_file(holo_dir, pol='xxyy', sbid=sbid, kind='hdf5')
mso_stokes = raw_to_stokes(in_name, refant)
report_flags(mso_stokes.flags, 'raw to stokes')
# Estimate, from Stokes I images, the beam positions relative to nominal
model_file = pkg_resources.resource_filename("aces.holography", "mean_beam_model.pkl")
if os.path.exists(model_file):
bmodel = BeamModel(mso_stokes)
qf = get_quality_flags(mso_stokes)
quality_flags = qf['flags']
report_flags(quality_flags, 'get_quality_flags')
log.debug(quality_flags.shape, np.product(quality_flags.shape), quality_flags.sum())
if quality_flags.all():
log.warning('All quality flags are True - this is not expected! Not updating flags.')
else:
mso_stokes.flags = quality_flags
mso_stokes.add_to_history('Flags updated')
shifts, resids, amps = get_beampos_errs(mso_stokes, bmodel)
bso_shift = shifts_to_mapset(mso_stokes, shifts, resids, amps)
save_shifts(bso_shift, holo_dir)
res_lims = set_resid_lim(resids, order=3, iqr_fac=3.0)
amp_lims = [0.8, 1.25]
shift_fit_flags = get_posfit_flags(resids, amps, res_lims, amp_lims)
report_flags(shift_fit_flags, 'after posfit flags')
shift_flags = get_shift_flags(shifts, shift_fit_flags)
report_flags(shift_flags, 'after get shift flags')
log.debug(f'new_flags sum {shift_flags.sum()}')
if shift_flags.all():
log.warning('All shift flags are True - this is not expected! Not updating flags.')
else:
mso_stokes.flags = shift_flags
mso_stokes.add_to_history('Flags updated')
log.info("Shifts and flags done")
else:
log.warning("No beam model file found {}".format(model_file))
report_flags(mso_stokes.flags, 'on writing')
out_name = f'{holo_dir}/{hf.make_file_name(mso_stokes, kind="stokes.hdf5")}'
mso_stokes.write_to_hdf5(out_name)
else:
# Read the Stokes cube from hdf5 file
in_name = hf.find_holo_file(holo_dir, pol='iquv', sbid=sbid, kind='hdf5')
# Load holography data: obj.data in ndarray
mso_stokes = bf.load_beamset_class(in_name)
# Stage 2:
# prepare_mean: find good/bad antennas, form mean over good.
# Interpolate spectra over bad channels
# Shift each beam image to the centre of its grid, and write FITS file for LINMOS.
mean_cube = form_mean(mso_stokes)
log.info("Mean prepared")
mean_name = f'{holo_dir}/{hf.make_file_name(mso_stokes, kind="mean.hdf5")}'
save_mean(mean_cube, mso_stokes, mean_name)
mean_cube_interp = fit_all_spectra(mean_cube, poly_ord=max_order)
mean_name_int = f'{holo_dir}/{hf.make_file_name(mso_stokes, kind="mean_interp.hdf5")}'
save_mean(mean_cube_interp, mso_stokes, mean_name_int)
log.info("Spectral interpolation done")
kxy = 5
res, dxy = centre_beams(mean_cube_interp, mso_stokes, kxy=kxy)
log.info("Beams centred")
res = res.transpose((0, 2, 1, 3, 4))
# Correct for Stokes I falloff
i_res = res[:, 0]
res[:, 1:] /= i_res[:, np.newaxis]
log.info("QUV divided by I")
write_to_fits(
res,
mso_stokes,
dxy,
holo_dir,
)
# Finally, produce a Taylor Term cube
nterm = 3
fstart = mso_stokes.frequencies[0] * 1.0e6
(beamtt, beamcubefit, f0, bw) = tt(fstart, res, nterm)
write_tt_to_fits(beamtt, mso_stokes, f0, bw, dxy, holo_dir)
log.info("All finished")
[docs]def cli():
import argparse
"""
Command line interface
"""
# Help string to be shown using the -h option
descStr = """
Clean holography data (2022Aug10)
"""
epilog_text = """
"""
# Parse the command line options
parser = argparse.ArgumentParser(description=descStr,
epilog=epilog_text,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
'sbid',
metavar='sbid',
type=int,
help='SBID of processed holography (stored in HDF5) [no default].')
parser.add_argument(
'-d',
dest='holo_dir',
type=str,
default='.',
help='Directory containing holography data [./].')
parser.add_argument(
"-r",
dest="remake_stokes",
action="store_true",
help="Remake averaged Stokes cube [False]."
)
parser.add_argument(
"--snr_limit",
type=float,
default=3,
help="SNR clipping level on model fitting [3]."
)
parser.add_argument(
"--max_order",
type=int,
default=5,
help="Maximum polymomial to fit to spectra [5]."
)
parser.add_argument(
"--param",
type=str,
choices=['AIC', 'BIC'],
default='BIC',
help="How to evaluation fits [BIC]."
)
parser.add_argument(
"-v",
"--verbosity",
action="count",
help="Increase output verbosity",
default=0
)
args = parser.parse_args()
if args.verbosity == 1:
log.basicConfig(
level=log.INFO,
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
elif args.verbosity >= 2:
log.basicConfig(
level=log.DEBUG,
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
main(sbid=args.sbid,
holo_dir=args.holo_dir,
remake_stokes=args.remake_stokes,
max_order=args.max_order,
param=args.param,
snr_limit=args.snr_limit,
)
if __name__ == "__main__":
sys.exit(cli())