#!/usr/bin/env python
"""
This writes the input parset for the casdaupload script.
See https://www.atnf.csiro.au/computing/software/askapsoft/sdp/docs/current/utils/casdaupload.html
-------------------------------------------------------------
"""
import sys
import os
import re
import time
import argparse as ap
from pathlib import Path
import glob
from aces.racs.racs_survey import ASKAP_Survey_factory
import aces.racs.survey_utils as sut
from astropy.time import Time
from astropy.io import fits
import tarfile
[docs]casda_production = '/group/casda/prd/'
[docs]casda_test = '/group/casda/at'
[docs]general = """# General
outputdir = "{}"
useAbsolutePaths = true
telescope = ASKAP
sbid = {:d}
# No other sbids provided.
obsprogram = RACS
writeREADYfile = true
"""
[docs]image = {'cont_restored_T0': ['mosaic_fcor_tt0', 'mosaic_cres_fcor_tt0'],
'cont_weight_T0': ['weights_fcor_tt0', 'weights_cres_fcor_tt0'],
'cont_restored_T1': ['mosaic_fcor_tt1', 'mosaic_cres_fcor_tt1'],
'cont_weight_T1': ['weights_fcor_tt1', 'weights_cres_fcor_tt1'],
'cont_noise_T0': ['mosaic_fcor_iqr'],
'cont_components_T0': ['mosaic_components_tt0'],
'cont_fitresidual_T0': ['mosaic_fitresidual_tt0']}
[docs]catalogue = {'continuum-component': 'selavy-components_cres_fcor',
'continuum-island': 'selavy-islands_cres_fcor'}
[docs]im_art = """
# Images
images.artifactlist = {}"""
[docs]ca_art = """
# Source catalogues
catalogues.artifactlist = {}"""
[docs]ms_art = """
# Measurement sets
measurementsets.artifactlist = {}"""
[docs]ev_art = """
# Evaluation reports
evaluation.artifactlist = {}"""
[docs]ke_fil = {'im': "image{:d}.filename = {}",
'ca': "cat{:d}.filename = {}",
'ms': "ms{:d}.filename = {}",
'ev': "eval{:d}.filename = {}"}
[docs]ke_typ = {'im': "image{:d}.type = {}",
'ca': "cat{:d}.type = {}"}
[docs]ke_pro = {'im': "image{:d}.project = {}",
'ca': "cat{:d}.project = {}",
'ms': "ms{:d}.project = {}",
'ev': "eval{:d}.project = {}"}
[docs]ke_fmt = {'ev': "eval{:d}.format = {}"}
[docs]ke_art = {'im': im_art, 'ca': ca_art, 'ms': ms_art, 'ev': ev_art}
"""
# The following uses an original code by Matthew Whiting <Matthew.Whiting@csiro.au>
# As described below. Here the essentials are extracted as a subroutine to be called by the main racs_to_casda
#
# A python script to update the header keywords of FITS files. The
# headers able to be updated are a small defined set: PROJECT, SBID,
# DATE-OBS, DURATION.
#
# It also allows the specification of HISTORY statments, by giving a
# list of strings following the arguments for the above
#
# Example Usage:
# updateFITSheaders.py --project=AS034 --SBID=1234 --DATE-OBS="2017-01-20T12:34:45" --DURATION=12345.6 "Made by me" "Not by you"
#
#
# @copyright (c) 2017 CSIRO
# Australia Telescope National Facility (ATNF)
# Commonwealth Scientific and Industrial Research Organisation (CSIRO)
# PO Box 76, Epping NSW 1710, Australia
# atnf-enquiries@csiro.au
#
# This file is part of the ASKAP software distribution.
#
# The ASKAP software distribution is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# @author Matthew Whiting <Matthew.Whiting@csiro.au>
#
import argparse
import astropy.io.fits as fits
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--fitsfile', type=str, help='FITS file to update')
parser.add_argument('--telescope', type=str, default="", help='TELESCOP keyword')
parser.add_argument('--project', type=str, default="", help='OPAL project ID for this observation')
parser.add_argument('--sbid', type=str, default="", help='Scheduling block ID for this observation')
parser.add_argument('--dateobs', type=str, default="", help='DATE-OBS string (YYYY-MM-DDTHH:MM:SS) for this observation')
parser.add_argument('--duration', type=str, default="", help='Length of this observation in sec')
parser.add_argument('history', metavar='hist', type=str, default="", nargs='*', help='A HISTORY statement')
options = parser.parse_args()
"""
[docs]required_fits_keys = ['NAXIS{:d}', 'CTYPE{:d}', 'CRVAL{:d}', 'CDELT{:d}', 'CRPIX{:d}']
[docs]class Options(object):
def __init__(self, fitsfile, sbid, dateobs, duration, telescope='ASKAP', project='AS110'):
self.fitsfile = fitsfile
self.telescope = telescope
self.project = project
self.sbid = sbid
self.dateobs = dateobs
self.duration = duration
[docs]def arg_init():
parser = ap.ArgumentParser(prog='survey_to_casda.py', formatter_class=ap.RawDescriptionHelpFormatter,
description=__doc__,
epilog='See -x for more explanation')
# noinspection PyTypeChecker
parser.add_argument('sbid', type=int, help='RACS SBID; no default.')
parser.add_argument('-t', '--test', action='store_true', help="Upload to the CASDA test environment")
parser.add_argument('-f', '--field', default=None, help="Field to select")
parser.add_argument('-e', '--epoch', default=0, type=int, help="Survey observation epoch")
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-x', '--explain', action='store_true', help="give an expanded explanation")
return parser
[docs]def t_string(db_time):
time_val = Time(db_time / 3600.0 / 24.0, format='mjd', scale='utc')
time_val.format = 'iso'
return "{}".format(time_val)
[docs]def my_grep(file_name, txt):
lines = open(file_name, 'r').readlines()
p = False
for li in lines:
q = True
for t in txt:
q = q and bool(re.search(t, li))
p = p or q
return p
[docs]def find_parset(root, row):
r = row
mos_time = r['MOSAIC_TIME']
field = r['FIELD_NAME']
if 'test4' not in field:
field = field.replace('RACS_', 'RACS_test4_1.05_')
sbid, csb = r['SBID', 'CAL_SBID']
mos_before = mos_time - 86400.0
mos_after = mos_time + 86400.0
day0 = t_string(mos_before).split(' ')[0]
day1 = t_string(mos_time).split(' ')[0]
day2 = t_string(mos_after).split(' ')[0]
# print(' ', day0, day1)
gtmp0 = "{}/{:d}/slurmOutput/racs_pipeline*{:d}*{}*.sh".format(root, csb, sbid, day0)
gtmp1 = "{}/{:d}/slurmOutput/racs_pipeline*{:d}*{}*.sh".format(root, csb, sbid, day1)
gtmp2 = "{}/{:d}/slurmOutput/racs_pipeline*{:d}*{}*.sh".format(root, csb, sbid, day2)
parfiles = glob.glob(gtmp0) + glob.glob(gtmp1) + glob.glob(gtmp2)
key = 'FIELD_SELECTION_SCIENCE'
# print("Found {:d} possible .sh files".format(len(parfiles)))
# print(parfiles)
parfile = None
if len(parfiles) == 0:
print("G0 {}".format(gtmp0))
print("G1 {}".format(gtmp1))
elif len(parfiles) == 1:
parfile = parfiles[0]
else:
flags = []
for pf in parfiles:
val = get_input(pf, key)
if val is None:
sp = (False, False)
else:
val = get_input(pf, key).replace('"', '')
# print("({}) ({})".format(val, field))
if val == field:
sp = (True, True)
else:
sp = (True, False)
flags.append(sp)
for pf, sp in zip(parfiles[::-1], flags[::-1]):
if (sp == (True, True)) or (sp == (False, False)):
parfile = pf
if parfile is None:
for pf in parfiles:
print("field {} -- {}".format(field, pf))
return parfile
[docs]def get_history(parfile):
lines = open(parfile, 'r').readlines()
lines = [li.strip() for li in lines]
history = []
for li in lines:
if li.startswith('# Processed'):
history.append(li[2:])
return history
[docs]def check_files_exist(files):
ret = True
for f in files:
if not Path(f).exists():
ret = False
print("missing {}".format(f))
return ret
[docs]def pack_flag_cal(root, row):
# Find and pack into a tar file certain flag summaries and cal tables.
"""
BPCAL/calparameters_1934_bp_SB{cal_sbid}.tab/
{fld}/cont_gains_cal_SB{sbid}_{fld}.beam??.tab/
diagnostics/Flagging_Summaries/scienceData_SB{sbid}_{fld}.beam??_averaged.ms.flagSummary
"""
csb, sb, fld = row['CAL_SBID', 'SBID', 'FIELD_NAME']
fld_test4 = fld.replace('RACS_', 'RACS_test4_1.05_')
da = root.joinpath("{:d}".format(csb))
bp = da.joinpath('BPCAL')
g_bpcal = str(bp.joinpath("calparameters_1934_bp_SB{:d}.tab".format(csb)))
g_gains = str(da.joinpath("{}".format(fld))) + "/cont_gains_cal_SB{:d}_{}.beam??.tab".format(sb, fld)
g_flags = str(da.joinpath('diagnostics').joinpath('Flagging_Summaries')) + \
"/scienceData_SB{:d}_{}.beam??_averaged.ms.flagSummary".format(sb, fld)
g_flagt = str(da.joinpath('diagnostics').joinpath('Flagging_Summaries')) + \
"/scienceData_SB{:d}_{}.beam??_averaged.ms.flagSummary".format(sb, fld_test4)
bpc = glob.glob(g_bpcal)
gai = glob.glob(g_gains)
fla = glob.glob(g_flags) + glob.glob(g_flagt)
tfile = da.joinpath("calibration-flags_SB{:d}_{}.tar".format(sb, fld))
tar = tarfile.TarFile(name=str(tfile), mode='w', dereference=True)
# tar = tarfile.open(str(tfile), "w")
for name in bpc + gai + fla:
tar.add(name)
tar.close()
return tfile
[docs]def main():
args = arg_init().parse_args()
run_date = time.strftime("%Y%m%d-%H%M%S")
if args.explain:
print(explanation)
sys.exit()
print("")
print("Running survey_to_casda.py with the following input:")
print("ARGS = ", args)
print("")
print(run_date)
print("")
os.system("module load askaputils")
print("Loading RACS database ...")
sbid = args.sbid
epoch = args.epoch
aks = ASKAP_Survey_factory(epoch=epoch)
if args.field is None:
print("Determining field names")
tab = aks.f_table
msk = tab['CAL_SBID'] == sbid
fields = list(tab[msk]['FIELD_NAME'])
else:
fields = [args.field]
casda_env = casda_production
if args.test:
casda_env = casda_test
projId = 'AS110'
images = ['cont_restored_T0', 'cont_weight_T0', 'cont_noise_T0',
'cont_components_T0', 'cont_fitresidual_T0',
'cont_restored_T1', 'cont_weight_T1']
catalogues = ['continuum-component', 'continuum-island']
ta = aks.f_table
root = aks.get_data_root()
mk = (ta['SBID'] == sbid) & (ta['SELECT'] == 1)
tab = ta[mk]
cal_sbid = tab[0]['CAL_SBID']
csb_dir = root.joinpath("{:d}".format(cal_sbid))
print(str(csb_dir))
os.chdir(csb_dir)
print("len(tab) = ", len(tab))
image_lines = []
cat_lines = []
ms_lines = []
ev_lines = []
im_arti = []
ca_arti = []
ms_arti = []
ev_arti = []
all_files = []
fits_files = []
all_history = []
# Collect the eval files first so that the history can be recovered.
j = 0
k = 'ev'
# Ensure that no parfiles are entered moree than once.
parfile_list = []
for r in tab:
# Now find the pipeline parset:
parfile = find_parset(root, r)
if parfile == None:
print("No param file found - FATAL")
exit(1)
else:
all_history.append(get_history(parfile))
if parfile not in parfile_list:
parfile_list.append(parfile)
fn = csb_dir.joinpath(parfile)
fnn = str(fn).replace('test4_1.05_', '')
ev_arti.append('eval{:d}'.format(j))
ev_lines.append(ke_fil[k].format(j, fnn))
ev_lines.append(ke_fmt[k].format(j, 'txt'))
ev_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(fnn)
# now the validation tar
fn = str(csb_dir.joinpath("validation_SB{:d}_{}.tar".format(sbid, r['FIELD_NAME'])))
fnn = str(fn).replace('test4_1.05_', '')
ev_arti.append('eval{:d}'.format(j))
ev_lines.append(ke_fil[k].format(j, fnn))
ev_lines.append(ke_fmt[k].format(j, 'validation-report'))
ev_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(fnn)
tfile = str(pack_flag_cal(root, r))
ev_arti.append('eval{:d}'.format(j))
ev_lines.append(ke_fil[k].format(j, tfile))
ev_lines.append(ke_fmt[k].format(j, 'calibration'))
ev_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(tfile)
j = 0
k = 'im'
for r, history in zip(tab, all_history):
r_inx = r['INDEX']
for im in images:
for which in image[im]:
fn = sut.get_product_name(aks, r_inx, which)
if fn is not None:
fnn = str(fn).replace('test4_1.05_', '')
im_arti.append('image{:d}'.format(j))
image_lines.append(ke_fil[k].format(j, fnn))
image_lines.append(ke_typ[k].format(j, im))
image_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(fnn)
fits_files.append(fnn)
else:
print(im, 'absent')
update_FITS_headers(fits_files, r, history)
print('Checking FITS file headers ...')
all_good = True
for f_file in fits_files:
h_good, err_mess = fits_header_check(f_file)
if not h_good:
all_good = False
print("{} has bad header".format(f_file))
for e in err_mess:
print(" {}".format(e))
if all_good:
print(" all OK\n")
else:
return 1
# catalogues
j = 0
k = 'ca'
for r in tab:
r_inx = r['INDEX']
for ca in catalogues:
which = catalogue[ca]
fn = sut.get_product_name(aks, r_inx, which)
if fn is not None:
fnn = str(fn).replace('test4_1.05_', '')
ca_arti.append('cat{:d}'.format(j))
cat_lines.append(ke_fil[k].format(j, fnn))
cat_lines.append(ke_typ[k].format(j, ca))
cat_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(fnn)
else:
print(ca, 'absent')
j = 0
k = 'ms'
for r in tab:
r_inx = r['INDEX']
if got_leakage:
which = 'ms_av_cal_lk'
else:
which = 'ms_av_cal'
fns = sut.get_product_name(aks, r_inx, which)
for fn in fns:
if fn is not None:
fnn = str(fn).replace('test4_1.05_', '')
ms_arti.append('ms{:d}'.format(j))
ms_lines.append(ke_fil[k].format(j, fnn))
ms_lines.append(ke_pro[k].format(j, projId))
j += 1
all_files.append(fnn)
if args.verbose:
print('---------\n')
print('[{}]'.format(','.join(im_arti)))
for li in image_lines:
print(li)
print('\n')
print('[{}]'.format(','.join(ca_arti)))
for li in cat_lines:
print(li)
print('[{}]'.format(','.join(ms_arti)))
for li in ms_lines:
print(li)
in_prov = 'casda_upload_{:d}.in_provisional'.format(sbid)
in_final = 'casda_upload_{:d}.in'.format(sbid)
fout = open(in_prov, 'w')
fout.write(general.format(casda_env, sbid) + '\n')
fout.write(im_art.format('[{}]'.format(','.join(im_arti)) + '\n'))
for li in image_lines:
fout.write(li + '\n')
fout.write(ca_art.format('[{}]'.format(','.join(ca_arti)) + '\n'))
for li in cat_lines:
fout.write(li + '\n')
fout.write(ms_art.format('[{}]'.format(','.join(ms_arti)) + '\n'))
for li in ms_lines:
fout.write(li + '\n')
fout.write(ev_art.format('[{}]'.format(','.join(ev_arti)) + '\n'))
for li in ev_lines:
fout.write(li + '\n')
fout.close()
all_present = check_files_exist(all_files)
if all_present:
os.rename(in_prov, in_final)
print("casdaupload input file {}".format(in_final))
else:
print("There are missing files.\n casdaupload input file {}".format(in_prov))
if __name__ == "__main__":
sys.exit(main())