"""Procedure to calibrate bandpass observation"""
from __future__ import annotations
from argparse import ArgumentParser
from pathlib import Path
import numpy as np
from casacore.tables import table, taql
from flint.calibrate.aocalibrate import AOSolutions, calibrate_apply_ms
from flint.flagging import flag_ms_aoflagger
from flint.logging import logger
from flint.ms import describe_ms, get_field_id_for_field, preprocess_askap_ms
from flint.naming import create_ms_name
from flint.options import MS
from flint.sky_model import KNOWN_1934_FILES, get_1934_model
[docs]
def plot_solutions(solutions_path: Path, ref_ant: int | None = 0) -> None:
"""Plot solutions for AO-style solutions
Args:
solutions_path (Path): Path to the solutions file
ref_ant (Optional[int], optional): Reference antenna to use. If None is specified there is no division by a reference antenna. Defaults to 0.
"""
logger.info(f"Plotting {solutions_path}")
ao_sols = AOSolutions.load(path=solutions_path)
_ = ao_sols.plot_solutions(ref_ant=ref_ant)
[docs]
def flag_bandpass_offset_pointings(ms: MS | Path) -> MS:
"""The typical bandpass style observation in ASKAP will shift each beam
so that it is centred on the bandpass-calibration object (here B1934-638).
During each offset position all beams are recording data still. The trick
here is that all 36 fields are still recorded in the measurement set, and
are generally of the form:
>>> ['B1934-638_beam0', 'B1934-638_beam1', 'B1934-638_beam10', 'B1934-638_beam11',
>>> 'B1934-638_beam12', 'B1934-638_beam13', 'B1934-638_beam14', 'B1934-638_beam15',
>>> 'B1934-638_beam16', 'B1934-638_beam17', 'B1934-638_beam18', 'B1934-638_beam19',
>>> 'B1934-638_beam2', 'B1934-638_beam20', 'B1934-638_beam21', 'B1934-638_beam22',
>>> 'B1934-638_beam23', 'B1934-638_beam24', 'B1934-638_beam25', 'B1934-638_beam26',
>>> 'B1934-638_beam27', 'B1934-638_beam28', 'B1934-638_beam29', 'B1934-638_beam3',
>>> 'B1934-638_beam30', 'B1934-638_beam31', 'B1934-638_beam32', 'B1934-638_beam33',
>>> 'B1934-638_beam34', 'B1934-638_beam35', 'B1934-638_beam4', 'B1934-638_beam5',
>>> 'B1934-638_beam6', 'B1934-638_beam7', 'B1934-638_beam8', 'B1934-638_beam9']
This function will attempt to deduce the intended field name for the beam
in question, and then flag all other fields.
Args:
ms (Union[MS, Path]): Path or instance of MS describing the measurement set to flag all other bandpass field.
Returns:
MS: A description of the ms
"""
ms = MS(path=ms) if isinstance(ms, Path) else ms
ms_summary = describe_ms(ms, verbose=False)
good_field_name = f"B1934-638_beam{ms_summary.beam}"
logger.info(f"The B1934-638 field name is {good_field_name}. ")
logger.info("Will attempt to flag other fields. ")
with table(f"{ms.path!s}/FIELD", readonly=True, ack=False) as tab:
# The ID is _position_ of the matching row in the table.
field_names = tab.getcol("NAME")
field_idx = np.argwhere([fn == good_field_name for fn in field_names])[0]
assert len(field_idx) == 1, (
f"More than one matching field name found. This should not happen. {good_field_name=} {field_names=}"
)
field_idx = field_idx[0]
logger.info(f"{good_field_name} FIELD_ID is {field_idx}")
with table(f"{ms.path!s}", readonly=False, ack=False) as tab:
field_idxs = tab.getcol("FIELD_ID")
field_mask = field_idxs != field_idx
logger.info(
f"Found {np.sum(field_mask)} rows not matching FIELD_ID={field_idx}"
)
# This is asserting that the stored polarisations are all XX, XY, YX, YY
flag_row = np.array([True, True, True, True])
flags = tab.getcol("FLAG")
flags[field_mask] = flag_row
# Update the column again
tab.putcol("FLAG", flags)
return ms
[docs]
def extract_correct_bandpass_pointing(
ms: MS | Path,
source_name_prefix: str = "B1934-638",
ms_out_dir: Path | None = None,
) -> MS:
"""The typical bandpass style observation in ASKAP will shift each beam
so that it is centred on the bandpass-calibration object (here B1934-638).
During each offset position all beams are recording data still. The trick
here is that all 36 fields are still recorded in the measurement set, and are
generally of the form:
>>> ['B1934-638_beam0', 'B1934-638_beam1', 'B1934-638_beam10', 'B1934-638_beam11',
>>> 'B1934-638_beam12', 'B1934-638_beam13', 'B1934-638_beam14', 'B1934-638_beam15',
>>> 'B1934-638_beam16', 'B1934-638_beam17', 'B1934-638_beam18', 'B1934-638_beam19',
>>> 'B1934-638_beam2', 'B1934-638_beam20', 'B1934-638_beam21', 'B1934-638_beam22',
>>> 'B1934-638_beam23', 'B1934-638_beam24', 'B1934-638_beam25', 'B1934-638_beam26',
>>> 'B1934-638_beam27', 'B1934-638_beam28', 'B1934-638_beam29', 'B1934-638_beam3',
>>> 'B1934-638_beam30', 'B1934-638_beam31', 'B1934-638_beam32', 'B1934-638_beam33',
>>> 'B1934-638_beam34', 'B1934-638_beam35', 'B1934-638_beam4', 'B1934-638_beam5',
>>> 'B1934-638_beam6', 'B1934-638_beam7', 'B1934-638_beam8', 'B1934-638_beam9']
This function will attempt to deduce the intended field name for the beam
in question, and then create a new measurement set with just these data. It
internally uses `taql` to create a selection statement:
>>> with table(ms_path) as tab:
>>> sub_ms = taql("select * from $tab where FIELD_ID==field_idx")
>>> sub_ms.copy(out_path, deep=True)
Therefore, some properties from other tables (e.g. FIELDS) may still
contain references to other fields.
Args:
ms (Union[MS, Path]): Path or instance of MS describing the measurement set to flag all other bandpass field.
source_name_prefix (str, optional): The beginning of the source name stored in the NAME column of the FIELD table. Field names are of the form B1934-638_beam1, where B1934-638 would be the prefix name, and beam is constructed based on the beam among the 36 observing the target source (for example).
ms_out_dir (Optional[Path], optional): If not None, place the split field measurement sets into this directory. Defaults to None.
Returns:
MS: A description of the new measurement set created with the file name ending .beamN.ms.
"""
ms = MS.cast(ms)
ms_summary = describe_ms(ms, verbose=False)
logger.info(f"Checking for unique fields in {ms.path!s} data table.")
with table(str(ms.path)) as tab:
fields = np.unique(tab.getcol("FIELD_ID"))
if len(fields) == 1:
logger.info(
f"Only a single field {fields} found. MS likely already split. "
)
return ms.with_options(beam=ms_summary.beam)
good_field_name = f"{source_name_prefix}_beam{ms_summary.beam}"
field_id = get_field_id_for_field(ms=ms, field_name=good_field_name)
out_name = create_ms_name(ms_path=ms.path, field=f"{source_name_prefix}")
out_path = Path("./")
# Handle writing out to elected output directory.
# TODO: Move this to a helper utility.
if ms_out_dir:
if not ms_out_dir.exists():
logger.info(f"Will create {ms_out_dir=}")
try:
ms_out_dir.mkdir(parents=True)
except Exception as e:
logger.warning(f"Exception caught when making {ms_out_dir=}.")
logger.warning(f"{e}")
pass
out_path = ms_out_dir / Path(out_name).name
logger.info(f"Will create a MS, writing to {out_path}")
with table(f"{ms.path!s}") as tab:
field_ms = taql(f"select * from $tab where FIELD_ID=={field_id}")
field_ms.copy(str(out_path), deep=True)
return ms.with_options(path=out_path, beam=ms_summary.beam)
[docs]
def calibrate_bandpass(
ms_path: Path,
data_column: str,
mode: str,
calibrate_container: Path,
plot: bool = True,
aoflagger_container: Path | None = None,
ms_out_dir: Path | None = None,
) -> MS:
"""Entry point to extract the appropriate field from a bandpass observation,
run AO-style calibrate, and plot results. In its current form a new measurement
set will be created container just the appropriate field to calibrate.
Args:
ms_path (Path): Path the the measurement set containing bandpass observations of B1934-638
data_column (str): The column that will be calibrated.
mode (str): The calibration approach to use. Currently only `calibrate` is supported.
calibrate_container (Path): The path to the singularity container that holds the appropriate software.
plot (bool, optional): Whether plotting should be performed. Defaults to True.
aoflagger_container (Path): The path to the singularity container that holds aoflagger. If this is not `None` that flagging will be performed on the extracted field-specific measurement set.
ms_out_dir (Optional[Path], optional): If not None, place the split field measurement sets into this directory. Defaults to None.
Returns:
MS: The calibrated measurement set with nominated column
"""
logger.info(f"Will calibrate {ms_path!s}, column {data_column}")
# TODO: Check to make sure only 1934-638
model_path: Path = get_1934_model(mode=mode)
ms = extract_correct_bandpass_pointing(ms=ms_path, ms_out_dir=ms_out_dir)
describe_ms(ms=ms)
ms = preprocess_askap_ms(ms)
if aoflagger_container is not None:
for i in range(3):
logger.info("Will run flagger on extracted measurement set. ")
flag_ms_aoflagger(
ms=ms.with_options(column="DATA"), container=aoflagger_container
)
apply_solutions = calibrate_apply_ms(
ms_path=ms.path,
model_path=model_path,
container=calibrate_container,
data_column=data_column,
)
if plot:
plot_solutions(solutions_path=apply_solutions.solution_path)
describe_ms(ms=apply_solutions.ms)
return ms
[docs]
def get_parser() -> ArgumentParser:
parser = ArgumentParser(
description="Bandpass calibration procedure to calibrate a 1934-638 measurement set. "
)
subparser = parser.add_subparsers(
dest="mode", help="Operation mode of flint_bandpass"
)
band_parser = subparser.add_parser("calibrate", help="Perform bandpass calibration")
supported_models = list(KNOWN_1934_FILES.keys())
band_parser.add_argument(
"ms", type=Path, help="Path to the 1934-638 measurement set. "
)
band_parser.add_argument(
"--data-column",
type=str,
default="DATA",
help="Column containing the data to calibrate. ",
)
band_parser.add_argument(
"--mode",
type=str,
default="calibrate",
choices=supported_models,
help=f"Set the style of the 1934-638 calibration model to use, which depends on the calibration software. available models: {supported_models}. ",
)
band_parser.add_argument(
"--calibrate-container",
type=Path,
default=Path("calibrate.sif"),
help="Path to container that is capable or running and apply calibration solutions for desired mode. ",
)
band_parser.add_argument(
"--aoflagger-container",
type=Path,
default=None,
help="Path to container container aoflagger. If provided guided automated flagging with aoflagger will be performed on the extracted measurement set. ",
)
band_parser.add_argument(
"--ms-out-dir",
type=Path,
default=None,
help="Location to write the output MSs to. ",
)
plot_parser = subparser.add_parser("plot", help="Plot a previous bandpass solution")
plot_parser.add_argument(
"solutions", type=Path, help="Path to the AO-style solutions file to plot"
)
plot_parser.add_argument(
"--plot-path",
type=Path,
default=None,
help="Path to write plot to. If None, it is automatically assigned based on solutions name. ",
)
return parser
[docs]
def cli() -> None:
import logging
logger.setLevel(logging.INFO)
parser = get_parser()
args = parser.parse_args()
if args.mode == "calibrate":
calibrate_bandpass(
ms_path=args.ms,
data_column=args.data_column,
mode=args.mode,
calibrate_container=args.calibrate_container,
aoflagger_container=args.aoflagger_container,
ms_out_dir=args.ms_out_dir,
)
elif args.mode == "plot":
plot_solutions(solutions_path=args.solutions)
else:
logger.warning("This should not have happened. ")
if __name__ == "__main__":
cli()