#!/usr/bin/env python
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
import argparse as ap
import sys
import time
import matplotlib.pylab as plt
import numpy as np
from numpy import pi
import pickle
import scipy.special as ssp
from scipy.interpolate import interp1d, interp2d, griddata
import glob
import itertools
from aces.obsplan.config import ACESConfig
[docs]fp_factory = aces_cfg.footprint_factory
[docs]HELP_START = """This estimates the ASKAP survey speed for an optimum setting of the beam separation.
It does this for any available footprint, and gives results as a function of frequency.
The results are saved into python pickle files and used to construct some summary plots.
The -p option will do the plotting without re-doing the time-consuming optimising calculations.
The Fov_method options (chosen with '-m') are:
sefd USe recent SEFD measures across footprint
image Use recent estimates from image noise across mosaic
old Use the original measurement made with BETA (Mark I PAF)
V 2April2019
"""
[docs]explanation = "\n" + HELP_START + "\n\n"
[docs]announce = "\ns_optimise.py\n"
[docs]fp_choices = [a for a in fp_factory.get_footprint_names() if a.startswith("ak:") or a.startswith("xx:")]
[docs]def arg_init():
"""Define the interpretation of command line arguments.
"""
parser = ap.ArgumentParser(prog='s_optimise',
formatter_class=ap.RawDescriptionHelpFormatter,
description=HELP_START,
epilog='See -x for more explanation .')
parser.add_argument('-o', '--output_suffix', default='', help='Optional suffix for output plot files')
parser.add_argument('-f', '--fpnames', nargs='*', choices=fp_choices, default='ak:square_6x6',
help='Footprint name')
parser.add_argument('-m', '--fov_method', choices=('old', 'sefd', 'image'), default='sefd',
help='FoV method to use')
parser.add_argument('-q', '--frequencies', nargs='*', type=float, default=1000., help='Frequencies')
parser.add_argument('-p', '--plot_only', action='store_true', help="Only plot previous results")
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-x', '--explain', action='store_true', help="Give an expanded explanation")
return parser
[docs]class intList(ap.Action):
[docs] def __call__(self, parser, namespace, values, option_string=None):
# print 'ACTION : %r %r %r' % (namespace, values, option_string)
safe_dict = {'range': range}
rp = eval(values, safe_dict)
if isinstance(rp, int):
rp = [rp]
setattr(namespace, self.dest, rp)
[docs]def bsect(x, z):
n = len(x)
j1 = 0
j2 = n
j = j1
while (j2 - j1) > 1:
j = (j1 + j2) / 2
if x[j] == z:
break
if x[j] > z:
j2 = j
else:
j1 = j
if x[j] > z:
j = j - 1
return j
[docs]class Timer:
def __init__(self):
self.t0 = 0.
self.cpu0 = 0.
self.t1 = 0.
self.cpu1 = 0.
self.reset()
[docs] def reset(self):
self.t0 = time.time()
self.cpu0 = time.clock()
self.t1 = self.t0
self.cpu1 = self.cpu0
[docs] def getTotal(self):
return time.time() - self.t0
[docs] def getDelta(self):
t1 = time.time()
delta = t1 - self.t1
self.t1 = t1
return delta
def fit_parabola(x, y):
"""
Fit a parabola to three points
:param x: the three abscissae
:param y: the three ordinates
:return: the coefficients of x**0, x**1, x**2
"""
a = np.zeros([3, 3])
a[:, 0] = 1.0
a[:, 1] = x
a[:, 2] = x**2
ai = np.linalg.inv(a)
b = np.dot(ai, y)
return b
[docs]def fit_parabola(x, y):
"""
Fit a parabola to the given points. If len(x) == 3, find the unique coefficients; otherwise
perform a least-squares fit.
:param x: the three abscissae
:param y: the three ordinates
:return: the coefficients of x**0, x**1, x**2
"""
N = len(x)
a = np.zeros([N,3])
a[:,0] = 1.0
a[:,1] = x
a[:,2] = x**2
if N == 3:
ai = np.linalg.inv(a)
coeff = np.dot(ai,y)
else:
coeff, r, rank, s = np.linalg.lstsq(a, y)
return coeff
[docs]def airy(x):
# Return airy function value at x
# x in units of lambda/D
return 2 * ssp.jn(1, x * pi) / x / pi
[docs]def airy1d(xgrid, xc, wid=1.0):
wf = 0.7095 * wid
aix = airy((xgrid - xc) / wf)
aix[np.isnan(aix)] = 1.0
return aix
[docs]def airy2d(xgrid, xc, yc, wid=1.0):
wf = 0.7095 * wid
x, y = np.meshgrid(xgrid, xgrid)
r = np.sqrt((x - xc) * (x - xc) + (y - yc) * (y - yc))
aix = airy(r / wf)
aix[np.isnan(aix)] = 1.0
return aix
[docs]def gauss1d(M, dxy, xc, wid=1.0):
x0 = - M / 2 * dxy
x1 = M / 2 * dxy
x = np.arange(x0, x1, dxy)
ag2 = wid ** 2 / (4 * np.log(2))
r2 = (x - xc) ** 2 / ag2
z = np.exp(-r2)
return x, z
[docs]def gauss2d(xgrid, xc, yc, wid=1.0):
x, y = np.meshgrid(xgrid, xgrid)
ag2 = wid ** 2 / (4 * np.log(2))
r2 = ((x - xc) ** 2 + (y - yc) ** 2) / ag2
z = np.exp(-r2)
return x, z
[docs]def empiricalFWHM(freq):
# return the empirically deermined FWHM as a function of frequency in MHz.
factor = 1.09
D = 12.0
fwhm = 299.8 / freq / D * factor
return fwhm
[docs]def getGrid(M, w):
xg = np.linspace(-w, w, M)
return xg
[docs]def makeBeams(xgrid, offsets, fwhm):
# compute beam values on given grid
# at offsets given
# Produces voltage beams with the given FWHM once converted to power.
wi = 1.368
wia = wi * fwhm
Nb = offsets.shape[0]
tbeams = None
for i in range(Nb):
xci, yci = offsets[i]
ai0 = airy2d(xgrid, xci, yci, wid=wia)
if i == 0:
tbeams = ai0
else:
tbeams = np.dstack((tbeams, ai0))
if Nb == 1:
tbeams = np.expand_dims(tbeams, axis=2)
abeams = np.transpose(tbeams, (2, 0, 1))
return abeams
[docs]def calcCov(abeams):
Nb = abeams.shape[0]
for i in range(Nb):
psai = []
ai0 = abeams[i, :, :]
d0 = (ai0**2).sum()
for j in range(Nb):
aia = abeams[j, :, :]
num = (ai0*aia).sum()**2
den = d0 * (aia**2).sum()
psai.append(num/den)
ra = np.array(psai)
if i == 0:
ra2 = ra
else:
ra2 = np.vstack((ra2, ra))
cov = np.matrix(ra2)
return cov
# def calcCov(abeams):
# Nb = abeams.shape[0]
# ra2 = None
# for i in range(Nb):
# psai = []
# ai0 = abeams[i, :, :]
# for j in range(Nb):
# aia = abeams[j, :, :]
# psai.append((ai0 * aia).sum())
# psai = np.array(psai)
# ra = psai ** 2 / (ai0 ** 2).sum() ** 2
# if i == 0:
# ra2 = ra
# else:
# ra2 = np.vstack((ra2, ra))
# cov = np.matrix(ra2)
# # print ('cov min,max ', cov.min(), cov.max())
# return cov
#
[docs]def get_cross_sep(offsets):
N = offsets.shape[0]
ret = np.zeros([N, N])
for i in range(N):
for j in range(i, N):
sep = np.linalg.norm(offsets[j] - offsets[i])
ret[i, j] = sep
ret[j, i] = sep
return ret
[docs]def corr_func(x):
# An empirically determined function giving beam to beam correlation coefficient depending on
# beam separation
# x - array of beam-to-beam separations
xf = x.flatten()
# p = [0.8715995, 0.48246021, 0.1284005]
p = [0.98044135, 0.57611785, 0.0]
return np.reshape(p[0] * np.exp(-(xf / p[1]) ** 2) + p[2], x.shape)
[docs]def cv_modelled(offsets, fwhm):
cs = get_cross_sep(offsets) / fwhm
cv = corr_func(cs)
return cv
[docs]def polyval2d(x, y, m):
order = int(np.sqrt(len(m))) - 1
ij = itertools.product(range(order + 1), range(order + 1))
z = np.zeros_like(x)
for a, (i, j) in zip(m, ij):
z += a * x ** i * y ** j
return z
[docs]def fnc(x, a, e):
# note the pattern trimming limit of 4.2 degrees
y = 1.0 / (1 + (x / a) ** e)
y[abs(x) > 4.2] = 0.0
return y
[docs]def designMat_2(X, Y, b):
al = [X, Y]
for f in [0.71, 1.0, 1.41, 2.0, 2.82]:
b1 = f * b
for i in [2, 4, 8]:
al.append(fnc(X, b1, i) * fnc(Y, b1, i))
A = np.array(al).T
return A
[docs]def apodizePAF(xgrid, centre=None, posang=0.0, unit=False, fov_method='sefd'):
if fov_method == 'sefd':
return apodizePAF_new(xgrid, centre, posang, unit, from_image=False)
elif fov_method == 'image':
return apodizePAF_new(xgrid, centre, posang, unit, from_image=True)
elif fov_method == 'old':
return apodizePAF_old(xgrid, centre, posang, unit)
else:
print ("apodizePAF : unknown FoV method {}".format(fov_method))
sys.exit()
[docs]def apodizePAF_old(xgrid, centre=None, posang=0.0, unit=False):
# Define an apodizing function that mimics the observed sensitivity variation across the PAF.
# formed from fit to a set of rational functions, parameters in variable m
# Expects xgrid (in radians), centre and PA (position angle) both in radians.
# Needs access to functions designMat and fnc
# print ("apodizePAF_old ", xgrid.shape, xgrid[:3])
if centre is None:
centre = [0.0, 0.0]
if unit:
apod = np.ones((xgrid.shape[0], xgrid.shape[0]))
else:
designMat = designMat_2
# coeff = np.array([1.28107531e+01, 4.56911754e-01, 5.17826052e-01,-1.51419796e+02,-1.49542178e+01,
# -1.58078435e+00, 6.77485394e+02, 4.67563591e+01, -6.80814573e+00,-1.41500519e+03,
# 1.23011095e+02, 4.87414906e+01, 1.10677965e+03, -5.81216252e+02, 1.55424512e+02])
# These parameters were derived from SBIDs around ~2024 (2016-September) with 48MHz bandwidth
# near 1304 MHz, using antennas :
# ant4,ant5,ant10,ant12,ant13,ant14,ant16,ant24,ant27,ant28,ant30
coeff = np.array([5.09633567e-04, -6.67986801e-04, -1.53786901e+01, -1.90039866e+00,
-5.02886076e-01, 1.10132646e+02, 1.07159428e+01, 9.09866150e-01,
-3.56162780e+02, -1.44262434e+01, 3.35535219e+00, 6.17456821e+02,
-1.26227972e+02, -2.47940423e+01, -4.38376122e+02, 4.48199146e+02,
-2.12011984e+02])
b = 3.84
SMALL = 1.0e-4
cosa, sina = np.cos(posang), np.sin(posang)
xg, yg = np.meshgrid(xgrid, xgrid)
Xr = np.degrees(xg.flatten())
Yr = np.degrees(yg.flatten())
X = cosa * Xr - sina * Yr
Y = sina * Xr + cosa * Yr
xc, yc = centre
xcd = np.degrees(cosa * xc - sina * yc)
ycd = np.degrees(sina * xc + cosa * yc)
xr = (X - xcd)
yr = (Y - ycd)
A = designMat(xr, yr, b)
apod = np.reshape(np.dot(A, coeff), xg.shape)
# trimming the pattern is done in fnc(x,a,e) with a hard coded value!
rlim = np.radians(4.7)
# xylim = np.radians(4.0)
apod[(xg - xc) ** 2 + (yg - yc) ** 2 > rlim ** 2] = SMALL
apod[apod < 0.0] = SMALL
apod /= apod.max()
return apod
[docs]def apodizePAF_new(xgrid, centre=None, posang=0.0, unit=False, from_image=True):
# Define an apodizing function that mimics the observed sensitivity variation across the PAF.
# This version is deduced from image sensitivity across 36-beam mosaics.
# See notebook "FoV_from_rms_tiles"
# Expects xgrid (in radians), centre and PA (position angle) both in radians.
# (centre and PA ignored at present.)
if centre is None:
centre = [0.0, 0.0]
SMALL = 1.0e-4
if unit:
apod = np.ones((xgrid.shape[0], xgrid.shape[0]))
else:
if from_image:
p = open('/Users/mcc381/software/jupyter/FoV/fov_1.05.pkl', 'rb')
else:
p = open('/Users/mcc381/software/jupyter/FoV/fov_sefd_1.05.pkl', 'rb')
package = pickle.load(p)
pitch, newp, newm = package
cosa, sina = np.cos(posang), np.sin(posang)
xg, yg = np.meshgrid(xgrid, xgrid)
Xr = np.degrees(xg.flatten())
Yr = np.degrees(yg.flatten())
X = cosa * Xr - sina * Yr
Y = sina * Xr + cosa * Yr
xc, yc = centre
xcd = np.degrees(cosa * xc - sina * yc)
ycd = np.degrees(sina * xc + cosa * yc)
xr = (X - xcd)
yr = (Y - ycd)
xr = xr.reshape(xg.shape)
yr = yr.reshape(yg.shape)
zti = griddata(newp, newm, (xr, yr), method='linear', fill_value=SMALL)
# apod = np.sqrt(zti/zti.max())
apod = zti / zti.max()
return apod
[docs]def draw_apod(fov_meth, levels=None, grid_hwid=3.0):
M = 81
xgrid = getGrid(M, np.radians(grid_hwid))
xm, ym = np.meshgrid(xgrid, xgrid)
apod = apodizePAF(xgrid, fov_method=fov_meth)
xymax = np.where(apod == apod.max())
xmax = xm[xymax[0][0], xymax[1][0]]
ymax = ym[xymax[0][0], xymax[1][0]]
if levels:
levs = levels
else:
levs = [0.95 * 0.9 ** p for p in range(8)]
CS = plt.contour(np.degrees(xm), np.degrees(ym), apod, levels=levs)
plt.clabel(CS, CS.levels, inline=True, fmt="%4.2f", fontsize=10)
plt.plot([xmax], [ymax], '+', ms=20)
"""
Define PAF map routines
"""
# def make_paf_map(c0, beams, sefds=None):
# # Expect Matrix C0, the normalised covariance matrix (computed from model beam weights in calCov)
# # beams[M,M,Nb] being the power response of each beam on an MxM grid.
# # If sefds is defined, it is an array of measured SEFD values (in Jy) in beam order.
# # Returns PAF map s in signal variance.
# # s is for the given beam covariance matrix
# M = beams.shape[1]
# Nb = beams.shape[0]
# W = np.matrix([1.0] * Nb).transpose()
# WT = W.transpose()
#
# if sefds is None:
# # var = 1900.0 ** 2
# var = 1.0 ** 2
# v = np.array([var] * Nb)
# else:
# v = sefds ** 2
#
# C = np.matrix(np.zeros([Nb, Nb]))
#
# for i in range(Nb):
# for j in range(Nb):
# C[i, j] = c0[i, j] * np.sqrt(v[i] * v[j])
#
# C1 = np.matrix(np.zeros([Nb, Nb]))
#
# s = np.zeros([M, M])
# for p in range(M):
# for q in range(M):
# for i in range(Nb):
# for j in range(Nb):
# C1[i, j] = C[i, j] / (beams[i, p, q] * beams[j, p, q])
# Ci = np.linalg.inv(C1)
# # si = (WT * Ci * W) ** -1
# si = Ci.sum() ** -1
# s[p, q] = si
# # The following step is necessary to supress negative values that can appear around the edge
# # of the field, presumably from numerically extreme values that violate assumptions about the
# # floating point arithmetic.
# s[s < 0.0] = s.max()
# return s
[docs]def make_paf_map(cov, beams, sefds=None):
"""
Generate a sensitivity map over the PAF field of view
:param cov: the normalised covariance matrix (computed from model beam weights in calCov)
:param beams: the power response of each beam on an MxM grid; shape = (Nb, M, M)
:param sefds: If defined, is an array of measured SEFD values (in Jy) in beam order.
:return: s: the variance across the smae grid defined in beams.
"""
M = beams.shape[1]
Nb = beams.shape[0]
if sefds is None:
var = 1.0
v = np.array([var]*Nb)
else:
v = sefds**2
C = np.matrix(np.zeros([Nb, Nb]))
for i in range(Nb):
for j in range(Nb):
C[i, j] = cov[i, j]*np.sqrt(v[i]*v[j])
covi = np.linalg.inv(C)
s = np.zeros([M, M])
for p in range(M):
for q in range(M):
b = beams[:, p, q].T
si = 1.0/np.dot(b.T, np.dot(covi, b).T)
s[p, q] = si
s[s<0.0] = s.max()
return s
# def makePAFmap2(C0, beams):
# # Expect Matrix C0, the normalised covariance matrix (computed from model beam weights in calCov)
# # beams[M,M,Nb] being the powe response of each beam on an MxM grid.
# # Returns two PAF maps, s and t in signal variance.
# # t is for no correlation between beams
# # s is for the given beam covariance matrix
# M = beams.shape[1]
# Nb = beams.shape[0]
# W = np.matrix([1.0] * Nb).transpose()
# D0 = np.diag([1.0] * Nb)
#
# v = np.array([1.0] * Nb)
# C = np.matrix(np.zeros([Nb, Nb]))
# D = np.matrix(np.zeros([Nb, Nb]))
#
# for i in range(Nb):
# D[i, i] = D0[i, i] * v[i]
# for j in range(Nb):
# C[i, j] = C0[i, j] * np.sqrt(v[i] * v[j])
#
# C1 = np.matrix(np.zeros([Nb, Nb]))
# D1 = np.matrix(np.zeros([Nb, Nb]))
#
# s = np.zeros([M, M])
# t = np.zeros([M, M])
# for p in range(M):
# for q in range(M):
# for i in range(Nb):
# for j in range(Nb):
# C1[i, j] = C[i, j] / (beams[i, p, q] * beams[j, p, q])
# D1[i, j] = D[i, j] / (beams[i, p, q] * beams[j, p, q])
# Ci = np.linalg.inv(C1)
# si = (W.transpose() * Ci * W) ** -1
# s[p, q] = si
#
# Di = np.linalg.inv(D1)
# ti = (W.transpose() * Di * W) ** -1
# t[p, q] = ti
#
# return s, t
[docs]class pafmap(object):
def __init__(self, freq, xgrid, pitch):
# xgrid - 1D grid used to form 2D map
# fwhm - beam full width at half max
# fpname - footprint name
# pitch - beam separation
# freq - frequency (MHz)
#
# All angles in radians.
#
# self.fpname = fpname
self.xg = xgrid
self.nx = self.xg.shape[0]
self.nx2 = self.nx / 2
self.dx = xgrid[1] - xgrid[0]
self.da = self.dx ** 2
self.area = self.da * self.nx ** 2
# self.fwhm = fwhm
self.pitch = pitch
# self.pif = pitch / fwhm
self.freq = freq
self.hx = max(2, int(self.pitch / self.dx / 2 + 0.5))
self.maps = []
self.offsets = []
self.ripple = []
self.eqArea = []
self.centroids = []
self.mosaic = None
self.mosaic_ripple = 0.0
self.mosaic_centroid = []
self.areaContour = 0.0
self.equivArea = 0.0
self.survey_params = []
[docs] def addmap(self, var, offsets):
# expects the variance map and the footprint interleaving offsets
self.maps.append(var)
self.offsets.append(offsets)
xc, yc = self._calcCentroid(var)
self.centroids.append((xc, yc))
# for ripple calculation, centre the region on the footprint centre
# xc,yc = offsets
ixc, iyc = int(xc + 0.5), int(yc + 0.5)
ix, jx = max(0, ixc - self.hx), min(self.nx, ixc + self.hx)
iy, jy = max(0, iyc - self.hx), min(self.nx, iyc + self.hx)
cen = np.sqrt(1.0 / var[iy:jy, ix:jx])
ri = (cen.max() - cen.min()) / cen.mean()
self.ripple.append(ri)
iv = 1.0 / var
self.eqArea.append(iv.sum() / iv.max() * self.da)
[docs] def makeLinMos(self):
Ni = len(self.maps)
denom = 1.0 / self.maps[0]
for s in self.maps[1:]:
denom += 1.0 / s
self.mosaic = Ni / denom
xc, yc = self._calcCentroid(self.mosaic)
self.mosaic_centroid = (xc, yc)
# for ripple calculation of the mosaic, use the calculated centroid for the region
ixc, iyc = int(xc + 0.5), int(yc + 0.5)
ix, jx = max(0, ixc - self.hx), min(self.nx, ixc + self.hx)
iy, jy = max(0, iyc - self.hx), min(self.nx, iyc + self.hx)
# note ordering of indices (y,x) is correct for np arrays
cen = np.sqrt(1.0 / self.mosaic[iy:jy, ix:jx])
self.mosaic_ripple = (cen.max() - cen.min()) / cen.mean()
mosiv = 1.0 / self.mosaic
moscut = mosiv.max() / np.sqrt(2.0)
mmosiv = np.ma.masked_array(mosiv, mosiv < moscut)
self.areaContour = mmosiv.count() * self.da
self.equivArea = mosiv.sum() / mosiv.max() * self.da
[docs] def getRipple(self):
return self.ripple, self.mosaic_ripple
[docs] def getSS(self, sigma=1.0e-4, BW=288.e6, Na=36, npol=2):
# return survey speed in sq deg/hour for the given parmeters:
# sigma - target image rms (Jy)
# BW - bandwidth (Hz)
# Na - number of antennas
# npol - number of polarizations
ssFac = BW * npol * Na ** 2 * sigma ** 2 * 3600.
saFac = self.da * (180.0 / pi) ** 2 * ssFac
self.survey_params = [sigma, BW, Na, npol]
return (1.0 / self.maps[0]).sum() * saFac
[docs] def get_survey_params(self):
return self.survey_params
# def get_sa_fac(self, sigma=1.0e-4, BW=300.e6, Na=30, npol=2):
# # return survey speed in sq deg/hour for the given parmeters:
# # sigma - target image rms (Jy)
# # BW - bandwidth (Hz)
# # Na - number of antennas
# # npol - number of polarizations
# ssFac = BW * npol * Na ** 2 * sigma ** 2 * 3600.
# saFac = self.area * (180.0 / pi) ** 2 * ssFac
# return saFac
[docs] def getContArea(self):
return self.areaContour * (180.0 / pi) ** 2
[docs] def _calcCentroid(self, var):
nx, nx2 = self.nx, self.nx2
isig = np.sqrt(1.0 / var)
xfunc = isig.mean(axis=0)
yfunc = isig.mean(axis=1)
cntx = (xfunc * np.arange(0, nx, 1)).sum() / xfunc.sum()
cnty = (yfunc * np.arange(0, nx, 1)).sum() / yfunc.sum()
if np.isnan(cntx) or np.isnan(cnty):
print (var.min(), var.max())
return cntx, cnty
@staticmethod
[docs] def _findPeak(var):
"""
:param var:
:return:
"""
xy = np.where(var == var.min())
cntx = xy[0][0]
cnty = xy[1][0]
return cntx, cnty
[docs]def makeCircle(rad, x0, y0):
th = np.arange(0, 361, 1) * pi / 180
x = x0 + rad * np.cos(th)
y = y0 + rad * np.sin(th)
return x, y
[docs]def ASKAP_SEFD(freq):
# Evaluate the polynomial approximation to measured SEFD for ASKAP (on boresight) for any frequency
# in the range [700, 1800] MHz.
# The argument can be a scalar or an ndarray.
# The SEFD is returned in Jy, as a scalar or ndarray according to the input.
tsys = ASKAP_tsys_on_eff(freq)
SEFD = tsys * (1380.0 * 2) / 113.1
return SEFD
[docs]def ASKAP_tsys_on_eff(freq):
# Evaluate the polynomial approximation to measured Tsys/eff for ASKAP (on boresight) for any frequency
# in the range [700, 1800] MHz.
# The argument can be a scalar or an ndarray.
# The Tsys/eff is returned in K, as a scalar or ndarray according to the input.
# The following line is written by the fitting routine in python notebook SEFD/full_spectrum
coeff = [5.18075082126351e-34, -6.766881661087157e-30, 3.8245980771420834e-26,
-1.1889614543042915e-22, 2.0255289080361158e-19, -1.0077294751671198e-16,
-3.687374327312008e-13, 1.0505041199723573e-09, -1.4531507807579093e-06,
0.0012726466808857277, -0.737013079528667, 275.33155694172655, -60388.60533807194,
5926644.561754142]
tsys = np.polyval(coeff, freq)
return tsys
# Define pafmap class, subXaxis and 2d interp for guessing best pitch.
[docs]def subXaxis(ax1, scale, label):
ax2 = ax1.twiny()
# pitch = np.degrees(fwhm*np.array(pif))
pift = np.array(ax1.get_xticks())
pitcht = ["%4.2f" % a for a in pift * scale]
# Move twinned axis ticks and label from top to bottom
ax2.xaxis.set_ticks_position("bottom")
ax2.xaxis.set_label_position("bottom")
# Offset the twin axis below the host
ax2.spines["bottom"].set_position(("axes", -0.15))
# Turn on the frame for the twin axis, but then hide all
# but the bottom spine
ax2.set_frame_on(True)
ax2.patch.set_visible(False)
for sp in ax2.spines.values():
sp.set_visible(False)
ax2.spines["bottom"].set_visible(True)
ax2.set_xticks(pift)
ax2.set_xticklabels(pitcht)
ax2.set_xlabel(label)
# ax2.set_xlim(xl, xu)
# def find_best_pitch(fpname, xg, freqs, pitchesf, fov_meth, **kw):
#
# pitchesf = np.array(pitchesf)
# ret = {}
# tune_interp = {}
# for freq, pitches in zip(freqs, pitchesf):
# fwhm = empiricalFWHM(freq)
# print ("Freq = {:6.1f}, FWHM = {:5.2f} deg".format(freq, np.degrees(fwhm)))
# spds = []
# for pitch in pitches:
# fp = fp_factory.make_footprint(fpname, np.radians(pitch))
# offsets = np.array(fp.offsetsRect)
#
# ioff = np.array([0., 0.])
# pmap = pafmap(xg, freq)
# abeams = makeBeams(xg, offsets, fwhm)
# C0 = cv_modelled(offsets, fwhm)
# # C0 = calcCov(abeams)
# apod = apodizePAF(xg, fov_method=fov_meth)
# beams = apod * abeams ** 2
# beams[beams == 0.0] = 1.0e-6
# s = make_paf_map(C0, beams)
# pmap.addmap(s, ioff)
# pmap.makeLinMos()
# spds.append(pmap.getSS(**kw))
# spds = np.array(spds)
# fp = interp1d(pitches, spds, kind='cubic')
# pfine = np.linspace(pitchesf.min(), pitchesf.max(), 50)
# mfine = fp(pfine)
# ipk = np.where(mfine == mfine.max())[0][0]
# ret[freq] = pfine[ipk]
# tune_interp[freq] = (pfine, mfine)
# return ret, tune_interp
[docs]def find_best_pitch(fpname, freq, fov_meth):
# print ("find_best_pitch")
fp = fp_factory.make_footprint(fpname, 0.01)
n1 = np.sqrt(fp.n_beams)
fwhm = empiricalFWHM(freq)
# pitch0 = estimate_pitch(fpname, freq)
# lolim, hilim = 0.7 * pitch0, 1.3 * pitch0
# lolim = max(lolim, fwhm/3.0)
lolim, hilim = 0.65, 1.3
pitches = np.array([lolim, hilim])
M = 81
grid_size = np.radians(7.0)
xg = getGrid(M, grid_size)
ret = {}
tune_interp = {}
k = 0
p1, p3 = np.radians(pitches)
dp = p3 - p1
p2 = (p1 + p3)/2.
xp = np.array([])
yp = np.array([])
while k < 3:
k += 1
ea1 = get_eq_area(fpname, p1, freq, xg, fov_meth)
ea2 = get_eq_area(fpname, p2, freq, xg, fov_meth)
ea3 = get_eq_area(fpname, p3, freq, xg, fov_meth)
xp = np.concatenate((xp, [p1, p2, p3]))
yp = np.concatenate((yp, [ea1, ea2, ea3]))
c, b, a = fit_parabola(xp, yp)
p2 = -b / (2.0 * a)
print ('new peak at ', p2 * 180.0 / pi)
dp /= 2.0
p1, p3 = p2 - dp / 2, p2 + dp / 2.
xp *= 180.0/np.pi
snx = np.argsort(xp)
# print xp
# print xp[snx]
fp = interp1d(xp[snx], yp[snx], kind='cubic')
pfine = np.linspace(pitches[0], pitches[1], 50)
mfine = fp(pfine)
ipk = np.where(mfine == mfine.max())[0][0]
ret[freq] = pfine[ipk]
tune_interp[freq] = (pfine, mfine)
fig, ax = plt.subplots(1, 1)
ax.plot(pfine, mfine)
ax.plot(xp, yp, 'o')
fig.savefig('bp{:.0f}.png'.format(freq))
return ret, tune_interp
[docs]def get_eq_area(fpname, pitch, freq, xg, fov_meth):
# print ("****** get_eq_area")
# print (fpname, pitch*180.0/np.pi, freq, xg.shape, fov_meth)
tiny = 1.0e-9
fp = fp_factory.make_footprint(fpname, pitch)
fwhm = empiricalFWHM(freq)
offsets = np.array(fp.offsetsRect)
ioff = np.array([0., 0.])
pmap = pafmap(freq, xg, fp.pitch_scale)
abeams = makeBeams(xg, offsets, fwhm)
# C0 = cv_modelled(offsets, fwhm)
C0 = calcCov(abeams)
apod = apodizePAF(xg, fov_method=fov_meth)
beams = apod * abeams ** 2 + tiny
# beams[beams < 1.0e-6] = 1.0e-6
s = make_paf_map(C0, beams)
pmap.addmap(s, ioff)
pmap.makeLinMos()
# print (freq, pitch*180.0/np.pi, pmap.equivArea)
return pmap.equivArea
[docs]def estimate_pitch(fpname, freq):
nb = fp_factory.make_footprint(fpname, 1.0 * pi / 180.).n_beams
est = np.sqrt(32./nb)
est = 1.0
return est
[docs]def calcSummary(fpname, freq, fov_meth, **kw):
tmr = Timer()
bp, tune_intp = find_best_pitch(fpname, freq, fov_meth)
bestPitch = bp[freq]
print ("{:5.2f} {:6.2f}s".format(bestPitch, tmr.getDelta()))
pitch = np.radians(bestPitch)
fp = fp_factory.make_footprint(fpname, pitch)
iOffsets = np.vstack((np.array([[0.0, 0.0]]), fp.interOffsRect))
Nb = fp.n_beams
offsets = np.array(fp.offsetsRect)
fwhm = empiricalFWHM(freq)
grid_size = fp.tile_offsets[0][0] + fwhm/2.
M = 121
xg = getGrid(M, grid_size)
sefd = ASKAP_SEFD(freq)
sefds = np.array([sefd] * Nb)
pmap = pafmap(freq, xg, pitch)
for i, ioff in enumerate(iOffsets):
offsets_i = offsets + ioff
abeams = makeBeams(xg, offsets_i, fwhm)
# C0 = cv_modelled(offsets_i, fwhm)
C0 = calcCov(abeams)
apod = apodizePAF(xg, centre=ioff, fov_method=fov_meth)
fig, ax = plt.subplots(1, 1, figsize=(6., 6.))
ax.imshow(apod, origin='lower')
fig.savefig("temp_apod_{:d}.pdf".format(i))
# print ("apod range {:f} {:f}".format(apod.min(), apod.max()))
beams = apod * abeams ** 2
beams[beams < 1.0e-6] = 1.0e-6
fig, ax = plt.subplots(6, 6, figsize=(6., 6.))
for j, axi in zip(range(36), ax.flat):
axi.imshow(beams[j], origin='lower')
fig.savefig("temp_beams_{:d}.pdf".format(i))
# print ("beams range {:f} {:f}".format(beams.min(), beams.max()))
s = make_paf_map(C0, beams, sefds=sefds)
fp_off = np.array([bsect(xg, ioff[0]), bsect(xg, ioff[1])])
pmap.addmap(s, fp_off)
pmap.makeLinMos()
# print ("equiv area {:.2f}".format(pmap.equivArea*(180.0/np.pi)**2))
surveySpeed = pmap.getSS(**kw)
do_unit = False
if do_unit:
pmapU = pafmap(freq, xg, pitch)
for i, ioff in enumerate(iOffsets):
offsets_i = offsets + ioff
abeams = makeBeams(xg, offsets_i, fwhm)
# C0 = cv_modelled(offsets_i, fwhm)
C0 = calcCov(abeams)
apod = apodizePAF(xg, centre=ioff, unit=True, fov_method=fov_meth)
beams = apod * abeams ** 2
beams[beams < 1.0e-6] = 1.0e-6
s = make_paf_map(C0, beams, sefds=sefds)
fp_off = np.array([bsect(xg, ioff[0]), bsect(xg, ioff[1])])
pmapU.addmap(s, fp_off)
# print ("{:.2f} {:d} {:6.2f}s".format(pitch*180.0/np.pi, i, tmr.getDelta()))
pmapU.makeLinMos()
else:
pmapU = pmap
# Now compute a tiled mosaic from four interleaved observations. This is to show the level of FoV
# ripple.
tp = fp.tile_offsets
if len(tp) < 2:
tp = np.array([1.0,1.0])*bestPitch*6*pi/180.0
xshift, yshift = tp[0][0], tp[1][1]
v0 = pmap.mosaic
m = 1./v0
f2 = interp2d(xg, xg, m, kind='cubic')
xh = getGrid(161,np.radians(10))
u1 = f2(xh+xshift/2.,xh+yshift/2.)
u2 = f2(xh-xshift/2.,xh+yshift/2.)
u3 = f2(xh-xshift/2.,xh-yshift/2.)
u4 = f2(xh+xshift/2.,xh-yshift/2.)
tol = 1.0e-12
u1[u1<tol] = tol
u2[u2<tol] = tol
u3[u3<tol] = tol
u4[u4<tol] = tol
vmos = (u1 + u2 + u3 + u4)
smos = np.sqrt(vmos)
print ('Equiv area : ', pmap.equivArea * (180.0/np.pi)**2)
print ('Eq area : ', pmap.eqArea[0] * (180.0/np.pi)**2)
print ('ripple : ', pmap.ripple, pmapU.ripple)
print ('ripple (mosaiced) : ', pmap.mosaic_ripple, pmapU.mosaic_ripple)
return pmap, pmapU, fp, bestPitch, smos, tune_intp, surveySpeed
[docs]def plot_opt(suffix):
files = glob.glob("results_ak:*{}.pkl".format(suffix))
files += glob.glob("results_xx:*{}.pkl".format(suffix))
tmp = [a[8:-4] for a in files]
fps = []
for f in tmp:
if f.endswith(suffix):
j = f.index(suffix)
fps.append(f[:j])
else:
fps.append(f)
print (fps)
ssScale = 1.0
cols = {'ak:square_6x6': 'k', 'xx:square_8x8': 'b', 'ak:closepack36': 'r', 'closepack30': 'g', 'rectangle_6x6': 'b',
'xx:closepack70': 'g', 'xx:closepack56': 'c'}
mrkr = {'ak:square_6x6': 's', 'xx:square_8x8': 's', 'ak:closepack36': 'h', 'closepack30': 'h', 'rectangle_6x6': 'd',
'xx:closepack70': 'h', 'xx:closepack56': 'h'}
msize = 4
for k in fps:
if k not in cols:
cols[k] = '0.5'
mrkr[k] = 'x'
labs = {'bp': ['Best pitch', '\n', '(deg)'], 'ss': ['Survey speed', '\n', '(sq deg/hr)'],
'ri': ['Interleaved ripple'], 'ea': ['Equivalent area', '\n', '(sq deg)'],
'eq': ['Equivalent area', '\n', '(sq deg)'],
'rr': ['Ripple'], 'tr': ['Tile ripple']}
# pfunc = {'bp': ax.plot, 'ss': plt.plot, 'ri': plt.plot, 'ea': plt.plot, 'eq': plt.plot, 'rr': plt.plot}
# vks = ['bp', 'ss', 'eq', 'rr', 'ri', 'tr']
vks = ['bp', 'ss', 'eq', 'rr', 'ri']
# iknd = {'bp':'cubic','ss':'cubic','ri':'linear','ea':'cubic'}
ny = len(vks)
fig, axes = plt.subplots(ny, 1, figsize=[7., 12.])
fig.subplots_adjust(wspace=0.65, hspace=0.05)
for ax in axes.flat:
ax.label_outer()
for vk, ax in zip(vks, axes.flat):
# ax.grid()
for fpn in fps:
j = fpn.index(':')+1
flab = fpn[j:]
col = cols[fpn]
mk = mrkr[fpn]
fpname, resf = pickle.load(open("results_{}{}.pkl".format(fpn, suffix), 'rb'))
frs = []
vals = []
for fr in resf.keys():
res = resf[fr]
frs.append(fr)
if vk == 'rr':
vals.append(res[vk][0])
else:
vals.append(res[vk])
z = zip(frs, vals)
z.sort()
frs = np.array([a[0] for a in z])
vals = np.array([a[1] for a in z])
# if vk == 'bp':
# fwhm = 300./frs/12.*1.12 * 180./np.pi
# vals = vals/fwhm
if vk == 'ss':
vals = vals * ssScale
print ("%16s" % fpn, vk, frs, vals)
fn = interp1d(frs, vals, kind='quadratic')
frf = np.linspace(frs[0], frs[-1], 100)
vf = fn(frf)
if vk == 'ri' or vk == 'xrr':
ax.plot(frs, vals, '-', marker=mk, ms=msize, c=col, label=flab)
else:
ax.plot(frf, vf, '-', c=col)
ax.plot(frs[:-1], vals[:-1], marker=mk, ms=msize, ls='none', c=col, label=flab)
ax.set_xlim(680, 1805)
ax.grid()
ax.set_ylabel(' '.join(labs[vk]))
if vk == vks[-1]:
ax.set_xlabel('Frequency (MHz)')
if vk == vks[0]:
ax.legend(loc='center left', fontsize=10, bbox_to_anchor=(0.75, 0.75))
if vk == 'ss' and 'sp' in res:
sp = res['sp']
sig, bw, na, npol = sp
x, y = 1580., 200.
dy = y/10.
ax.text(x, y, "BW = {:.0f} MHz".format(bw * 1.e-6))
ax.text(x, y-dy, "{:d} antennas".format(na))
ax.text(x, y-dy*2, "{:d} polarizations".format(npol))
ax.text(x, y-dy*3, r'$\sigma$ = {:.3f} mJy'.format(sig * 1.e3))
ax.yaxis.set_label_coords(-0.075, 0.5)
pname = 'optimisedVals{}.pdf'.format(suffix)
fig.savefig(pname, dpi=300)
print ('Plotted to {}'.format(pname))
[docs]def plot_pitch_tuning(suffix):
files = glob.glob("results_ak:*{}.pkl".format(suffix))
files += glob.glob("results_xx:*{}.pkl".format(suffix))
tmp = [a[8:-4] for a in files]
fps = []
for f in tmp:
if f.endswith(suffix):
j = f.index(suffix)
fps.append(f[:j])
else:
fps.append(f)
cols = {'ak:square_6x6': 'k', 'xx:square_8x8': 'b', 'ak:closepack36': 'r', 'xx:closepack56': 'c', 'xx:closepack70': 'g'}
lwi = {'ak:square_6x6': 1.5, 'xx:square_8x8': 1.3, 'ak:closepack36': 1.5, 'xx:closepack56': 1.3, 'xx:closepack70': 1.3}
tuning = {}
xlim = [100., 0.]
ylim = [100., 0.]
freqs = []
for fpn in fps:
fpname, resf = pickle.load(open("results_{}{}.pkl".format(fpn, suffix), 'rb'))
freqs = sorted(resf.keys())[:-1:2]
dat = {}
dat['freq'] = freqs
xexs, yexs = [], []
for frq in freqs:
xex = resf[frq]['ex'][frq][0]
yex = resf[frq]['ex'][frq][1]
xexs.append(xex)
yexs.append(yex)
xlim[0] = min(xlim[0], xex.min())
xlim[1] = max(xlim[1], xex.max())
ylim[0] = min(ylim[0], yex.min())
ylim[1] = max(ylim[1], yex.max())
dat['xex'] = xexs
dat['yex'] = yexs
tuning[fpn] = dat
for k in tuning.keys():
if k not in cols:
cols[k] = '0.5'
lwi[k] = 1.0
ylims = {}
ymin = 1.0e3
ymax = 0.0
for fp in fps:
for i, frq in enumerate(freqs):
ymin = min(ymin, tuning[fp]['yex'][i].min())
ymax = max(ymax, tuning[fp]['yex'][i].max())
ylims[frq] = [ymin, ymax]
fpnames = tuning.keys()
ny, nx = (len(freqs)+1)/2, 2
fig, axes = plt.subplots(ny, nx, figsize=(8., 5.))
axt = axes.transpose()
fig.subplots_adjust(wspace=0.02, hspace=0.03)
for i, (frq, ax) in enumerate(zip(freqs, axt.flat)):
for fp in fpnames:
j = fp.index(':')+1
flab = fp[j:]
xex = tuning[fp]['xex'][i]
# yex = tuning[fp]['yex'][i] / ylims[frq][1]
yex = tuning[fp]['yex'][i]
yex /= yex.max()
ax.plot(xex, yex, c=cols[fp], label=flab, lw=lwi[fp])
# axes[0,0].plot(xex, yex, c=cols[fp], label=flab, lw=lwi[fp])
ax.set_xlim(xlim[0], xlim[1])
# ax.set_ylim(ylims[frq][0], ylims[frq][1])
ax.set_ylim(0.7, 1.04)
ax.grid()
ax.set_xlabel('Pitch (deg)')
ax.text(1.15, 0.75, "{:.0f} MHz".format(frq), ha='right', va='center')
if i == 0:
ax.set_ylabel('Relative' '\n' ' survey speed ', rotation='horizontal')
ax.yaxis.set_label_coords(-0.05, 1.01)
for ax in axt.flat:
ax.label_outer()
# ax.set_title('Pitch tuning curve at %d MHz'%frq,loc='left')
# ax.legend(loc='center left', fontsize=10, bbox_to_anchor=(0.45, 0.28))
ax.legend()
pname = 'pitch_tuning{}.pdf'.format(suffix)
fig.savefig(pname, dpi=300)
print ('Plotted to {}'.format(pname))
[docs]def main():
# parse command line options
print (announce)
args = arg_init().parse_args()
verbose = args.verbose
if args.explain:
print (explanation)
sys.exit(0)
if verbose:
print ("ARGS = ", args)
suffix = args.output_suffix
if len(suffix) > 0:
suffix = '_' + suffix
fov_meth = args.fov_method
do_calc = not args.plot_only
fp_names = args.fpnames
freqs = args.frequencies
kw = {'sigma': 100.0e-6, 'BW': 288.e6, 'Na': 36}
results = {}
if do_calc:
for fpname in fp_names:
resf = {}
pmaps = {}
for freq in freqs:
print (fpname, freq)
pmap, pmapU, fp, bestPitch, smos, tune_interp, surveySpeed = calcSummary(fpname, freq, fov_meth, **kw)
plotFootprint(pmap, fpname, fp, bestPitch, fov_meth)
pmaps[freq] = pmap
m = smos.shape[0]
cmin = smos[m-10:m+10, m-10:m+10].min()
smsk = np.ma.masked_array(smos, mask=(smos < cmin))
res = {}
res['fp'] = fpname
res['fr'] = freq
res['ss'] = surveySpeed
res['bp'] = bestPitch
res['ex'] = tune_interp
res['ri'] = pmapU.mosaic_ripple
res['rr'] = pmap.ripple
res['eq'] = pmap.eqArea[0] * (180 / pi) ** 2
res['ea'] = pmap.equivArea * (180 / pi) ** 2
res['tr'] = smsk.std() / smsk.mean()
res['sp'] = pmap.get_survey_params()
resf[freq] = res
plt.clf()
plt.imshow(smos)
plt.title("{} {}".format(fpname, freq))
plt.savefig("smos_{:d}.png".format(int(freq)))
results[fpname] = resf
pname = "results_{}{}.pkl".format(fpname, suffix)
pickle.dump((fpname, resf), open(pname, 'w'))
plot_opt(suffix)
plot_pitch_tuning(suffix)
if __name__ == "__main__":
sys.exit(main())