Spectral line and time-step imaging¶
flint has the beginnings of a spectral line workflow that operates against continuum-subtracted visibilities. This mode requires that continuum imaging has been performed using wsclean’s --save-source-list option, which outputs a text file describing each clean component constructed by wsclean and their parameterisation (e.g. location and spatial/spectral shape).
This workflow will first generate model visibilities at the spectral resolution of the data (note that wsclean will generate model visibilites that are constant of sub-band intervals). There are two ways to do this:
using
addmodelfrom thecalibratecontainerusing the
crystalballpython package
When using the addmodel approach a sub-flow is created, allowing a set of different computing resources to be described. addmodel using threads to accelerate compute, which it does extremely well. However it is not able to spread across nodes to achieve higher throughput. By specifying a different dask cluster compute profile a large set of resources (e.g. more CPUs, longer wall times) may be specified, thereby speeding up the model prediction.
Alternatively, we also include an approach that uses a the crystalball python package. crystalball uses dask to achieve parallelism. Since flint configures prefect to use dask as its compute backend, crystalball is able to use the same infrastructure. This allows the model prediction process to seamlessly scale across many nodes. For most intents and purposes this crystalball approach should be preferred.
Crystalball dask configuration¶
The best way to achieve throughput when using crystalball is to spawn many workers with a small compute footprint. With such a configuration a batch based system (e.g. slurm) can still allocate workers even when it is congested, and the distributed nature of dask can be leveraged. In practise we found that the following settings (set in the subtract_cube_pipeline) were able to improve the stability of the distributed cluster:
# These improve the stability of the distributed dask cluster, particularly around
# the usage of crystalball prediction
dask.config.set({"distributed.comm.retry.count": 20})
dask.config.set({"distributed.comm.timeouts.connect": 30})
dask.config.set({"distributed.worker.memory.terminate": False})
Dask allows these settings to be configured in a number of ways, including via environment variables. In the future this alternative approach may be used, but for the moment they are hardcoded directly in that work flow.
Imaging axis¶
The subtract_cube_pipeline can image along both the channel (spectral) and scan (temporal) axis. By default the imaging is done per-channel, and only one can be performed within a single execution of this workflow.
Similarly, the imaging is performed at the native resolution of the dimension in the measurement set.
Achieving parallelism¶
After a set of model visibilities have been predicted for each measurement set, the result of the continuum subtract leaves what should be noise across all channels (of course, sharp spectral features should also remain). Since there is no benefit to multi-frequency synthesis imaging, each individual channel may be image in isolation from one another. With this in mind the general approach is to configure dask to allocate many workers that each individually require a small set of compute resources.
A field image is produced at each channel by a separate linmos process invocation. That is to say, if there are 288 channels in the collection of measurement sets, there will be 288 separate linmos processes throughout the flow. Once all field images have been proceduced they are concatenated together (in an asynchronous and memory efficient way) into a single FITS cube. See the fitscube python package for more information.
Processes vs Threads¶
A dask-worker can be assigned a set of cpu cores that are to be used throughout computation. Further, a dask-worker can be configured to execute work using either a ProcessPool or a ThreadPool, each with different types of advantages and disadvantages. Should many external compiled applications be called upong (as is the case throughout the self-calibration and imaging pipeline), a process backed work pool is advised:
cluster_kwargs:
cores: 1
processes: 1
job_cpu: 4
name: 'flint-worker'
memory: "32GB"
Here, 4 cores and 32GB of RAM are provided to each dask worker, and each dask worker can only be executing a single task at a time. Should those tasks call out to a compiled application that can take advantage of multiple-cores (e.g. wsclean) they will be used.
Alternatively for the subtract_cube_pipeline the following may be useful (or even preferred) if crystalball is being used:
cluster_kwargs:
cores: 4
processes: 1
job_cpu: 4
name: 'flint-worker'
memory: "32GB"
This will configure each dask-worker to use 4 threads at a time, meaning 4 tasks can be executing concurrently. In the case of crystalball this modification may lower inter-process communication load (include network read/writes between nodes) as dask can now consider data locality as a component of the scheduling. Should this be a driving consideration in you configuration? No. Maybe. Unlikely. But it is still something to be aware of.
In the context of the subtract_cube_pipeline context, should multiple wsclean and linmos be tasks be invoked those applications would be sharing the 4 assigned cores and 32GB memory pool. Over-subscribing in this way, provided out-of-memory limits are not reached, may improve overall utilisation of the compute allocation as these applications rarely use all available cores in a sustained sense.
Caution
Should configurations like $MEMDIR be sure to consider overlapping data and side-effects. For instance, should wsclean -tmp-dir $SLURM_TEMP_SPACE be used, be mindful of the clean up operation when the wsclean task finishes. Should $SLURM_TEMP_SPACE (a imagined variable for this discussion) be used concurrently by many threads in a single dask-worker the final move from a single thread will effect data in others still working. Hence, consider -tmp-dir $SLURM_TEMP_SPACE/$FLINT_UUID, where $FLINT_UUID will trigger a unique identifier to be appended, thus avoiding side-effects.
Output data¶
The principle result of this workshow is a spectral cube of the observed field at the native spectral resolution of the input collection of measurement sets (including the corresponding weight map produced by linmos). Intermediary files created throughout the workflow are deleted once they are no longer needede in order to preserve disk space.
Accessing via the CLI¶
The primary entry point for the continuum subtraction and spectral imaging pipeline in flint is the flint_flow_subtract_cube_pipeline:
This is a workflow to subtract a continuum model and image the channel-wise data
Unlike the continuum imaging and self-calibnration pipeline this flow currently expects that all measurement sets are in the flint format, which means other than the naming scheme that they have been been preprocessed to place them in the IAU frame and have had their fields table updated. That is to say that they have already been preprocessed and fixed.
usage: flint_flow_subtract_cube_pipeline [-h] [--cli-config CLI_CONFIG]
[--cluster-config CLUSTER_CONFIG]
[--subtract-model-data]
[--data-column DATA_COLUMN]
[--expected-ms EXPECTED_MS]
[--imaging-strategy IMAGING_STRATEGY]
[--holofile HOLOFILE]
[--linmos-residuals]
[--beam-cutoff BEAM_CUTOFF]
[--pb-cutoff PB_CUTOFF]
[--stagger-delay-seconds STAGGER_DELAY_SECONDS]
[--attempt-subtract]
[--subtract-data-column SUBTRACT_DATA_COLUMN]
[--predict-wsclean-model]
[--use-addmodel] [--use-crystalball]
[--subtract-only] [--timestep-image]
[--channelwise-image]
[--max-intervals MAX_INTERVALS]
[--fitscube-remove-original-images]
[--wsclean-pol-mode WSCLEAN_POL_MODE [WSCLEAN_POL_MODE ...]]
[--calibrate-container CALIBRATE_CONTAINER]
[--addmodel-cluster-config ADDMODEL_CLUSTER_CONFIG]
[--crystallball-wsclean-pol-mode CRYSTALLBALL_WSCLEAN_POL_MODE [CRYSTALLBALL_WSCLEAN_POL_MODE ...]]
[--row-chunks ROW_CHUNKS]
[--model-chunks MODEL_CHUNKS]
[--memory-fraction MEMORY_FRACTION]
science_path wsclean_container
yandasoft_container
Positional Arguments¶
- science_path
Path to the directory containing the beam-wise measurement sets
Named Arguments¶
- --cli-config
Path to configuration file
- --cluster-config
Path to a cluster configuration file, or a known cluster name.
Default:
'petrichor'
Inputs for SubtractFieldOptions¶
- wsclean_container
Path to the container with wsclean
Default:
PydanticUndefined- yandasoft_container
Path to the container with yandasoft
Default:
PydanticUndefined- --subtract-model-data
Subtract the MODEL_DATA column from the nominated data column
Default:
False- --data-column
Describe the column that should be imaed and, if requested, have model subtracted from
Default:
'CORRECTED_DATA'- --expected-ms
The number of measurement sets that should exist
Default:
36- --imaging-strategy
Path to a FLINT imaging yaml file that contains settings to use throughout imaging
- --holofile
Path to the holography FITS cube that will be used when co-adding beams
- --linmos-residuals
Linmos the cleaning residuals together into a field image
Default:
False- --beam-cutoff
Cutoff in arcseconds to use when calculating the common beam to convol to
Default:
150- --pb-cutoff
Primary beam attenuation cutoff to use during linmos
Default:
0.1- --stagger-delay-seconds
The delay, in seconds, that should be used when submitting items in batches (e.g. looping over channels)
- --attempt-subtract
Attempt to subtract the model column from the nominated data column
Default:
False- --subtract-data-column
Should the continuum model be subtracted, where to store the output. This will update the column to be imaged.
Default:
'DATA'- --predict-wsclean-model
Search for the continuum model produced by wsclean and subtract
Default:
False- --use-addmodel
Invoke the
addmodelvisibility prediction, including the search for thewscleansource listDefault:
False- --use-crystalball
Attempt to predict the model visibilities using
crystalballDefault:
False- --subtract-only
Only perform the continuum subtraction
Default:
False- --timestep-image
Perform timestep imaging after subtraction
Default:
False- --channelwise-image
Perform channel-wise imaing of the residuals
Default:
False- --max-intervals
The maximum number of scans/channels to consider
Default:
500- --fitscube-remove-original-images
Remove the images that go into forming the fitscube
Default:
False
Inputs for AddModelSubtractFieldOptions¶
- --wsclean-pol-mode
The polarisation of the wsclean model that was generated
Default:
['i']- --calibrate-container
Path to the container with the calibrate software (including addmodel)
- --addmodel-cluster-config
Specify a new cluster configuration file different to the preferred on. If None, drawn from preferred cluster config
Inputs for CrystalBallOptions¶
- --crystallball-wsclean-pol-mode
The polarisation of the wsclean model that was generated
Default:
['i']- --row-chunks
Number of rows of input MS that are processed in a single chunk. If 0 it will be set automatically. Default is 0.
Default:
0- --model-chunks
Number of sky model components that are processed in a single chunk. If 0 it will be set automatically. Default is 0.
Default:
0- --memory-fraction
The fraction of available memory to use to define the target chunk size
Default:
0.75
The SubtractFieldOptions class¶
Embedded below is the flint Options class used to drive the flint_flow_subtract_cube_pipeline workflow. Input values are validated by pydantic to ensure they are appropriately typed.
class SubtractFieldOptions(BaseOptions):
"""Container for options related to the
continuum-subtracted pipeline"""
wsclean_container: Path
"""Path to the container with wsclean"""
yandasoft_container: Path
"""Path to the container with yandasoft"""
subtract_model_data: bool = False
"""Subtract the MODEL_DATA column from the nominated data column"""
data_column: str = "CORRECTED_DATA"
"""Describe the column that should be imaed and, if requested, have model subtracted from"""
expected_ms: int = 36
"""The number of measurement sets that should exist"""
imaging_strategy: Path | None = None
"""Path to a FLINT imaging yaml file that contains settings to use throughout imaging"""
holofile: Path | None = None
"""Path to the holography FITS cube that will be used when co-adding beams"""
linmos_residuals: bool = False
"""Linmos the cleaning residuals together into a field image"""
beam_cutoff: float = 150
"""Cutoff in arcseconds to use when calculating the common beam to convol to"""
pb_cutoff: float = 0.1
"""Primary beam attenuation cutoff to use during linmos"""
stagger_delay_seconds: float | None = None
"""The delay, in seconds, that should be used when submitting items in batches (e.g. looping over channels)"""
attempt_subtract: bool = False
"""Attempt to subtract the model column from the nominated data column"""
subtract_data_column: str = "DATA"
"""Should the continuum model be subtracted, where to store the output. This will update the column to be imaged."""
predict_wsclean_model: bool = False
"""Search for the continuum model produced by wsclean and subtract"""
use_addmodel: bool = False
"""Invoke the ``addmodel`` visibility prediction, including the search for the ``wsclean`` source list"""
use_crystalball: bool = False
"""Attempt to predict the model visibilities using ``crystalball``"""
subtract_only: bool = False
"""Only perform the continuum subtraction"""
timestep_image: bool = False
"""Perform timestep imaging after subtraction"""
channelwise_image: bool = False
"""Perform channel-wise imaing of the residuals"""
max_intervals: int = 500
"""The maximum number of scans/channels to consider"""
fitscube_remove_original_images: bool = False
"""Remove the images that go into forming the fitscube"""
The AddModelSubtractFieldOptions class¶
Embedded below is the flint Options class used to drive the flint_flow_subtract_cube_pipeline workflow. Input values are validated by pydantic to ensure they are appropriately typed.
class AddModelSubtractFieldOptions(BaseOptions):
"""Options related to predicting a continuum model during the SubtractFieldOptions workflow.
Specifically these options deal with identifying the wsclean produced source list model, which
may be used by ``admodel`` to predict model visibilities. See utilities around the ``aocalibrate``
functions and routines."""
wsclean_pol_mode: list[str] = ["i"]
"""The polarisation of the wsclean model that was generated"""
calibrate_container: Path | None = None
"""Path to the container with the calibrate software (including addmodel)"""
addmodel_cluster_config: Path | None = None
"""Specify a new cluster configuration file different to the preferred on. If None, drawn from preferred cluster config"""