Source code for aces.survey.survey_leakage

#!/usr/bin/env python

import numpy as np

from astropy.io import fits
from astropy.table import Table
from astropy.coordinates import SkyCoord
from astropy import units as u
from astropy.wcs import WCS
from astropy.stats import sigma_clip

[docs]KEEP_COLUMNS = { "ra_deg_cont": "StokesI_RA", "dec_deg_cont": "StokesI_DEC", "flux_peak": "StokesI_flux_peak", "flux_int": "StokesI_flux_int", "rms_image": "StokesI_image_rms", "component_id": "Component_ID" }
[docs]class StokesData(): """Class to extract and hold Stokes Q, U, or V leakage data.""" def __init__(self, catalogue_i, image_p, ra_key="ra", dec_key="dec", multiplier=1.): """ Parameters ---------- catalogue_i : str Filename for Stokes I catalogue. Normally the Selavy catalogue. image_p : str Filename for Stokes Q, U, or V image. ra_key : str, optional Name of RA key in catalogue. [Default 'ra'] dec_key : str, optional Name of dec. key in catalogue. [Default 'dec'] multiplier : float, optional Value to multiply catalogue flux densities by. Note for Selavy this should be set to 1e-3 as Selavy by default reports in mJy. [Default 1.0] """ if catalogue_i.lower().endswith(".fits"): table_format = "fits" elif catalogue_i.lower().endswith(".vot") or catalogue_i.lower().endswith(".xml"): table_format = "votable" else: table_format = None self.catalogue_i = Table.read(catalogue_i, format=table_format) self.image_p = fits.open(image_p) self.header = self.image_p[0].header self.wcs = WCS(self.header).celestial self.data = np.squeeze(self.image_p[0].data) self.multiplier = multiplier abs_clipped = sigma_clip(np.abs(self.data), sigma=5, maxiters=3, masked=False, axis=(0, 1) ) self.data_clipped = self.data.copy() self.data_clipped[np.isnan(abs_clipped)] = np.nan self.noise = np.nanstd(self.data_clipped) print("Stokes V sigma-clipped map stddev.: {}".format( self.noise )) try: self.coords = SkyCoord( ra=self.catalogue_i[ra_key]*u.deg, dec=self.catalogue_i[dec_key]*u.deg ) except Exception: self.coords = SkyCoord( ra=self.catalogue_i[ra_key], dec=self.catalogue_i[dec_key] )
[docs] def get_isolated(self, zone=0.0055): """Get isolated sources. A clipped version of the original catalogue is made. Parameters ---------- zone : float, optional Sources within `zone` of each other are excluded, in degrees. [Default 0.0055] """ idx, seps, _ = self.coords.match_to_catalog_sky(self.coords, nthneighbor=2 ) isolated = np.where(seps.value > zone)[0] print("Isolated fraction: {}".format( len(isolated) / len(self.catalogue_i) )) self.catalogue_i = self.catalogue_i[isolated] self.coords = self.coords[isolated]
[docs] def get_compact(self, ratio=(0.83, 1.2), int_flux="int_flux", peak_flux="peak_flux" ): """Get compact sources. A clipped version of the catalogue is made. Parameters ---------- ratio : tuple, float, optional Tuple of minimum and maximum flux density ratios. [Default (0.83, 1.2)] int_flux : str, optional Column name for integrated/total flux density in Stokes I catalogue. [Default 'int_flux'] peak_flux : str, optional Column name for peak flux in Stokes I catalogue. [Default 'peak_flux'] """ self.peak_flux = peak_flux self.int_flux = int_flux over_cond = self.catalogue_i[int_flux] / self.catalogue_i[peak_flux] > ratio[0] under_cond = self.catalogue_i[int_flux] / self.catalogue_i[peak_flux] < ratio[1] compact = np.where(over_cond & under_cond)[0] self.catalogue_i = self.catalogue_i[compact] self.coords = self.coords[compact]
[docs] def get_xycoords(self): """Get sources coordinates in pixel x, y coordinates.""" self.x, self.y = self.wcs.all_world2pix(self.coords.ra, self.coords.dec, 0) self.x = self.x.astype(int) self.y = self.y.astype(int)
[docs] def get_leakage(self, local_rms="local_rms", sigma=0, search_box=1, noise_box=20): """Get leakage from polarization image. Parameters ---------- local_rms : str, optional Column name for image rms. [Default 'local_rms'] sigma : int, optional Minimum Stokes I SNR required. [Default 10] search_box : int, optional Look for peak Stokes Q/U/V in expanded box around Stokes I position. [Default 1] noise_box : int, optional Use for noise calculations and reject sources <= `noise_box` of a NaN pixel, likely image edge. [Default 20] """ self.local_rms = local_rms self.sigma = sigma p_over_i = [] pols = [] rmses = [] for i in range(len(self.coords)): try: if np.isnan( self.data[ self.y[i]-noise_box:self.y[i]+noise_box, self.x[i]-noise_box:self.x[i]+noise_box ].flatten()).any(): raise ValueError cutout = self.data[ self.y[i]-search_box:self.y[i]+search_box, self.x[i]-search_box:self.x[i]+search_box ].flatten() cutout_noise = self.data_clipped[ self.y[i]-noise_box:self.y[i]+noise_box, self.x[i]-noise_box:self.x[i]+noise_box ].flatten() pol = cutout[np.nanargmax(np.abs(cutout))] rms = np.nanstd(cutout_noise) except (IndexError, ValueError): pol = np.nan rms = np.nan if self.catalogue_i[self.peak_flux][i] / self.catalogue_i[local_rms][i] > sigma: pi = pol / (self.catalogue_i[self.peak_flux][i] * self.multiplier) else: pi = np.nan p_over_i.append(pi) pols.append(pol) rmses.append(rms) self.pols = np.asarray(pols) / self.multiplier self.p_over_i = np.asarray(p_over_i) self.rms = np.asarray(rmses) / self.multiplier
[docs] def get_table_leakage(self, outname=None, remove_cols=True, prefix="", pol="v"): """Add leakage columns to a table, clip NaNs, writeout? Parameters ---------- outname : str, optional Output filename. If supplied a .csv file is written. [Default None] remove_cols : bool, optional Remove all columns from the subject catalogue that do not belong to the KEEP_COLUMNs set. [Default True] prefic : str, optional If this prefix exists for the name of a column, it is ignored when deciding whether to remove columns. [Default ""] """ polname = "Stokes{}".format(pol.upper()) if remove_cols: catalogue_columns = self.catalogue_i.columns.copy() for col in catalogue_columns: if col.replace(prefix, "") not in KEEP_COLUMNS.keys(): self.catalogue_i.remove_column(col) else: self.catalogue_i[col].name = KEEP_COLUMNS[col.replace(prefix, "")] self.catalogue_i.add_columns( cols=[self.x, self.y, self.p_over_i, self.pols, self.rms], names=[ "StokesI_x", "StokesI_y", polname + "_leakage", polname + "_flux_peak", polname + "_image_rms" ], ) self.catalogue_i = self.catalogue_i[np.where(np.isfinite(self.catalogue_i[polname + "_leakage"]))[0]] if outname is not None: self.catalogue_i.write(outname, format="ascii", delimiter=",", overwrite=True)