Source code for flint.archive

"""Operations around preserving files and products from an flint run"""

from __future__ import annotations

import re
import shlex
import shutil
import subprocess
import tarfile
from argparse import ArgumentParser
from pathlib import Path
from typing import Any, Collection

from flint.configuration import get_options_from_strategy
from flint.exceptions import TarArchiveError
from flint.logging import logger
from flint.options import (
    ArchiveOptions,
    add_options_to_parser,
    create_options_from_parser,
)


[docs] def resolve_glob_expressions( base_path: Path, file_re_patterns: Collection[str] ) -> tuple[Path, ...]: """Collect a set of files given a base directory and a set of glob expressions. Unique paths are returned. Args: base_path (Path): The base folder with files to consider file_re_patterns (Collection[str]): An iterable with a set of regular-expression patterns to evaluate Returns: Tuple[Path,...]: Unique collection of paths """ base_path = Path(base_path) resolved_files: list[Path] = [] logger.info(f"Searching {base_path=}") all_files = list(base_path.iterdir()) logger.info(f"{len(all_files)} total files and {len(file_re_patterns)} to consider") for reg_expression in file_re_patterns: logger.info(f"Using expression: {reg_expression}") resolved_files.extend( [f for f in all_files if re.search(reg_expression, str(f.name))] ) logger.info( f"Resolved {len(resolved_files)} files from {len(file_re_patterns)} expressions in {base_path=}" ) return tuple(sorted([Path(p) for p in set(resolved_files)]))
[docs] def copy_files_into(copy_out_path: Path, files_to_copy: Collection[Path]) -> Path: """Copy a set of specified files into an output directory, If a file happens to be a folder then it will be copied over. Args: copy_out_path (Path): Path to copy files into files_to_copy (Collection[Path]): Files that shall be copied Returns: Path: The path files were copied into """ copy_out_path = Path(copy_out_path) copy_out_path.mkdir(parents=True, exist_ok=True) total = len(files_to_copy) not_copied: list[Path] = [] logger.info(f"Copying {total} files into {copy_out_path}") for count, file in enumerate(files_to_copy): if file.is_file(): logger.info(f"{count + 1} of {total}, copying file {file}") shutil.copy(file, copy_out_path) elif file.is_dir(): # TODO: Add an option to tar folders into the final location logger.info(f"{count + 1} of {total}, copying folder {file}") shutil.copytree(file, copy_out_path / file.name) else: not_copied.append(file) logger.critical(f"{file} is not a file. Skipping. ") continue if not_copied: logger.critical(f"Did not copy {len(not_copied)} files, {not_copied=}") return copy_out_path
[docs] def verify_tarball( tarball: Path, ) -> bool: """Verify that a tarball was created properly by examining its table. Internally this calls ``tar`` through a subprocess call. Hence, ``tar`` needs to be available on the system PATH. Args: tarball (Path): The tarball to examine Returns: bool: True if the ``tar`` exit code is 0, False otherwise """ tarball = Path(tarball) # trust nothing assert tarball.exists() and tarball.is_file(), ( f"{tarball} is not a file or does not exist" ) assert tarball.suffix == ".tar", f"{tarball=} appears to not have a .tar extension" cmd = f"tar -tvf {tarball!s}" logger.info(f"Verifying {tarball=}") popen = subprocess.Popen(shlex.split(cmd), stderr=subprocess.PIPE) with popen.stderr: # type: ignore for line in iter(popen.stderr.readline, b""): # type: ignore logger.error(line.decode().strip()) exitcode = popen.wait() return exitcode == 0
# TODO: Add a clobber option
[docs] def tar_files_into( tar_out_path: Path, files_to_tar: Collection[Path], verify: bool = True ) -> Path: """Create a tar file given a desired output path and list of files to tar. Args: tar_out_path (Path): The output path of the tarball. The parent directory will be created if necessary. files_to_tar (Collection[Path]): All the files to tarball up verify (bool, optional): Verify that the tarball was correctly formed. Defaults to True. Raises: FileExistsError: The path of the tarball created Returns: Path: There exists a tarball of the same name """ tar_out_path = Path(tar_out_path) if tar_out_path.exists(): raise FileExistsError(f"{tar_out_path} already exists. ") # Create the output directory in case it does not exist tar_out_path.parent.mkdir(parents=True, exist_ok=True) total = len(files_to_tar) logger.info(f"Taring {total} files") logger.info(f"Opening {tar_out_path}") with tarfile.open(tar_out_path, "w") as tar: for count, file in enumerate(files_to_tar): logger.info(f"{count + 1} of {total}, adding {file!s}") tar.add(file, arcname=file.name) logger.info(f"Created {tar_out_path}") if verify: tar_success = verify_tarball(tarball=tar_out_path) if not tar_success: raise TarArchiveError(f"Failed to verify {tar_out_path=}") logger.info(f"{tar_out_path=} appears to be correctly formed") return tar_out_path
[docs] def create_sbid_tar_archive( tar_out_path: Path, base_path: Path, archive_options: ArchiveOptions ) -> Path: """Create a tar file of key products in a SBID folder. Args: tar_out_path (Path): The output location of the tarball to write base_path (Path): The base directory that contains files to archive archive_options (ArchiveOptions): Options relating to how files are found and archived Returns: Path: Output tarball directory """ files_to_tar = resolve_glob_expressions( base_path=base_path, file_re_patterns=archive_options.tar_file_re_patterns ) tar_out_path = tar_files_into(tar_out_path=tar_out_path, files_to_tar=files_to_tar) return tar_out_path
[docs] def copy_sbid_files_archive( copy_out_path: Path, base_path: Path, archive_options: ArchiveOptions ) -> Path: """Copy files from an SBID processing folder into a final location. Uses the `copy_file_globs` set of expressions to identify files to copy. Args: copy_out_path (Path): The output location of the tarball to write base_path (Path): The base directory that contains files to archive archive_options (ArchiveOptions): Options relating to how files are found and archived Returns: Path: Output tarball directory """ files_to_copy = resolve_glob_expressions( base_path=base_path, file_re_patterns=archive_options.copy_file_re_patterns ) copy_out_path = copy_files_into( copy_out_path=copy_out_path, files_to_copy=files_to_copy ) return copy_out_path
[docs] def get_archive_options_from_yaml(strategy_yaml_path: Path) -> dict[str, Any]: """Load the archive options from a specified strategy file Args: strategy_yaml_path (Path): The path to the strategy yaml file containing archive options Returns: Dict[str, Any]: Loaded options for ArchiveOptions """ archive_options = get_options_from_strategy( strategy=strategy_yaml_path, mode="archive", round_info=0, operation="selfcal" ) logger.info(f"{archive_options=}") return archive_options
[docs] def get_parser() -> ArgumentParser: parser = ArgumentParser( description="Operations around archiving. Patterns are specified as regular expressions (not globs). " ) subparser = parser.add_subparsers( dest="mode", help="Operation mode of flint_archive" ) list_parser = subparser.add_parser( "list", help="List the files that would be copied" ) list_parser.add_argument( "--base-path", type=Path, default=Path("."), help="Base directory to perform glob expressions", ) list_parser.add_argument( "--strategy-yaml-path", type=Path, default=None, help="Path to a strategy file with a archive section. Overrides any --file-patterns. ", ) list_parser.add_argument( "--list-mode", choices=("create", "copy"), default="copy", help="Which set of RE patterns to present, those for the tarball (create) or those for copy", ) list_parser = add_options_to_parser( parser=list_parser, options_class=ArchiveOptions ) create_parser = subparser.add_parser("create", help="Create a tarfile archive") create_parser.add_argument( "tar_out_path", type=Path, help="Path of the output tar file to be created" ) create_parser.add_argument( "--base-path", type=Path, default=Path("."), help="Base directory to perform glob expressions", ) create_parser = add_options_to_parser( parser=create_parser, options_class=ArchiveOptions ) create_parser.add_argument( "--strategy-yaml-path", type=Path, default=None, help="Path to a strategy file with a archive section. Overrides any --file-patterns. ", ) copy_parser = subparser.add_parser( "copy", help="Copy a set of files into a output directory" ) copy_parser.add_argument( "copy_out_path", type=Path, help="Path of the output folder that files will be copied into", ) copy_parser.add_argument( "--base-path", type=Path, default=Path("."), help="Base directory to perform glob expressions", ) copy_parser = add_options_to_parser( parser=copy_parser, options_class=ArchiveOptions ) copy_parser.add_argument( "--strategy-yaml-path", type=Path, default=None, help="Path to a strategy file with a archive section. Overrides any --file-patterns. ", ) return parser
[docs] def cli() -> None: parser = get_parser() args = parser.parse_args() if args.mode == "list": archive_options = ( ArchiveOptions( **get_archive_options_from_yaml( strategy_yaml_path=args.strategy_yaml_path ) ) if args.strategy_yaml_path else create_options_from_parser( parser_namespace=args, options_class=ArchiveOptions ) ) files = resolve_glob_expressions( base_path=args.base_path, file_re_patterns=( archive_options.tar_file_re_patterns if args.list_mode == "create" else archive_options.copy_file_re_patterns ), ) for count, file in enumerate(sorted(files)): logger.info(f"{count} of {len(files)}, {file}") logger.info(f"{len(files)} for mode={args.list_mode}") elif args.mode == "create": archive_options = ( ArchiveOptions( **get_archive_options_from_yaml( strategy_yaml_path=args.strategy_yaml_path ) ) if args.strategy_yaml_path else create_options_from_parser( parser_namespace=args, options_class=ArchiveOptions ) ) update_options_create: dict[str, Any] = ( get_archive_options_from_yaml(strategy_yaml_path=args.strategy_yaml_path) if args.strategy_yaml_path else dict(tar_file_re_patterhs=args.file_patterns) ) archive_options = ArchiveOptions(**update_options_create) create_sbid_tar_archive( tar_out_path=args.tar_out_path, base_path=args.base_path, archive_options=archive_options, ) elif args.mode == "copy": archive_options = ( ArchiveOptions( **get_archive_options_from_yaml( strategy_yaml_path=args.strategy_yaml_path ) ) if args.strategy_yaml_path else create_options_from_parser( parser_namespace=args, options_class=ArchiveOptions ) ) copy_sbid_files_archive( copy_out_path=args.copy_out_path, base_path=args.base_path, archive_options=archive_options, ) else: parser.print_help()
if __name__ == "__main__": cli()