#!/usr/bin/env python
from __future__ import print_function
import time
import argparse as ap
import sys
import os
import re
import numpy as np
import matplotlib as mpl
mpl.use('Agg') # this line must precede pylab import to suppress display
import matplotlib.pylab as plt # noqa
import aces.obsplan.tile as tile # noqa
from aces.obsplan.sphere_plotting import SphereView # noqa
from askap.footprint import Skypos # noqa
from askap.parset import ParameterSet # noqa
from aces.obsplan.config import ACESConfig # noqa
from aces.obsplan import logger # noqa
from askap import logging # noqa
from askap.coordinates import parse_direction # noqa
from aces.askapdata.schedblock import SchedulingBlock # noqa
[docs]explanation = """
Explanation
tile_plot.py produces three files for displaying pointing information from an observing parset or from the SBID of an
observation. They are:
- <xxx>.png
- <xxx>.ann
- <xxx>_footprint.sh
where <xxx> is either the parset name or the SBID. For example:
> tile_plot.py RACS_test3_1.05
access the parset RACS_test3_1.05.parset (which must exist in the working directory) and yields:
- RACS_test3_1.05.png
- RACS_test3_1.05.ann
- RACS_test3_1.05_footprint.sh
whereas
> tile_plot.py 8545
yields: SBID_8545.png, SBID_8545.ann, SBID_8545_footprint.sh
The .png file is a plot of the whole sky with tiles marked.
The .ann fle is an overlay file suitable for kvis
The .reg fle is an overlay file using DS9 syntax
The x_footprint.sh file is a bash script that will execute footprint-plan.py for each tile to generate lists of
beam positions and kvis overlay files, one per tile, showing the beam positions.
"""
[docs]description = """
Description goes here
"""
# todo: sort plotting projections
[docs]timestr = time.strftime("%Y%m%d-%H%M%S")
logging.basicConfig(filename='survey-plan-{}.log'.format(timestr), level=logging.DEBUG,
filemode='w', format=FORMAT_FILE)
[docs]console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter_console)
logging.getLogger('').addHandler(console)
# logger.addHandler(console)
[docs]def arg_init():
aces_cfg = ACESConfig()
# fp_factory = aces_cfg.footprint_factory
# fp_names = [f for f in fp_factory.get_footprint_names() if f.startswith('ak:')]
# Build some help information:
epilog = '\nSee -x for more explanation'
north_lim = 48.2
parser = ap.ArgumentParser(prog='tile_plot', formatter_class=ap.RawDescriptionHelpFormatter,
description=description,
epilog=epilog)
parser.add_argument('label', nargs='+', help="Survey label or SB integer identifier")
parser.add_argument('-x', '--explain', action='store_true', help="Give an expanded explanation")
parser.add_argument('-v', '--verbose', action='store_true', help="Print extra information to the screen")
parser.add_argument('-t', '--transform', type=int, default=0, choices=[0, 1, 2, 3, 4],
help="Projection [%(default)d]")
parser.add_argument('-H', '--horiz', nargs=2, type=float, default=[0., 360.], help="Horizontal limits (degrees)")
parser.add_argument('-V', '--vertical', nargs='*', type=float, default=[-90., north_lim],
help="Vertical limits (degrees)")
# parser.add_argument('-a', '--show_all', action='store_true', help="Show whole tiling")
parser.add_argument('-s', '--shift', action='store_true', help="Shift tiles for tile-interleave")
parser.add_argument('-i', '--show_ident', action='store_true', help="Label each tile with ID")
parser.add_argument('-u', '--subpoint', metavar="'RA,Dec'", help="J2000 position projected centre",
default=[0.0, 0.0], action=Celpos)
parser.add_argument('-p', '--polygons', default="", help="File defining polygons to draw")
parser.add_argument('-l', '--interleaf', default='ABCD', help='Interleaf to plot : all plotted if -l omitted')
return parser, aces_cfg
[docs]class Celpos(ap.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs is not None:
raise ValueError("nargs not allowed")
super(Celpos, self).__init__(option_strings, dest, **kwargs)
[docs] def __call__(self, parser, namespace, values, option_string=None):
logger.debug('ACTION : %r %r %r' % (namespace, values, option_string))
a, b = parse_direction(values)
rp = [np.radians(a), np.radians(b)]
# noinspection PyUnresolvedReferences
setattr(namespace, self.dest, rp)
[docs]class WidHt(ap.Action):
[docs] def __call__(self, parser, namespace, values, option_string=None):
logger.debug('ACTION : %r %r %r' % (namespace, values, option_string))
# print 'WidHt : ', values
a, b = [q for q in values.split(",") if len(q) > 0]
rp = [int(a), int(b)]
# noinspection PyUnresolvedReferences
setattr(namespace, self.dest, rp)
[docs]def float_pair(arg):
return [float(a) for a in arg.split(',')]
[docs]class TwoFloats(ap.Action):
[docs] def __call__(self, parser, namespace, values, option_string=None):
logger.debug('ACTION : %r %r %r' % (namespace, values, option_string))
a, b = [q for q in values.split(",") if len(q) > 0]
rp = [float(a), float(b)]
# noinspection PyUnresolvedReferences
setattr(namespace, self.dest, rp)
[docs]def prefix(word, pfx):
if word.startswith(pfx):
return word
else:
return pfx + word
[docs]def prepare_tiles(idents, tile_positions, do_label=True):
n_tiles = 0
t_pos = []
tile_corners = []
fpcmd = []
over_ann = []
over_ds9 = []
# print('prepare_tiles (idents) : ', idents)
print(len(tile_positions))
for ident in idents:
tp = tile_positions[ident]
# print (id, tp)
ti = tile.Tile(ident, tp[0], tp[1], tp[2])
n_tiles += 1
# print('# ', id, ti.name, ti.skypos, ti.posang)
t_pos.append(ti.j2000)
cs = ti.get_corners()
cs = np.concatenate((cs, cs[:1]))
tile_corners.append(cs)
fpcmd.append(ti.get_footprint_cmd())
over_ann.append(ti.get_overlay(kind='kvis'))
over_ds9.append(ti.get_overlay(kind='ds9'))
if do_label:
over_ds9.append(ti.get_overlay_label("", kind='ds9'))
return t_pos, tile_corners, fpcmd, over_ann, over_ds9
[docs]def get_idents(targets):
ks0 = [a.split('.')[0] for a in targets.keys()]
ks = []
for a in ks0:
if a not in ks:
if re.match("src[0-9]+", a):
ks.append(a)
return [int(a[3:]) for a in ks]
[docs]def main():
# parse command line options
parser, aces_cfg = arg_init()
args = parser.parse_args()
if args.verbose:
print("ARGS = ", args)
if args.explain:
print(explanation)
return
# Establish output file names, and initialise
user_input = args.label
parset = ParameterSet()
if user_input[0].isdigit():
sbid = int(user_input[0])
try:
sb = SchedulingBlock(sbid)
# except Ice.ConnectTimeoutException:
except:
print('No connection to SchedBlocks')
return 0
par = sb.get_parameters()
var = sb.get_variables()
for k, v in var.items():
if 'xec' in k:
print(k, v)
for k, v in par.items():
parset.set_value(k, v)
label = "SBID_{:d}".format(sbid)
else:
label = args.label[0]
parset_name = "{}.parset".format(label)
parset = ParameterSet(parset_name)
polygons = []
if args.polygons:
poly_file = args.polygons
if os.path.exists(poly_file):
polygons = tile.get_polygons(poly_file)
plot_interleaf = args.interleaf
# todo: Decide whether tile selection is a function to be supported.
hr = np.radians(np.array(args.horiz))
vr = np.radians(np.array(args.vertical))
if hr[0] < 0.0:
hr[0] += 2.0 * np.pi
if hr[1] < 0.0:
hr[1] += 2.0 * np.pi
show_all = True
show_selected = False
show_ident = args.show_ident
kvis_ann_name = '{}.ann'.format(label)
ds9_name = '{}.reg'.format(label)
footprint_cmd_name = '{}_footprint.sh'.format(label)
plot_file_name = '{}.png'.format(label)
ann = open(kvis_ann_name, 'w')
s1 = 'COLOR GREEN'
ann.write(s1 + '\n')
ds9 = open(ds9_name, 'w')
fpcmds = open(footprint_cmd_name, 'w')
fp_factory = aces_cfg.footprint_factory
name = parset['common.target.src%d.footprint.name']
beam_pitch = parset['common.target.src%d.footprint.pitch'] * np.pi / 180.0
bf_pa = parset['common.target.src%d.footprint.rotation']
name = prefix(name, 'ak:')
fp = fp_factory.make_footprint(name, beam_pitch)
if 'common.targets' in parset:
targets = parset['common.targets']
else:
ks = parset.to_dict()['common']['target']
targets = [a for a in ks.keys() if '%' not in a]
tiling = {'idents': [], 'tiles': []}
for target in targets:
ident = int(re.findall('[0-9]+', target)[0])
fd = parset['common.target.{}.field_direction'.format(target)]
pa = parset['common.target.{}.pol_axis'.format(target)]
na = parset['common.target.{}.field_name'.format(target)]
il = na[-1]
# If field names lack interleaf designator, select what would have been 'A'.
# (See tile.py get_parset_entry)
if il in '0123456789':
il = 'A'
if il in plot_interleaf:
tpos = Skypos(fd[0], fd[1])
ra = tpos.ra
dec = tpos.dec
tpa = bf_pa + float(pa[1])
tiling['tiles'].append((ra, dec, tpa))
tiling['idents'].append(ident)
# Assume the following defaults
origin = Skypos(0.0, 0.0)
bf_pa = 45.0
coord = 'J2000'
tile.Tile.initialise(origin, label, fp, bf_pa, coord=coord)
tile_pos_list = tiling['tiles']
tile_idents = tiling['idents']
tile_positions = {}
idents = []
for ident, tp in zip(tile_idents, tile_pos_list):
tile_positions[ident] = tp
h, v, p = tp
if ident == 20:
print("{:d} {:f} {:f}".format(ident, h, v))
h = (h + 2.0 * np.pi) % (2.0 * np.pi)
# print 'for tile_ident ... ', tp, hr, vr
if hr[0] < hr[1]:
h_range = hr[0] <= h <= hr[1]
else:
h_range = hr[0] <= h or h < hr[1]
if h_range and vr[0] <= tp[1] <= vr[1]:
idents.append(ident)
t_pos, tile_corners, fpcmd, o_ann, o_ds9 = prepare_tiles(idents, tile_positions, do_label=show_ident)
print('All tiles : ', len(t_pos))
for cmd in fpcmd:
fpcmds.write(cmd)
fpcmds.close()
for cmd in o_ann:
ann.write(cmd + '\n')
ann.close()
for cmd in o_ds9:
ds9.write(cmd + '\n')
ds9.close()
# Produce a png plot
fig = plt.Figure(figsize=[8., 8.])
# ax.set_aspect(1.0)
proj = args.transform
projname = {0: 'CAR', 1: 'ORTH', 2: 'MOLL', 3: 'aitoff', 4: 'hammer'}
sphere = SphereView(projection=projname[proj])
subp = Skypos(args.subpoint[0], args.subpoint[1])
sphere.set_subpoint(subp)
sphere.draw_outline()
sphere.draw_coord_grid()
sphere.axis.set_axis_off()
# sphere.draw_label()
# rgb = [0.99197232, 0.54131488, 0.23197232]
rgb = [0.1, 0.4, 0.7]
style = {"alpha": 0.3, 'color': rgb}
poly_style = {"lw": 0.5, 'color': rgb, "ls": "-."}
for cs in tile_corners:
sphere.join_gc(cs)
sphere.join_gc_fill(cs, **style)
if show_all:
if show_ident and not show_selected:
txt_style = {"fontsize": 4}
for i, p in zip(idents, t_pos):
sphere.draw_text(p, "{:d}".format(i), **txt_style)
if show_selected:
if show_ident:
for i, p in zip(idents, t_pos):
sphere.draw_text(p, "{:d}".format(i))
for poly in polygons:
sphere.join_gc(poly, close=True, **poly_style)
plt.savefig(plot_file_name, dpi=300, bbox_inches='tight', transparent=True)
print('trans = True')
n_tiles = len(idents)
print("{:d} tiles selected for requested coverage".format(n_tiles))
if __name__ == "__main__":
sys.exit(main())