Source code for flint.validation

"""Utility function to create a one figure multi panel validation plot
for continuum imaging of RACS data
"""

from __future__ import annotations

from argparse import ArgumentParser
from pathlib import Path
from typing import NamedTuple

import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from astropy.wcs import WCS
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from scipy import stats

from flint.catalogue import Catalogue, get_reference_catalogue
from flint.logging import logger
from flint.naming import processed_ms_format
from flint.summary import BeamSummary, FieldSummary
from flint.utils import estimate_image_centre, get_packaged_resource_path

[docs] F_SMALL = 7
[docs] F_MED = 8
[docs] F_LARGE = 12
[docs] F_HUGE = 20
[docs] MAX_SOURCES_TO_PLOT = 10000
[docs] class ValidationCatalogues(NamedTuple): """Container for all the catalogues that are loaded in and used throughout validation processing"""
[docs] nvss: Catalogue
"""NVSS catalogue"""
[docs] sumss: Catalogue
"""SUMSS catalogue"""
[docs] icrf: Catalogue
"""ICRF catalogue"""
[docs] askap: Catalogue
"""ASKAP catalogue"""
[docs] racs_high: Catalogue | None = None
"""RACS high catalogue"""
[docs] racs_mid: Catalogue | None = None
"""RACS mid catalogue"""
[docs] tgss: Catalogue | None = None
"""TGSS catalogue"""
[docs] vlass: Catalogue | None = None
"""VLASS catalogue"""
[docs] class Tables(NamedTuple): """Container for all the tables that are loaded in"""
[docs] nvss: Table
"""NVSS catalogue"""
[docs] sumss: Table
"""SUMSS catalogue"""
[docs] icrf: Table
"""ICRF catalogue"""
[docs] askap: Table
"""ASKAP catalogue"""
[docs] racs_high: Table | None = None
"""RACS high catalogue"""
[docs] racs_mid: Table | None = None
"""RACS mid catalogue"""
[docs] tgss: Table | None = None
"""TGSS catalogue"""
[docs] vlass: Table | None = None
"""VLASS catalogue"""
[docs] class XMatchTables(NamedTuple): """Container for all the cross matched tables"""
[docs] nvss: Path
"""NVSS catalogue"""
[docs] sumss: Path
"""SUMSS catalogue"""
[docs] icrf: Path
"""ICRF catalogue"""
[docs] racs_high: Path | None = None
"""RACS high catalogue"""
[docs] racs_mid: Path | None = None
"""RACS mid catalogue"""
[docs] tgss: Path | None = None
"""TGSS catalogue"""
[docs] vlass: Path | None = None
"""VLASS catalogue"""
[docs] class ValidationTables(NamedTuple): """Container for all the tables that are generated by the validation routine"""
[docs] psf_table_path: Path
"""Path to the PSF table"""
[docs] stats_table_path: Path
"""Path to the statistics table"""
[docs] xmatch_tables: XMatchTables
"""Cross-matched tables"""
[docs] class ValidatorLayout(NamedTuple): """Simple container for all the matplotlib axes objects"""
[docs] ax_rms: Axes
"""Axes for the RMS of the field"""
[docs] ax_legend: Axes
"""Container for basic SBID information"""
[docs] ax_psf: Axes
"""Axes for the PSF of the image"""
[docs] ax_counts: Axes
"""Axes for quick look source counts"""
[docs] ax_brightness1: Axes
"""Axes to compare brightness of sources from the first catalogue"""
[docs] ax_brightness2: Axes
"""Axes to compare brightness of sources from the first catalogue"""
[docs] ax_astrometry: Axes
"""Axes to compare astrometry of sources"""
[docs] ax_astrometry1: Axes
"""Axes to compare astrometry of sources from the first catalogue"""
[docs] ax_astrometry2: Axes
"""Axes to compare astromnetry of sources from the first catalogue"""
[docs] ax_flag_summary: Axes
"""Axes to summarise the flagging of the MS"""
[docs] class RMSImageInfo(NamedTuple): """Class to hold basic RMS Image information, excluding the actual raw image data"""
[docs] path: Path
"""Path to the RMS fits image"""
[docs] header: fits.Header
"""Header from the FITS image"""
[docs] shape: tuple[int, int]
"""Dimension of the image"""
[docs] no_valid_pixels: int
"""Number of valid pixels in the image"""
[docs] area: float
"""The area of valid sky, in degrees squared"""
[docs] wcs: WCS
"""The WCS of the image"""
[docs] centre: SkyCoord
"""The centre of the image"""
[docs] median: float
"""The median value of the image"""
[docs] minimum: float
"""The minimum value of the image"""
[docs] maximum: float
"""The maximum value of the image"""
[docs] std: float
"""The standard deviation of the image"""
[docs] mode: float
"""The mode of the image"""
[docs] class SourceCounts(NamedTuple): """A small container to pass around source count information"""
[docs] bins: np.ndarray
"""The bin edges of the source counts in Jy"""
[docs] bin_center: np.ndarray
"""The bin centers to use in Jy"""
[docs] counts_per_bin: np.ndarray
"""The counts of sources per flux bin"""
[docs] counts_per_bin_err: np.ndarray
"""The rough estimate on the errors per bin"""
[docs] euclid_counts: np.ndarray
"""Euclidean normalised source counts"""
[docs] euclid_counts_err: np.ndarray
"""Rough estimate of error on the euclidean normalised source counts"""
[docs] area: float
"""The area in square degrees that the sources cover, i.e. image footprint sky-area"""
[docs] area_fraction: np.ndarray | None = None
"""The fraction of the image that was above a sigma level per flux bin. This may be used as a rough term to scale the Euclidean Normalised source counts. This is not intended to be a robust way of correcting the source counts - just quick"""
[docs] class MatchResult(NamedTuple): """Simple container to hold results of a quick cross match"""
[docs] name1: str
"""Name of the first survey"""
[docs] name2: str
"""Name of the second survey"""
[docs] pos1: SkyCoord
"""Sky-positions of sources in the first survey"""
[docs] pos2: SkyCoord
"""Sky-positions of sources in the second survey"""
[docs] freq1: float
"""Frequency in Hz of the first survey"""
[docs] freq2: float
"""Frequency in Hz of the second survey"""
[docs] idx1: np.ndarray
"""The indices of the matched sources from the original input table 1"""
[docs] idx2: np.ndarray
"""The indices of the matched sources from the original input table 2"""
[docs] flux1: np.ndarray | None = None
"""Brightness in Jy of source in the first survey"""
[docs] flux2: np.ndarray | None = None
"""Brightness in Jy of source in the second survey"""
[docs] def extract_inner_image_array_region(image: np.ndarray, fraction: float) -> np.ndarray: """Extract an inner region of an image array. The size of the extracted array is specified via the `fraction` parameter, which should be in the range of 0 to 1. Args: image (np.ndarray): The image to extract pixels from fraction (float): The size of the region to extraction. Returns: np.ndarray: The extracted sub-region """ assert 0.0 < fraction <= 1.0, f"{fraction=}, but has to be between 0.0 to 1.0" image_shape = image.shape yx_shape = np.array(image_shape[-2:]) yx_center = np.ceil(yx_shape / 2).astype(int) box_size = (yx_shape * fraction).astype(int) box_delta = np.ceil(box_size / 2).astype(int) min_y = max(0, yx_center[0] - box_delta[0]) max_y = min(yx_shape[0], yx_center[0] + box_delta[0]) min_x = max(0, yx_center[1] - box_delta[1]) max_x = min(yx_shape[1], yx_center[1] + box_delta[1]) sub_image_array = image[..., min_y:max_y, min_x:max_x] return sub_image_array
[docs] def get_known_catalogue_info(name: str) -> Catalogue: """Return the parameters of a recognised catalogue. These are currently hardcoded. The structure returneed outlines the name of columns of interest: - RA - Dec - Integrated flux - Major / Minor / PA Args: name (str): The survey name of interest Raises: ValueError: Raised when an unrecongised catalogue is provided Returns: Catalogue: Information of the survey catalogue """ # TODO: This should be expanded to include the units as well # TODO: Catalogues here need to be packaged somehow name = name.upper() if name == "NVSS": catalogue = Catalogue( survey="NVSS", file_name="VIII_65_nvss.dat_CH_2.fits", name_col="NVSS", freq=1.4e9, ra_col="RA", dec_col="Dec", flux_col="S1.4", maj_col="MajAxis", min_col="MinAxis", pa_col="PA", ) elif name == "SUMSS": catalogue = Catalogue( survey="SUMSS", file_name="sumsscat.Mar-11-2008_CLH.fits", freq=8.43e8, ra_col="RA", dec_col="DEC", name_col="Mosaic", flux_col="St", maj_col="dMajAxis", min_col="dMinAxis", pa_col="dPA", ) elif name == "ICRF": catalogue = Catalogue( survey="ICRF", file_name="icrf.csv", freq=1e9, ra_col="RA", dec_col="Dec", name_col="IERS Des.", flux_col="None", maj_col="None", min_col="None", pa_col="None", ) else: raise ValueError(f"Unknown catalogue {name}") return catalogue
[docs] def load_known_catalogue( name: str, reference_catalogue_directory: Path ) -> tuple[Table, Catalogue]: """Load in a known catalogue table Args: name (str): Name of the survey to load reference_catalogue_directory (Path): The directory location with the reference catalogues installed Returns: Tuple[Table,Catalogue]: The loaded table and Catalogue structure describing the columns """ table, catalogue = get_reference_catalogue( reference_directory=reference_catalogue_directory, survey=name ) return table, catalogue
[docs] def get_rms_image_info(rms_path: Path, extract_fraction: float = 0.5) -> RMSImageInfo: """Extract information about the RMS image and construct a representative structure When computing the pixel statistics an inner region of the RMS map is extracted to avoid the primary beam roll off. Other properties of the ``RMSImageInfo`` are computed on the whole image (e.g. area) Args: rms_path (Path): The RMS image that will be presented extract_fraction (float, optional): Extract the inner region of the base RMS image map. Defaults to 0.5. Returns: RMSImageInfo: Extracted RMS image information """ with fits.open(rms_path) as rms_image: rms_header = rms_image[0].header # type: ignore rms_data = rms_image[0].data # type: ignore logger.info(f"Extracting inner region of {extract_fraction=} for RMS statistics") sub_rms_data = extract_inner_image_array_region( image=rms_data, fraction=extract_fraction ) valid_pixels = np.sum(np.isfinite(rms_data) & (rms_data != 0)) ra_cell_size = rms_header["CDELT1"] dec_cell_size = rms_header["CDELT2"] pixel_area = np.abs(ra_cell_size * dec_cell_size) area = pixel_area * valid_pixels centre_sky = estimate_image_centre(image_path=rms_path) wcs = WCS(fits.getheader(rms_path)) rms_image_info = RMSImageInfo( path=rms_path, header=rms_header, shape=rms_data.shape, no_valid_pixels=valid_pixels, area=area, wcs=wcs, centre=centre_sky, median=float(np.nanmedian(sub_rms_data)), minimum=float(np.nanmin(sub_rms_data)), maximum=float(np.nanmax(sub_rms_data)), std=float(np.nanstd(sub_rms_data)), mode=stats.mode(sub_rms_data, keepdims=False).mode[0], ) return rms_image_info
[docs] def make_validator_axes_layout(fig: Figure, rms_path: Path) -> ValidatorLayout: """Create the figure layout to use for the quick look validation plot. Args: fig (Figure): The figure canvas to add the axes to rms_path (Path): Path to the RMS image that will be presented. Loaded to access the WCS Returns: ValidatorLayout: Representation of all axes objects """ # My friendship with gridspec is over, now subplot_mosaic is my best friend # Using the following encoding: # T = text / field info # F = flux / source counts # A = astrometry # P = PSF # B = BANE noise map # S = SUMMS flux comparison # s = SUMMS astrometry comparison # N = NVSS flux comparison # n = NVSS astrometry comparison # f = Flagging summary rms_wcs = WCS(fits.getheader(rms_path)).celestial ax_dict = fig.subplot_mosaic( """ TFPA BBSs BBNn ffff """, per_subplot_kw={ # type: ignore "B": {"projection": rms_wcs}, }, subplot_kw={ "aspect": "equal", }, ) for spine in ax_dict["T"].spines.values(): spine.set_edgecolor("none") _ = ax_dict["T"].axes.yaxis.set_visible(False) _ = ax_dict["T"].axes.xaxis.set_visible(False) # Set the axes that are shared _ = ax_dict["N"].sharex(ax_dict["S"]) _ = ax_dict["n"].sharex(ax_dict["s"]) validator_layout = ValidatorLayout( ax_rms=ax_dict["B"], ax_legend=ax_dict["T"], ax_psf=ax_dict["P"], ax_counts=ax_dict["F"], ax_brightness1=ax_dict["N"], ax_brightness2=ax_dict["S"], ax_astrometry=ax_dict["A"], ax_astrometry1=ax_dict["n"], ax_astrometry2=ax_dict["s"], ax_flag_summary=ax_dict["f"], ) return validator_layout
[docs] def plot_flag_summary( fig: Figure, ax: Axes, field_summary: FieldSummary, ) -> Axes: """Plot the percentage of the spectrum that is flagged for each measurement set Args: fig (Figure): The figure canvas that is being plotted to ax (Axes): The axes object that is being plotted to field_summary (FieldSummary): An active field summary object with the collection of MSSummary structures Returns: Axes: The axes object with the plotted RMS image """ if field_summary.ms_summaries is None: logger.info("No measurement summaries to plot") ax.text( 0.5, 0.5, "No data to plot", va="center", ha="center", transform=ax.transAxes, ) return ax ms_summaries = sorted( [mss for mss in field_summary.ms_summaries], key=lambda x: x.beam ) accumulated_flags = 0 accumulated_samples = 0 for ms_summary in ms_summaries: flag_spectrum = ms_summary.flag_spectrum accumulated_flags += np.sum(np.nan_to_num(flag_spectrum)) accumulated_samples += len(flag_spectrum) ax.plot( np.arange(len(flag_spectrum)), flag_spectrum, label=f"{ms_summary.beam:02d}", lw=0.5, ) percent_sbid_flagged = accumulated_flags / accumulated_samples * 100.0 ax.legend(ncols=18, title="Beam Number", loc="lower left") ax.grid() ax.axhline(0.0, color="black") ax.set( xlabel="Channel", ylabel="Flagged fraction", ylim=(-0.35, 1.1), aspect="auto", title=f"Flagging summary - {percent_sbid_flagged:.2f}% of {field_summary.sbid} flagged", ) return ax
[docs] def plot_rms_map( fig: Figure, ax: Axes, rms_path: Path, source_positions: SkyCoord | None = None ) -> Axes: """Add the RMS image to the figure Args: fig (Figure): Figure that contains the axes object ax (Axes): The axes that will be plotted rms_path (Path): Location of the RMS image source_positions (Optional[SkyCoord], optionals): Sources in the ASKAP catalogue to overlay onto the RMS image. Defaults to None. Returns: Axes: The axes object with the plotted RMS image """ # Convert Jy/beam to uJy/beam rms_data = fits.getdata(rms_path) * 1e6 # type: ignore # Pirate no believes get below 10uJy/beam mateeey. Percentile a little more # robust to outliers but much of a much floor: float = max(1, np.floor(np.log10(np.nanpercentile(rms_data, 16)))) # type: ignore im = ax.imshow( np.log10(rms_data), vmin=floor, vmax=floor + 1.0, origin="lower", cmap="YlOrRd", ) if source_positions is not None: # If there are too many sources on the RMS map nothing can be seen if len(source_positions) > MAX_SOURCES_TO_PLOT: logger.info( f"{len(source_positions)} sources, too many to plot. Skipping. " ) else: ax.scatter( source_positions.ra, # type: ignore source_positions.dec, # type: ignore marker="o", edgecolor="black", facecolor="none", s=0.1, transform=ax.get_transform("fk5"), # type: ignore ) ax.grid(color="0.5", ls="solid") ax.set_xlabel("RA (J2000)") ax.set_ylabel("Dec (J2000)") # These offsets are pretty and correspond to 10, 20, 30,m 50, 100 where floor is 10 uJy/bea, yt = floor + np.array([0.0, 0.30103, 0.47712125, 0.69897, 1.0]) ytl = [f"{10**lx:.0f}" for lx in yt] cbar = fig.colorbar( im, ax=ax, use_gridspec=True, pad=0.09, shrink=0.5, orientation="vertical", ) cbar.set_label(r"Image rms ($\mu$Jy/beam)", fontsize=F_SMALL) cbar.set_ticks(yt) cbar.set_ticklabels(ytl) gal_col = "b" overlay = ax.get_coords_overlay("galactic") overlay.grid(color=gal_col, ls="dashed", alpha=0.6) overlay[0].tick_params(colors=gal_col) overlay[1].tick_params(colors=gal_col) overlay[0].set_axislabel("Galactic long.", color=gal_col) overlay[1].set_axislabel("Galactic lat.", color=gal_col) ax.set_title("Image noise", loc="left", fontsize=F_MED) return ax
[docs] def calculate_area_correction_per_flux( rms_image_path: Path, flux_bin_centre: np.ndarray, sigma: float = 5 ) -> np.ndarray: """This derives a rough correction to the area term when calculating the source counts. This is not intended to correct for completeness, although they are closely related. The RMS image is read in and a CDF is calculated. This is used to return a scaling term that indicates what fraction a flux-bin was `sigma` times the RMS, which could be used to scale the area. Args: rms_image_path (Path): RMS image that will be use to calculate the area correction flux_bin_centre (np.ndarray): Fluxes to calculate the area fraction at sigma (float, optional): Mutliplicate term indicating what sigma level source finding cropped at. Defaults to 5. Returns: np.ndarray: Fraction of the image that was available for source finding at a flux density """ # Filter out the data rms_data = fits.getdata(rms_image_path) assert rms_data is not None, f"{rms_data=}, which should not happen" rms_data = rms_data[np.isfinite(rms_data)].flatten() # Calculate the cumulative distribution and normalise it to 0..1 rms_hist, rms_bins = np.histogram(rms_data, bins=500) rms_cdf = np.cumsum(rms_hist) rms_cdf = rms_cdf / rms_cdf[-1] assert np.isclose(rms_cdf[-1], 1), "Normalisation of the RMS CDF failed" # Calculate the bin centers of the histogram / CDF rms_bin_centre = 0.5 * (rms_bins[1:] + rms_bins[:-1]) # Now calculate the area available per flux bin that is available # for source finding. Since the source finding is often performed # with some flux / rms cut, the area available is a multiple of # the rms. The scaling term here is a generic value that most # pirates use. area_fraction_per_bin = np.interp( flux_bin_centre, rms_bin_centre * sigma, rms_cdf, left=0.0, right=1.0 ) return area_fraction_per_bin
[docs] def get_source_counts( fluxes: np.ndarray, area: float, minlogf: float = -4, maxlogf: float = 2, Nbins: int = 81, rms_image_path: Path | None = None, ) -> SourceCounts: """Derive source counts for a set of fluxes and known area Args: fluxes (np.ndarray): The fluxes in Jy to count area (float): Area over which the sources were collected minlogf (float, optional): The minimum bin edge, in Jy. Defaults to -4. maxlogf (float, optional): The maximum bin edgem, in Jy. Defaults to 2. Nbins (int, optional): Number of bins to include in the source counts. Defaults to 81. rms_image_path (Optional[Path], optional): Path to the RMS image. If not None, it is used to calculate a rough area correction. Defaults to None. Returns: SourceCounts: Source counts and their properties """ logger.info( f"Computing source counts for {len(fluxes)} sources, {Nbins=}, {area=:.2f} sq.deg." ) logf = np.linspace(minlogf, maxlogf, Nbins) f = np.power(10.0, logf) counts, bins = np.histogram(fluxes, f) binsmid = 0.5 * ((bins[:-1]) + (bins[1:])) ds = bins[1:] - bins[:-1] area_sky_total = 360.0 * 360.0 / np.pi solid_angle = 4 * np.pi * area / area_sky_total counts_err = np.array(np.sqrt(counts), dtype=int) scount = np.power(binsmid, 2.5) * counts / (ds * solid_angle) scount_err = np.power(binsmid, 2.5) * 1.0 * counts_err / (ds * solid_angle) area_fraction = ( calculate_area_correction_per_flux( rms_image_path=rms_image_path, flux_bin_centre=binsmid, sigma=5 ) if rms_image_path else None ) source_counts = SourceCounts( bins=bins, bin_center=0.5 * (bins[1:] + bins[:-1]), counts_per_bin=counts, counts_per_bin_err=counts_err, euclid_counts=scount, euclid_counts_err=scount_err, area=area, area_fraction=area_fraction, ) return source_counts
[docs] def plot_source_counts( catalogue: Table, rms_info: RMSImageInfo, ax: Axes, freq: float | None = None, dezotti: Table | None = None, skads: Table | None = None, ) -> Axes: """Create a figure of source counts from a astropy Table. If `freq` and either `dezotti` / `skads` are supplied then these precomputed source count tables are also included in the panel. When computing the source counts for `catalogue`, only a minimumal set of corrections are derived and applied. Args: catalogue (Table): The catalogue to derive source counts for rms_info (RMSImageInfo): Look up information from the RMS file that catalogue was constructed against ax (Axes): The axes panel the counts will be plottedd on freq (Optional[float], optional): Frequency that the source catalogue. Used to scale the Dezotti and SKADS tables. Defaults to None. dezotti (Optional[Table], optional): Loaded reference table of Dezotti source counts. Defaults to None. skads (Optional[Table], optional): Loaded reference table of SKADS source counts. Defaults to None. Returns: Axes: The axes object used for plotting """ # TODO: Is the freq column needed anymore fluxes = catalogue["int_flux"] source_counts = get_source_counts( fluxes=fluxes, area=rms_info.area, rms_image_path=rms_info.path ) ax.errorbar( source_counts.bin_center * 1e3, source_counts.euclid_counts, yerr=source_counts.euclid_counts_err, fmt=".", color="darkred", label="Raw Component Catalogue", ) # Since area_fraction could be a numpy array we have to # explicitly test to ensure it is not None, otherwise it # is ambiguous if source_counts.area_fraction is not None: ax.errorbar( source_counts.bin_center * 1e3, source_counts.euclid_counts / source_counts.area_fraction, yerr=source_counts.euclid_counts_err / source_counts.area_fraction, fmt="x", color="darkred", label="Area Corrected Catalogue", ) if dezotti is not None and freq is not None: spectral_index_scale = (freq / 1.4e9) ** -0.8 ax.errorbar( np.power(10.0, dezotti["col1"]) * 1.0e3 * spectral_index_scale, dezotti["col2"] * np.power(spectral_index_scale, 1.5), yerr=[ dezotti["col4"] * np.power(spectral_index_scale, 1.5), dezotti["col3"] * np.power(spectral_index_scale, 1.5), ], fmt="d", color="grey", alpha=0.2, label="de Zotti et al. 2010", ) if skads is not None and freq is not None: spectral_index_scale = (freq / 1.4e9) ** -0.8 ax.errorbar( skads["fl_bin_mid"] * 1.0e3 * spectral_index_scale, skads["SC"] * np.power(spectral_index_scale, 1.5), yerr=skads["SC_err"] * np.power(spectral_index_scale, 1.5), fmt=":", color="k", label="Wilman et al. 2008", ) ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel("Flux Density (mJy)") ax.set_ylabel(r"$\frac{dN}{dS} S^{\frac{5}{2}}$ (Jy$^{\frac{3}{2}}$ sr$^{-1}$)") ax.set_title(f"{len(fluxes)} sources") ax.set_xlim(0.2, 1.0e4) ax.set_ylim(5.0e-2, 1.0e4) ax.grid() ax.legend(loc="lower right", fontsize=F_SMALL) return ax
[docs] def match_nearest_neighbour( table1: Table, table2: Table, catalogue1: Catalogue, catalogue2: Catalogue, radius: float = 10, ) -> MatchResult: """Match two catalogues together, and construct common properties. Args: table1 (Table): The catalogue table from survey one table2 (Table): The catalogue table from survey two catalogue1 (Catalogue): Catalogue metadata for survey one catalogue2 (Catalogue): Catalogue metadata for survey two radius (float, optional): Maximum matching radius. Defaults to 10. Returns: MatchResult: Object containing source matches and common properties """ pos1 = SkyCoord( table1[catalogue1.ra_col], table1[catalogue1.dec_col], unit="deg,deg" ) pos2 = SkyCoord( table2[catalogue2.ra_col], table2[catalogue2.dec_col], unit="deg,deg" ) idx, sep, _ = pos1.match_to_catalog_sky(catalogcoord=pos2, nthneighbor=1) mask = sep < (radius * u.arcsecond) idx = idx[mask] match_result = MatchResult( name1=catalogue1.survey, name2=catalogue2.survey, pos1=pos1[mask], pos2=pos2[idx], freq1=catalogue1.freq, freq2=catalogue2.freq, idx1=np.argwhere(mask)[:, 0], idx2=idx, flux1=( table1[catalogue1.flux_col].value[mask] if catalogue1.flux_col != "None" else None ), flux2=( table2[catalogue2.flux_col].value[idx] if catalogue2.flux_col != "None" else None ), ) return match_result
[docs] def plot_astrometry_comparison( fig: Figure, ax: Axes, match_result: MatchResult ) -> Axes: """Plot the astrometry of cross matches from a match result set Args: fig (Figure): The figure canvas plotting on ax (Axes): The Axes being plotted on match_result (MatchResult): The set of sources cross-matched and found in common Returns: Axes: The Axes plotted on """ logger.info( f"Plotting astrometry match between {match_result.name1} and {match_result.name2}" ) if len(match_result.pos1) == 0: ax.set_xlim(-8.0, 8.0) ax.set_ylim(-8.0, 8.0) ax.text(0.0, 0.0, f"No data for {match_result.name2}", va="center", ha="center") return ax pos1, pos2 = match_result.pos1, match_result.pos2 # Get the separations between two points, and their angle between them seps = pos1.separation(pos2) pas = pos1.position_angle(pos2) # Compute the detlaRA and deltaDec. Straight subtraction is not correct err_x = (seps * -np.sin(pas.radian)).to(u.arcsec).value err_y = (seps * np.cos(pas.radian)).to(u.arcsec).value mean_x, mean_y = err_x.mean(), err_y.mean() std_x, std_y = err_x.std(), err_y.std() ax.plot(err_x, err_y, ".r", zorder=0, ms=2) ax.errorbar(mean_x, mean_y, xerr=std_x, yerr=std_y, fmt="ok") ax.grid() ax.axvline(0, color="black", ls=":") ax.axhline(0, color="black", ls=":") ax.text( -7.0, 7.0, match_result.name2, fontsize=F_MED, bbox=dict(facecolor="white", edgecolor="none", alpha=0.8), ) ax.text( -7.0, 7.0, f"{match_result.name2} - {len(err_x)} matches", fontsize=F_MED, bbox=dict(facecolor="white", edgecolor="none", alpha=0.8), ) ax.set( xlim=(-8, 8), ylim=(-8, 8), xlabel="Offset (arcsec)", ylabel="Offset (arcsec)" ) le_a1 = rf" $\epsilon_{{SU}} : ({{{mean_x:.1f}}}\pm{{{std_x:.1f}}},{{{mean_y:.1f}}}\pm{{{std_y:.1f}}})$" ax.text( 7.6, 7.6, le_a1, va="top", ha="right", fontsize=F_MED, bbox=dict(facecolor="white", edgecolor="none", alpha=0.8), ) return ax
[docs] def plot_flux_comparison(fig: Figure, ax: Axes, match_result: MatchResult) -> Axes: """Create a flux comparison plot showing the flux densities from two catalogues compared to one another. Args: fig (Figure): The figure canvas that the axes is on ax (Axes): The axes object that will be render the plot match_result (MatchResult): A result set of the cross-match between two catalogues Returns: Axes: The aces object that was used for plotting """ if len(match_result.pos1) == 0: ax.loglog([2.0, 2000.0], [2.0, 2000.0], "ow") ax.text(1e0, 1e2, f"No data for {match_result.name2}", va="center", ha="center") return ax flux1 = match_result.flux1 flux2 = match_result.flux2 one2one = np.array([0.002, 8]) spectral_index_scale = (match_result.freq2 / match_result.freq1) ** -0.8 ax.loglog(flux1, flux2, "ok", ms=2) ax.loglog(one2one, one2one, "-", c="r", label="Raw") ax.loglog(one2one, one2one * spectral_index_scale, "--", c="r", label="Scaled") ax.set( ylabel=f"{match_result.name2} Integrated Flux (Jy)", xlabel=f"{match_result.name1} Integrated Flux (Jy)", ) ax.legend() ax.grid() return ax
[docs] def scale_flux_alpha( flux: float | np.ndarray, freq: float | np.ndarray, ref_freq: float, alpha: float = -0.8, ) -> float | np.ndarray: """Scale a flux density to a reference frequency using a spectral index Args: flux (float): The flux density to scale freq (float): The frequency of the flux density ref_freq (float): The reference frequency to scale to alpha (float, optional): The spectral index to use. Defaults to -0.8. Returns: float: The scaled flux density """ return flux * (freq / ref_freq) ** alpha
[docs] def make_xmatch_table( table1: Table, table2: Table, catalogue1: Catalogue, catalogue2: Catalogue, match_result: MatchResult, output_path: Path, ) -> tuple[Table, Path]: """Create a simple cross match table between two catalogues Args: table1 (Table): The catalogue table from survey one table2 (Table): The catalogue table from survey two catalogue1 (Catalogue): Catalogue metadata for survey one catalogue2 (Catalogue): Catalogue metadata for survey two match_result (MatchResult): A result set of the cross-match between two catalogues output_path (Path): Location to save the table to Returns: Tuple[Table, Path]: The table and the output path """ askap_flux = match_result.flux1 # Some catalogues (like the ICFS) do not have a flux column external_flux = ( match_result.flux2 if match_result.flux2 is not None else np.ones_like(match_result.idx2) * -1 ) askap_flux_scaled = scale_flux_alpha( flux=askap_flux, freq=match_result.freq1, ref_freq=match_result.freq2, ) xmatch_table = Table( { "Component_ID": table1[catalogue1.name_col][match_result.idx1], "RA": table1[catalogue1.ra_col][match_result.idx1], "DEC": table1[catalogue1.dec_col][match_result.idx1], "ASKAP_flux": table1[catalogue1.flux_col][match_result.idx1], "ASKAP_flux_SNR": table1[catalogue1.flux_col][match_result.idx1] / table1["local_rms"][match_result.idx1], "RA_comp": table2[catalogue2.ra_col][match_result.idx2], "DEC_comp": table2[catalogue2.dec_col][match_result.idx2], "Comp_flux": external_flux, "Flux_Ratio_AlphaCorr": askap_flux_scaled / external_flux, "Flux_Ratio_NOAlphaCorr": askap_flux / external_flux, } ) suffix = f"{catalogue2.survey}_{catalogue1.survey}" output_file = output_path / f"cat_match_{suffix}.csv" xmatch_table.write(output_file, format="csv", overwrite=True) return xmatch_table, output_file
[docs] def plot_psf(fig: Figure, ax: Axes, rms_info: RMSImageInfo) -> Axes: """Create a plot highlighting the synthesised beam recorded in the RMS image header Args: fig (Figure): Fogire canvas being used ax (Axes): The axes object that will be used for plotting rms_info (RMSImageInfo): Extracted information from the RMS image Returns: Axes: The aces object used for plotting """ bmaj = rms_info.header["BMAJ"] * 3600 bmin = rms_info.header["BMIN"] * 3600 ax.plot(bmin, bmaj, "ok", ms=3) ax.set( xlim=(5.0, 20.0), ylim=(5.0, 40.0), xlabel="PSF minor axis (arcsec)", ylabel="PSF major axis (arcsec)", ) ax.grid() return ax
[docs] def plot_field_info( fig: Figure, ax: Axes, field_summary: FieldSummary, rms_info: RMSImageInfo, askap_table: Table, ) -> Axes: # TODO: Add the field information to the plot # Namely: # - Field name # - J2000 RA / Dec # - Galactic l / b # - SBID # - CAL_SBID # - Start time # - Integration time # - Hour angle range # - Elevation range # - Median rms # - Number of sources # - Processing date hour_angles = field_summary.hour_angles elevations = field_summary.elevations pol_axis_str = ( f"{field_summary.pol_axis:.3f} rad" if field_summary.pol_axis else "Unknown" ) field_text = "\n".join( ( f"Field name: {field_summary.field_name}", f"- J2000 RA / Dec : {rms_info.centre.icrs.to_string(style='hmsdms', precision=1)}", # type: ignore f"- Galactic l / b : {rms_info.centre.galactic.to_string(style='decimal')}", # type: ignore f"- SBID : {field_summary.sbid}", f"- CAL_SBID : {field_summary.cal_sbid}", f"- Holorgraphy file : {field_summary.holography_path.name if field_summary.holography_path else 'N/A'}", f"- Start time : {field_summary.ms_times[0].utc.fits}", # type: ignore f"- Integration time : {field_summary.integration_time * u.second:latex_inline}", # type: ignore f"- Hour angle range : {hour_angles.min().to_string(precision=2, format='latex_inline')} - {hour_angles.max().to_string(precision=2, format='latex_inline')}", # type: ignore f"- Elevation range : {elevations.min().to_string(precision=2, format='latex_inline')} - {elevations.max().to_string(precision=2, format='latex_inline')}", # type: ignore f"- Median rms uJy : {rms_info.median * 1e6:.1f}", f"- Components : {len(askap_table)}", f"- Processing date : {Time.now().fits}", f"- Pol. axis : {pol_axis_str}", ) ) props = dict(boxstyle="square", facecolor="none", edgecolor="tab:red", pad=2) ax.text( 0.0, 0.0, field_text, family="monospace", fontdict={"fontsize": F_LARGE}, wrap=True, bbox=props, ha="left", ) return ax
[docs] def load_catalogues( source_catalogue_path: Path, reference_catalogue_directory: Path, askap_survey_name: str, rms_info: RMSImageInfo, ) -> tuple[ValidationCatalogues, Tables]: """Load in all the catalogues that are required for the validation. Args: source_catalogue_path (Path): The source catalogue to load reference_catalogue_directory (Path): The directory that contains the reference ICRF, NVSS and SUMSS catalogues. askap_survey_name (str): The name that will be given to the ASKAP field data rms_info (RMSImageInfo): The extracted information from the RMS image Returns: Tuple[ValidationCatalogues, Tables]: The loaded catalogues and tables """ logger.info(f"Loading {source_catalogue_path=}") askap_table = Table.read(source_catalogue_path) askap_cata = Catalogue( survey=askap_survey_name, file_name=source_catalogue_path.name, freq=rms_info.header["CRVAL3"], ra_col="ra", dec_col="dec", name_col="source", flux_col="int_flux", maj_col="a", min_col="b", pa_col="pa", ) icrf_table, icrf_catalogue = load_known_catalogue( name="ICRF", reference_catalogue_directory=reference_catalogue_directory ) sumss_table, sumss_catalogue = load_known_catalogue( name="SUMSS", reference_catalogue_directory=reference_catalogue_directory ) nvss_table, nvss_catalogue = load_known_catalogue( name="NVSS", reference_catalogue_directory=reference_catalogue_directory ) return ( ValidationCatalogues( askap=askap_cata, icrf=icrf_catalogue, sumss=sumss_catalogue, nvss=nvss_catalogue, ), Tables( askap=askap_table, icrf=icrf_table, sumss=sumss_table, nvss=nvss_table, ), )
[docs] def make_field_stats_table( field_summary: FieldSummary, rms_info: RMSImageInfo, output_path: Path, ) -> Path: # Columns are: # FLD_NAME,SBID,NPIXV,MINIMUM,MAXIMUM,MED_RMS_uJy,MODE_RMS_uJy,STD_RMS_uJy,MIN_RMS_uJy,MAX_RMS_uJy # TODO: Get the min / max from the image (not rms) if field_summary.linmos_image is not None: with fits.open(field_summary.linmos_image) as linmos_image: image_data = linmos_image[0].data min_image_data = np.nanmin(image_data) max_image_data = np.nanmax(image_data) else: logger.warning("The linmos image in the provided field summary is empty.") min_image_data = np.nan max_image_data = np.nan stats_table = Table( { "FLD_NAME": [field_summary.field_name], "SBID": [field_summary.sbid], "NPIXV": [rms_info.no_valid_pixels], "MINIMUM": [min_image_data], "MAXIMUM": [max_image_data], "MED_RMS_uJy": [rms_info.median * 1e6], "MODE_RMS_uJy": [rms_info.mode * 1e6], "STD_RMS_uJy": [rms_info.std * 1e6], "MIN_RMS_uJy": [rms_info.minimum * 1e6], "MAX_RMS_uJy": [rms_info.maximum * 1e6], } ) # statistics_{SBID}-{FIELD_NAME}.csv outfile = ( output_path / f"statistics_{field_summary.sbid}-{field_summary.field_name}.csv" ) stats_table.write(outfile, format="csv", overwrite=True) return outfile
[docs] class PSFTableRow(NamedTuple): """Something that is only used to internally represent the PSF information of a beam image. Currently no reason to use this. """
[docs] beam: int
[docs] vis_total: int
[docs] vis_flagged: int
[docs] image_name: str
[docs] bmaj: float
[docs] bmin: float
[docs] bpa: float
[docs] ra: float
[docs] dec: float
[docs] l: float # noqa: E741
[docs] b: float
[docs] def _make_beam_psf_row(beam_summary: BeamSummary) -> PSFTableRow: """Collects the information required of a single beam image and measurement set for entry into the PSF table. Not intended for usage other than int he creation of the PSF table. Args: beam_summary (BeamSummary): The input set of collected properties Returns: PSFTableRow: Extracted information """ name_components = processed_ms_format(in_name=beam_summary.ms_summary.path) vis_total = beam_summary.ms_summary.flagged + beam_summary.ms_summary.unflagged vis_flagged = beam_summary.ms_summary.flagged assert beam_summary.image_set is not None, ( f"{beam_summary.image_set=}, which should not happen" ) image_file = list(beam_summary.image_set.image)[-1] with fits.open(image_file) as image: bmaj = image[0].header["BMAJ"] # type: ignore bmin = image[0].header["BMIN"] # type: ignore bpa = image[0].header["BPA"] # type: ignore coord = estimate_image_centre(image_path=image_file) assert name_components is not None, f"{name_components=}, which should not happen" return PSFTableRow( beam=int(name_components.beam) if name_components.beam else -999, vis_flagged=vis_flagged, vis_total=vis_total, image_name=image_file.name, bmaj=bmaj, bmin=bmin, bpa=bpa, ra=coord.ra.deg, dec=coord.dec.deg, l=coord.galactic.l.deg, b=coord.galactic.b.deg, )
[docs] def make_psf_table(field_summary: FieldSummary, output_path: Path) -> Path: # TODO: This will likely need changes to the # FieldSummary structure to handle holding MSSummary objects # Columns are: # BEAM_NUM,BEAM_TIME,RA_DEG,DEC_DEG,GAL_LONG,GAL_LAT,PSF_MAJOR,PSF_MINOR,PSF_ANGLE,VIS_TOTAL,VIS_FLAGGED assert field_summary.beam_summaries is not None, ( f"{field_summary.beam_summaries=}, which should not happen" ) psf_table_rows = [ _make_beam_psf_row(beam_summary=beam_summary) for beam_summary in field_summary.beam_summaries ] psf_table = Table( { "BEAM_NUM": [row.beam for row in psf_table_rows], "VIS_TOTAL": [row.vis_total for row in psf_table_rows], "VIS_FLAGGED": [row.vis_total for row in psf_table_rows], "IMAGE_NAME": [row.image_name for row in psf_table_rows], "PSF_MAJOR": [row.bmaj for row in psf_table_rows], "PSF_MINOR": [row.bmin for row in psf_table_rows], "PSF_ANGLE": [row.bpa for row in psf_table_rows], } ) psf_table.sort("BEAM_NUM") # beam_inf_{SBID}-{FIELD_NAME}.csv outfile = ( output_path / f"beam_inf_{field_summary.sbid}-{field_summary.field_name}.csv" ) psf_table.write(outfile, format="csv", overwrite=True) return outfile
[docs] def create_validation_tables( field_summary: FieldSummary, rms_image_path: Path, source_catalogue_path: Path, output_path: Path, reference_catalogue_directory: Path, ) -> ValidationTables: """Create a set of validation tables that can be used to assess the correctness of an image and associated source catalogue. Args: field_summary (FieldSummary): A description of the key properties of the field rms_image_path (Path): The RMS fits image the source catalogue was constructed against. source_catalogue_path (Path): The source catalogue. output_path (Path): The output path of the figure to create reference_catalogue_directory (Path): The directory that contains the reference catalogues installed Returns: ValidationTables: The tables that were created """ logger.info("Creating validation tables") rms_info = get_rms_image_info(rms_path=rms_image_path) askap_survey_name = f"{field_summary.sbid}-{field_summary.field_name}" # Load in the catalogues catalogues, tables = load_catalogues( source_catalogue_path=source_catalogue_path, reference_catalogue_directory=reference_catalogue_directory, askap_survey_name=askap_survey_name, rms_info=rms_info, ) # Loop over all the catalogues and cross match them to the ASKAP catalogue _tables: dict[str, Table] = {} for survey in catalogues._fields: if survey == "askap": continue # The index method of a named tuple is the same as tuple[i], where # i is an int. This is a round about way of getting the key name # from a named attributed stored in a parameter survey_catalogue = catalogues.__getattribute__(survey) survey_table = tables.__getattribute__(survey) if survey_catalogue is None: continue logger.info(f"Processing {survey=}") match_result = match_nearest_neighbour( table1=tables.askap, table2=survey_table, catalogue1=catalogues.askap, catalogue2=survey_catalogue, ) xmatch_table, xmatch_file = make_xmatch_table( table1=tables.askap, table2=survey_table, catalogue1=catalogues.askap, catalogue2=survey_catalogue, match_result=match_result, output_path=output_path, ) _tables[survey] = xmatch_file xmatch_tables = XMatchTables( **_tables, ) stats_table = make_field_stats_table( field_summary=field_summary, rms_info=rms_info, output_path=output_path, ) psf_table = make_psf_table( field_summary=field_summary, output_path=output_path, ) return ValidationTables( xmatch_tables=xmatch_tables, stats_table_path=stats_table, psf_table_path=psf_table, )
[docs] def create_validation_plot( field_summary: FieldSummary, rms_image_path: Path, source_catalogue_path: Path, output_path: Path, reference_catalogue_directory: Path, ) -> Path: """Create a simple multi-panel validation figure intended to asses the correctness of an image and associated source catalogue. The image described by `rms_image_path` should be a FITS file. The WCS of this file is used for plotting and rreading the synthesised beam information using the standard CRVAL/BMAJ/BMIN keywords. The source catalogue is read using astropy.table.Table. This routine also expects that some level of units are embedded in the catalogue. For Aegean produced catalogues this is the case. The reference_catalogue_directory sets the directory to look into when searching for the reference ICRF, NVSS and SUMSS cataloues. Args: field_summary (FieldSummary): A description of the key properties of the field rms_image_path (Path): The RMS fits image the source catalogue was constructed against. source_catalogue_path (Path): The source catalogue. output_path (Path): The output path of the figure to create reference_catalogue_directory (Path): The directory that contains the reference ICRF, NVSS and SUMSS catalogues. Returns: Path: The output path of the figure """ logger.info("Creating validation plot") rms_info = get_rms_image_info(rms_path=rms_image_path) dezotti_path = get_packaged_resource_path( package="flint.data.source_counts", filename="de_zotti_1p4.txt" ) skads_path = get_packaged_resource_path( package="flint.data.source_counts", filename="SKADS_1p4GHz.fits" ) logger.info(f"Loading {dezotti_path}") dezotti = Table.read(dezotti_path, format="ascii") logger.info(f"Loading {skads_path=}") skads = Table.read(skads_path) askap_survey_name = f"{field_summary.sbid}-{field_summary.field_name}" # Load in the catalogues catalogues, tables = load_catalogues( source_catalogue_path=source_catalogue_path, reference_catalogue_directory=reference_catalogue_directory, askap_survey_name=askap_survey_name, rms_info=rms_info, ) height = 16.0 width = height * np.sqrt(3.0) fig = plt.figure(figsize=(width, height)) validator_layout = make_validator_axes_layout(fig=fig, rms_path=rms_image_path) plot_source_counts( catalogue=tables.askap, rms_info=rms_info, ax=validator_layout.ax_counts, freq=rms_info.header["CRVAL3"], dezotti=dezotti, skads=skads, ) askap_sources = SkyCoord( tables.askap[catalogues.askap.ra_col], tables.askap[catalogues.askap.dec_col], unit="deg,deg", ) plot_rms_map( fig=fig, ax=validator_layout.ax_rms, rms_path=rms_info.path, source_positions=askap_sources, ) plot_psf(fig=fig, ax=validator_layout.ax_psf, rms_info=rms_info) ierf_match = match_nearest_neighbour( table1=tables.askap, table2=tables.icrf, catalogue1=catalogues.askap, catalogue2=catalogues.icrf, ) plot_astrometry_comparison( fig=fig, ax=validator_layout.ax_astrometry, match_result=ierf_match ) sumss_match = match_nearest_neighbour( table1=tables.askap, table2=tables.sumss, catalogue1=catalogues.askap, catalogue2=catalogues.sumss, ) plot_astrometry_comparison( fig=fig, ax=validator_layout.ax_astrometry1, match_result=sumss_match ) plot_flux_comparison( fig=fig, ax=validator_layout.ax_brightness1, match_result=sumss_match ) nvss_match = match_nearest_neighbour( table1=tables.askap, table2=tables.nvss, catalogue1=catalogues.askap, catalogue2=catalogues.nvss, ) plot_astrometry_comparison( fig=fig, ax=validator_layout.ax_astrometry2, match_result=nvss_match ) plot_flux_comparison( fig=fig, ax=validator_layout.ax_brightness2, match_result=nvss_match ) plot_field_info( fig=fig, ax=validator_layout.ax_legend, field_summary=field_summary, rms_info=rms_info, askap_table=tables.askap, ) plot_flag_summary( fig=fig, ax=validator_layout.ax_flag_summary, field_summary=field_summary, ) fig.suptitle( rf"$\it{{Flint}}$ summary for Field: {field_summary.field_name} - SBID: {field_summary.sbid} - Round: {field_summary.round}", fontsize=F_HUGE, ) fig.tight_layout() output_file = output_path / f"validation_{askap_survey_name}.png" fig.savefig(str(output_file), dpi=300, bbox_inches="tight") return output_file
[docs] def get_parser() -> ArgumentParser: """Create the argument parser for the validation plot creation Returns: ArgumentParser: CLI entry point """ parser = ArgumentParser( description="Create a validation figure to highlight the reliability of some process continuum field" ) parser.add_argument( "processed_ms_paths", nargs="+", type=Path, help="Paths to the processed MeasurementSets.", ) parser.add_argument( "rms_image_path", type=Path, help="Path to the RMS image of the field" ) parser.add_argument( "source_catalogue_path", type=Path, help="Path to the source catalogue. At present on Aegean formatted component catalogues supported. ", ) parser.add_argument( "output_path", type=Path, help="Directory to place the output files. " ) parser.add_argument( "--reference-catalogue-directory", type=Path, default=Path("."), help="Directory container the reference ICFS, NVSS and SUMSS catalogues. These are known catalogues and expect particular file names. ", ) parser.add_argument( "--bandpass-directory", type=Path, default=None, help="Path to the directory containing the bandpass measurement sets and solutions", ) parser.add_argument( "--holography-path", type=Path, default=None, help="Path to the holography fits cube used to form the linmos image", ) return parser
[docs] def cli() -> None: """CLI entry point for validation plot creation""" parser = get_parser() args = parser.parse_args() from astropy.io import fits from flint.source_finding.aegean import AegeanOutputs from flint.summary import create_field_summary rms_image_path = args.rms_image_path try: rms_header = fits.getheader(rms_image_path) rms_beam = ( rms_header["BMAJ"], rms_header["BMIN"], rms_header["BPA"], ) except KeyError: rms_beam = (1.0, 1.0, 1.0) logger.warning( f"Beam keywords not found in {rms_image_path=}. Setting to default {rms_beam}" ) aegean_outputs = AegeanOutputs( bkg=rms_image_path, rms=rms_image_path, comp=args.source_catalogue_path, beam_shape=rms_beam, image=rms_image_path, ) field_summary = create_field_summary( mss=args.processed_ms_paths, aegean_outputs=aegean_outputs, holography_path=args.holography_path, cal_sbid_path=args.bandpass_directory, ) create_validation_plot( field_summary=field_summary, rms_image_path=args.rms_image_path, source_catalogue_path=args.source_catalogue_path, output_path=args.output_path, reference_catalogue_directory=args.reference_catalogue_directory, ) validation_tables = create_validation_tables( field_summary=field_summary, rms_image_path=args.rms_image_path, source_catalogue_path=args.source_catalogue_path, output_path=args.output_path, reference_catalogue_directory=args.reference_catalogue_directory, ) logger.info(f"\n{validation_tables}")
if __name__ == "__main__": cli()