"""Construct a leakge map between two polarisations, typically V/I"""
from __future__ import annotations
from argparse import ArgumentParser
from pathlib import Path
from typing import NamedTuple, Union
import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table
from astropy.wcs import WCS
from astropy.wcs.utils import skycoord_to_pixel
from flint.catalogue import guess_column_in_table
from flint.logging import logger
[docs]
TableOrPath = Union[Table, Path]
[docs]
class LeakageFilters(NamedTuple):
"""Description of the filtering options to apply to components
when characterising leakage"""
[docs]
isolation_radius_deg: float = 0.0155
"""The minimum distance to the nearest component"""
[docs]
upper_int_peak_ratio: float = 1.2
"""The upper limit on acceptable int/peak ratios"""
[docs]
lower_int_peak_ratio: float = 0.8
"""The lower limit on acceptable int/peak ratios"""
[docs]
search_box_size: int = 1
"""The size of a box to search for peak polarised signal in"""
[docs]
noise_box_size: int = 30
"""the size of a box to compute a local RMS noise measure from"""
[docs]
mean_box_size: int = 10
"""The size of abox to compute a local mean measure over"""
"""Minimum stokes-I signal-to-noise ratio"""
[docs]
class FITSImage(NamedTuple):
"""Container to couple FITS header, image and WCS"""
"""The data of the fits image"""
"""Header of the fits image"""
"""Celestial WCS of the fits image"""
"""Path of the loaded FITS image on disk"""
[docs]
class PixelCoords(NamedTuple):
"""Slim container to help collect and maintain pixel coordinates. Not intended for extensive use"""
"""The y-coordinate of a set of pixels"""
"""The x-coordinate of a set of pixels"""
[docs]
def _load_fits_image(fits_path: Path) -> FITSImage:
"""Load in a FITS image and package the components into a consistent
form. Not intended for extensive use.
Args:
fits_path (Path): The path of the FITS image to examining
Returns:
FITSImage: Loaded FITS properties
"""
assert fits_path.suffix == ".fits", (
f"Unexpected file type for {fits_path=}, expected fits"
)
logger.info(f"Opening {fits_path=}")
with fits.open(fits_path) as in_fits:
image_data = in_fits[0].data # type: ignore
header = dict(in_fits[0].header.items()) # type: ignore
wcs = WCS(header)
return FITSImage(data=image_data, header=header, wcs=wcs, path=fits_path)
[docs]
def _load_component_table(catalogue: TableOrPath) -> Table:
"""Return a table given either a loaded table or a path to a table on disk"""
component_table = (
Table.read(catalogue) if isinstance(catalogue, Path) else catalogue
)
logger.info(f"Loaded component table of {len(component_table)}")
return component_table
[docs]
def filter_components(
table: Table,
peak_col: str,
int_col: str,
int_err_col: str,
leakage_filters: LeakageFilters,
ra_col: str | None = None,
dec_col: str | None = None,
) -> Table:
"""Apply the pre-processing operations to catalogue components to select an
optimal sample of sources for leakage characterisation. Sources will be selected
based on:
* how isolated they are
* compactness, as traced by their int/peak
Args:
table (Table): Collection of sources, as produced from a source finder
peak_col (str): The column name describing the peak flux density
int_col (str): The column name describing integrated flux
int_err_col (str): The column container errors that correspond to `int_col` to use when computing signal-to-noise
leakage_filters (LeakageFilters): Criteria applied to the source components in the table
ra_col (Optional[str], optional): The RA column name. If None, it will be guessed. Defaults to None.
dec_col (Optional[str], optional): The Dec column name. If None, it will be guessed. Defaults to None.
Returns:
Table: A filtered table
"""
ra_col = guess_column_in_table(table=table, column="ra", guess_column=ra_col)
dec_col = guess_column_in_table(table=table, column="dec", guess_column=dec_col)
assert all(
[
col in table.colnames
for col in (ra_col, dec_col, peak_col, int_col, int_err_col)
]
), (
f"Supplied column names {ra_col=} {dec_col=} {peak_col=} {int_col=} partly missing from {table.colnames=}"
)
total_comps = len(table)
sky_coords = SkyCoord(table[ra_col], table[dec_col], unit=(u.deg, u.deg))
# The match_to_catalog_sky return idx, sep2d, sep3d. We care about separation, matey
isolation_mask = sky_coords.match_to_catalog_sky(sky_coords, nthneighbor=2)[1] > (
leakage_filters.isolation_radius_deg * u.deg
) # type: ignore
logger.info(
f"{np.sum(isolation_mask)} of {total_comps} sources are isolated with radius {float(leakage_filters.isolation_radius_deg):.5f} deg"
)
ratio = table[int_col] / table[peak_col] # type: ignore
ratio_mask = (leakage_filters.lower_int_peak_ratio < ratio) & (
ratio < leakage_filters.upper_int_peak_ratio
) # type: ignore
logger.info(
f"{np.sum(ratio_mask)} of {total_comps} sources are compact with {leakage_filters.lower_int_peak_ratio} < int/peak < {leakage_filters.upper_int_peak_ratio}"
)
signal_to_noise = table[int_col] / table[int_err_col] # type: ignore
signal_mask = signal_to_noise > leakage_filters.source_snr
logger.info(
f"{np.sum(signal_mask)} of {total_comps} sources have S/N above {leakage_filters.source_snr}"
)
mask = isolation_mask & ratio_mask & signal_mask
logger.info(
f"{np.sum(mask)} of {total_comps} sources are isolated, compact and bright"
)
table = table[mask] # type: ignore
return table
[docs]
def get_xy_pixel_coords(
table: Table,
wcs: WCS,
ra_col: str | None = None,
dec_col: str | None = None,
) -> PixelCoords:
"""Convert (RA, Dec) positions in a catalogue into (x, y)-pixels given an WCS
Args:
table (Table): The table containing sources to collect (x, y)-coordinates
wcs (WCS): The WCS description to use to resolve (RA, Dec) to (x, y)
ra_col (Optional[str], optional): The RA column name. If None, it will be guessed. Defaults to None.
dec_col (Optional[str], optional): The Dec column name. If None, it will be guessed. Defaults to None.
Returns:
PixelCoords: _description_
"""
ra_col = guess_column_in_table(table=table, column="ra")
dec_col = guess_column_in_table(table=table, column="dec")
sky_coord = SkyCoord(table[ra_col], table[dec_col], unit=(u.deg, u.deg))
x, y = skycoord_to_pixel(wcs=wcs.celestial, coords=sky_coord, origin=0)
return PixelCoords(y=np.floor(y).astype(int), x=np.floor(x).astype(int))
[docs]
def load_and_filter_components(
catalogue: TableOrPath, leakage_filters: LeakageFilters
) -> Table:
"""Load in a component catalogue table and apply filters to them. The
remaining components will be used to characterise leakage
Args:
catalogue (TableOrPath): The path to a component catalogue, or a loaded component catalogue
leakage_filters (LeakageFilters): Filtering options to find ideal components for leakage characterisation
Returns:
Table: Filtered component catalogue
"""
# TODO: Need to have a single guess function here
comp_table = _load_component_table(catalogue=catalogue)
ra_col = guess_column_in_table(table=comp_table, column="ra")
dec_col = guess_column_in_table(table=comp_table, column="dec")
peak_col = guess_column_in_table(table=comp_table, column="peakflux")
int_col = guess_column_in_table(table=comp_table, column="intflux")
int_err_col = guess_column_in_table(table=comp_table, column="intfluxerr")
# TODO: Need to use Catalogue here
comp_table = filter_components(
table=comp_table,
ra_col=ra_col,
dec_col=dec_col,
peak_col=peak_col,
int_col=int_col,
int_err_col=int_err_col,
leakage_filters=leakage_filters,
)
return comp_table
[docs]
class PolStatistics(NamedTuple):
"""Simple container for statistics around the extraction of leakage polarisation statistics"""
"""The peak pixel value"""
"""The standard deviation of pixels within a box"""
"""The mean of pixels within a box"""
[docs]
def _get_output_catalogue_path(
input_path: Path, pol: str, output_path: Path | None = None
) -> Path:
"""Create the output leakage catalogue name"""
# NOTE: This is a separate function to test against after a silly. Might move with the other named Pirates
assert isinstance(input_path, Path)
input_suffix = input_path.suffix
output_path = (
input_path.with_suffix(f".{pol}_leakage{input_suffix}")
if output_path is None
else output_path
)
assert output_path is not None, (
f"{output_path=} is empty, and no catalogue path provided"
)
return Path(output_path)
[docs]
def create_leakge_component_table(
pol_image: Path,
catalogue: Table | Path,
pol: str = "v",
output_path: Path | None = None,
) -> Path:
"""Create a component catalogue that includes enough information to describe the
polarisation fraction of sources across a field. This is intended to be used
for leakage characterisation.
New catalogue columns will be added:
* pol_fraction: The POL/I fraction. The peak flux is taken from the catalogue, using the appropriate column name
* pol_peak: The peak polarised signal in the nearby region of a component position
* pol_noise: The noise in the polarised image pixels around the component position
Args:
pol_image (Path): The polarised image that will be used to extract peak polarised flux from
catalogue (Union[Table, Path]): Component table describing positions to extract flux from
pol (str, optional): The polarisation stokes being considered. Defaults to "v".
output_path (Optional[Path], optional): The path of the new catalogue. If `None` it is derived from the input `catalogue` path. Defaults to None.
Returns:
Path: Path to the new catalogue use for leakage
"""
pol_fits = _load_fits_image(fits_path=pol_image)
leakage_filters = LeakageFilters()
components = load_and_filter_components(
catalogue=catalogue, leakage_filters=leakage_filters
)
peak_flux_col = guess_column_in_table(table=components, column="peakflux")
i_values = components[peak_flux_col]
pol_pixel_coords = get_xy_pixel_coords(table=components, wcs=pol_fits.wcs)
pol_stats = extract_pol_stats_in_box(
pol_image=pol_fits.data,
pixel_coords=pol_pixel_coords,
search_box_size=leakage_filters.search_box_size,
noise_box_size=leakage_filters.noise_box_size,
mean_box_size=leakage_filters.mean_box_size,
)
frac_values = pol_stats.peak / i_values
logger.info(f"{frac_values.shape=}")
components[f"{pol}_fraction"] = frac_values
components[f"{pol}_peak"] = pol_stats.peak
components[f"{pol}_noise"] = pol_stats.noise
components[f"{pol}_mean"] = pol_stats.mean
output_path = _get_output_catalogue_path(
input_path=catalogue if isinstance(catalogue, Path) else pol_image,
pol=pol,
output_path=output_path,
)
logger.info(f"Writing {output_path}")
components.write(output_path, overwrite=True)
return output_path
[docs]
def get_parser() -> ArgumentParser:
parser = ArgumentParser(description="Create a leakage catalogue and map")
parser.add_argument("pol_image", type=Path, help="Path to the polarisation image")
parser.add_argument(
"component_catalogue", type=Path, help="Path to the component catalogue"
)
parser.add_argument(
"--pol",
default="v",
choices=("q", "u", "v"),
help="The polarisation that is being considered",
)
return parser
[docs]
def cli() -> None:
parser = get_parser()
args = parser.parse_args()
create_leakge_component_table(
pol_image=args.pol_image,
catalogue=args.component_catalogue,
pol=args.pol,
)
if __name__ == "__main__":
cli()