Source code for flint.prefect.helpers
"""Assorted bits and pieces to help with prefect interactions.
Not intended to contain any common task decorated functions
"""
from __future__ import annotations
from prefect import get_run_logger
# This function has been lifted from
# https://gist.github.com/anna-geller/0b9e6ecbde45c355af425cd5b97e303d
# like any good pirate why make when you can take
[docs]
def enable_loguru_support() -> None:
"""Redirect loguru logging messages to the prefect run logger.
This function should be called from within a Prefect task or flow before calling any module that uses loguru.
This function can be safely called multiple times.
Example Usage:
>>> from prefect import flow
>>> from loguru import logger
>>> from prefect_utils import enable_loguru_support # import this function in your flow from your module
>>> @flow()
>>> def myflow():
>>> logger.info("This is hidden from the Prefect UI")
>>> enable_loguru_support()
>>> logger.info("This shows up in the Prefect UI")
"""
# import here for distributed execution because loguru cannot be pickled.
from loguru import logger # pylint: disable=import-outside-toplevel
run_logger = get_run_logger()
logger.remove()
log_format = "{name}:{function}:{line} - {message}"
logger.add(
run_logger.debug,
filter=lambda record: record["level"].name == "DEBUG",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.warning,
filter=lambda record: record["level"].name == "WARNING",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.error,
filter=lambda record: record["level"].name == "ERROR",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.critical,
filter=lambda record: record["level"].name == "CRITICAL",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.info,
filter=lambda record: record["level"].name
not in ["DEBUG", "WARNING", "ERROR", "CRITICAL"],
level="TRACE",
format=log_format,
)