config
frequenz.sdk.config ¤
Configuration management.
Overview¤
To provide dynamic configurations to an application, you can use the
ConfigManager
class. This class provides
a convenient interface to manage configurations from multiple config files and receive
updates when the configurations change. Users can create a receiver to receive
configurations from the manager.
Setup¤
To use the ConfigManager
, you need to create an instance of it and pass the
paths to the configuration files. The configuration files must be in the TOML
format.
When specifying multiple files order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.
from frequenz.sdk.config import ConfigManager
async with ConfigManager(["base-config.toml", "overrides.toml"]) as config_manager:
...
Logging¤
The ConfigManager
can also instantiate
a LoggingConfigUpdatingActor
to
monitor logging configurations. This actor will listen for logging configuration changes
and update the logging configuration accordingly.
This feature is enabled by default using the key logging
in the configuration file. To
disable it you can pass logging_config_key=None
to the ConfigManager
.
Receiving configurations¤
To receive configurations, you can create a receiver using the [new_receiver()
][
frequenz.sdk.config.ConfigManager.new_receiver] method. The receiver will receive
configurations from the manager for a particular key, and validate and load the
configurations to a dataclass using marshmallow_dataclass
.
If the key is a sequence of strings, it will be treated as a nested key and the
receiver will receive the configuration under the nested key. For example
["key", "subkey"]
will get only config["key"]["subkey"]
.
Besides a configuration instance, the receiver can also receive exceptions if there are
errors loading the configuration (typically
a ValidationError
), or None
if there is no
configuration for the key.
The value under key
must be another mapping, otherwise
a InvalidValueForKeyError
instance will
be sent to the receiver.
If there were any errors loading the configuration, the error will be logged too.
from dataclasses import dataclass
from frequenz.sdk.config import ConfigManager
@dataclass(frozen=True, kw_only=True)
class AppConfig:
test: int
async with ConfigManager("config.toml") as config_manager:
receiver = config_manager.new_receiver("app", AppConfig)
app_config = await receiver.receive()
match app_config:
case AppConfig(test=42):
print("App configured with 42")
case Exception() as error:
print(f"Error loading configuration: {error}")
case None:
print("There is no configuration for the app key")
Validation and loading¤
The configuration class used to create the configuration instance is expected to be
a dataclasses.dataclass
, which is used to create a marshmallow.Schema
via
the marshmallow_dataclass.class_schema
function.
This means you can customize the schema derived from the configuration
dataclass using marshmallow_dataclass
to specify extra validation and
options via field metadata.
Customization can also be done via a base_schema
. By default
BaseConfigSchema
is used to provide support
for some extra commonly used fields (like quantities) and to
exclude unknown fields by default.
import marshmallow.validate
from dataclasses import dataclass, field
@dataclass(frozen=True, kw_only=True)
class Config:
test: int = field(
metadata={"validate": marshmallow.validate.Range(min=0)},
)
Additional arguments can be passed to marshmallow.Schema.load
using
the marshmallow_load_kwargs
keyword arguments.
When marshmallow.EXCLUDE
is used, a warning will be logged if there are extra
fields in the configuration that are excluded. This is useful, for example, to catch
typos in the configuration file.
Skipping superfluous updates¤
If there is a burst of configuration updates, the receiver will only receive the last configuration, older configurations will be ignored.
If skip_unchanged
is set to True
, then a configuration that didn't change
compared to the last one received will be ignored and not sent to the receiver.
The comparison is done using the raw dict
to determine if the configuration
has changed.
Error handling¤
The value under key
must be another mapping, otherwise an error
will be logged and a frequenz.sdk.config.InvalidValueForKeyError
instance
will be sent to the receiver.
Configurations that don't pass the validation will be logged as an error and
the ValidationError
sent to the receiver.
Any other unexpected error raised during the configuration loading will be logged as an error and the error instance sent to the receiver.
Further customization¤
If you have special needs for receiving the configurations (for example validating using
marshmallow
doesn't fit your needs), you can create a custom receiver using
config_channel.new_receiver()
directly. Please bear in mind that this provides a low-level access to the whole config
in the file as a raw Python mapping.
Recommended usage¤
Actors that need to be reconfigured should take a configuration manager and a key to receive configurations updates, and instantiate the new receiver themselves. This allows actors to have full control over how the configuration is loaded (for example providing a custom base schema or marshmallow options).
Passing the key explicitly too allows application to structure the configuration in whatever way is most convenient for the application.
Actors can use the wait_for_first()
function to
wait for the first configuration to be received, and cache the configuration for later
use and in case the actor is restarted. If the configuration is not received after some
timeout, a asyncio.TimeoutError
will be raised (and if uncaught, the actor will
be automatically restarted after some delay).
Actor that can run without a configuration (using a default configuration)
import dataclasses
import logging
from collections.abc import Sequence
from datetime import timedelta
from typing import assert_never
from frequenz.channels import select, selected_from
from frequenz.channels.event import Event
from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first
_logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True, kw_only=True)
class MyActorConfig:
some_config: timedelta = dataclasses.field(
default=timedelta(seconds=42), # (1)!
metadata={"metadata": {"description": "Some optional configuration"}},
)
class MyActor(Actor):
def __init__(
self,
config_manager: ConfigManager,
/,
*,
config_key: str | Sequence[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._config_manager = config_manager
self._config_key = config_key
self._config: MyActorConfig = MyActorConfig() # (2)!
async def _run(self) -> None:
config_receiver = self._config_manager.new_receiver(
self._config_key, MyActorConfig
)
self._update_config(
await wait_for_first(
config_receiver, receiver_name=str(self), allow_none=True # (3)!
)
)
other_receiver = Event()
async for selected in select(config_receiver, other_receiver):
if selected_from(selected, config_receiver):
self._update_config(selected.message)
elif selected_from(selected, other_receiver):
# Do something else
...
def _update_config(self, config_update: MyActorConfig | Exception | None) -> None:
match config_update:
case MyActorConfig() as config:
_logger.info("New configuration received, updating.")
self._reconfigure(config)
case None:
_logger.info("Configuration was unset, resetting to the default")
self._reconfigure(MyActorConfig()) # (4)!
case Exception():
_logger.info( # (5)!
"New configuration has errors, keeping the old configuration."
)
case unexpected:
assert_never(unexpected)
def _reconfigure(self, config: MyActorConfig) -> None:
self._config = config
# Do something with the new configuration
- This is different when the actor requires a configuration to run. Here, the config has a default value.
- This is different when the actor requires a configuration to run. Here, the actor can just instantiate a default configuration.
- This is different when the actor requires a configuration to run. Here, the actor
can accept a
None
configuration. - This is different when the actor requires a configuration to run. Here, the actor can reset to a default configuration.
- There is no need to log the error itself, the configuration manager will log it automatically.
Actor that requires a configuration to run
import dataclasses
import logging
from collections.abc import Sequence
from datetime import timedelta
from typing import assert_never
from frequenz.channels import select, selected_from
from frequenz.channels.event import Event
from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first
_logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True, kw_only=True)
class MyActorConfig:
some_config: timedelta = dataclasses.field( # (1)!
metadata={"metadata": {"description": "Some required configuration"}},
)
class MyActor(Actor):
def __init__(
self,
config_manager: ConfigManager,
/,
*,
config_key: str | Sequence[str],
name: str | None = None,
) -> None:
super().__init__(name=name)
self._config_manager = config_manager
self._config_key = config_key
self._config: MyActorConfig # (2)!
async def _run(self) -> None:
config_receiver = self._config_manager.new_receiver(
self._config_key, MyActorConfig
)
self._update_config(
await wait_for_first(config_receiver, receiver_name=str(self)) # (3)!
)
other_receiver = Event()
async for selected in select(config_receiver, other_receiver):
if selected_from(selected, config_receiver):
self._update_config(selected.message)
elif selected_from(selected, other_receiver):
# Do something else
...
def _update_config(self, config_update: MyActorConfig | Exception | None) -> None:
match config_update:
case MyActorConfig() as config:
_logger.info("New configuration received, updating.")
self._reconfigure(config)
case None:
_logger.info("Configuration was unset, keeping the old configuration.") # (4)!
case Exception():
_logger.info( # (5)!
"New configuration has errors, keeping the old configuration."
)
case unexpected:
assert_never(unexpected)
def _reconfigure(self, config: MyActorConfig) -> None:
self._config = config
# Do something with the new configuration
- This is different when the actor can use a default configuration. Here, the field is required, so there is no default configuration possible.
- This is different when the actor can use a default configuration. Here, the
assignment of the configuration is delayed to the
_run()
method. - This is different when the actor can use a default configuration. Here, the actor
doesn't accept
None
as a valid configuration as it can't create a default configuration. - This is different when the actor can use a default configuration. Here, the actor
doesn't accept
None
as a valid configuration as it can't create a default configuration, so it needs to keep the old configuration. - There is no need to log the error itself, the configuration manager will log it automatically.
Application
The pattern used by the application is very similar to the one used by actors. In this case the application requires a configuration to run, but if it could also use a default configuration, the changes would be the same as in the actor examples.
import asyncio
import dataclasses
import logging
import pathlib
from collections.abc import Sequence
from datetime import timedelta
from typing import Sequence, assert_never
from frequenz.sdk.actor import Actor
from frequenz.sdk.config import ConfigManager, wait_for_first
_logger = logging.getLogger(__name__)
class MyActor(Actor): # (1)!
def __init__(
self, config_manager: ConfigManager, /, *, config_key: str | Sequence[str]
) -> None:
super().__init__()
self._config_manager = config_manager
self._config_key = config_key
async def _run(self) -> None: ...
@dataclasses.dataclass(frozen=True, kw_only=True)
class AppConfig:
enable_actor: bool = dataclasses.field(
metadata={"metadata": {"description": "Whether to enable the actor"}},
)
class App:
def __init__(self, *, config_paths: Sequence[pathlib.Path]):
self._config_manager = ConfigManager(config_paths)
self._config_receiver = self._config_manager.new_receiver("app", AppConfig)
self._actor = MyActor(self._config_manager, config_key="actor")
async def _update_config(self, config_update: AppConfig | Exception | None) -> None:
match config_update:
case AppConfig() as config:
_logger.info("New configuration received, updating.")
await self._reconfigure(config)
case None:
_logger.info("Configuration was unset, keeping the old configuration.")
case Exception():
_logger.info("New configuration has errors, keeping the old configuration.")
case unexpected:
assert_never(unexpected)
async def _reconfigure(self, config: AppConfig) -> None:
if config.enable_actor:
self._actor.start()
else:
await self._actor.stop()
async def run(self) -> None:
_logger.info("Starting App...")
async with self._config_manager:
await self._update_config(
await wait_for_first(self._config_receiver, receiver_name="app")
)
_logger.info("Waiting for configuration updates...")
async for config_update in self._config_receiver:
await self._reconfigure(config_update)
if __name__ == "__main__":
asyncio.run(App(config_paths="config.toml").run())
- Look for the actor examples for a proper implementation of the actor.
Example configuration file:
Classes¤
frequenz.sdk.config.BaseConfigSchema ¤
Bases: QuantitySchema
A base schema for configuration classes.
This schema provides validation for quantities and ignores unknown fields by default.
Source code in frequenz/sdk/config/_base_schema.py
Classes¤
Meta ¤
Functions¤
__init__ ¤
Initialize the schema with a default serialization format.
PARAMETER | DESCRIPTION |
---|---|
*args
|
Additional positional arguments.
TYPE:
|
serialize_as_string_default
|
Default serialization format for quantities. If True, quantities are serialized as strings with units. If False, quantities are serialized as floats.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
Source code in frequenz/quantities/experimental/marshmallow.py
frequenz.sdk.config.ConfigManager ¤
Bases: BackgroundService
A manager for configuration files.
This class reads configuration files and sends the configuration to the receivers, providing configuration key filtering and value validation.
For a more in-depth introduction and examples, please read the module documentation.
Source code in frequenz/sdk/config/_manager.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
|
Attributes¤
config_actor
instance-attribute
¤
config_actor: Final[ConfigManagingActor] = (
ConfigManagingActor(
config_paths,
new_sender(),
name=name,
force_polling=force_polling,
polling_interval=polling_interval,
)
)
The actor that manages the configuration for this manager.
config_channel
instance-attribute
¤
config_channel: Final[Broadcast[Mapping[str, Any]]] = (
Broadcast(name=f"{self}_config", resend_latest=True)
)
The channel used for sending configuration updates (resends the latest value).
This is the channel used to communicate with the
ConfigManagingActor
and will
receive the complete raw configuration as a mapping.
logging_actor
instance-attribute
¤
logging_actor: Final[LoggingConfigUpdatingActor | None] = (
None
if logging_config_key is None
else LoggingConfigUpdatingActor(
self, config_key=logging_config_key, name=name
)
)
The actor that manages the logging configuration for this manager.
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(
config_paths: str | Path | Sequence[Path | str],
/,
*,
force_polling: bool = True,
logging_config_key: (
str | Sequence[str] | None
) = "logging",
name: str | None = None,
polling_interval: timedelta = timedelta(seconds=1),
) -> None
Initialize this config manager.
PARAMETER | DESCRIPTION |
---|---|
config_paths
|
The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path. |
force_polling
|
Whether to force file polling to check for changes.
TYPE:
|
logging_config_key
|
The key to use for the logging configuration. If |
name
|
A name to use when creating actors. If
TYPE:
|
polling_interval
|
The interval to poll for changes. Only relevant if polling is enabled. |
Source code in frequenz/sdk/config/_manager.py
__repr__ ¤
__repr__() -> str
Return a string representation of this config manager.
Source code in frequenz/sdk/config/_manager.py
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks and actors spawned by this config manager.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/config/_manager.py
new_receiver ¤
new_receiver(
key: str | Sequence[str],
config_class: type[DataclassT],
/,
*,
skip_unchanged: bool = True,
base_schema: type[Schema] | None = BaseConfigSchema,
marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> Receiver[DataclassT | Exception | None]
Create a new receiver for receiving the configuration for a particular key.
This method has a lot of features and functionalities to make it easier to
receive configurations, but it also imposes some restrictions on how the
configurations are received. If you need more control over the configuration
receiver, you can create a receiver directly using
config_channel.new_receiver()
.
For a more in-depth introduction and examples, please read the module documentation.
PARAMETER | DESCRIPTION |
---|---|
key
|
The configuration key to be read by the receiver. If a sequence of strings is used, it is used as a sub-key. |
config_class
|
The class object to use to instantiate a configuration. The
configuration will be validated against this type too using
TYPE:
|
skip_unchanged
|
Whether to skip sending the configuration if it hasn't changed compared to the last one received.
TYPE:
|
base_schema
|
An optional class to be used as a base schema for the
configuration class. This allow using custom fields for example. Will be
passed to
TYPE:
|
marshmallow_load_kwargs
|
Additional arguments to be passed to
|
RETURNS | DESCRIPTION |
---|---|
Receiver[DataclassT | Exception | None]
|
The receiver for the configuration. |
Source code in frequenz/sdk/config/_manager.py
start ¤
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this config manager to finish.
Wait until all tasks and actors are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/config/_manager.py
frequenz.sdk.config.ConfigManagingActor ¤
Bases: Actor
An actor that monitors a TOML configuration files for updates.
When the actor is started the configuration files will be read and sent to the output sender. Then the actor will start monitoring the files for updates. If any file is updated, all the configuration files will be re-read and sent to the output sender.
If no configuration file could be read, the actor will raise an exception.
The configuration files are read in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path.
Example
If config1.toml
contains:
And config2.toml
contains:
Then the final configuration will be:
Source code in frequenz/sdk/config/_managing_actor.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
|
Attributes¤
RESTART_DELAY
class-attribute
instance-attribute
¤
The delay to wait between restarts of this actor.
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(
config_paths: str | Path | Sequence[Path | str],
output: Sender[Mapping[str, Any]],
*,
name: str | None = None,
force_polling: bool = True,
polling_interval: timedelta = timedelta(seconds=1)
) -> None
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
config_paths
|
The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path. |
output
|
The sender to send the configuration to. |
name
|
The name of the actor. If
TYPE:
|
force_polling
|
Whether to force file polling to check for changes.
TYPE:
|
polling_interval
|
The interval to poll for changes. Only relevant if polling is enabled. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If no configuration path is provided. |
Source code in frequenz/sdk/config/_managing_actor.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
send_config
async
¤
start ¤
Start this actor.
If this actor is already running, this method does nothing.
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
frequenz.sdk.config.InvalidValueForKeyError ¤
Bases: ValueError
An error indicating that the value under the specified key is invalid.
Source code in frequenz/sdk/config/_manager.py
Attributes¤
value
instance-attribute
¤
The actual value that was found that is not a mapping.
Functions¤
__init__ ¤
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The error message.
TYPE:
|
key
|
The key that has an invalid value. |
value
|
The actual value that was found that is not a mapping.
TYPE:
|
Source code in frequenz/sdk/config/_manager.py
frequenz.sdk.config.LoggerConfig
dataclass
¤
A configuration for a logger.
Source code in frequenz/sdk/config/_logging_actor.py
frequenz.sdk.config.LoggingConfig
dataclass
¤
A configuration for the logging system.
Source code in frequenz/sdk/config/_logging_actor.py
Attributes¤
loggers
class-attribute
instance-attribute
¤
loggers: dict[str, LoggerConfig] = field(
default_factory=dict,
metadata={
"metadata": {
"description": "Configuration for a logger (the key is the logger name)."
}
},
)
The list of loggers configurations.
root_logger
class-attribute
instance-attribute
¤
root_logger: LoggerConfig = field(
default_factory=lambda: LoggerConfig(level="INFO"),
metadata={
"metadata": {
"description": "Default default configuration for all loggers."
}
},
)
The default log level.
frequenz.sdk.config.LoggingConfigUpdatingActor ¤
Bases: Actor
Actor that listens for logging configuration changes and sets them.
Example
config.toml
file:
[logging.root_logger]
level = "INFO"
[logging.loggers."frequenz.sdk.actor.power_distributing"]
level = "DEBUG"
[logging.loggers."frequenz.channels"]
level = "DEBUG"
import asyncio
from frequenz.sdk.config import LoggingConfigUpdatingActor
from frequenz.sdk.actor import run as run_actors
async def run() -> None:
config_manager: ConfigManager = ...
await run_actors(LoggingConfigUpdatingActor(config_manager))
asyncio.run(run())
Now whenever the config.toml
file is updated, the logging configuration
will be updated as well.
Source code in frequenz/sdk/config/_logging_actor.py
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
|
Attributes¤
RESTART_DELAY
class-attribute
instance-attribute
¤
The delay to wait between restarts of this actor.
is_running
property
¤
is_running: bool
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name
property
¤
name: str
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__
async
¤
__aenter__() -> Self
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
__aexit__
async
¤
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the exception raised, if any.
TYPE:
|
exc_val
|
The exception raised, if any.
TYPE:
|
exc_tb
|
The traceback of the exception raised, if any.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
__await__ ¤
__await__() -> Generator[None, None, None]
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
None
|
An implementation-specific generator for the awaitable. |
Source code in frequenz/sdk/actor/_background_service.py
__del__ ¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
__init__ ¤
__init__(
config_manager: ConfigManager,
/,
*,
config_key: str | Sequence[str] = "logging",
log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z",
log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s",
name: str | None = None,
)
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
config_manager
|
The configuration manager to use.
TYPE:
|
config_key
|
The key to use to retrieve the configuration from the
configuration manager. If |
log_datefmt
|
Use the specified date/time format in logs.
TYPE:
|
log_format
|
Use the specified format string in logs.
TYPE:
|
name
|
The name of this actor. If
TYPE:
|
Note
The log_format
and log_datefmt
parameters are used in a call to
logging.basicConfig()
. If logging has already been configured elsewhere
in the application (through a previous basicConfig()
call), then the format
settings specified here will be ignored.
Source code in frequenz/sdk/config/_logging_actor.py
__repr__ ¤
__repr__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
__str__ ¤
__str__() -> str
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
cancel ¤
cancel(msg: str | None = None) -> None
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in frequenz/sdk/actor/_background_service.py
start ¤
Start this actor.
If this actor is already running, this method does nothing.
stop
async
¤
stop(msg: str | None = None) -> None
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg
|
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in frequenz/sdk/actor/_background_service.py
wait
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in frequenz/sdk/actor/_background_service.py
Functions¤
frequenz.sdk.config.load_config ¤
load_config(
cls: type[DataclassT],
config: Mapping[str, Any],
/,
*,
base_schema: type[Schema] | None = BaseConfigSchema,
marshmallow_load_kwargs: dict[str, Any] | None = None,
) -> DataclassT
Load a configuration from a dictionary into an instance of a configuration class.
The configuration class is expected to be a dataclasses.dataclass
, which is
used to create a marshmallow.Schema
schema to validate the configuration
dictionary using marshmallow_dataclass.class_schema
(which in turn uses the
marshmallow.Schema.load
method to do the validation and deserialization).
To customize the schema derived from the configuration dataclass, you can use the
metadata
key in dataclasses.field
to pass extra options to
marshmallow_dataclass
to be used during validation and deserialization.
Additional arguments can be passed to marshmallow.Schema.load
using keyword
arguments marshmallow_load_kwargs
.
Note
This method will raise marshmallow.ValidationError
if the configuration
dictionary is invalid and you have to have in mind all of the gotchas of
marshmallow
and marshmallow_dataclass
applies when using this
function. It is recommended to carefully read the documentation of these
libraries.
PARAMETER | DESCRIPTION |
---|---|
cls
|
The configuration class.
TYPE:
|
config
|
The configuration dictionary. |
base_schema
|
An optional class to be used as a base schema for the configuration
class. This allow using custom fields for example. Will be passed to
TYPE:
|
marshmallow_load_kwargs
|
Additional arguments to be passed to
|
RETURNS | DESCRIPTION |
---|---|
DataclassT
|
The loaded configuration as an instance of the configuration class. |
Source code in frequenz/sdk/config/_util.py
frequenz.sdk.config.wait_for_first
async
¤
wait_for_first(
receiver: Receiver[DataclassT | Exception | None],
/,
*,
receiver_name: str | None = None,
allow_none: bool = False,
timeout: timedelta = timedelta(minutes=1),
) -> DataclassT | None
Wait for and receive the the first configuration.
For a more in-depth introduction and examples, please read the module documentation.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The receiver to receive the first configuration from. |
receiver_name
|
The name of the receiver, used for logging. If
TYPE:
|
allow_none
|
Whether consider a
TYPE:
|
timeout
|
The timeout in seconds to wait for the first configuration. |
RETURNS | DESCRIPTION |
---|---|
DataclassT | None
|
The first configuration received. |
RAISES | DESCRIPTION |
---|---|
TimeoutError
|
If the first configuration is not received within the timeout. |
ReceiverStoppedError
|
If the receiver is stopped before the first configuration is received. |