"""A set of utility routines for various beam modelling exercises"""
[docs]__author__ = 'Dave McConnell <david.mcconnell@csiro.au>'
import numpy as np
import scipy.special as ssp
import matplotlib.pylab as plt
import itertools
# todo : remove unused functions
[docs]def airy(x):
# Return airy function value at x
# x in units of lambda/D
return 2 * ssp.jn(1, x * pi) / x / pi
[docs]def airy1d(xgrid, xc, wid=1.0):
wf = 0.7095 * wid
aix = airy((xgrid - xc) / wf)
aix[np.isnan(aix)] = 1.0
return aix
[docs]def airy2d(xgrid, xc, yc, wid=1.0):
wf = 0.7095 * wid
x, y = np.meshgrid(xgrid, xgrid)
r = np.sqrt((x - xc) * (x - xc) + (y - yc) * (y - yc))
aix = airy(r / wf)
aix[np.isnan(aix)] = 1.0
return aix
[docs]def gauss1d(M, dxy, xc, wid=1.0):
x0 = - M / 2 * dxy
x1 = M / 2 * dxy
x = np.arange(x0, x1, dxy)
ag2 = wid ** 2 / (4 * np.log(2))
r2 = (x - xc) ** 2 / ag2
z = np.exp(-r2)
return x, z
[docs]def gauss2d(xgrid, xc, yc, wid=1.0):
x, y = np.meshgrid(xgrid, xgrid)
ag2 = wid ** 2 / (4 * np.log(2))
r2 = ((x - xc) ** 2 + (y - yc) ** 2) / ag2
z = np.exp(-r2)
return x, z
[docs]def empirical_fwhm(freq):
# return the empirically determined FWHM as a function of frequency in MHz.
factor = 1.12
diameter = 12.0
fwhm = 299.8 / freq / diameter * factor
return fwhm
[docs]def get_grid(M, w):
xg = np.linspace(-w, w, M)
return xg
[docs]def make_beams(xgrid, offsets, fwhm):
# compute beam values on given grid
# at offsets given
# Produces voltage beams with the given FWHM once converted to power.
wi = 1.368
wia = wi * fwhm
nb = offsets.shape[0]
tbeams = None
for i in range(nb):
xci, yci = offsets[i]
ai0 = airy2d(xgrid, xci, yci, wid=wia)
if i == 0:
tbeams = ai0
else:
tbeams = np.dstack((tbeams, ai0))
if nb == 1:
tbeams = np.expand_dims(tbeams, axis=2)
abeams = np.transpose(tbeams, (2, 0, 1))
return abeams
[docs]def calc_cov(abeams):
Nb = abeams.shape[0]
ra2 = None
for i in range(Nb):
psai = []
ai0 = abeams[i, :, :]
for j in range(Nb):
aia = abeams[j, :, :]
psai.append((ai0 * aia).sum())
psai = np.array(psai)
ra = psai ** 2 / (ai0 ** 2).sum() ** 2
if i == 0:
ra2 = ra
else:
ra2 = np.vstack((ra2, ra))
cov = np.matrix(ra2)
return cov
[docs]def apodize_paf_0(xgrid, centre=None, unit=False):
# define an apodizing function that mimics the observed sensitivity variation across the PAF.
# formed from quadratic fit, parameters in variable m
# No other manipulation of the function is made; see routine apodize_paf.
if centre is None:
centre = [0.0, 0.0]
if unit:
apod = np.ones((xgrid.shape[0], xgrid.shape[0]))
else:
m = [5.68171268e-04, -2.42077280e-04, -2.78766255e-02, 2.68446142e-04,
1.14317719e-03, -2.17087761e-01, -2.47329723e-02, 1.57161909e-01, -2.94909312e+01]
x, y = np.meshgrid(xgrid, xgrid)
xc, yc = centre
xr = (x - xc) / root2 - (y - yc) / root2
yr = (x - xc) / root2 + (y - yc) / root2
apod = polyval2d(xr, yr, m)
apod[apod < 0.0] = 0.0
apod /= apod.max()
return apod
[docs]def apodize_paf_posang_0(xgrid, centre=None, unit=False):
# define an apodizing function that mimics the observed sensitivity variation across the PAF.
# formed from quadratic fit, parameters in variable m
# No 45deg rotation is applied.
if centre is None:
centre = [0.0, 0.0]
if unit:
apod = np.ones((xgrid.shape[0], xgrid.shape[0]))
else:
m = [5.68171268e-04, -2.42077280e-04, -2.78766255e-02, 2.68446142e-04,
1.14317719e-03, -2.17087761e-01, -2.47329723e-02, 1.57161909e-01, -2.94909312e+01]
x, y = np.meshgrid(xgrid, xgrid)
xc, yc = centre
xr = (x - xc)
yr = (y - yc)
apod = polyval2d(xr, yr, m)
apod[apod < 0.0] = 0.0
apod /= apod.max()
return apod
[docs]def polyval2d(x, y, m):
order = int(np.sqrt(len(m))) - 1
ij = itertools.product(range(order + 1), range(order + 1))
z = np.zeros_like(x)
for a, (i, j) in zip(m, ij):
z += a * x ** i * y ** j
return z
[docs]def fnc(x, a, e):
# note the pattern trimming limit of 4.2 degrees
y = 1.0 / (1 + (x / a) ** e)
y[abs(x) > 4.2] = 0.0
return y
[docs]def design_mat_2(X, Y, b):
al = [X, Y]
for f in [0.71, 1.0, 1.41, 2.0, 2.82]:
b1 = f * b
for i in [2, 4, 8]:
al.append(fnc(X, b1, i) * fnc(Y, b1, i))
A = np.array(al).T
return A
[docs]def apodize_paf(xgrid, centre=None, posang=0.0, unit=False):
# Define an apodizing function that mimics the observed sensitivity variation across the PAF.
# formed from fit to a set of rational functions, parameters in variable m
# Expects xgrid (in radians), centre and PA (position angle) both in radians.
# Needs access to functions design_mat and fnc
if centre is None:
centre = [0.0, 0.0]
if unit:
apod = np.ones((xgrid.shape[0], xgrid.shape[0]))
else:
design_mat = design_mat_2
# coeff = np.array([1.28107531e+01, 4.56911754e-01, 5.17826052e-01,-1.51419796e+02,-1.49542178e+01,
# -1.58078435e+00, 6.77485394e+02, 4.67563591e+01, -6.80814573e+00,-1.41500519e+03,
# 1.23011095e+02, 4.87414906e+01, 1.10677965e+03, -5.81216252e+02, 1.55424512e+02])
coeff = np.array([5.09633567e-04, -6.67986801e-04, -1.53786901e+01, -1.90039866e+00,
-5.02886076e-01, 1.10132646e+02, 1.07159428e+01, 9.09866150e-01,
-3.56162780e+02, -1.44262434e+01, 3.35535219e+00, 6.17456821e+02,
-1.26227972e+02, -2.47940423e+01, -4.38376122e+02, 4.48199146e+02,
-2.12011984e+02])
b = 3.84
small_value = 1.0e-4
cosa, sina = np.cos(posang), np.sin(posang)
xg, yg = np.meshgrid(xgrid, xgrid)
Xr = np.degrees(xg.flatten())
Yr = np.degrees(yg.flatten())
X = cosa * Xr - sina * Yr
Y = sina * Xr + cosa * Yr
xc, yc = centre
xcd = np.degrees(cosa * xc - sina * yc)
ycd = np.degrees(sina * xc + cosa * yc)
xr = (X - xcd)
yr = (Y - ycd)
A = design_mat(xr, yr, b)
apod = np.reshape(np.dot(A, coeff), xg.shape)
# trimming the pattern is done in fnc(x,a,e) with a hard coded value!
rlim = np.radians(4.7)
# xylim = np.radians(4.0)
apod[(xg - xc) ** 2 + (yg - yc) ** 2 > rlim ** 2] = small_value
apod[apod < 0.0] = small_value
apod /= apod.max()
return apod
[docs]def draw_apod(grid_wid=3.0, levels=None, plot_grid=False):
M = 81
xgrid = get_grid(M, np.radians(grid_wid))
xm, ym = np.meshgrid(xgrid, xgrid)
apod = apodize_paf(xgrid)
xymax = np.where(apod == apod.max())
xmax = xm[xymax[0][0], xymax[1][0]]
ymax = ym[xymax[0][0], xymax[1][0]]
if levels:
levs = levels
else:
levs = [0.95 * 0.9 ** p for p in range(8)]
CS = plt.contour(np.degrees(xm), np.degrees(ym), apod, levels=levs)
plt.clabel(CS, CS.levels, inline=True, fmt="%4.2f", fontsize=10)
plt.plot([xmax], [ymax], '+', ms=20)
if plot_grid:
plt.grid()
# Define PAF map routines
[docs]def make_paf_map(cov, beams, sefds=None):
# Expect Matrix C0, the normalised covariance matrix (computed from model beam weights in calCov)
# beams[Nb,M,M] being the power response of each beam on an MxM grid.
# If sefds is defined, it is an array of measured SEFD values (in Jy) in beam order.
# Returns PAF map s in signal variance.
# s is for the given beam covariance matrix
M = beams.shape[1]
Nb = beams.shape[0]
if sefds is None:
# var = 1900.0**2
var = 1.0
v = np.array([var] * Nb)
else:
v = sefds ** 2
C = np.array(np.zeros([Nb, Nb]))
for i in range(Nb):
for j in range(Nb):
C[i, j] = cov[i, j] * np.sqrt(v[i] * v[j])
covi = np.linalg.inv(C)
s = np.zeros([M, M])
for p in range(M):
for q in range(M):
b = beams[:, p, q].T
si = 1.0 / np.dot(b.T, np.dot(covi, b).T)
s[p, q] = si
return s
[docs]def make_paf_map2(C0, beams):
# Expect Matrix C0, the normalised covariance matrix (computed from model beam weights in calCov)
# beams[M,M,Nb] being the powe response of each beam on an MxM grid.
# Returns two PAF maps, s and t in signal variance.
# t is for no correlation between beams
# s is for the given beam covariance matrix
M = beams.shape[1]
Nb = beams.shape[0]
W = np.array([1.0] * Nb).transpose()
D0 = np.diag([1.0] * Nb)
v = np.ones(Nb)
C = np.array(np.zeros([Nb, Nb]))
D = np.array(np.zeros([Nb, Nb]))
for i in range(Nb):
D[i, i] = D0[i, i] * v[i]
for j in range(Nb):
C[i, j] = C0[i, j] * np.sqrt(v[i] * v[j])
C1 = np.array(np.zeros([Nb, Nb]))
D1 = np.array(np.zeros([Nb, Nb]))
s = np.zeros([M, M])
t = np.zeros([M, M])
for p in range(M):
for q in range(M):
for i in range(Nb):
for j in range(Nb):
C1[i, j] = C[i, j] / (beams[i, p, q] * beams[j, p, q])
D1[i, j] = D[i, j] / (beams[i, p, q] * beams[j, p, q])
Ci = np.linalg.inv(C1)
si = (W.transpose() * Ci * W) ** -1
s[p, q] = si
Di = np.linalg.inv(D1)
ti = (W.transpose() * Di * W) ** -1
t[p, q] = ti
return s, t
[docs]class pafmap(object):
def __init__(self, xgrid, fwhm, fpname, pitch, freq):
# xgrid - 1D grid used to form 2D map
# fwhm - beam full width at half max
# fpname - footprint name
# pitch - beam separation
# freq - frequency (MHz)
#
# All angles in radians.
#
self.fp_name = fpname
self.xg = xgrid
self.nx = self.xg.shape[0]
self.nx2 = self.nx / 2
self.dx = xgrid[1] - xgrid[0]
self.da = self.dx ** 2
self.area = self.da * self.nx ** 2
self.fwhm = fwhm
self.pitch = pitch
self.pif = pitch / fwhm
self.freq = freq
self.hx = max(2, int(self.pitch / self.dx / 2 + 0.5))
self.maps = []
self.offsets = []
self.ripple = []
self.eqArea = []
self.centroids = []
self.mosaic = None
self.mosaic_ripple = 0.0
self.mosaic_centroid = []
self.areaContour = 0.0
self.equivArea = 0.0
[docs] def addmap(self, var, offsets):
# expects the variance map and the footprint interleaving offsets
self.maps.append(var)
self.offsets.append(offsets)
xc, yc = self._calc_centroid(var)
self.centroids.append((xc, yc))
# for ripple calculation, centre the region on the footprint centre
# xc,yc = offsets
ixc, iyc = int(xc + 0.5), int(yc + 0.5)
ix, jx = max(0, ixc - self.hx), min(self.nx, ixc + self.hx)
iy, jy = max(0, iyc - self.hx), min(self.nx, iyc + self.hx)
cen = np.sqrt(1.0 / var[iy:jy, ix:jx])
ri = (cen.max() - cen.min()) / cen.mean()
self.ripple.append(ri)
iv = 1.0 / var
self.eqArea.append(iv.sum() / iv.max() * self.da)
[docs] def make_lin_mos(self):
Ni = len(self.maps)
denom = 1.0 / self.maps[0]
for s in self.maps[1:]:
denom += 1.0 / s
self.mosaic = Ni / denom
xc, yc = self._calc_centroid(self.mosaic)
self.mosaic_centroid = (xc, yc)
# for ripple calculation of the mosaic, use the calculated centroid for the region
ixc, iyc = int(xc + 0.5), int(yc + 0.5)
ix, jx = max(0, ixc - self.hx), min(self.nx, ixc + self.hx)
iy, jy = max(0, iyc - self.hx), min(self.nx, iyc + self.hx)
# note ordering of indices (y,x) is correct for np arrays
cen = np.sqrt(1.0 / self.mosaic[iy:jy, ix:jx])
self.mosaic_ripple = (cen.max() - cen.min()) / cen.mean()
mosiv = 1.0 / self.mosaic
moscut = mosiv.max() / np.sqrt(2.0)
mmosiv = np.ma.masked_array(mosiv, mosiv < moscut)
self.areaContour = mmosiv.count() * self.da
self.equivArea = mosiv.sum() / mosiv.max() * self.da
[docs] def get_ripple(self):
return self.ripple, self.mosaic_ripple
[docs] def get_survey_speed(self, sigma=1.0e-4, bandwidth=300.e6, num_antennas=30, npol=2):
# return survey speed in sq deg/hour for the given parmeters:
# sigma - target image rms (Jy)
# bandwidth - bandwidth (Hz)
# Na - number of antennas
# npol - number of polarizations
ssFac = bandwidth * npol * num_antennas ** 2 * sigma ** 2 * 3600.
saFac = self.area * (180.0 / pi) ** 2 * ssFac
return (1.0 / self.mosaic).mean() * saFac
[docs] def get_cont_area(self):
return self.areaContour * (180.0 / pi) ** 2
[docs] def _calc_centroid(self, var):
nx, nx2 = self.nx, self.nx2
isig = np.sqrt(1.0 / var)
xfunc = isig.mean(axis=0)
yfunc = isig.mean(axis=1)
cntx = (xfunc * np.arange(0, nx, 1)).sum() / xfunc.sum()
cnty = (yfunc * np.arange(0, nx, 1)).sum() / yfunc.sum()
return cntx, cnty
@staticmethod
[docs] def _find_peak(var):
xy = np.where(var == var.min())
cntx = xy[0][0]
cnty = xy[1][0]
return cntx, cnty
[docs]def make_circle(rad, x0, y0):
th = np.arange(0, 361, 1) * pi / 180
x = x0 + rad * np.cos(th)
y = y0 + rad * np.sin(th)
return x, y