Skip to content

config

frequenz.sdk.config ¤

Read and update config variables.

Classes¤

frequenz.sdk.config.ConfigManagingActor ¤

Bases: Actor

An actor that monitors a TOML configuration files for updates.

When the actor is started the configuration files will be read and sent to the output sender. Then the actor will start monitoring the files for updates. If any file is updated, all the configuration files will be re-read and sent to the output sender.

If no configuration file could be read, the actor will raise an exception.

The configuration files are read in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

Example

If config1.toml contains:

var1 = [1, 2]
var2 = 2
[section]
var3 = [1, 3]

And config2.toml contains:

var2 = "hello" # Can override with a different type too
var3 = 4
[section]
var3 = 5
var4 = 5

Then the final configuration will be:

{
    "var1": [1, 2],
    "var2": "hello",
    "var3": 4,
    "section": {
        "var3": 5,
        "var4": 5,
    },
}
Source code in frequenz/sdk/config/_config_managing.py
class ConfigManagingActor(Actor):
    """An actor that monitors a TOML configuration files for updates.

    When the actor is started the configuration files will be read and sent to the
    output sender. Then the actor will start monitoring the files for updates. If any
    file is updated, all the configuration files will be re-read and sent to the output
    sender.

    If no configuration file could be read, the actor will raise an exception.

    The configuration files are read in the order of the paths, so the last path will
    override the configuration set by the previous paths. Dict keys will be merged
    recursively, but other objects (like lists) will be replaced by the value in the
    last path.

    Example:
        If `config1.toml` contains:

        ```toml
        var1 = [1, 2]
        var2 = 2
        [section]
        var3 = [1, 3]
        ```

        And `config2.toml` contains:

        ```toml
        var2 = "hello" # Can override with a different type too
        var3 = 4
        [section]
        var3 = 5
        var4 = 5
        ```

        Then the final configuration will be:

        ```py
        {
            "var1": [1, 2],
            "var2": "hello",
            "var3": 4,
            "section": {
                "var3": 5,
                "var4": 5,
            },
        }
        ```
    """

    # pylint: disable-next=too-many-arguments
    def __init__(
        self,
        config_paths: abc.Sequence[pathlib.Path | str],
        output: Sender[abc.Mapping[str, Any]],
        event_types: abc.Set[EventType] = frozenset(EventType),
        *,
        name: str | None = None,
        force_polling: bool = True,
        polling_interval: timedelta = timedelta(seconds=1),
    ) -> None:
        """Initialize this instance.

        Args:
            config_paths: The paths to the TOML files with the configuration. Order
                matters, as the configuration will be read and updated in the order
                of the paths, so the last path will override the configuration set by
                the previous paths. Dict keys will be merged recursively, but other
                objects (like lists) will be replaced by the value in the last path.
            output: The sender to send the configuration to.
            event_types: The set of event types to monitor.
            name: The name of the actor. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
            force_polling: Whether to force file polling to check for changes.
            polling_interval: The interval to poll for changes. Only relevant if
                polling is enabled.
        """
        super().__init__(name=name)
        self._config_paths: list[pathlib.Path] = [
            (
                config_path
                if isinstance(config_path, pathlib.Path)
                else pathlib.Path(config_path)
            )
            for config_path in config_paths
        ]
        self._output: Sender[abc.Mapping[str, Any]] = output
        self._event_types: abc.Set[EventType] = event_types
        self._force_polling: bool = force_polling
        self._polling_interval: timedelta = polling_interval

    def _read_config(self) -> abc.Mapping[str, Any]:
        """Read the contents of the configuration file.

        Returns:
            A dictionary containing configuration variables.

        Raises:
            ValueError: If config file cannot be read.
        """
        error_count = 0
        config: dict[str, Any] = {}

        for config_path in self._config_paths:
            try:
                with config_path.open("rb") as toml_file:
                    data = tomllib.load(toml_file)
                    config = _recursive_update(config, data)
            except ValueError as err:
                _logger.error("%s: Can't read config file, err: %s", self, err)
                error_count += 1

        if error_count == len(self._config_paths):
            raise ValueError(f"{self}: Can't read any of the config files")

        return config

    async def send_config(self) -> None:
        """Send the configuration to the output sender."""
        config = self._read_config()
        await self._output.send(config)

    async def _run(self) -> None:
        """Monitor for and send configuration file updates.

        At startup, the Config Manager sends the current config so that it
        can be cache in the Broadcast channel and served to receivers even if
        there hasn't been any change to the config file itself.
        """
        await self.send_config()

        parent_paths = {p.parent for p in self._config_paths}

        # FileWatcher can't watch for non-existing files, so we need to watch for the
        # parent directories instead just in case a configuration file doesn't exist yet
        # or it is deleted and recreated again.
        file_watcher = FileWatcher(
            paths=list(parent_paths),
            event_types=self._event_types,
            force_polling=self._force_polling,
            polling_interval=self._polling_interval,
        )

        try:
            async for event in file_watcher:
                # Since we are watching the whole parent directories, we need to make
                # sure we only react to events related to the configuration files we
                # are interested in.
                if not any(event.path.samefile(p) for p in self._config_paths):
                    continue

                match event.type:
                    case EventType.CREATE:
                        _logger.info(
                            "%s: The configuration file %s was created, sending new config...",
                            self,
                            event.path,
                        )
                        await self.send_config()
                    case EventType.MODIFY:
                        _logger.info(
                            "%s: The configuration file %s was modified, sending update...",
                            self,
                            event.path,
                        )
                        await self.send_config()
                    case EventType.DELETE:
                        _logger.info(
                            "%s: The configuration file %s was deleted, ignoring...",
                            self,
                            event.path,
                        )
                    case _:
                        assert_never(event.type)
        finally:
            del file_watcher
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_paths: Sequence[Path | str],
    output: Sender[Mapping[str, Any]],
    event_types: Set[EventType] = frozenset(EventType),
    *,
    name: str | None = None,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1)
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
config_paths

The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

TYPE: Sequence[Path | str]

output

The sender to send the configuration to.

TYPE: Sender[Mapping[str, Any]]

event_types

The set of event types to monitor.

TYPE: Set[EventType] DEFAULT: frozenset(EventType)

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

force_polling

Whether to force file polling to check for changes.

TYPE: bool DEFAULT: True

polling_interval

The interval to poll for changes. Only relevant if polling is enabled.

TYPE: timedelta DEFAULT: timedelta(seconds=1)

Source code in frequenz/sdk/config/_config_managing.py
def __init__(
    self,
    config_paths: abc.Sequence[pathlib.Path | str],
    output: Sender[abc.Mapping[str, Any]],
    event_types: abc.Set[EventType] = frozenset(EventType),
    *,
    name: str | None = None,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1),
) -> None:
    """Initialize this instance.

    Args:
        config_paths: The paths to the TOML files with the configuration. Order
            matters, as the configuration will be read and updated in the order
            of the paths, so the last path will override the configuration set by
            the previous paths. Dict keys will be merged recursively, but other
            objects (like lists) will be replaced by the value in the last path.
        output: The sender to send the configuration to.
        event_types: The set of event types to monitor.
        name: The name of the actor. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
        force_polling: Whether to force file polling to check for changes.
        polling_interval: The interval to poll for changes. Only relevant if
            polling is enabled.
    """
    super().__init__(name=name)
    self._config_paths: list[pathlib.Path] = [
        (
            config_path
            if isinstance(config_path, pathlib.Path)
            else pathlib.Path(config_path)
        )
        for config_path in config_paths
    ]
    self._output: Sender[abc.Mapping[str, Any]] = output
    self._event_types: abc.Set[EventType] = event_types
    self._force_polling: bool = force_polling
    self._polling_interval: timedelta = polling_interval
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
send_config async ¤
send_config() -> None

Send the configuration to the output sender.

Source code in frequenz/sdk/config/_config_managing.py
async def send_config(self) -> None:
    """Send the configuration to the output sender."""
    config = self._read_config()
    await self._output.send(config)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.config.LoggerConfig ¤

A configuration for a logger.

Source code in frequenz/sdk/config/_logging_config_updater.py
@dataclass
class LoggerConfig:
    """A configuration for a logger."""

    level: LogLevel = field(
        default="NOTSET",
        metadata={
            "metadata": {
                "description": "Log level for the logger. Uses standard logging levels."
            },
            "required": False,
        },
    )
    """The log level for the logger."""
Attributes¤
level class-attribute instance-attribute ¤
level: LogLevel = field(
    default="NOTSET",
    metadata={
        "metadata": {
            "description": "Log level for the logger. Uses standard logging levels."
        },
        "required": False,
    },
)

The log level for the logger.

frequenz.sdk.config.LoggingConfig ¤

A configuration for the logging system.

Source code in frequenz/sdk/config/_logging_config_updater.py
@dataclass
class LoggingConfig:
    """A configuration for the logging system."""

    root_logger: LoggerConfig = field(
        default_factory=LoggerConfig,
        metadata={
            "metadata": {
                "description": "Default default configuration for all loggers.",
            },
            "required": False,
        },
    )
    """The default log level."""

    loggers: dict[str, LoggerConfig] = field(
        default_factory=dict,
        metadata={
            "metadata": {
                "description": "Configuration for a logger (the key is the logger name)."
            },
            "required": False,
        },
    )
    """The list of loggers configurations."""

    @classmethod
    def load(cls, configs: Mapping[str, Any]) -> Self:  # noqa: DOC502
        """Load and validate configs from a dictionary.

        Args:
            configs: The configuration to validate.

        Returns:
            The configuration if they are valid.

        Raises:
            ValidationError: if the configuration are invalid.
        """
        schema = class_schema(cls)()
        return cast(Self, schema.load(configs, unknown=RAISE))
Attributes¤
loggers class-attribute instance-attribute ¤
loggers: dict[str, LoggerConfig] = field(
    default_factory=dict,
    metadata={
        "metadata": {
            "description": "Configuration for a logger (the key is the logger name)."
        },
        "required": False,
    },
)

The list of loggers configurations.

root_logger class-attribute instance-attribute ¤
root_logger: LoggerConfig = field(
    default_factory=LoggerConfig,
    metadata={
        "metadata": {
            "description": "Default default configuration for all loggers."
        },
        "required": False,
    },
)

The default log level.

Functions¤
load classmethod ¤
load(configs: Mapping[str, Any]) -> Self

Load and validate configs from a dictionary.

PARAMETER DESCRIPTION
configs

The configuration to validate.

TYPE: Mapping[str, Any]

RETURNS DESCRIPTION
Self

The configuration if they are valid.

RAISES DESCRIPTION
ValidationError

if the configuration are invalid.

Source code in frequenz/sdk/config/_logging_config_updater.py
@classmethod
def load(cls, configs: Mapping[str, Any]) -> Self:  # noqa: DOC502
    """Load and validate configs from a dictionary.

    Args:
        configs: The configuration to validate.

    Returns:
        The configuration if they are valid.

    Raises:
        ValidationError: if the configuration are invalid.
    """
    schema = class_schema(cls)()
    return cast(Self, schema.load(configs, unknown=RAISE))

frequenz.sdk.config.LoggingConfigUpdater ¤

Bases: Actor

Actor that listens for logging configuration changes and sets them.

Example

config.toml file:

[logging.root_logger]
level = "INFO"

[logging.loggers."frequenz.sdk.actor.power_distributing"]
level = "DEBUG"

[logging.loggers."frequenz.channels"]
level = "DEBUG"

import asyncio
from collections.abc import Mapping
from typing import Any

from frequenz.channels import Broadcast
from frequenz.sdk.config import LoggingConfigUpdater, ConfigManager
from frequenz.sdk.actor import run as run_actors

async def run() -> None:
    config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True)
    actors = [
        ConfigManager(config_paths=["config.toml"], output=config_channel.new_sender()),
        LoggingConfigUpdater(
            config_recv=config_channel.new_receiver(limit=1)).map(
                lambda app_config: app_config.get("logging", {}
            )
        ),
    ]
    await run_actors(*actors)

asyncio.run(run())

Now whenever the config.toml file is updated, the logging configuration will be updated as well.

Source code in frequenz/sdk/config/_logging_config_updater.py
class LoggingConfigUpdater(Actor):
    """Actor that listens for logging configuration changes and sets them.

    Example:
        `config.toml` file:
        ```toml
        [logging.root_logger]
        level = "INFO"

        [logging.loggers."frequenz.sdk.actor.power_distributing"]
        level = "DEBUG"

        [logging.loggers."frequenz.channels"]
        level = "DEBUG"
        ```

        ```python
        import asyncio
        from collections.abc import Mapping
        from typing import Any

        from frequenz.channels import Broadcast
        from frequenz.sdk.config import LoggingConfigUpdater, ConfigManager
        from frequenz.sdk.actor import run as run_actors

        async def run() -> None:
            config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True)
            actors = [
                ConfigManager(config_paths=["config.toml"], output=config_channel.new_sender()),
                LoggingConfigUpdater(
                    config_recv=config_channel.new_receiver(limit=1)).map(
                        lambda app_config: app_config.get("logging", {}
                    )
                ),
            ]
            await run_actors(*actors)

        asyncio.run(run())
        ```

        Now whenever the `config.toml` file is updated, the logging configuration
        will be updated as well.
    """

    def __init__(
        self,
        config_recv: Receiver[Mapping[str, Any]],
        log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
        log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
    ):
        """Initialize this instance.

        Args:
            config_recv: The receiver to listen for configuration changes.
            log_format: Use the specified format string in logs.
            log_datefmt: Use the specified date/time format in logs.
        """
        super().__init__()
        self._config_recv = config_recv
        self._format = log_format
        self._datefmt = log_datefmt

        # Setup default configuration.
        # This ensures logging is configured even if actor fails to start or
        # if the configuration cannot be loaded.
        self._current_config: LoggingConfig = LoggingConfig()
        self._update_logging(self._current_config)

    async def _run(self) -> None:
        """Listen for configuration changes and update logging."""
        async for message in self._config_recv:
            try:
                new_config = LoggingConfig.load(message)
            except marshmallow.ValidationError:
                _logger.exception(
                    "Invalid logging configuration received. Skipping config update"
                )
                continue

            if new_config != self._current_config:
                self._update_logging(new_config)

    def _update_logging(self, config: LoggingConfig) -> None:
        """Configure the logging level."""
        # If the logger is not in the new config, set it to NOTSET
        loggers_to_unset = self._current_config.loggers.keys() - config.loggers.keys()
        for logger_id in loggers_to_unset:
            _logger.debug("Unsetting log level for logger '%s'", logger_id)
            logging.getLogger(logger_id).setLevel(logging.NOTSET)

        self._current_config = config
        logging.basicConfig(
            format=self._format,
            level=self._current_config.root_logger.level,
            datefmt=self._datefmt,
        )

        # For each logger in the new config, set the log level
        for logger_id, logger_config in self._current_config.loggers.items():
            _logger.debug(
                "Setting log level for logger '%s' to '%s'",
                logger_id,
                logger_config.level,
            )
            logging.getLogger(logger_id).setLevel(logger_config.level)

        _logger.info("Logging config changed to: %s", self._current_config)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_recv: Receiver[Mapping[str, Any]],
    log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
    log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
)

Initialize this instance.

PARAMETER DESCRIPTION
config_recv

The receiver to listen for configuration changes.

TYPE: Receiver[Mapping[str, Any]]

log_format

Use the specified format string in logs.

TYPE: str DEFAULT: '%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s'

log_datefmt

Use the specified date/time format in logs.

TYPE: str DEFAULT: '%Y-%m-%dT%H:%M:%S%z'

Source code in frequenz/sdk/config/_logging_config_updater.py
def __init__(
    self,
    config_recv: Receiver[Mapping[str, Any]],
    log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
    log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
):
    """Initialize this instance.

    Args:
        config_recv: The receiver to listen for configuration changes.
        log_format: Use the specified format string in logs.
        log_datefmt: Use the specified date/time format in logs.
    """
    super().__init__()
    self._config_recv = config_recv
    self._format = log_format
    self._datefmt = log_datefmt

    # Setup default configuration.
    # This ensures logging is configured even if actor fails to start or
    # if the configuration cannot be loaded.
    self._current_config: LoggingConfig = LoggingConfig()
    self._update_logging(self._current_config)
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

Functions¤

frequenz.sdk.config.load_config ¤

load_config(
    cls: type[DataclassT],
    config: Mapping[str, Any],
    /,
    base_schema: type[Schema] | None = None,
    **marshmallow_load_kwargs: Any,
) -> DataclassT

Load a configuration from a dictionary into an instance of a configuration class.

The configuration class is expected to be a dataclasses.dataclass, which is used to create a marshmallow.Schema schema to validate the configuration dictionary using marshmallow_dataclass.class_schema (which in turn uses the marshmallow.Schema.load method to do the validation and deserialization).

To customize the schema derived from the configuration dataclass, you can use the metadata key in dataclasses.field to pass extra options to marshmallow_dataclass to be used during validation and deserialization.

Additional arguments can be passed to marshmallow.Schema.load using keyword arguments marshmallow_load_kwargs.

Note

This method will raise marshmallow.ValidationError if the configuration dictionary is invalid and you have to have in mind all of the gotchas of marshmallow and marshmallow_dataclass applies when using this function. It is recommended to carefully read the documentation of these libraries.

PARAMETER DESCRIPTION
cls

The configuration class.

TYPE: type[DataclassT]

config

The configuration dictionary.

TYPE: Mapping[str, Any]

base_schema

An optional class to be used as a base schema for the configuration class. This allow using custom fields for example. Will be passed to marshmallow_dataclass.class_schema.

TYPE: type[Schema] | None DEFAULT: None

**marshmallow_load_kwargs

Additional arguments to be passed to marshmallow.Schema.load.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
DataclassT

The loaded configuration as an instance of the configuration class.

Source code in frequenz/sdk/config/_util.py
def load_config(
    cls: type[DataclassT],
    config: Mapping[str, Any],
    /,
    base_schema: type[Schema] | None = None,
    **marshmallow_load_kwargs: Any,
) -> DataclassT:
    """Load a configuration from a dictionary into an instance of a configuration class.

    The configuration class is expected to be a [`dataclasses.dataclass`][], which is
    used to create a [`marshmallow.Schema`][] schema to validate the configuration
    dictionary using [`marshmallow_dataclass.class_schema`][] (which in turn uses the
    [`marshmallow.Schema.load`][] method to do the validation and deserialization).

    To customize the schema derived from the configuration dataclass, you can use the
    `metadata` key in [`dataclasses.field`][] to pass extra options to
    [`marshmallow_dataclass`][] to be used during validation and deserialization.

    Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword
    arguments `marshmallow_load_kwargs`.

    Note:
        This method will raise [`marshmallow.ValidationError`][] if the configuration
        dictionary is invalid and you have to have in mind all of the gotchas of
        [`marshmallow`][] and [`marshmallow_dataclass`][] applies when using this
        function.  It is recommended to carefully read the documentation of these
        libraries.

    Args:
        cls: The configuration class.
        config: The configuration dictionary.
        base_schema: An optional class to be used as a base schema for the configuration
            class. This allow using custom fields for example. Will be passed to
            [`marshmallow_dataclass.class_schema`][].
        **marshmallow_load_kwargs: Additional arguments to be passed to
            [`marshmallow.Schema.load`][].

    Returns:
        The loaded configuration as an instance of the configuration class.
    """
    instance = class_schema(cls, base_schema)().load(config, **marshmallow_load_kwargs)
    # We need to cast because `.load()` comes from marshmallow and doesn't know which
    # type is returned.
    return cast(DataclassT, instance)