"""Utility function to create a one figure multi panel validation plot
for continuum imaging of RACS data
"""
from __future__ import annotations
from argparse import ArgumentParser
from pathlib import Path
from typing import NamedTuple
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from astropy.wcs import WCS
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from scipy import stats
from flint.catalogue import Catalogue, get_reference_catalogue
from flint.logging import logger
from flint.naming import processed_ms_format
from flint.summary import BeamSummary, FieldSummary
from flint.utils import estimate_image_centre, get_packaged_resource_path
[docs]
MAX_SOURCES_TO_PLOT = 10000
[docs]
class ValidationCatalogues(NamedTuple):
"""Container for all the catalogues that are loaded in and
used throughout validation processing"""
"""NVSS catalogue"""
"""SUMSS catalogue"""
"""ICRF catalogue"""
"""ASKAP catalogue"""
[docs]
racs_high: Catalogue | None = None
"""RACS high catalogue"""
[docs]
racs_mid: Catalogue | None = None
"""RACS mid catalogue"""
[docs]
tgss: Catalogue | None = None
"""TGSS catalogue"""
[docs]
vlass: Catalogue | None = None
"""VLASS catalogue"""
[docs]
class Tables(NamedTuple):
"""Container for all the tables that are loaded in"""
"""NVSS catalogue"""
"""SUMSS catalogue"""
"""ICRF catalogue"""
"""ASKAP catalogue"""
[docs]
racs_high: Table | None = None
"""RACS high catalogue"""
[docs]
racs_mid: Table | None = None
"""RACS mid catalogue"""
[docs]
tgss: Table | None = None
"""TGSS catalogue"""
[docs]
vlass: Table | None = None
"""VLASS catalogue"""
[docs]
class XMatchTables(NamedTuple):
"""Container for all the cross matched tables"""
"""NVSS catalogue"""
"""SUMSS catalogue"""
"""ICRF catalogue"""
[docs]
racs_high: Path | None = None
"""RACS high catalogue"""
[docs]
racs_mid: Path | None = None
"""RACS mid catalogue"""
[docs]
tgss: Path | None = None
"""TGSS catalogue"""
[docs]
vlass: Path | None = None
"""VLASS catalogue"""
[docs]
class ValidationTables(NamedTuple):
"""Container for all the tables that are generated by the validation routine"""
"""Path to the PSF table"""
"""Path to the statistics table"""
[docs]
xmatch_tables: XMatchTables
"""Cross-matched tables"""
[docs]
class ValidatorLayout(NamedTuple):
"""Simple container for all the matplotlib axes objects"""
"""Axes for the RMS of the field"""
"""Container for basic SBID information"""
"""Axes for the PSF of the image"""
"""Axes for quick look source counts"""
"""Axes to compare brightness of sources from the first catalogue"""
"""Axes to compare brightness of sources from the first catalogue"""
"""Axes to compare astrometry of sources"""
"""Axes to compare astrometry of sources from the first catalogue"""
"""Axes to compare astromnetry of sources from the first catalogue"""
"""Axes to summarise the flagging of the MS"""
[docs]
class RMSImageInfo(NamedTuple):
"""Class to hold basic RMS Image information, excluding the actual raw image data"""
"""Path to the RMS fits image"""
"""Header from the FITS image"""
"""Dimension of the image"""
"""Number of valid pixels in the image"""
"""The area of valid sky, in degrees squared"""
"""The WCS of the image"""
"""The centre of the image"""
"""The median value of the image"""
"""The minimum value of the image"""
"""The maximum value of the image"""
"""The standard deviation of the image"""
"""The mode of the image"""
[docs]
class SourceCounts(NamedTuple):
"""A small container to pass around source count information"""
"""The bin edges of the source counts in Jy"""
"""The bin centers to use in Jy"""
[docs]
counts_per_bin: np.ndarray
"""The counts of sources per flux bin"""
[docs]
counts_per_bin_err: np.ndarray
"""The rough estimate on the errors per bin"""
[docs]
euclid_counts: np.ndarray
"""Euclidean normalised source counts"""
[docs]
euclid_counts_err: np.ndarray
"""Rough estimate of error on the euclidean normalised source counts"""
"""The area in square degrees that the sources cover, i.e. image footprint sky-area"""
[docs]
area_fraction: np.ndarray | None = None
"""The fraction of the image that was above a sigma level per flux bin. This may be used as a rough term to scale the Euclidean Normalised source counts. This is not intended to be a robust way of correcting the source counts - just quick"""
[docs]
class MatchResult(NamedTuple):
"""Simple container to hold results of a quick cross match"""
"""Name of the first survey"""
"""Name of the second survey"""
"""Sky-positions of sources in the first survey"""
"""Sky-positions of sources in the second survey"""
"""Frequency in Hz of the first survey"""
"""Frequency in Hz of the second survey"""
"""The indices of the matched sources from the original input table 1"""
"""The indices of the matched sources from the original input table 2"""
[docs]
flux1: np.ndarray | None = None
"""Brightness in Jy of source in the first survey"""
[docs]
flux2: np.ndarray | None = None
"""Brightness in Jy of source in the second survey"""
[docs]
def get_known_catalogue_info(name: str) -> Catalogue:
"""Return the parameters of a recognised catalogue.
These are currently hardcoded.
The structure returneed outlines the name of columns of interest:
- RA
- Dec
- Integrated flux
- Major / Minor / PA
Args:
name (str): The survey name of interest
Raises:
ValueError: Raised when an unrecongised catalogue is provided
Returns:
Catalogue: Information of the survey catalogue
"""
# TODO: This should be expanded to include the units as well
# TODO: Catalogues here need to be packaged somehow
name = name.upper()
if name == "NVSS":
catalogue = Catalogue(
survey="NVSS",
file_name="VIII_65_nvss.dat_CH_2.fits",
name_col="NVSS",
freq=1.4e9,
ra_col="RA",
dec_col="Dec",
flux_col="S1.4",
maj_col="MajAxis",
min_col="MinAxis",
pa_col="PA",
)
elif name == "SUMSS":
catalogue = Catalogue(
survey="SUMSS",
file_name="sumsscat.Mar-11-2008_CLH.fits",
freq=8.43e8,
ra_col="RA",
dec_col="DEC",
name_col="Mosaic",
flux_col="St",
maj_col="dMajAxis",
min_col="dMinAxis",
pa_col="dPA",
)
elif name == "ICRF":
catalogue = Catalogue(
survey="ICRF",
file_name="icrf.csv",
freq=1e9,
ra_col="RA",
dec_col="Dec",
name_col="IERS Des.",
flux_col="None",
maj_col="None",
min_col="None",
pa_col="None",
)
else:
raise ValueError(f"Unknown catalogue {name}")
return catalogue
[docs]
def load_known_catalogue(
name: str, reference_catalogue_directory: Path
) -> tuple[Table, Catalogue]:
"""Load in a known catalogue table
Args:
name (str): Name of the survey to load
reference_catalogue_directory (Path): The directory location with the reference catalogues installed
Returns:
Tuple[Table,Catalogue]: The loaded table and Catalogue structure describing the columns
"""
table, catalogue = get_reference_catalogue(
reference_directory=reference_catalogue_directory, survey=name
)
return table, catalogue
[docs]
def get_rms_image_info(rms_path: Path, extract_fraction: float = 0.5) -> RMSImageInfo:
"""Extract information about the RMS image and construct a representative structure
When computing the pixel statistics an inner region of the RMS map is extracted
to avoid the primary beam roll off. Other properties of the ``RMSImageInfo`` are
computed on the whole image (e.g. area)
Args:
rms_path (Path): The RMS image that will be presented
extract_fraction (float, optional): Extract the inner region of the base RMS image map. Defaults to 0.5.
Returns:
RMSImageInfo: Extracted RMS image information
"""
with fits.open(rms_path) as rms_image:
rms_header = rms_image[0].header # type: ignore
rms_data = rms_image[0].data # type: ignore
logger.info(f"Extracting inner region of {extract_fraction=} for RMS statistics")
sub_rms_data = extract_inner_image_array_region(
image=rms_data, fraction=extract_fraction
)
valid_pixels = np.sum(np.isfinite(rms_data) & (rms_data != 0))
ra_cell_size = rms_header["CDELT1"]
dec_cell_size = rms_header["CDELT2"]
pixel_area = np.abs(ra_cell_size * dec_cell_size)
area = pixel_area * valid_pixels
centre_sky = estimate_image_centre(image_path=rms_path)
wcs = WCS(fits.getheader(rms_path))
rms_image_info = RMSImageInfo(
path=rms_path,
header=rms_header,
shape=rms_data.shape,
no_valid_pixels=valid_pixels,
area=area,
wcs=wcs,
centre=centre_sky,
median=float(np.nanmedian(sub_rms_data)),
minimum=float(np.nanmin(sub_rms_data)),
maximum=float(np.nanmax(sub_rms_data)),
std=float(np.nanstd(sub_rms_data)),
mode=stats.mode(sub_rms_data, keepdims=False).mode[0],
)
return rms_image_info
[docs]
def make_validator_axes_layout(fig: Figure, rms_path: Path) -> ValidatorLayout:
"""Create the figure layout to use for the quick look validation plot.
Args:
fig (Figure): The figure canvas to add the axes to
rms_path (Path): Path to the RMS image that will be presented. Loaded to access the WCS
Returns:
ValidatorLayout: Representation of all axes objects
"""
# My friendship with gridspec is over, now subplot_mosaic is my best friend
# Using the following encoding:
# T = text / field info
# F = flux / source counts
# A = astrometry
# P = PSF
# B = BANE noise map
# S = SUMMS flux comparison
# s = SUMMS astrometry comparison
# N = NVSS flux comparison
# n = NVSS astrometry comparison
# f = Flagging summary
rms_wcs = WCS(fits.getheader(rms_path)).celestial
ax_dict = fig.subplot_mosaic(
"""
TFPA
BBSs
BBNn
ffff
""",
per_subplot_kw={ # type: ignore
"B": {"projection": rms_wcs},
},
subplot_kw={
"aspect": "equal",
},
)
for spine in ax_dict["T"].spines.values():
spine.set_edgecolor("none")
_ = ax_dict["T"].axes.yaxis.set_visible(False)
_ = ax_dict["T"].axes.xaxis.set_visible(False)
# Set the axes that are shared
_ = ax_dict["N"].sharex(ax_dict["S"])
_ = ax_dict["n"].sharex(ax_dict["s"])
validator_layout = ValidatorLayout(
ax_rms=ax_dict["B"],
ax_legend=ax_dict["T"],
ax_psf=ax_dict["P"],
ax_counts=ax_dict["F"],
ax_brightness1=ax_dict["N"],
ax_brightness2=ax_dict["S"],
ax_astrometry=ax_dict["A"],
ax_astrometry1=ax_dict["n"],
ax_astrometry2=ax_dict["s"],
ax_flag_summary=ax_dict["f"],
)
return validator_layout
[docs]
def plot_flag_summary(
fig: Figure,
ax: Axes,
field_summary: FieldSummary,
) -> Axes:
"""Plot the percentage of the spectrum that is flagged for
each measurement set
Args:
fig (Figure): The figure canvas that is being plotted to
ax (Axes): The axes object that is being plotted to
field_summary (FieldSummary): An active field summary object with the collection of MSSummary structures
Returns:
Axes: The axes object with the plotted RMS image
"""
if field_summary.ms_summaries is None:
logger.info("No measurement summaries to plot")
ax.text(
0.5,
0.5,
"No data to plot",
va="center",
ha="center",
transform=ax.transAxes,
)
return ax
ms_summaries = sorted(
[mss for mss in field_summary.ms_summaries], key=lambda x: x.beam
)
accumulated_flags = 0
accumulated_samples = 0
for ms_summary in ms_summaries:
flag_spectrum = ms_summary.flag_spectrum
accumulated_flags += np.sum(np.nan_to_num(flag_spectrum))
accumulated_samples += len(flag_spectrum)
ax.plot(
np.arange(len(flag_spectrum)),
flag_spectrum,
label=f"{ms_summary.beam:02d}",
lw=0.5,
)
percent_sbid_flagged = accumulated_flags / accumulated_samples * 100.0
ax.legend(ncols=18, title="Beam Number", loc="lower left")
ax.grid()
ax.axhline(0.0, color="black")
ax.set(
xlabel="Channel",
ylabel="Flagged fraction",
ylim=(-0.35, 1.1),
aspect="auto",
title=f"Flagging summary - {percent_sbid_flagged:.2f}% of {field_summary.sbid} flagged",
)
return ax
[docs]
def plot_rms_map(
fig: Figure, ax: Axes, rms_path: Path, source_positions: SkyCoord | None = None
) -> Axes:
"""Add the RMS image to the figure
Args:
fig (Figure): Figure that contains the axes object
ax (Axes): The axes that will be plotted
rms_path (Path): Location of the RMS image
source_positions (Optional[SkyCoord], optionals): Sources in the ASKAP catalogue to overlay onto the RMS image. Defaults to None.
Returns:
Axes: The axes object with the plotted RMS image
"""
# Convert Jy/beam to uJy/beam
rms_data = fits.getdata(rms_path) * 1e6 # type: ignore
# Pirate no believes get below 10uJy/beam mateeey. Percentile a little more
# robust to outliers but much of a much
floor: float = max(1, np.floor(np.log10(np.nanpercentile(rms_data, 16)))) # type: ignore
im = ax.imshow(
np.log10(rms_data),
vmin=floor,
vmax=floor + 1.0,
origin="lower",
cmap="YlOrRd",
)
if source_positions is not None:
# If there are too many sources on the RMS map nothing can be seen
if len(source_positions) > MAX_SOURCES_TO_PLOT:
logger.info(
f"{len(source_positions)} sources, too many to plot. Skipping. "
)
else:
ax.scatter(
source_positions.ra, # type: ignore
source_positions.dec, # type: ignore
marker="o",
edgecolor="black",
facecolor="none",
s=0.1,
transform=ax.get_transform("fk5"), # type: ignore
)
ax.grid(color="0.5", ls="solid")
ax.set_xlabel("RA (J2000)")
ax.set_ylabel("Dec (J2000)")
# These offsets are pretty and correspond to 10, 20, 30,m 50, 100 where floor is 10 uJy/bea,
yt = floor + np.array([0.0, 0.30103, 0.47712125, 0.69897, 1.0])
ytl = [f"{10**lx:.0f}" for lx in yt]
cbar = fig.colorbar(
im,
ax=ax,
use_gridspec=True,
pad=0.09,
shrink=0.5,
orientation="vertical",
)
cbar.set_label(r"Image rms ($\mu$Jy/beam)", fontsize=F_SMALL)
cbar.set_ticks(yt)
cbar.set_ticklabels(ytl)
gal_col = "b"
overlay = ax.get_coords_overlay("galactic")
overlay.grid(color=gal_col, ls="dashed", alpha=0.6)
overlay[0].tick_params(colors=gal_col)
overlay[1].tick_params(colors=gal_col)
overlay[0].set_axislabel("Galactic long.", color=gal_col)
overlay[1].set_axislabel("Galactic lat.", color=gal_col)
ax.set_title("Image noise", loc="left", fontsize=F_MED)
return ax
[docs]
def calculate_area_correction_per_flux(
rms_image_path: Path, flux_bin_centre: np.ndarray, sigma: float = 5
) -> np.ndarray:
"""This derives a rough correction to the area term when calculating
the source counts. This is not intended to correct for completeness,
although they are closely related.
The RMS image is read in and a CDF is calculated. This is used to
return a scaling term that indicates what fraction a flux-bin was
`sigma` times the RMS, which could be used to scale the area.
Args:
rms_image_path (Path): RMS image that will be use to calculate the area correction
flux_bin_centre (np.ndarray): Fluxes to calculate the area fraction at
sigma (float, optional): Mutliplicate term indicating what sigma level source finding cropped at. Defaults to 5.
Returns:
np.ndarray: Fraction of the image that was available for source finding at a flux density
"""
# Filter out the data
rms_data = fits.getdata(rms_image_path)
assert rms_data is not None, f"{rms_data=}, which should not happen"
rms_data = rms_data[np.isfinite(rms_data)].flatten()
# Calculate the cumulative distribution and normalise it to 0..1
rms_hist, rms_bins = np.histogram(rms_data, bins=500)
rms_cdf = np.cumsum(rms_hist)
rms_cdf = rms_cdf / rms_cdf[-1]
assert np.isclose(rms_cdf[-1], 1), "Normalisation of the RMS CDF failed"
# Calculate the bin centers of the histogram / CDF
rms_bin_centre = 0.5 * (rms_bins[1:] + rms_bins[:-1])
# Now calculate the area available per flux bin that is available
# for source finding. Since the source finding is often performed
# with some flux / rms cut, the area available is a multiple of
# the rms. The scaling term here is a generic value that most
# pirates use.
area_fraction_per_bin = np.interp(
flux_bin_centre, rms_bin_centre * sigma, rms_cdf, left=0.0, right=1.0
)
return area_fraction_per_bin
[docs]
def get_source_counts(
fluxes: np.ndarray,
area: float,
minlogf: float = -4,
maxlogf: float = 2,
Nbins: int = 81,
rms_image_path: Path | None = None,
) -> SourceCounts:
"""Derive source counts for a set of fluxes and known area
Args:
fluxes (np.ndarray): The fluxes in Jy to count
area (float): Area over which the sources were collected
minlogf (float, optional): The minimum bin edge, in Jy. Defaults to -4.
maxlogf (float, optional): The maximum bin edgem, in Jy. Defaults to 2.
Nbins (int, optional): Number of bins to include in the source counts. Defaults to 81.
rms_image_path (Optional[Path], optional): Path to the RMS image. If not None, it is used to calculate a rough area correction. Defaults to None.
Returns:
SourceCounts: Source counts and their properties
"""
logger.info(
f"Computing source counts for {len(fluxes)} sources, {Nbins=}, {area=:.2f} sq.deg."
)
logf = np.linspace(minlogf, maxlogf, Nbins)
f = np.power(10.0, logf)
counts, bins = np.histogram(fluxes, f)
binsmid = 0.5 * ((bins[:-1]) + (bins[1:]))
ds = bins[1:] - bins[:-1]
area_sky_total = 360.0 * 360.0 / np.pi
solid_angle = 4 * np.pi * area / area_sky_total
counts_err = np.array(np.sqrt(counts), dtype=int)
scount = np.power(binsmid, 2.5) * counts / (ds * solid_angle)
scount_err = np.power(binsmid, 2.5) * 1.0 * counts_err / (ds * solid_angle)
area_fraction = (
calculate_area_correction_per_flux(
rms_image_path=rms_image_path, flux_bin_centre=binsmid, sigma=5
)
if rms_image_path
else None
)
source_counts = SourceCounts(
bins=bins,
bin_center=0.5 * (bins[1:] + bins[:-1]),
counts_per_bin=counts,
counts_per_bin_err=counts_err,
euclid_counts=scount,
euclid_counts_err=scount_err,
area=area,
area_fraction=area_fraction,
)
return source_counts
[docs]
def plot_source_counts(
catalogue: Table,
rms_info: RMSImageInfo,
ax: Axes,
freq: float | None = None,
dezotti: Table | None = None,
skads: Table | None = None,
) -> Axes:
"""Create a figure of source counts from a astropy Table. If
`freq` and either `dezotti` / `skads` are supplied then these
precomputed source count tables are also included in the
panel.
When computing the source counts for `catalogue`, only a
minimumal set of corrections are derived and applied.
Args:
catalogue (Table): The catalogue to derive source counts for
rms_info (RMSImageInfo): Look up information from the RMS file that catalogue was constructed against
ax (Axes): The axes panel the counts will be plottedd on
freq (Optional[float], optional): Frequency that the source catalogue. Used to scale the Dezotti and SKADS tables. Defaults to None.
dezotti (Optional[Table], optional): Loaded reference table of Dezotti source counts. Defaults to None.
skads (Optional[Table], optional): Loaded reference table of SKADS source counts. Defaults to None.
Returns:
Axes: The axes object used for plotting
"""
# TODO: Is the freq column needed anymore
fluxes = catalogue["int_flux"]
source_counts = get_source_counts(
fluxes=fluxes, area=rms_info.area, rms_image_path=rms_info.path
)
ax.errorbar(
source_counts.bin_center * 1e3,
source_counts.euclid_counts,
yerr=source_counts.euclid_counts_err,
fmt=".",
color="darkred",
label="Raw Component Catalogue",
)
# Since area_fraction could be a numpy array we have to
# explicitly test to ensure it is not None, otherwise it
# is ambiguous
if source_counts.area_fraction is not None:
ax.errorbar(
source_counts.bin_center * 1e3,
source_counts.euclid_counts / source_counts.area_fraction,
yerr=source_counts.euclid_counts_err / source_counts.area_fraction,
fmt="x",
color="darkred",
label="Area Corrected Catalogue",
)
if dezotti is not None and freq is not None:
spectral_index_scale = (freq / 1.4e9) ** -0.8
ax.errorbar(
np.power(10.0, dezotti["col1"]) * 1.0e3 * spectral_index_scale,
dezotti["col2"] * np.power(spectral_index_scale, 1.5),
yerr=[
dezotti["col4"] * np.power(spectral_index_scale, 1.5),
dezotti["col3"] * np.power(spectral_index_scale, 1.5),
],
fmt="d",
color="grey",
alpha=0.2,
label="de Zotti et al. 2010",
)
if skads is not None and freq is not None:
spectral_index_scale = (freq / 1.4e9) ** -0.8
ax.errorbar(
skads["fl_bin_mid"] * 1.0e3 * spectral_index_scale,
skads["SC"] * np.power(spectral_index_scale, 1.5),
yerr=skads["SC_err"] * np.power(spectral_index_scale, 1.5),
fmt=":",
color="k",
label="Wilman et al. 2008",
)
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("Flux Density (mJy)")
ax.set_ylabel(r"$\frac{dN}{dS} S^{\frac{5}{2}}$ (Jy$^{\frac{3}{2}}$ sr$^{-1}$)")
ax.set_title(f"{len(fluxes)} sources")
ax.set_xlim(0.2, 1.0e4)
ax.set_ylim(5.0e-2, 1.0e4)
ax.grid()
ax.legend(loc="lower right", fontsize=F_SMALL)
return ax
[docs]
def match_nearest_neighbour(
table1: Table,
table2: Table,
catalogue1: Catalogue,
catalogue2: Catalogue,
radius: float = 10,
) -> MatchResult:
"""Match two catalogues together, and construct common properties.
Args:
table1 (Table): The catalogue table from survey one
table2 (Table): The catalogue table from survey two
catalogue1 (Catalogue): Catalogue metadata for survey one
catalogue2 (Catalogue): Catalogue metadata for survey two
radius (float, optional): Maximum matching radius. Defaults to 10.
Returns:
MatchResult: Object containing source matches and common properties
"""
pos1 = SkyCoord(
table1[catalogue1.ra_col], table1[catalogue1.dec_col], unit="deg,deg"
)
pos2 = SkyCoord(
table2[catalogue2.ra_col], table2[catalogue2.dec_col], unit="deg,deg"
)
idx, sep, _ = pos1.match_to_catalog_sky(catalogcoord=pos2, nthneighbor=1)
mask = sep < (radius * u.arcsecond)
idx = idx[mask]
match_result = MatchResult(
name1=catalogue1.survey,
name2=catalogue2.survey,
pos1=pos1[mask],
pos2=pos2[idx],
freq1=catalogue1.freq,
freq2=catalogue2.freq,
idx1=np.argwhere(mask)[:, 0],
idx2=idx,
flux1=(
table1[catalogue1.flux_col].value[mask]
if catalogue1.flux_col != "None"
else None
),
flux2=(
table2[catalogue2.flux_col].value[idx]
if catalogue2.flux_col != "None"
else None
),
)
return match_result
[docs]
def plot_astrometry_comparison(
fig: Figure, ax: Axes, match_result: MatchResult
) -> Axes:
"""Plot the astrometry of cross matches from a match result set
Args:
fig (Figure): The figure canvas plotting on
ax (Axes): The Axes being plotted on
match_result (MatchResult): The set of sources cross-matched and found in common
Returns:
Axes: The Axes plotted on
"""
logger.info(
f"Plotting astrometry match between {match_result.name1} and {match_result.name2}"
)
if len(match_result.pos1) == 0:
ax.set_xlim(-8.0, 8.0)
ax.set_ylim(-8.0, 8.0)
ax.text(0.0, 0.0, f"No data for {match_result.name2}", va="center", ha="center")
return ax
pos1, pos2 = match_result.pos1, match_result.pos2
# Get the separations between two points, and their angle between them
seps = pos1.separation(pos2)
pas = pos1.position_angle(pos2)
# Compute the detlaRA and deltaDec. Straight subtraction is not correct
err_x = (seps * -np.sin(pas.radian)).to(u.arcsec).value
err_y = (seps * np.cos(pas.radian)).to(u.arcsec).value
mean_x, mean_y = err_x.mean(), err_y.mean()
std_x, std_y = err_x.std(), err_y.std()
ax.plot(err_x, err_y, ".r", zorder=0, ms=2)
ax.errorbar(mean_x, mean_y, xerr=std_x, yerr=std_y, fmt="ok")
ax.grid()
ax.axvline(0, color="black", ls=":")
ax.axhline(0, color="black", ls=":")
ax.text(
-7.0,
7.0,
match_result.name2,
fontsize=F_MED,
bbox=dict(facecolor="white", edgecolor="none", alpha=0.8),
)
ax.text(
-7.0,
7.0,
f"{match_result.name2} - {len(err_x)} matches",
fontsize=F_MED,
bbox=dict(facecolor="white", edgecolor="none", alpha=0.8),
)
ax.set(
xlim=(-8, 8), ylim=(-8, 8), xlabel="Offset (arcsec)", ylabel="Offset (arcsec)"
)
le_a1 = rf" $\epsilon_{{SU}} : ({{{mean_x:.1f}}}\pm{{{std_x:.1f}}},{{{mean_y:.1f}}}\pm{{{std_y:.1f}}})$"
ax.text(
7.6,
7.6,
le_a1,
va="top",
ha="right",
fontsize=F_MED,
bbox=dict(facecolor="white", edgecolor="none", alpha=0.8),
)
return ax
[docs]
def plot_flux_comparison(fig: Figure, ax: Axes, match_result: MatchResult) -> Axes:
"""Create a flux comparison plot showing the flux densities from two catalogues compared
to one another.
Args:
fig (Figure): The figure canvas that the axes is on
ax (Axes): The axes object that will be render the plot
match_result (MatchResult): A result set of the cross-match between two catalogues
Returns:
Axes: The aces object that was used for plotting
"""
if len(match_result.pos1) == 0:
ax.loglog([2.0, 2000.0], [2.0, 2000.0], "ow")
ax.text(1e0, 1e2, f"No data for {match_result.name2}", va="center", ha="center")
return ax
flux1 = match_result.flux1
flux2 = match_result.flux2
one2one = np.array([0.002, 8])
spectral_index_scale = (match_result.freq2 / match_result.freq1) ** -0.8
ax.loglog(flux1, flux2, "ok", ms=2)
ax.loglog(one2one, one2one, "-", c="r", label="Raw")
ax.loglog(one2one, one2one * spectral_index_scale, "--", c="r", label="Scaled")
ax.set(
ylabel=f"{match_result.name2} Integrated Flux (Jy)",
xlabel=f"{match_result.name1} Integrated Flux (Jy)",
)
ax.legend()
ax.grid()
return ax
[docs]
def scale_flux_alpha(
flux: float | np.ndarray,
freq: float | np.ndarray,
ref_freq: float,
alpha: float = -0.8,
) -> float | np.ndarray:
"""Scale a flux density to a reference frequency using a spectral index
Args:
flux (float): The flux density to scale
freq (float): The frequency of the flux density
ref_freq (float): The reference frequency to scale to
alpha (float, optional): The spectral index to use. Defaults to -0.8.
Returns:
float: The scaled flux density
"""
return flux * (freq / ref_freq) ** alpha
[docs]
def make_xmatch_table(
table1: Table,
table2: Table,
catalogue1: Catalogue,
catalogue2: Catalogue,
match_result: MatchResult,
output_path: Path,
) -> tuple[Table, Path]:
"""Create a simple cross match table between two catalogues
Args:
table1 (Table): The catalogue table from survey one
table2 (Table): The catalogue table from survey two
catalogue1 (Catalogue): Catalogue metadata for survey one
catalogue2 (Catalogue): Catalogue metadata for survey two
match_result (MatchResult): A result set of the cross-match between two catalogues
output_path (Path): Location to save the table to
Returns:
Tuple[Table, Path]: The table and the output path
"""
askap_flux = match_result.flux1
# Some catalogues (like the ICFS) do not have a flux column
external_flux = (
match_result.flux2
if match_result.flux2 is not None
else np.ones_like(match_result.idx2) * -1
)
askap_flux_scaled = scale_flux_alpha(
flux=askap_flux,
freq=match_result.freq1,
ref_freq=match_result.freq2,
)
xmatch_table = Table(
{
"Component_ID": table1[catalogue1.name_col][match_result.idx1],
"RA": table1[catalogue1.ra_col][match_result.idx1],
"DEC": table1[catalogue1.dec_col][match_result.idx1],
"ASKAP_flux": table1[catalogue1.flux_col][match_result.idx1],
"ASKAP_flux_SNR": table1[catalogue1.flux_col][match_result.idx1]
/ table1["local_rms"][match_result.idx1],
"RA_comp": table2[catalogue2.ra_col][match_result.idx2],
"DEC_comp": table2[catalogue2.dec_col][match_result.idx2],
"Comp_flux": external_flux,
"Flux_Ratio_AlphaCorr": askap_flux_scaled / external_flux,
"Flux_Ratio_NOAlphaCorr": askap_flux / external_flux,
}
)
suffix = f"{catalogue2.survey}_{catalogue1.survey}"
output_file = output_path / f"cat_match_{suffix}.csv"
xmatch_table.write(output_file, format="csv", overwrite=True)
return xmatch_table, output_file
[docs]
def plot_psf(fig: Figure, ax: Axes, rms_info: RMSImageInfo) -> Axes:
"""Create a plot highlighting the synthesised beam recorded in the
RMS image header
Args:
fig (Figure): Fogire canvas being used
ax (Axes): The axes object that will be used for plotting
rms_info (RMSImageInfo): Extracted information from the RMS image
Returns:
Axes: The aces object used for plotting
"""
bmaj = rms_info.header["BMAJ"] * 3600
bmin = rms_info.header["BMIN"] * 3600
ax.plot(bmin, bmaj, "ok", ms=3)
ax.set(
xlim=(5.0, 20.0),
ylim=(5.0, 40.0),
xlabel="PSF minor axis (arcsec)",
ylabel="PSF major axis (arcsec)",
)
ax.grid()
return ax
[docs]
def plot_field_info(
fig: Figure,
ax: Axes,
field_summary: FieldSummary,
rms_info: RMSImageInfo,
askap_table: Table,
) -> Axes:
# TODO: Add the field information to the plot
# Namely:
# - Field name
# - J2000 RA / Dec
# - Galactic l / b
# - SBID
# - CAL_SBID
# - Start time
# - Integration time
# - Hour angle range
# - Elevation range
# - Median rms
# - Number of sources
# - Processing date
hour_angles = field_summary.hour_angles
elevations = field_summary.elevations
pol_axis_str = (
f"{field_summary.pol_axis:.3f} rad" if field_summary.pol_axis else "Unknown"
)
field_text = "\n".join(
(
f"Field name: {field_summary.field_name}",
f"- J2000 RA / Dec : {rms_info.centre.icrs.to_string(style='hmsdms', precision=1)}", # type: ignore
f"- Galactic l / b : {rms_info.centre.galactic.to_string(style='decimal')}", # type: ignore
f"- SBID : {field_summary.sbid}",
f"- CAL_SBID : {field_summary.cal_sbid}",
f"- Holorgraphy file : {field_summary.holography_path.name if field_summary.holography_path else 'N/A'}",
f"- Start time : {field_summary.ms_times[0].utc.fits}", # type: ignore
f"- Integration time : {field_summary.integration_time * u.second:latex_inline}", # type: ignore
f"- Hour angle range : {hour_angles.min().to_string(precision=2, format='latex_inline')} - {hour_angles.max().to_string(precision=2, format='latex_inline')}", # type: ignore
f"- Elevation range : {elevations.min().to_string(precision=2, format='latex_inline')} - {elevations.max().to_string(precision=2, format='latex_inline')}", # type: ignore
f"- Median rms uJy : {rms_info.median * 1e6:.1f}",
f"- Components : {len(askap_table)}",
f"- Processing date : {Time.now().fits}",
f"- Pol. axis : {pol_axis_str}",
)
)
props = dict(boxstyle="square", facecolor="none", edgecolor="tab:red", pad=2)
ax.text(
0.0,
0.0,
field_text,
family="monospace",
fontdict={"fontsize": F_LARGE},
wrap=True,
bbox=props,
ha="left",
)
return ax
[docs]
def load_catalogues(
source_catalogue_path: Path,
reference_catalogue_directory: Path,
askap_survey_name: str,
rms_info: RMSImageInfo,
) -> tuple[ValidationCatalogues, Tables]:
"""Load in all the catalogues that are required for the validation.
Args:
source_catalogue_path (Path): The source catalogue to load
reference_catalogue_directory (Path): The directory that contains the reference ICRF, NVSS and SUMSS catalogues.
askap_survey_name (str): The name that will be given to the ASKAP field data
rms_info (RMSImageInfo): The extracted information from the RMS image
Returns:
Tuple[ValidationCatalogues, Tables]: The loaded catalogues and tables
"""
logger.info(f"Loading {source_catalogue_path=}")
askap_table = Table.read(source_catalogue_path)
askap_cata = Catalogue(
survey=askap_survey_name,
file_name=source_catalogue_path.name,
freq=rms_info.header["CRVAL3"],
ra_col="ra",
dec_col="dec",
name_col="source",
flux_col="int_flux",
maj_col="a",
min_col="b",
pa_col="pa",
)
icrf_table, icrf_catalogue = load_known_catalogue(
name="ICRF", reference_catalogue_directory=reference_catalogue_directory
)
sumss_table, sumss_catalogue = load_known_catalogue(
name="SUMSS", reference_catalogue_directory=reference_catalogue_directory
)
nvss_table, nvss_catalogue = load_known_catalogue(
name="NVSS", reference_catalogue_directory=reference_catalogue_directory
)
return (
ValidationCatalogues(
askap=askap_cata,
icrf=icrf_catalogue,
sumss=sumss_catalogue,
nvss=nvss_catalogue,
),
Tables(
askap=askap_table,
icrf=icrf_table,
sumss=sumss_table,
nvss=nvss_table,
),
)
[docs]
def make_field_stats_table(
field_summary: FieldSummary,
rms_info: RMSImageInfo,
output_path: Path,
) -> Path:
# Columns are:
# FLD_NAME,SBID,NPIXV,MINIMUM,MAXIMUM,MED_RMS_uJy,MODE_RMS_uJy,STD_RMS_uJy,MIN_RMS_uJy,MAX_RMS_uJy
# TODO: Get the min / max from the image (not rms)
if field_summary.linmos_image is not None:
with fits.open(field_summary.linmos_image) as linmos_image:
image_data = linmos_image[0].data
min_image_data = np.nanmin(image_data)
max_image_data = np.nanmax(image_data)
else:
logger.warning("The linmos image in the provided field summary is empty.")
min_image_data = np.nan
max_image_data = np.nan
stats_table = Table(
{
"FLD_NAME": [field_summary.field_name],
"SBID": [field_summary.sbid],
"NPIXV": [rms_info.no_valid_pixels],
"MINIMUM": [min_image_data],
"MAXIMUM": [max_image_data],
"MED_RMS_uJy": [rms_info.median * 1e6],
"MODE_RMS_uJy": [rms_info.mode * 1e6],
"STD_RMS_uJy": [rms_info.std * 1e6],
"MIN_RMS_uJy": [rms_info.minimum * 1e6],
"MAX_RMS_uJy": [rms_info.maximum * 1e6],
}
)
# statistics_{SBID}-{FIELD_NAME}.csv
outfile = (
output_path / f"statistics_{field_summary.sbid}-{field_summary.field_name}.csv"
)
stats_table.write(outfile, format="csv", overwrite=True)
return outfile
[docs]
class PSFTableRow(NamedTuple):
"""Something that is only used to internally
represent the PSF information of a beam image.
Currently no reason to use this.
"""
[docs]
def _make_beam_psf_row(beam_summary: BeamSummary) -> PSFTableRow:
"""Collects the information required of a single beam image
and measurement set for entry into the PSF table.
Not intended for usage other than int he creation of the PSF table.
Args:
beam_summary (BeamSummary): The input set of collected properties
Returns:
PSFTableRow: Extracted information
"""
name_components = processed_ms_format(in_name=beam_summary.ms_summary.path)
vis_total = beam_summary.ms_summary.flagged + beam_summary.ms_summary.unflagged
vis_flagged = beam_summary.ms_summary.flagged
assert beam_summary.image_set is not None, (
f"{beam_summary.image_set=}, which should not happen"
)
image_file = list(beam_summary.image_set.image)[-1]
with fits.open(image_file) as image:
bmaj = image[0].header["BMAJ"] # type: ignore
bmin = image[0].header["BMIN"] # type: ignore
bpa = image[0].header["BPA"] # type: ignore
coord = estimate_image_centre(image_path=image_file)
assert name_components is not None, f"{name_components=}, which should not happen"
return PSFTableRow(
beam=int(name_components.beam) if name_components.beam else -999,
vis_flagged=vis_flagged,
vis_total=vis_total,
image_name=image_file.name,
bmaj=bmaj,
bmin=bmin,
bpa=bpa,
ra=coord.ra.deg,
dec=coord.dec.deg,
l=coord.galactic.l.deg,
b=coord.galactic.b.deg,
)
[docs]
def make_psf_table(field_summary: FieldSummary, output_path: Path) -> Path:
# TODO: This will likely need changes to the
# FieldSummary structure to handle holding MSSummary objects
# Columns are:
# BEAM_NUM,BEAM_TIME,RA_DEG,DEC_DEG,GAL_LONG,GAL_LAT,PSF_MAJOR,PSF_MINOR,PSF_ANGLE,VIS_TOTAL,VIS_FLAGGED
assert field_summary.beam_summaries is not None, (
f"{field_summary.beam_summaries=}, which should not happen"
)
psf_table_rows = [
_make_beam_psf_row(beam_summary=beam_summary)
for beam_summary in field_summary.beam_summaries
]
psf_table = Table(
{
"BEAM_NUM": [row.beam for row in psf_table_rows],
"VIS_TOTAL": [row.vis_total for row in psf_table_rows],
"VIS_FLAGGED": [row.vis_total for row in psf_table_rows],
"IMAGE_NAME": [row.image_name for row in psf_table_rows],
"PSF_MAJOR": [row.bmaj for row in psf_table_rows],
"PSF_MINOR": [row.bmin for row in psf_table_rows],
"PSF_ANGLE": [row.bpa for row in psf_table_rows],
}
)
psf_table.sort("BEAM_NUM")
# beam_inf_{SBID}-{FIELD_NAME}.csv
outfile = (
output_path / f"beam_inf_{field_summary.sbid}-{field_summary.field_name}.csv"
)
psf_table.write(outfile, format="csv", overwrite=True)
return outfile
[docs]
def create_validation_tables(
field_summary: FieldSummary,
rms_image_path: Path,
source_catalogue_path: Path,
output_path: Path,
reference_catalogue_directory: Path,
) -> ValidationTables:
"""Create a set of validation tables that can be used to assess the
correctness of an image and associated source catalogue.
Args:
field_summary (FieldSummary): A description of the key properties of the field
rms_image_path (Path): The RMS fits image the source catalogue was constructed against.
source_catalogue_path (Path): The source catalogue.
output_path (Path): The output path of the figure to create
reference_catalogue_directory (Path): The directory that contains the reference catalogues installed
Returns:
ValidationTables: The tables that were created
"""
logger.info("Creating validation tables")
rms_info = get_rms_image_info(rms_path=rms_image_path)
askap_survey_name = f"{field_summary.sbid}-{field_summary.field_name}"
# Load in the catalogues
catalogues, tables = load_catalogues(
source_catalogue_path=source_catalogue_path,
reference_catalogue_directory=reference_catalogue_directory,
askap_survey_name=askap_survey_name,
rms_info=rms_info,
)
# Loop over all the catalogues and cross match them to the ASKAP catalogue
_tables: dict[str, Table] = {}
for survey in catalogues._fields:
if survey == "askap":
continue
# The index method of a named tuple is the same as tuple[i], where
# i is an int. This is a round about way of getting the key name
# from a named attributed stored in a parameter
survey_catalogue = catalogues.__getattribute__(survey)
survey_table = tables.__getattribute__(survey)
if survey_catalogue is None:
continue
logger.info(f"Processing {survey=}")
match_result = match_nearest_neighbour(
table1=tables.askap,
table2=survey_table,
catalogue1=catalogues.askap,
catalogue2=survey_catalogue,
)
xmatch_table, xmatch_file = make_xmatch_table(
table1=tables.askap,
table2=survey_table,
catalogue1=catalogues.askap,
catalogue2=survey_catalogue,
match_result=match_result,
output_path=output_path,
)
_tables[survey] = xmatch_file
xmatch_tables = XMatchTables(
**_tables,
)
stats_table = make_field_stats_table(
field_summary=field_summary,
rms_info=rms_info,
output_path=output_path,
)
psf_table = make_psf_table(
field_summary=field_summary,
output_path=output_path,
)
return ValidationTables(
xmatch_tables=xmatch_tables,
stats_table_path=stats_table,
psf_table_path=psf_table,
)
[docs]
def create_validation_plot(
field_summary: FieldSummary,
rms_image_path: Path,
source_catalogue_path: Path,
output_path: Path,
reference_catalogue_directory: Path,
) -> Path:
"""Create a simple multi-panel validation figure intended to asses
the correctness of an image and associated source catalogue.
The image described by `rms_image_path` should be a FITS file. The
WCS of this file is used for plotting and rreading the synthesised
beam information using the standard CRVAL/BMAJ/BMIN keywords.
The source catalogue is read using astropy.table.Table. This routine
also expects that some level of units are embedded in the catalogue.
For Aegean produced catalogues this is the case.
The reference_catalogue_directory sets the directory to look into when
searching for the reference ICRF, NVSS and SUMSS cataloues.
Args:
field_summary (FieldSummary): A description of the key properties of the field
rms_image_path (Path): The RMS fits image the source catalogue was constructed against.
source_catalogue_path (Path): The source catalogue.
output_path (Path): The output path of the figure to create
reference_catalogue_directory (Path): The directory that contains the reference ICRF, NVSS and SUMSS catalogues.
Returns:
Path: The output path of the figure
"""
logger.info("Creating validation plot")
rms_info = get_rms_image_info(rms_path=rms_image_path)
dezotti_path = get_packaged_resource_path(
package="flint.data.source_counts", filename="de_zotti_1p4.txt"
)
skads_path = get_packaged_resource_path(
package="flint.data.source_counts", filename="SKADS_1p4GHz.fits"
)
logger.info(f"Loading {dezotti_path}")
dezotti = Table.read(dezotti_path, format="ascii")
logger.info(f"Loading {skads_path=}")
skads = Table.read(skads_path)
askap_survey_name = f"{field_summary.sbid}-{field_summary.field_name}"
# Load in the catalogues
catalogues, tables = load_catalogues(
source_catalogue_path=source_catalogue_path,
reference_catalogue_directory=reference_catalogue_directory,
askap_survey_name=askap_survey_name,
rms_info=rms_info,
)
height = 16.0
width = height * np.sqrt(3.0)
fig = plt.figure(figsize=(width, height))
validator_layout = make_validator_axes_layout(fig=fig, rms_path=rms_image_path)
plot_source_counts(
catalogue=tables.askap,
rms_info=rms_info,
ax=validator_layout.ax_counts,
freq=rms_info.header["CRVAL3"],
dezotti=dezotti,
skads=skads,
)
askap_sources = SkyCoord(
tables.askap[catalogues.askap.ra_col],
tables.askap[catalogues.askap.dec_col],
unit="deg,deg",
)
plot_rms_map(
fig=fig,
ax=validator_layout.ax_rms,
rms_path=rms_info.path,
source_positions=askap_sources,
)
plot_psf(fig=fig, ax=validator_layout.ax_psf, rms_info=rms_info)
ierf_match = match_nearest_neighbour(
table1=tables.askap,
table2=tables.icrf,
catalogue1=catalogues.askap,
catalogue2=catalogues.icrf,
)
plot_astrometry_comparison(
fig=fig, ax=validator_layout.ax_astrometry, match_result=ierf_match
)
sumss_match = match_nearest_neighbour(
table1=tables.askap,
table2=tables.sumss,
catalogue1=catalogues.askap,
catalogue2=catalogues.sumss,
)
plot_astrometry_comparison(
fig=fig, ax=validator_layout.ax_astrometry1, match_result=sumss_match
)
plot_flux_comparison(
fig=fig, ax=validator_layout.ax_brightness1, match_result=sumss_match
)
nvss_match = match_nearest_neighbour(
table1=tables.askap,
table2=tables.nvss,
catalogue1=catalogues.askap,
catalogue2=catalogues.nvss,
)
plot_astrometry_comparison(
fig=fig, ax=validator_layout.ax_astrometry2, match_result=nvss_match
)
plot_flux_comparison(
fig=fig, ax=validator_layout.ax_brightness2, match_result=nvss_match
)
plot_field_info(
fig=fig,
ax=validator_layout.ax_legend,
field_summary=field_summary,
rms_info=rms_info,
askap_table=tables.askap,
)
plot_flag_summary(
fig=fig,
ax=validator_layout.ax_flag_summary,
field_summary=field_summary,
)
fig.suptitle(
rf"$\it{{Flint}}$ summary for Field: {field_summary.field_name} - SBID: {field_summary.sbid} - Round: {field_summary.round}",
fontsize=F_HUGE,
)
fig.tight_layout()
output_file = output_path / f"validation_{askap_survey_name}.png"
fig.savefig(str(output_file), dpi=300, bbox_inches="tight")
return output_file
[docs]
def get_parser() -> ArgumentParser:
"""Create the argument parser for the validation plot creation
Returns:
ArgumentParser: CLI entry point
"""
parser = ArgumentParser(
description="Create a validation figure to highlight the reliability of some process continuum field"
)
parser.add_argument(
"processed_ms_paths",
nargs="+",
type=Path,
help="Paths to the processed MeasurementSets.",
)
parser.add_argument(
"rms_image_path", type=Path, help="Path to the RMS image of the field"
)
parser.add_argument(
"source_catalogue_path",
type=Path,
help="Path to the source catalogue. At present on Aegean formatted component catalogues supported. ",
)
parser.add_argument(
"output_path", type=Path, help="Directory to place the output files. "
)
parser.add_argument(
"--reference-catalogue-directory",
type=Path,
default=Path("."),
help="Directory container the reference ICFS, NVSS and SUMSS catalogues. These are known catalogues and expect particular file names. ",
)
parser.add_argument(
"--bandpass-directory",
type=Path,
default=None,
help="Path to the directory containing the bandpass measurement sets and solutions",
)
parser.add_argument(
"--holography-path",
type=Path,
default=None,
help="Path to the holography fits cube used to form the linmos image",
)
return parser
[docs]
def cli() -> None:
"""CLI entry point for validation plot creation"""
parser = get_parser()
args = parser.parse_args()
from astropy.io import fits
from flint.source_finding.aegean import AegeanOutputs
from flint.summary import create_field_summary
rms_image_path = args.rms_image_path
try:
rms_header = fits.getheader(rms_image_path)
rms_beam = (
rms_header["BMAJ"],
rms_header["BMIN"],
rms_header["BPA"],
)
except KeyError:
rms_beam = (1.0, 1.0, 1.0)
logger.warning(
f"Beam keywords not found in {rms_image_path=}. Setting to default {rms_beam}"
)
aegean_outputs = AegeanOutputs(
bkg=rms_image_path,
rms=rms_image_path,
comp=args.source_catalogue_path,
beam_shape=rms_beam,
image=rms_image_path,
)
field_summary = create_field_summary(
mss=args.processed_ms_paths,
aegean_outputs=aegean_outputs,
holography_path=args.holography_path,
cal_sbid_path=args.bandpass_directory,
)
create_validation_plot(
field_summary=field_summary,
rms_image_path=args.rms_image_path,
source_catalogue_path=args.source_catalogue_path,
output_path=args.output_path,
reference_catalogue_directory=args.reference_catalogue_directory,
)
validation_tables = create_validation_tables(
field_summary=field_summary,
rms_image_path=args.rms_image_path,
source_catalogue_path=args.source_catalogue_path,
output_path=args.output_path,
reference_catalogue_directory=args.reference_catalogue_directory,
)
logger.info(f"\n{validation_tables}")
if __name__ == "__main__":
cli()