Skip to content

config

frequenz.sdk.config ¤

Configuration management.

Overview¤

To provide dynamic configurations to an application, you can use the ConfigManager class. This class provides a convenient interface to manage configurations from multiple config files and receive updates when the configurations change. Users can create a receiver to receive configurations from the manager.

Setup¤

To use the ConfigManager, you need to create an instance of it and pass the paths to the configuration files. The configuration files must be in the TOML format.

When specifying multiple files order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

from frequenz.sdk.config import ConfigManager

async with ConfigManager(["base-config.toml", "overrides.toml"]) as config_manager:
    ...

Logging¤

The ConfigManager can also instantiate a LoggingConfigUpdatingActor to monitor logging configurations. This actor will listen for logging configuration changes and update the logging configuration accordingly.

This feature is enabled by default using the key logging in the configuration file. To disable it you can pass logging_config_key=None to the ConfigManager.

Receiving configurations¤

To receive configurations, you can create a receiver using the [new_receiver()][ frequenz.sdk.config.ConfigManager.new_receiver] method. The receiver will receive configurations from the manager for a particular key, and validate and load the configurations to a dataclass using marshmallow_dataclass.

If the key is a sequence of strings, it will be treated as a nested key and the receiver will receive the configuration under the nested key. For example ["key", "subkey"] will get only config["key"]["subkey"].

Besides a configuration instance, the receiver can also receive exceptions if there are errors loading the configuration (typically a ValidationError), or None if there is no configuration for the key.

The value under key must be another mapping, otherwise a InvalidValueForKeyError instance will be sent to the receiver.

If there were any errors loading the configuration, the error will be logged too.

from dataclasses import dataclass
from frequenz.sdk.config import ConfigManager

@dataclass(frozen=True, kw_only=True)
class AppConfig:
    test: int

async with ConfigManager("config.toml") as config_manager:
    receiver = config_manager.new_receiver("app", AppConfig)
    app_config = await receiver.receive()
    match app_config:
        case AppConfig(test=42):
            print("App configured with 42")
        case Exception() as error:
            print(f"Error loading configuration: {error}")
        case None:
            print("There is no configuration for the app key")

Validation and loading¤

The configuration class used to create the configuration instance is expected to be a dataclasses.dataclass, which is used to create a marshmallow.Schema via the marshmallow_dataclass.class_schema function.

This means you can customize the schema derived from the configuration dataclass using marshmallow_dataclass to specify extra validation and options via field metadata.

Customization can also be done via a base_schema. By default BaseConfigSchema is used to provide support for some extra commonly used fields (like quantities) and to exclude unknown fields by default.

import marshmallow.validate
from dataclasses import dataclass, field

@dataclass(frozen=True, kw_only=True)
class Config:
    test: int = field(
        metadata={"validate": marshmallow.validate.Range(min=0)},
    )

Additional arguments can be passed to marshmallow.Schema.load using the marshmallow_load_kwargs keyword arguments.

When marshmallow.EXCLUDE is used, a warning will be logged if there are extra fields in the configuration that are excluded. This is useful, for example, to catch typos in the configuration file.

Skipping superfluous updates¤

If there is a burst of configuration updates, the receiver will only receive the last configuration, older configurations will be ignored.

If skip_unchanged is set to True, then a configuration that didn't change compared to the last one received will be ignored and not sent to the receiver. The comparison is done using the raw dict to determine if the configuration has changed.

Error handling¤

The value under key must be another mapping, otherwise an error will be logged and a frequenz.sdk.config.InvalidValueForKeyError instance will be sent to the receiver.

Configurations that don't pass the validation will be logged as an error and the ValidationError sent to the receiver.

Any other unexpected error raised during the configuration loading will be logged as an error and the error instance sent to the receiver.

Further customization¤

If you have special needs for receiving the configurations (for example validating using marshmallow doesn't fit your needs), you can create a custom receiver using config_channel.new_receiver() directly. Please bear in mind that this provides a low-level access to the whole config in the file as a raw Python mapping.

Actors that need to be reconfigured should take a configuration manager and a key to receive configurations updates, and instantiate the new receiver themselves. This allows actors to have full control over how the configuration is loaded (for example providing a custom base schema or marshmallow options).

Passing the key explicitly too allows application to structure the configuration in whatever way is most convenient for the application.

Actors can use the wait_for_first() function to wait for the first configuration to be received, and cache the configuration for later use and in case the actor is restarted. If the configuration is not received after some timeout, a asyncio.TimeoutError will be raised (and if uncaught, the actor will be automatically restarted after some delay).

Actor that can run without a configuration (using a default configuration)
actor.py
import dataclasses
import logging
from collections.abc import Sequence
from datetime import timedelta
from typing import assert_never

from frequenz.channels import select, selected_from
from frequenz.channels.event import Event

from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first

_logger = logging.getLogger(__name__)

@dataclasses.dataclass(frozen=True, kw_only=True)
class MyActorConfig:
    some_config: timedelta = dataclasses.field(
        default=timedelta(seconds=42), # (1)!
        metadata={"metadata": {"description": "Some optional configuration"}},
    )

class MyActor(Actor):
    def __init__(
        self,
        config_manager: ConfigManager,
        /,
        *,
        config_key: str | Sequence[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._config_manager = config_manager
        self._config_key = config_key
        self._config: MyActorConfig = MyActorConfig() # (2)!

    async def _run(self) -> None:
        config_receiver = self._config_manager.new_receiver(
            self._config_key, MyActorConfig
        )
        self._update_config(
            await wait_for_first(
                config_receiver, receiver_name=str(self), allow_none=True # (3)!
            )
        )

        other_receiver = Event()

        async for selected in select(config_receiver, other_receiver):
            if selected_from(selected, config_receiver):
                self._update_config(selected.message)
            elif selected_from(selected, other_receiver):
                # Do something else
                ...

    def _update_config(self, config_update: MyActorConfig | Exception | None) -> None:
        match config_update:
            case MyActorConfig() as config:
                _logger.info("New configuration received, updating.")
                self._reconfigure(config)
            case None:
                _logger.info("Configuration was unset, resetting to the default")
                self._reconfigure(MyActorConfig()) # (4)!
            case Exception():
                _logger.info( # (5)!
                    "New configuration has errors, keeping the old configuration."
                )
            case unexpected:
                assert_never(unexpected)

    def _reconfigure(self, config: MyActorConfig) -> None:
        self._config = config
        # Do something with the new configuration
  1. This is different when the actor requires a configuration to run. Here, the config has a default value.
  2. This is different when the actor requires a configuration to run. Here, the actor can just instantiate a default configuration.
  3. This is different when the actor requires a configuration to run. Here, the actor can accept a None configuration.
  4. This is different when the actor requires a configuration to run. Here, the actor can reset to a default configuration.
  5. There is no need to log the error itself, the configuration manager will log it automatically.
Actor that requires a configuration to run
actor.py
import dataclasses
import logging
from collections.abc import Sequence
from datetime import timedelta
from typing import assert_never

from frequenz.channels import select, selected_from
from frequenz.channels.event import Event

from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first

_logger = logging.getLogger(__name__)

@dataclasses.dataclass(frozen=True, kw_only=True)
class MyActorConfig:
    some_config: timedelta = dataclasses.field( # (1)!
        metadata={"metadata": {"description": "Some required configuration"}},
    )

class MyActor(Actor):
    def __init__(
        self,
        config_manager: ConfigManager,
        /,
        *,
        config_key: str | Sequence[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._config_manager = config_manager
        self._config_key = config_key
        self._config: MyActorConfig # (2)!

    async def _run(self) -> None:
        config_receiver = self._config_manager.new_receiver(
            self._config_key, MyActorConfig
        )
        self._update_config(
            await wait_for_first(config_receiver, receiver_name=str(self)) # (3)!
        )

        other_receiver = Event()

        async for selected in select(config_receiver, other_receiver):
            if selected_from(selected, config_receiver):
                self._update_config(selected.message)
            elif selected_from(selected, other_receiver):
                # Do something else
                ...

    def _update_config(self, config_update: MyActorConfig | Exception | None) -> None:
        match config_update:
            case MyActorConfig() as config:
                _logger.info("New configuration received, updating.")
                self._reconfigure(config)
            case None:
                _logger.info("Configuration was unset, keeping the old configuration.") # (4)!
            case Exception():
                _logger.info( # (5)!
                    "New configuration has errors, keeping the old configuration."
                )
            case unexpected:
                assert_never(unexpected)

    def _reconfigure(self, config: MyActorConfig) -> None:
        self._config = config
        # Do something with the new configuration
  1. This is different when the actor can use a default configuration. Here, the field is required, so there is no default configuration possible.
  2. This is different when the actor can use a default configuration. Here, the assignment of the configuration is delayed to the _run() method.
  3. This is different when the actor can use a default configuration. Here, the actor doesn't accept None as a valid configuration as it can't create a default configuration.
  4. This is different when the actor can use a default configuration. Here, the actor doesn't accept None as a valid configuration as it can't create a default configuration, so it needs to keep the old configuration.
  5. There is no need to log the error itself, the configuration manager will log it automatically.
Application

The pattern used by the application is very similar to the one used by actors. In this case the application requires a configuration to run, but if it could also use a default configuration, the changes would be the same as in the actor examples.

app.py
import asyncio
import dataclasses
import logging
import pathlib
from collections.abc import Sequence
from datetime import timedelta
from typing import Sequence, assert_never

from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first

_logger = logging.getLogger(__name__)

class MyActor(Actor): # (1)!
    def __init__(
        self, config_manager: ConfigManager, /, *, config_key: str | Sequence[str]
    ) -> None:
        super().__init__()
        self._config_manager = config_manager
        self._config_key = config_key
    async def _run(self) -> None: ...

@dataclasses.dataclass(frozen=True, kw_only=True)
class AppConfig:
    enable_actor: bool = dataclasses.field(
        metadata={"metadata": {"description": "Whether to enable the actor"}},
    )

class App:
    def __init__(self, *, config_paths: Sequence[pathlib.Path]):
        self._config_manager = ConfigManager(config_paths)
        self._config_receiver = self._config_manager.new_receiver("app", AppConfig)
        self._actor = MyActor(self._config_manager, config_key="actor")

    async def _update_config(self, config_update: AppConfig | Exception | None) -> None:
        match config_update:
            case AppConfig() as config:
                _logger.info("New configuration received, updating.")
                await self._reconfigure(config)
            case None:
                _logger.info("Configuration was unset, keeping the old configuration.")
            case Exception():
                _logger.info("New configuration has errors, keeping the old configuration.")
            case unexpected:
                assert_never(unexpected)

    async def _reconfigure(self, config: AppConfig) -> None:
        if config.enable_actor:
            self._actor.start()
        else:
            await self._actor.stop()

    async def run(self) -> None:
        _logger.info("Starting App...")

        async with self._config_manager:
            await self._update_config(
                await wait_for_first(self._config_receiver, receiver_name="app")
            )

            _logger.info("Waiting for configuration updates...")
            async for config_update in self._config_receiver:
                await self._reconfigure(config_update)

if __name__ == "__main__":
    asyncio.run(App(config_paths="config.toml").run())
  1. Look for the actor examples for a proper implementation of the actor.

Example configuration file:

config.toml
[app]
enable_actor = true

[actor]
some_config = 10

[logging.root_logger]
level = "DEBUG"

Classes¤

frequenz.sdk.config.BaseConfigSchema ¤

Bases: QuantitySchema

A base schema for configuration classes.

This schema provides validation for quantities and ignores unknown fields by default.

Source code in frequenz/sdk/config/_base_schema.py
class BaseConfigSchema(QuantitySchema):
    """A base schema for configuration classes.

    This schema provides validation for quantities and ignores unknown fields by
    default.
    """

    class Meta:
        """Meta options for the schema."""

        unknown = marshmallow.EXCLUDE
Classes¤
Meta ¤

Meta options for the schema.

Source code in frequenz/sdk/config/_base_schema.py
class Meta:
    """Meta options for the schema."""

    unknown = marshmallow.EXCLUDE
Functions¤
__init__ ¤
__init__(
    *args: Any,
    serialize_as_string_default: bool = False,
    **kwargs: Any
) -> None

Initialize the schema with a default serialization format.

PARAMETER DESCRIPTION
*args

Additional positional arguments.

TYPE: Any DEFAULT: ()

serialize_as_string_default

Default serialization format for quantities. If True, quantities are serialized as strings with units. If False, quantities are serialized as floats.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in frequenz/quantities/experimental/marshmallow.py
def __init__(
    self, *args: Any, serialize_as_string_default: bool = False, **kwargs: Any
) -> None:
    """
    Initialize the schema with a default serialization format.

    Args:
        *args: Additional positional arguments.
        serialize_as_string_default: Default serialization format for quantities.
            If True, quantities are serialized as strings with units.
            If False, quantities are serialized as floats.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.context["serialize_as_string_default"] = serialize_as_string_default

frequenz.sdk.config.ConfigManager ¤

Bases: BackgroundService

A manager for configuration files.

This class reads configuration files and sends the configuration to the receivers, providing configuration key filtering and value validation.

For a more in-depth introduction and examples, please read the module documentation.

Source code in frequenz/sdk/config/_manager.py
class ConfigManager(BackgroundService):
    """A manager for configuration files.

    This class reads configuration files and sends the configuration to the receivers,
    providing configuration key filtering and value validation.

    For a more in-depth introduction and examples, please read the [module
    documentation][frequenz.sdk.config].
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        config_paths: str | pathlib.Path | Sequence[pathlib.Path | str],
        /,
        *,
        force_polling: bool = True,
        logging_config_key: str | Sequence[str] | None = "logging",
        name: str | None = None,
        polling_interval: timedelta = timedelta(seconds=1),
    ) -> None:
        """Initialize this config manager.

        Args:
            config_paths: The paths to the TOML files with the configuration. Order
                matters, as the configuration will be read and updated in the order
                of the paths, so the last path will override the configuration set by
                the previous paths. Dict keys will be merged recursively, but other
                objects (like lists) will be replaced by the value in the last path.
            force_polling: Whether to force file polling to check for changes.
            logging_config_key: The key to use for the logging configuration. If `None`,
                logging configuration will not be managed.  If a key is provided, the
                manager update the logging configuration whenever the configuration
                changes.
            name: A name to use when creating actors. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
            polling_interval: The interval to poll for changes. Only relevant if
                polling is enabled.
        """
        super().__init__(name=name)

        self.config_channel: Final[Broadcast[Mapping[str, Any]]] = Broadcast(
            name=f"{self}_config", resend_latest=True
        )
        """The channel used for sending configuration updates (resends the latest value).

        This is the channel used to communicate with the
        [`ConfigManagingActor`][frequenz.sdk.config.ConfigManager.config_actor] and will
        receive the complete raw configuration as a mapping.
        """

        self.config_actor: Final[ConfigManagingActor] = ConfigManagingActor(
            config_paths,
            self.config_channel.new_sender(),
            name=self.name,
            force_polling=force_polling,
            polling_interval=polling_interval,
        )
        """The actor that manages the configuration for this manager."""

        # pylint: disable-next=import-outside-toplevel,cyclic-import
        from ._logging_actor import LoggingConfigUpdatingActor

        self.logging_actor: Final[LoggingConfigUpdatingActor | None] = (
            None
            if logging_config_key is None
            else LoggingConfigUpdatingActor(
                self, config_key=logging_config_key, name=self.name
            )
        )
        """The actor that manages the logging configuration for this manager."""

    @override
    def start(self) -> None:
        """Start this config manager."""
        self.config_actor.start()
        if self.logging_actor:
            self.logging_actor.start()

    @property
    @override
    def is_running(self) -> bool:
        """Whether this config manager is running."""
        return self.config_actor.is_running or (
            self.logging_actor is not None and self.logging_actor.is_running
        )

    @override
    def cancel(self, msg: str | None = None) -> None:
        """Cancel all running tasks and actors spawned by this config manager.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        if self.logging_actor:
            self.logging_actor.cancel(msg)
        self.config_actor.cancel(msg)

    @override
    async def wait(self) -> None:
        """Wait this config manager to finish.

        Wait until all tasks and actors are finished.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception (`CancelError` is not considered an error and not returned in
                the exception group).
        """
        exceptions: list[BaseException] = []
        if self.logging_actor:
            try:
                await self.logging_actor
            except BaseExceptionGroup as err:  # pylint: disable=try-except-raise
                exceptions.append(err)

        try:
            await self.config_actor
        except BaseExceptionGroup as err:  # pylint: disable=try-except-raise
            exceptions.append(err)

        if exceptions:
            raise BaseExceptionGroup(f"Error while stopping {self!r}", exceptions)

    @override
    def __repr__(self) -> str:
        """Return a string representation of this config manager."""
        logging_actor = (
            f"logging_actor={self.logging_actor!r}, " if self.logging_actor else ""
        )
        return (
            f"<{self.__class__.__name__}: "
            f"name={self.name!r}, "
            f"config_channel={self.config_channel!r}, "
            + logging_actor
            + f"config_actor={self.config_actor!r}>"
        )

    def new_receiver(  # pylint: disable=too-many-arguments
        self,
        # This is tricky, because a str is also a Sequence[str], if we would use only
        # Sequence[str], then a regular string would also be accepted and taken as
        # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from
        # the allowed types without changing Sequence[str] to something more specific,
        # like list[str] or tuple[str] (but both have their own problems).
        key: str | Sequence[str],
        config_class: type[DataclassT],
        /,
        *,
        skip_unchanged: bool = True,
        base_schema: type[Schema] | None = BaseConfigSchema,
        marshmallow_load_kwargs: dict[str, Any] | None = None,
    ) -> Receiver[DataclassT | Exception | None]:
        """Create a new receiver for receiving the configuration for a particular key.

        This method has a lot of features and functionalities to make it easier to
        receive configurations, but it also imposes some restrictions on how the
        configurations are received. If you need more control over the configuration
        receiver, you can create a receiver directly using
        [`config_channel.new_receiver()`][frequenz.sdk.config.ConfigManager.config_channel].

        For a more in-depth introduction and examples, please read the [module
        documentation][frequenz.sdk.config].

        Args:
            key: The configuration key to be read by the receiver. If a sequence of
                strings is used, it is used as a sub-key.
            config_class: The class object to use to instantiate a configuration. The
                configuration will be validated against this type too using
                [`marshmallow_dataclass`][].
            skip_unchanged: Whether to skip sending the configuration if it hasn't
                changed compared to the last one received.
            base_schema: An optional class to be used as a base schema for the
                configuration class. This allow using custom fields for example. Will be
                passed to [`marshmallow_dataclass.class_schema`][].
            marshmallow_load_kwargs: Additional arguments to be passed to
                [`marshmallow.Schema.load`][].

        Returns:
            The receiver for the configuration.
        """
        _validate_load_kwargs(marshmallow_load_kwargs)

        # We disable warning on overflow, because we are only interested in the latest
        # configuration, it is completely fine to drop old configuration updates.
        receiver = self.config_channel.new_receiver(
            name=f"{self}:{key}", limit=1, warn_on_overflow=False
        ).map(
            lambda config: _load_config_with_logging_and_errors(
                config,
                config_class,
                key=key,
                base_schema=base_schema,
                marshmallow_load_kwargs=marshmallow_load_kwargs,
            )
        )

        if skip_unchanged:
            # For some reason the type argument for WithPrevious is not inferred
            # correctly, so we need to specify it explicitly.
            return receiver.filter(
                WithPrevious[DataclassT | Exception | None](
                    lambda old, new: _not_equal_with_logging(
                        key=key, old_value=old, new_value=new
                    )
                )
            )

        return receiver
Attributes¤
config_actor instance-attribute ¤
config_actor: Final[ConfigManagingActor] = (
    ConfigManagingActor(
        config_paths,
        new_sender(),
        name=name,
        force_polling=force_polling,
        polling_interval=polling_interval,
    )
)

The actor that manages the configuration for this manager.

config_channel instance-attribute ¤
config_channel: Final[Broadcast[Mapping[str, Any]]] = (
    Broadcast(name=f"{self}_config", resend_latest=True)
)

The channel used for sending configuration updates (resends the latest value).

This is the channel used to communicate with the ConfigManagingActor and will receive the complete raw configuration as a mapping.

is_running property ¤
is_running: bool

Whether this config manager is running.

logging_actor instance-attribute ¤
logging_actor: Final[LoggingConfigUpdatingActor | None] = (
    None
    if logging_config_key is None
    else LoggingConfigUpdatingActor(
        self, config_key=logging_config_key, name=name
    )
)

The actor that manages the logging configuration for this manager.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_paths: str | Path | Sequence[Path | str],
    /,
    *,
    force_polling: bool = True,
    logging_config_key: (
        str | Sequence[str] | None
    ) = "logging",
    name: str | None = None,
    polling_interval: timedelta = timedelta(seconds=1),
) -> None

Initialize this config manager.

PARAMETER DESCRIPTION
config_paths

The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

TYPE: str | Path | Sequence[Path | str]

force_polling

Whether to force file polling to check for changes.

TYPE: bool DEFAULT: True

logging_config_key

The key to use for the logging configuration. If None, logging configuration will not be managed. If a key is provided, the manager update the logging configuration whenever the configuration changes.

TYPE: str | Sequence[str] | None DEFAULT: 'logging'

name

A name to use when creating actors. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

polling_interval

The interval to poll for changes. Only relevant if polling is enabled.

TYPE: timedelta DEFAULT: timedelta(seconds=1)

Source code in frequenz/sdk/config/_manager.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    config_paths: str | pathlib.Path | Sequence[pathlib.Path | str],
    /,
    *,
    force_polling: bool = True,
    logging_config_key: str | Sequence[str] | None = "logging",
    name: str | None = None,
    polling_interval: timedelta = timedelta(seconds=1),
) -> None:
    """Initialize this config manager.

    Args:
        config_paths: The paths to the TOML files with the configuration. Order
            matters, as the configuration will be read and updated in the order
            of the paths, so the last path will override the configuration set by
            the previous paths. Dict keys will be merged recursively, but other
            objects (like lists) will be replaced by the value in the last path.
        force_polling: Whether to force file polling to check for changes.
        logging_config_key: The key to use for the logging configuration. If `None`,
            logging configuration will not be managed.  If a key is provided, the
            manager update the logging configuration whenever the configuration
            changes.
        name: A name to use when creating actors. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
        polling_interval: The interval to poll for changes. Only relevant if
            polling is enabled.
    """
    super().__init__(name=name)

    self.config_channel: Final[Broadcast[Mapping[str, Any]]] = Broadcast(
        name=f"{self}_config", resend_latest=True
    )
    """The channel used for sending configuration updates (resends the latest value).

    This is the channel used to communicate with the
    [`ConfigManagingActor`][frequenz.sdk.config.ConfigManager.config_actor] and will
    receive the complete raw configuration as a mapping.
    """

    self.config_actor: Final[ConfigManagingActor] = ConfigManagingActor(
        config_paths,
        self.config_channel.new_sender(),
        name=self.name,
        force_polling=force_polling,
        polling_interval=polling_interval,
    )
    """The actor that manages the configuration for this manager."""

    # pylint: disable-next=import-outside-toplevel,cyclic-import
    from ._logging_actor import LoggingConfigUpdatingActor

    self.logging_actor: Final[LoggingConfigUpdatingActor | None] = (
        None
        if logging_config_key is None
        else LoggingConfigUpdatingActor(
            self, config_key=logging_config_key, name=self.name
        )
    )
    """The actor that manages the logging configuration for this manager."""
__repr__ ¤
__repr__() -> str

Return a string representation of this config manager.

Source code in frequenz/sdk/config/_manager.py
@override
def __repr__(self) -> str:
    """Return a string representation of this config manager."""
    logging_actor = (
        f"logging_actor={self.logging_actor!r}, " if self.logging_actor else ""
    )
    return (
        f"<{self.__class__.__name__}: "
        f"name={self.name!r}, "
        f"config_channel={self.config_channel!r}, "
        + logging_actor
        + f"config_actor={self.config_actor!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks and actors spawned by this config manager.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/config/_manager.py
@override
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks and actors spawned by this config manager.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    if self.logging_actor:
        self.logging_actor.cancel(msg)
    self.config_actor.cancel(msg)
new_receiver ¤
new_receiver(
    key: str | Sequence[str],
    config_class: type[DataclassT],
    /,
    *,
    skip_unchanged: bool = True,
    base_schema: type[Schema] | None = BaseConfigSchema,
    marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> Receiver[DataclassT | Exception | None]

Create a new receiver for receiving the configuration for a particular key.

This method has a lot of features and functionalities to make it easier to receive configurations, but it also imposes some restrictions on how the configurations are received. If you need more control over the configuration receiver, you can create a receiver directly using config_channel.new_receiver().

For a more in-depth introduction and examples, please read the module documentation.

PARAMETER DESCRIPTION
key

The configuration key to be read by the receiver. If a sequence of strings is used, it is used as a sub-key.

TYPE: str | Sequence[str]

config_class

The class object to use to instantiate a configuration. The configuration will be validated against this type too using marshmallow_dataclass.

TYPE: type[DataclassT]

skip_unchanged

Whether to skip sending the configuration if it hasn't changed compared to the last one received.

TYPE: bool DEFAULT: True

base_schema

An optional class to be used as a base schema for the configuration class. This allow using custom fields for example. Will be passed to marshmallow_dataclass.class_schema.

TYPE: type[Schema] | None DEFAULT: BaseConfigSchema

marshmallow_load_kwargs

Additional arguments to be passed to marshmallow.Schema.load.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
Receiver[DataclassT | Exception | None]

The receiver for the configuration.

Source code in frequenz/sdk/config/_manager.py
def new_receiver(  # pylint: disable=too-many-arguments
    self,
    # This is tricky, because a str is also a Sequence[str], if we would use only
    # Sequence[str], then a regular string would also be accepted and taken as
    # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from
    # the allowed types without changing Sequence[str] to something more specific,
    # like list[str] or tuple[str] (but both have their own problems).
    key: str | Sequence[str],
    config_class: type[DataclassT],
    /,
    *,
    skip_unchanged: bool = True,
    base_schema: type[Schema] | None = BaseConfigSchema,
    marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> Receiver[DataclassT | Exception | None]:
    """Create a new receiver for receiving the configuration for a particular key.

    This method has a lot of features and functionalities to make it easier to
    receive configurations, but it also imposes some restrictions on how the
    configurations are received. If you need more control over the configuration
    receiver, you can create a receiver directly using
    [`config_channel.new_receiver()`][frequenz.sdk.config.ConfigManager.config_channel].

    For a more in-depth introduction and examples, please read the [module
    documentation][frequenz.sdk.config].

    Args:
        key: The configuration key to be read by the receiver. If a sequence of
            strings is used, it is used as a sub-key.
        config_class: The class object to use to instantiate a configuration. The
            configuration will be validated against this type too using
            [`marshmallow_dataclass`][].
        skip_unchanged: Whether to skip sending the configuration if it hasn't
            changed compared to the last one received.
        base_schema: An optional class to be used as a base schema for the
            configuration class. This allow using custom fields for example. Will be
            passed to [`marshmallow_dataclass.class_schema`][].
        marshmallow_load_kwargs: Additional arguments to be passed to
            [`marshmallow.Schema.load`][].

    Returns:
        The receiver for the configuration.
    """
    _validate_load_kwargs(marshmallow_load_kwargs)

    # We disable warning on overflow, because we are only interested in the latest
    # configuration, it is completely fine to drop old configuration updates.
    receiver = self.config_channel.new_receiver(
        name=f"{self}:{key}", limit=1, warn_on_overflow=False
    ).map(
        lambda config: _load_config_with_logging_and_errors(
            config,
            config_class,
            key=key,
            base_schema=base_schema,
            marshmallow_load_kwargs=marshmallow_load_kwargs,
        )
    )

    if skip_unchanged:
        # For some reason the type argument for WithPrevious is not inferred
        # correctly, so we need to specify it explicitly.
        return receiver.filter(
            WithPrevious[DataclassT | Exception | None](
                lambda old, new: _not_equal_with_logging(
                    key=key, old_value=old, new_value=new
                )
            )
        )

    return receiver
start ¤
start() -> None

Start this config manager.

Source code in frequenz/sdk/config/_manager.py
@override
def start(self) -> None:
    """Start this config manager."""
    self.config_actor.start()
    if self.logging_actor:
        self.logging_actor.start()
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this config manager to finish.

Wait until all tasks and actors are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/config/_manager.py
@override
async def wait(self) -> None:
    """Wait this config manager to finish.

    Wait until all tasks and actors are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    exceptions: list[BaseException] = []
    if self.logging_actor:
        try:
            await self.logging_actor
        except BaseExceptionGroup as err:  # pylint: disable=try-except-raise
            exceptions.append(err)

    try:
        await self.config_actor
    except BaseExceptionGroup as err:  # pylint: disable=try-except-raise
        exceptions.append(err)

    if exceptions:
        raise BaseExceptionGroup(f"Error while stopping {self!r}", exceptions)

frequenz.sdk.config.ConfigManagingActor ¤

Bases: Actor

An actor that monitors a TOML configuration files for updates.

When the actor is started the configuration files will be read and sent to the output sender. Then the actor will start monitoring the files for updates. If any file is updated, all the configuration files will be re-read and sent to the output sender.

If no configuration file could be read, the actor will raise an exception.

The configuration files are read in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

Example

If config1.toml contains:

var1 = [1, 2]
var2 = 2
[section]
var3 = [1, 3]

And config2.toml contains:

var2 = "hello" # Can override with a different type too
var3 = 4
[section]
var3 = 5
var4 = 5

Then the final configuration will be:

{
    "var1": [1, 2],
    "var2": "hello",
    "var3": 4,
    "section": {
        "var3": 5,
        "var4": 5,
    },
}
Source code in frequenz/sdk/config/_managing_actor.py
class ConfigManagingActor(Actor):
    """An actor that monitors a TOML configuration files for updates.

    When the actor is started the configuration files will be read and sent to the
    output sender. Then the actor will start monitoring the files for updates. If any
    file is updated, all the configuration files will be re-read and sent to the output
    sender.

    If no configuration file could be read, the actor will raise an exception.

    The configuration files are read in the order of the paths, so the last path will
    override the configuration set by the previous paths. Dict keys will be merged
    recursively, but other objects (like lists) will be replaced by the value in the
    last path.

    Example:
        If `config1.toml` contains:

        ```toml
        var1 = [1, 2]
        var2 = 2
        [section]
        var3 = [1, 3]
        ```

        And `config2.toml` contains:

        ```toml
        var2 = "hello" # Can override with a different type too
        var3 = 4
        [section]
        var3 = 5
        var4 = 5
        ```

        Then the final configuration will be:

        ```py
        {
            "var1": [1, 2],
            "var2": "hello",
            "var3": 4,
            "section": {
                "var3": 5,
                "var4": 5,
            },
        }
        ```
    """

    # pylint: disable-next=too-many-arguments
    def __init__(
        self,
        config_paths: str | pathlib.Path | abc.Sequence[pathlib.Path | str],
        output: Sender[abc.Mapping[str, Any]],
        *,
        name: str | None = None,
        force_polling: bool = True,
        polling_interval: timedelta = timedelta(seconds=1),
    ) -> None:
        """Initialize this instance.

        Args:
            config_paths: The paths to the TOML files with the configuration. Order
                matters, as the configuration will be read and updated in the order
                of the paths, so the last path will override the configuration set by
                the previous paths. Dict keys will be merged recursively, but other
                objects (like lists) will be replaced by the value in the last path.
            output: The sender to send the configuration to.
            name: The name of the actor. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
            force_polling: Whether to force file polling to check for changes.
            polling_interval: The interval to poll for changes. Only relevant if
                polling is enabled.

        Raises:
            ValueError: If no configuration path is provided.
        """
        super().__init__(name=name)
        match config_paths:
            case str():
                self._config_paths = [pathlib.Path(config_paths)]
            case pathlib.Path():
                self._config_paths = [config_paths]
            case abc.Sequence() as seq if len(seq) == 0:
                raise ValueError("At least one config path is required.")
            case abc.Sequence():
                self._config_paths = [
                    (
                        config_path
                        if isinstance(config_path, pathlib.Path)
                        else pathlib.Path(config_path)
                    )
                    for config_path in config_paths
                ]
            case unexpected:
                assert_never(unexpected)
        self._output: Sender[abc.Mapping[str, Any]] = output
        self._force_polling: bool = force_polling
        self._polling_interval: timedelta = polling_interval

    def _read_config(self) -> abc.Mapping[str, Any] | None:
        """Read the contents of the configuration file.

        Returns:
            A dictionary containing configuration variables.
        """
        error_count = 0
        config: dict[str, Any] = {}

        for config_path in self._config_paths:
            _logger.info(
                "[%s] Reading configuration file %r...", self.name, str(config_path)
            )
            try:
                with config_path.open("rb") as toml_file:
                    data = tomllib.load(toml_file)
                    _logger.info(
                        "[%s] Configuration file %r read successfully.",
                        self.name,
                        str(config_path),
                    )
                    config = _recursive_update(config, data)
            except ValueError as err:
                _logger.error("[%s] Can't read config file, err: %s", self.name, err)
                error_count += 1
            except OSError as err:
                # It is ok for config file to don't exist.
                _logger.error(
                    "[%s] Error reading config file %r (%s). Ignoring it.",
                    self.name,
                    str(config_path),
                    err,
                )
                error_count += 1

        if error_count == len(self._config_paths):
            _logger.error(
                "[%s] Can't read any of the config files, ignoring config update.", self
            )
            return None

        _logger.info(
            "[%s] Read %s/%s configuration files successfully.",
            self.name,
            len(self._config_paths) - error_count,
            len(self._config_paths),
        )
        return config

    async def send_config(self) -> None:
        """Send the configuration to the output sender."""
        config = self._read_config()
        if config is not None:
            await self._output.send(config)

    async def _run(self) -> None:
        """Monitor for and send configuration file updates.

        At startup, the Config Manager sends the current config so that it
        can be cache in the Broadcast channel and served to receivers even if
        there hasn't been any change to the config file itself.
        """
        await self.send_config()

        parent_paths = {p.parent for p in self._config_paths}

        # FileWatcher can't watch for non-existing files, so we need to watch for the
        # parent directories instead just in case a configuration file doesn't exist yet
        # or it is deleted and recreated again.
        file_watcher = FileWatcher(
            paths=list(parent_paths),
            event_types={EventType.CREATE, EventType.MODIFY},
            force_polling=self._force_polling,
            polling_interval=self._polling_interval,
        )

        try:
            async for event in file_watcher:
                if not event.path.exists():
                    _logger.error(
                        "[%s] Received event %s, but the watched path %s doesn't exist.",
                        self.name,
                        event,
                        event.path,
                    )
                    continue
                # Since we are watching the whole parent directories, we need to make
                # sure we only react to events related to the configuration files we
                # are interested in.
                #
                # pathlib.Path.samefile raises error if any path doesn't exist so we need to
                # make sure the paths exists before calling it. This could happen as it is not
                # required that all config files exist, only one is required but we don't know
                # which.
                if not any(
                    event.path.samefile(p) for p in self._config_paths if p.exists()
                ):
                    continue

                match event.type:
                    case EventType.CREATE:
                        _logger.info(
                            "[%s] The configuration file %s was created, sending new config...",
                            self.name,
                            event.path,
                        )
                        await self.send_config()
                    case EventType.MODIFY:
                        _logger.info(
                            "[%s] The configuration file %s was modified, sending update...",
                            self.name,
                            event.path,
                        )
                        await self.send_config()
                    case EventType.DELETE:
                        _logger.error(
                            "[%s] Unexpected DELETE event for path %s. Please report this "
                            "issue to Frequenz.",
                            self.name,
                            event.path,
                        )
                    case _:
                        assert_never(event.type)
        finally:
            del file_watcher
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_paths: str | Path | Sequence[Path | str],
    output: Sender[Mapping[str, Any]],
    *,
    name: str | None = None,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1)
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
config_paths

The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.

TYPE: str | Path | Sequence[Path | str]

output

The sender to send the configuration to.

TYPE: Sender[Mapping[str, Any]]

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

force_polling

Whether to force file polling to check for changes.

TYPE: bool DEFAULT: True

polling_interval

The interval to poll for changes. Only relevant if polling is enabled.

TYPE: timedelta DEFAULT: timedelta(seconds=1)

RAISES DESCRIPTION
ValueError

If no configuration path is provided.

Source code in frequenz/sdk/config/_managing_actor.py
def __init__(
    self,
    config_paths: str | pathlib.Path | abc.Sequence[pathlib.Path | str],
    output: Sender[abc.Mapping[str, Any]],
    *,
    name: str | None = None,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1),
) -> None:
    """Initialize this instance.

    Args:
        config_paths: The paths to the TOML files with the configuration. Order
            matters, as the configuration will be read and updated in the order
            of the paths, so the last path will override the configuration set by
            the previous paths. Dict keys will be merged recursively, but other
            objects (like lists) will be replaced by the value in the last path.
        output: The sender to send the configuration to.
        name: The name of the actor. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
        force_polling: Whether to force file polling to check for changes.
        polling_interval: The interval to poll for changes. Only relevant if
            polling is enabled.

    Raises:
        ValueError: If no configuration path is provided.
    """
    super().__init__(name=name)
    match config_paths:
        case str():
            self._config_paths = [pathlib.Path(config_paths)]
        case pathlib.Path():
            self._config_paths = [config_paths]
        case abc.Sequence() as seq if len(seq) == 0:
            raise ValueError("At least one config path is required.")
        case abc.Sequence():
            self._config_paths = [
                (
                    config_path
                    if isinstance(config_path, pathlib.Path)
                    else pathlib.Path(config_path)
                )
                for config_path in config_paths
            ]
        case unexpected:
            assert_never(unexpected)
    self._output: Sender[abc.Mapping[str, Any]] = output
    self._force_polling: bool = force_polling
    self._polling_interval: timedelta = polling_interval
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
send_config async ¤
send_config() -> None

Send the configuration to the output sender.

Source code in frequenz/sdk/config/_managing_actor.py
async def send_config(self) -> None:
    """Send the configuration to the output sender."""
    config = self._read_config()
    if config is not None:
        await self._output.send(config)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.config.InvalidValueForKeyError ¤

Bases: ValueError

An error indicating that the value under the specified key is invalid.

Source code in frequenz/sdk/config/_manager.py
class InvalidValueForKeyError(ValueError):
    """An error indicating that the value under the specified key is invalid."""

    def __init__(self, msg: str, *, key: Sequence[str], value: Any) -> None:
        """Initialize this error.

        Args:
            msg: The error message.
            key: The key that has an invalid value.
            value: The actual value that was found that is not a mapping.
        """
        super().__init__(msg)

        self.key: Final[Sequence[str]] = key
        """The key that has an invalid value."""

        self.value: Final[Any] = value
        """The actual value that was found that is not a mapping."""
Attributes¤
key instance-attribute ¤
key: Final[Sequence[str]] = key

The key that has an invalid value.

value instance-attribute ¤
value: Final[Any] = value

The actual value that was found that is not a mapping.

Functions¤
__init__ ¤
__init__(
    msg: str, *, key: Sequence[str], value: Any
) -> None

Initialize this error.

PARAMETER DESCRIPTION
msg

The error message.

TYPE: str

key

The key that has an invalid value.

TYPE: Sequence[str]

value

The actual value that was found that is not a mapping.

TYPE: Any

Source code in frequenz/sdk/config/_manager.py
def __init__(self, msg: str, *, key: Sequence[str], value: Any) -> None:
    """Initialize this error.

    Args:
        msg: The error message.
        key: The key that has an invalid value.
        value: The actual value that was found that is not a mapping.
    """
    super().__init__(msg)

    self.key: Final[Sequence[str]] = key
    """The key that has an invalid value."""

    self.value: Final[Any] = value
    """The actual value that was found that is not a mapping."""

frequenz.sdk.config.LoggerConfig dataclass ¤

A configuration for a logger.

Source code in frequenz/sdk/config/_logging_actor.py
@dataclass(frozen=True, kw_only=True)
class LoggerConfig:
    """A configuration for a logger."""

    level: LogLevel = field(
        default="NOTSET",
        metadata={
            "metadata": {
                "description": "Log level for the logger. Uses standard logging levels."
            },
        },
    )
    """The log level for the logger."""
Attributes¤
level class-attribute instance-attribute ¤
level: LogLevel = field(
    default="NOTSET",
    metadata={
        "metadata": {
            "description": "Log level for the logger. Uses standard logging levels."
        }
    },
)

The log level for the logger.

frequenz.sdk.config.LoggingConfig dataclass ¤

A configuration for the logging system.

Source code in frequenz/sdk/config/_logging_actor.py
@dataclass(frozen=True, kw_only=True)
class LoggingConfig:
    """A configuration for the logging system."""

    root_logger: LoggerConfig = field(
        default_factory=lambda: LoggerConfig(level="INFO"),
        metadata={
            "metadata": {
                "description": "Default default configuration for all loggers.",
            },
        },
    )
    """The default log level."""

    loggers: dict[str, LoggerConfig] = field(
        default_factory=dict,
        metadata={
            "metadata": {
                "description": "Configuration for a logger (the key is the logger name)."
            },
        },
    )
    """The list of loggers configurations."""
Attributes¤
loggers class-attribute instance-attribute ¤
loggers: dict[str, LoggerConfig] = field(
    default_factory=dict,
    metadata={
        "metadata": {
            "description": "Configuration for a logger (the key is the logger name)."
        }
    },
)

The list of loggers configurations.

root_logger class-attribute instance-attribute ¤
root_logger: LoggerConfig = field(
    default_factory=lambda: LoggerConfig(level="INFO"),
    metadata={
        "metadata": {
            "description": "Default default configuration for all loggers."
        }
    },
)

The default log level.

frequenz.sdk.config.LoggingConfigUpdatingActor ¤

Bases: Actor

Actor that listens for logging configuration changes and sets them.

Example

config.toml file:

[logging.root_logger]
level = "INFO"

[logging.loggers."frequenz.sdk.actor.power_distributing"]
level = "DEBUG"

[logging.loggers."frequenz.channels"]
level = "DEBUG"

import asyncio

from frequenz.sdk.config import LoggingConfigUpdatingActor
from frequenz.sdk.actor import run as run_actors

async def run() -> None:
    config_manager: ConfigManager = ...
    await run_actors(LoggingConfigUpdatingActor(config_manager))

asyncio.run(run())

Now whenever the config.toml file is updated, the logging configuration will be updated as well.

Source code in frequenz/sdk/config/_logging_actor.py
class LoggingConfigUpdatingActor(Actor):
    """Actor that listens for logging configuration changes and sets them.

    Example:
        `config.toml` file:
        ```toml
        [logging.root_logger]
        level = "INFO"

        [logging.loggers."frequenz.sdk.actor.power_distributing"]
        level = "DEBUG"

        [logging.loggers."frequenz.channels"]
        level = "DEBUG"
        ```

        ```python
        import asyncio

        from frequenz.sdk.config import LoggingConfigUpdatingActor
        from frequenz.sdk.actor import run as run_actors

        async def run() -> None:
            config_manager: ConfigManager = ...
            await run_actors(LoggingConfigUpdatingActor(config_manager))

        asyncio.run(run())
        ```

        Now whenever the `config.toml` file is updated, the logging configuration
        will be updated as well.
    """

    # pylint: disable-next=too-many-arguments
    def __init__(
        self,
        config_manager: ConfigManager,
        /,
        *,
        config_key: str | Sequence[str] = "logging",
        log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
        log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
        name: str | None = None,
    ):
        """Initialize this instance.

        Args:
            config_manager: The configuration manager to use.
            config_key: The key to use to retrieve the configuration from the
                configuration manager.  If `None`, the whole configuration will be used.
            log_datefmt: Use the specified date/time format in logs.
            log_format: Use the specified format string in logs.
            name: The name of this actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.

        Note:
            The `log_format` and `log_datefmt` parameters are used in a call to
            `logging.basicConfig()`. If logging has already been configured elsewhere
            in the application (through a previous `basicConfig()` call), then the format
            settings specified here will be ignored.
        """
        self._config_receiver = config_manager.new_receiver(
            config_key, LoggingConfig, base_schema=None
        )

        # Setup default configuration.
        # This ensures logging is configured even if actor fails to start or
        # if the configuration cannot be loaded.
        self._current_config: LoggingConfig = LoggingConfig()

        super().__init__(name=name)

        logging.basicConfig(
            format=log_format,
            datefmt=log_datefmt,
            level=logging.INFO,
        )
        _logger.info("Applying initial default logging configuration...")
        self._reconfigure(self._current_config)

    async def _run(self) -> None:
        """Listen for configuration changes and update logging."""
        self._reconfigure(
            await wait_for_first(
                self._config_receiver, receiver_name=str(self), allow_none=True
            )
        )
        async for config_update in self._config_receiver:
            self._reconfigure(config_update)

    def _reconfigure(self, config_update: LoggingConfig | Exception | None) -> None:
        """Update the logging configuration.

        Args:
            config_update: The new configuration, or an exception if there was an error
                parsing the configuration, or `None` if the configuration was unset.
        """
        match config_update:
            case LoggingConfig():
                _logger.info(
                    "New configuration received, updating logging configuration."
                )
                self._update_logging(config_update)
            case None:
                _logger.info(
                    "Configuration was unset, resetting to the default "
                    "logging configuration."
                )
                self._update_logging(LoggingConfig())
            case Exception():
                _logger.info(
                    "New configuration has errors, keeping the old logging "
                    "configuration."
                )
            case unexpected:
                assert_never(unexpected)

    def _update_logging(self, config: LoggingConfig) -> None:
        """Configure the logging level."""
        # If the logger is not in the new config, set it to NOTSET
        loggers_to_unset = self._current_config.loggers.keys() - config.loggers.keys()
        for logger_id in loggers_to_unset:
            _logger.debug("Unsetting log level for logger '%s'", logger_id)
            logging.getLogger(logger_id).setLevel(logging.NOTSET)

        self._current_config = config
        _logger.info(
            "Setting root logger level to '%s'", self._current_config.root_logger.level
        )
        logging.getLogger().setLevel(self._current_config.root_logger.level)

        # For each logger in the new config, set the log level
        for logger_id, logger_config in self._current_config.loggers.items():
            _logger.info(
                "Setting log level for logger '%s' to '%s'",
                logger_id,
                logger_config.level,
            )
            logging.getLogger(logger_id).setLevel(logger_config.level)

        _logger.info("Logging config update completed.")
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_manager: ConfigManager,
    /,
    *,
    config_key: str | Sequence[str] = "logging",
    log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
    log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
    name: str | None = None,
)

Initialize this instance.

PARAMETER DESCRIPTION
config_manager

The configuration manager to use.

TYPE: ConfigManager

config_key

The key to use to retrieve the configuration from the configuration manager. If None, the whole configuration will be used.

TYPE: str | Sequence[str] DEFAULT: 'logging'

log_datefmt

Use the specified date/time format in logs.

TYPE: str DEFAULT: '%Y-%m-%dT%H:%M:%S%z'

log_format

Use the specified format string in logs.

TYPE: str DEFAULT: '%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s'

name

The name of this actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Note

The log_format and log_datefmt parameters are used in a call to logging.basicConfig(). If logging has already been configured elsewhere in the application (through a previous basicConfig() call), then the format settings specified here will be ignored.

Source code in frequenz/sdk/config/_logging_actor.py
def __init__(
    self,
    config_manager: ConfigManager,
    /,
    *,
    config_key: str | Sequence[str] = "logging",
    log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
    log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
    name: str | None = None,
):
    """Initialize this instance.

    Args:
        config_manager: The configuration manager to use.
        config_key: The key to use to retrieve the configuration from the
            configuration manager.  If `None`, the whole configuration will be used.
        log_datefmt: Use the specified date/time format in logs.
        log_format: Use the specified format string in logs.
        name: The name of this actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.

    Note:
        The `log_format` and `log_datefmt` parameters are used in a call to
        `logging.basicConfig()`. If logging has already been configured elsewhere
        in the application (through a previous `basicConfig()` call), then the format
        settings specified here will be ignored.
    """
    self._config_receiver = config_manager.new_receiver(
        config_key, LoggingConfig, base_schema=None
    )

    # Setup default configuration.
    # This ensures logging is configured even if actor fails to start or
    # if the configuration cannot be loaded.
    self._current_config: LoggingConfig = LoggingConfig()

    super().__init__(name=name)

    logging.basicConfig(
        format=log_format,
        datefmt=log_datefmt,
        level=logging.INFO,
    )
    _logger.info("Applying initial default logging configuration...")
    self._reconfigure(self._current_config)
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

Functions¤

frequenz.sdk.config.load_config ¤

load_config(
    cls: type[DataclassT],
    config: Mapping[str, Any],
    /,
    *,
    base_schema: type[Schema] | None = BaseConfigSchema,
    marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> DataclassT

Load a configuration from a dictionary into an instance of a configuration class.

The configuration class is expected to be a dataclasses.dataclass, which is used to create a marshmallow.Schema schema to validate the configuration dictionary using marshmallow_dataclass.class_schema (which in turn uses the marshmallow.Schema.load method to do the validation and deserialization).

To customize the schema derived from the configuration dataclass, you can use the metadata key in dataclasses.field to pass extra options to marshmallow_dataclass to be used during validation and deserialization.

Additional arguments can be passed to marshmallow.Schema.load using keyword arguments marshmallow_load_kwargs.

Note

This method will raise marshmallow.ValidationError if the configuration dictionary is invalid and you have to have in mind all of the gotchas of marshmallow and marshmallow_dataclass applies when using this function. It is recommended to carefully read the documentation of these libraries.

PARAMETER DESCRIPTION
cls

The configuration class.

TYPE: type[DataclassT]

config

The configuration dictionary.

TYPE: Mapping[str, Any]

base_schema

An optional class to be used as a base schema for the configuration class. This allow using custom fields for example. Will be passed to marshmallow_dataclass.class_schema.

TYPE: type[Schema] | None DEFAULT: BaseConfigSchema

marshmallow_load_kwargs

Additional arguments to be passed to marshmallow.Schema.load.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
DataclassT

The loaded configuration as an instance of the configuration class.

Source code in frequenz/sdk/config/_util.py
def load_config(
    cls: type[DataclassT],
    config: Mapping[str, Any],
    /,
    *,
    base_schema: type[Schema] | None = BaseConfigSchema,
    marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> DataclassT:
    """Load a configuration from a dictionary into an instance of a configuration class.

    The configuration class is expected to be a [`dataclasses.dataclass`][], which is
    used to create a [`marshmallow.Schema`][] schema to validate the configuration
    dictionary using [`marshmallow_dataclass.class_schema`][] (which in turn uses the
    [`marshmallow.Schema.load`][] method to do the validation and deserialization).

    To customize the schema derived from the configuration dataclass, you can use the
    `metadata` key in [`dataclasses.field`][] to pass extra options to
    [`marshmallow_dataclass`][] to be used during validation and deserialization.

    Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword
    arguments `marshmallow_load_kwargs`.

    Note:
        This method will raise [`marshmallow.ValidationError`][] if the configuration
        dictionary is invalid and you have to have in mind all of the gotchas of
        [`marshmallow`][] and [`marshmallow_dataclass`][] applies when using this
        function.  It is recommended to carefully read the documentation of these
        libraries.

    Args:
        cls: The configuration class.
        config: The configuration dictionary.
        base_schema: An optional class to be used as a base schema for the configuration
            class. This allow using custom fields for example. Will be passed to
            [`marshmallow_dataclass.class_schema`][].
        marshmallow_load_kwargs: Additional arguments to be passed to
            [`marshmallow.Schema.load`][].

    Returns:
        The loaded configuration as an instance of the configuration class.
    """
    _validate_load_kwargs(marshmallow_load_kwargs)

    instance = class_schema(cls, base_schema)().load(
        config, **(marshmallow_load_kwargs or {})
    )
    # We need to cast because `.load()` comes from marshmallow and doesn't know which
    # type is returned.
    return cast(DataclassT, instance)

frequenz.sdk.config.wait_for_first async ¤

wait_for_first(
    receiver: Receiver[DataclassT | Exception | None],
    /,
    *,
    receiver_name: str | None = None,
    allow_none: Literal[False] = False,
    timeout: timedelta = timedelta(minutes=1),
) -> DataclassT
wait_for_first(
    receiver: Receiver[DataclassT | Exception | None],
    /,
    *,
    receiver_name: str | None = None,
    allow_none: Literal[True] = True,
    timeout: timedelta = timedelta(minutes=1),
) -> DataclassT | None
wait_for_first(
    receiver: Receiver[DataclassT | Exception | None],
    /,
    *,
    receiver_name: str | None = None,
    allow_none: bool = False,
    timeout: timedelta = timedelta(minutes=1),
) -> DataclassT | None

Wait for and receive the the first configuration.

For a more in-depth introduction and examples, please read the module documentation.

PARAMETER DESCRIPTION
receiver

The receiver to receive the first configuration from.

TYPE: Receiver[DataclassT | Exception | None]

receiver_name

The name of the receiver, used for logging. If None, the string representation of the receiver will be used.

TYPE: str | None DEFAULT: None

allow_none

Whether consider a None value as a valid configuration.

TYPE: bool DEFAULT: False

timeout

The timeout in seconds to wait for the first configuration.

TYPE: timedelta DEFAULT: timedelta(minutes=1)

RETURNS DESCRIPTION
DataclassT | None

The first configuration received.

RAISES DESCRIPTION
TimeoutError

If the first configuration is not received within the timeout.

ReceiverStoppedError

If the receiver is stopped before the first configuration is received.

Source code in frequenz/sdk/config/_manager.py
async def wait_for_first(
    receiver: Receiver[DataclassT | Exception | None],
    /,
    *,
    receiver_name: str | None = None,
    allow_none: bool = False,
    timeout: timedelta = timedelta(minutes=1),
) -> DataclassT | None:
    """Wait for and receive the the first configuration.

    For a more in-depth introduction and examples, please read the [module
    documentation][frequenz.sdk.config].

    Args:
        receiver: The receiver to receive the first configuration from.
        receiver_name: The name of the receiver, used for logging. If `None`, the
            string representation of the receiver will be used.
        allow_none: Whether consider a `None` value as a valid configuration.
        timeout: The timeout in seconds to wait for the first configuration.

    Returns:
        The first configuration received.

    Raises:
        asyncio.TimeoutError: If the first configuration is not received within the
            timeout.
        ReceiverStoppedError: If the receiver is stopped before the first configuration
            is received.
    """
    if receiver_name is None:
        receiver_name = str(receiver)

    # We need this type guard because we can't use a TypeVar for isinstance checks or
    # match cases.
    def is_config_class(value: DataclassT | Exception | None) -> TypeGuard[DataclassT]:
        return is_dataclass(value) if value is not None else False

    _logger.info(
        "%s: Waiting %s seconds for the first configuration to arrive...",
        receiver_name,
        timeout.total_seconds(),
    )
    try:
        async with asyncio.timeout(timeout.total_seconds()):
            async for config in receiver:
                match config:
                    case None:
                        if allow_none:
                            return None
                        _logger.error(
                            "%s: Received empty configuration, waiting again for "
                            "a first configuration to be set.",
                            receiver_name,
                        )
                    case Exception() as error:
                        _logger.error(
                            "%s: Error while receiving the first configuration, "
                            "will keep waiting for an update: %s.",
                            receiver_name,
                            error,
                        )
                    case config if is_config_class(config):
                        _logger.info("%s: Received first configuration.", receiver_name)
                        return config
                    case unexpected:
                        assert (
                            False
                        ), f"{receiver_name}: Unexpected value received: {unexpected!r}."
    except asyncio.TimeoutError:
        _logger.error("%s: No configuration received in time.", receiver_name)
        raise
    raise ReceiverStoppedError(receiver)