Skip to content

Index

frequenz.sdk.actor ¤

Actors are a primitive unit of computation that runs autonomously.

Actor Programming Model¤

From Wikipedia

The actor model in computer science is a mathematical model of concurrent computation that treats an actor as the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).

We won't get into much more detail here because it is outside the scope of this documentation. However, if you are interested in learning more about the actor programming model, here are some useful resources:

Frequenz SDK Actors¤

The Actor class serves as the foundation for creating concurrent tasks and all actors in the SDK inherit from it. This class provides a straightforward way to implement actors. It shares similarities with the traditional actor programming model but also has some unique features:

  • Message Passing: Like traditional actors, our Actor class communicates through message passing. Even when no particular message passing mechanism is enforced, the SDK actors use frequenz.channels for communication.

  • Automatic Restart: If an unhandled exception occurs in an actor's logic (_run() method), the actor will be automatically restarted. This ensures robustness in the face of errors.

  • Simplified Task Management: Actors manage asynchronous tasks using asyncio. You can create and manage tasks within the actor, and the Actor class handles task cancellation and cleanup.

  • Simplified lifecycle management: Actors are [async context managers] and also a run() function is provided to easily run a group of actors and wait for them to finish.

Lifecycle¤

Actors are not started when they are created. There are 3 main ways to start an actor (from most to least recommended):

  1. By using the run() function.
  2. By using the actor as an async context manager.
  3. By using the start() method.

Warning

  1. If an actor raises an unhandled exception, it will be restarted automatically.

  2. Actors manage asyncio.Task objects, so a reference to the actor object must be held for as long as the actor is expected to be running, otherwise its tasks will be cancelled and the actor will stop. For more information, please refer to the Python asyncio documentation.

The run() Function¤

The run() function can start many actors at once and waits for them to finish. If any of the actors is stopped with errors, the errors will be logged.

Example
from frequenz.sdk.actor import Actor, run

class MyActor(Actor):
    async def _run(self) -> None:
        print("Hello World!")

await run(MyActor()) # (1)!
  1. This line will block until the actor completes its execution or is manually stopped.
Async Context Manager¤

When an actor is used as an async context manager, it is started when the async with block is entered and stopped automatically when the block is exited (even if an unhandled exception is raised).

Example
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        print("Hello World!")

async with MyActor() as actor: # (1)!
    print("The actor is running")
# (2)!
  1. start() is called automatically when entering the async with block.
  2. stop() is called automatically when exiting the async with block.
The start() Method¤

When using the start() method, the actor is started in the background and the method returns immediately. The actor will continue to run until it is manually stopped or until it completes its execution.

Example
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        print("Hello World!")

actor = MyActor() # (1)!
actor.start() # (2)!
print("The actor is running") # (3)!
await actor.stop() # (4)!
  1. The actor is created but not started yet.
  2. The actor is started manually, it keeps running in the background.
  3. Danger

    If this function would raise an exception, the actor will never be stopped!

  4. Until the actor is stopped manually.

Warning

This method is not recommended because it is easy to forget to stop the actor manually, especially in error conditions.

Communication¤

The Actor class doesn't impose any specific way to communicate between actors. However, frequenz.channels are always used as the communication mechanism between actors in the SDK.

Implementing an Actor¤

To implement an actor, you must inherit from the Actor class and implement an initializer and the abstract _run() method.

Initialization¤

The initializer is called when the actor is created. The Actor class initializer (__init__) should be always called first in the class we are implementing to make sure actors are properly initialized.

The Actor.__init__() takes one optional argument, a name that will be used to identify the actor in logs. If no name is provided, a default name will be generated, but it is recommended that Actor subclasses can also receive a name as an argument to make it easier to identify individual instances in logs more easily.

The actor initializer normally also accepts as arguments the input channels receivers and output channels senders that will be used for communication. These channels should be created outside the actor and passed to it as arguments to ensure actors can be composed easily.

Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor

class EchoActor(Actor):  # (1)!
    def __init__(
            self,
            input: Receiver[int],  # (2)!
            output: Sender[int],  # (3)!
            name: str | None = None,  # (4)!
    ) -> None:
        super().__init__(name=name) # (5)!
        self._input: Receiver[int] = input  # (6)!
        self._output: Sender[int] = output  # (7)!
  1. We define a new actor class called EchoActor that inherits from Actor.

  2. We accept an input argument that will be used to receive messages from a channel.

  3. We accept an output argument that will be used to send messages to a channel.
  4. We accept an optional name argument that will be used to identify the actor in logs.
  5. We call Actor.__init__() to make sure the actor is properly initialized.
  6. We store the input argument in a private attribute to use it later.
  7. We store the output argument in a private attribute to use it later.
The _run() Method¤

The abstract _run() method is called automatically by the base class when the actor is started.

Normally an actor should run forever (or until it is stopped), so it is very common to have an infinite loop in the _run() method, typically receiving messages from one or more channels (receivers), processing them and sending the results to other channels (senders). However, it is worth noting that an actor can also be designed for a one-time execution or a limited number of runs, terminating upon completion.

Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor

class EchoActor(Actor):
    def __init__(
            self,
            input: Receiver[int],
            output: Sender[int],
            name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._input: Receiver[int] = input
        self._output: Sender[int] = output

    async def _run(self) -> None:  # (1)!
        async for msg in self._input:  # (2)!
            await self._output.send(msg)  # (3)!
  1. We implement the abstract _run() method.
  2. We receive messages from the input channel one by one.
  3. We send the received message to the output channel.
Stopping¤

By default, the stop() method will call the cancel() method (which will cancel all the tasks created by the actor) and will wait for them to finish.

This means that when an actor is stopped, the _run() method will receive a CancelledError exception. You should have this in mind when implementing your actor and make sure to handle this exception properly if you need to do any cleanup.

The actor will handle the CancelledError exception automatically if it is not handled in the _run() method, so if there is no need for extra cleanup, you don't need to worry about it.

If an unhandled exception is raised in the _run() method, the actor will re-run the _run() method automatically. This ensures robustness in the face of errors, but you should also have this in mind if you need to do any cleanup to make sure the re-run doesn't cause any problems.

Tip

You could implement your own stop() method to, for example, send a message to a channel to notify that the actor should stop, or any other more graceful way to stop the actor if you need to make sure it can't be interrupted at any await point.

Spawning Extra Tasks¤

Actors run at least one background task, created automatically by the Actor class. But Actor inherits from BackgroundService, which provides a few methods to create and manage extra tasks.

If your actor needs to spawn extra tasks, you can use BackgroundService facilities to manage the tasks, so they are also automatically stopped when the actor is stopped.

All you need to do is add the newly spawned tasks to the actor's tasks set.

Example
import asyncio
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        extra_task = asyncio.create_task(self._extra_task())  # (1)!
        self.tasks.add(extra_task)  # (2)!
        while True:  # (3)!
            print("_run() running")
            await asyncio.sleep(1)

    async def _extra_task(self) -> None:
        while True:  # (4)!
            print("_extra_task() running")
            await asyncio.sleep(1.1)

async with MyActor() as actor:  # (5)!
    await asyncio.sleep(3)  # (6)!
# (7)!
  1. We create a new task using asyncio.create_task().
  2. We add the task to the actor's tasks set. This ensures the task will be cancelled and cleaned up when the actor is stopped.
  3. We leave the actor running forever.
  4. The extra task will also run forever.
  5. The actor is started.
  6. We wait for 3 seconds, the actor should print a bunch of "_run() running" and "_extra_task() running" messages while it's running.
  7. The actor is stopped and the extra task is cancelled automatically.
Examples¤

Here are a few simple but complete examples to demonstrate how to create actors and connect them using channels.

Tip

The code examples are annotated with markers (like ), you can click on them to see the step-by-step explanation of what's going on. The annotations are numbered according to the order of execution.

Composing actors¤

This example shows how to create two actors and connect them using broadcast channels.

compose.py
import asyncio

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.sdk.actor import Actor

class Actor1(Actor):  # (1)!
    def __init__(
        self,
        receiver: Receiver[str],
        output: Sender[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver = receiver
        self._output = output

    async def _run(self) -> None:
        async for msg in self._receiver:
            await self._output.send(f"Actor1 forwarding: {msg!r}")  # (8)!


class Actor2(Actor):
    def __init__(
        self,
        receiver: Receiver[str],
        output: Sender[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver = receiver
        self._output = output

    async def _run(self) -> None:
        async for msg in self._receiver:
            await self._output.send(f"Actor2 forwarding: {msg!r}")  # (9)!


async def main() -> None:  # (2)!
    # (4)!
    input_channel: Broadcast[str] = Broadcast("Input to Actor1")
    middle_channel: Broadcast[str] = Broadcast("Actor1 -> Actor2 stream")
    output_channel: Broadcast[str] = Broadcast("Actor2 output")

    input_sender = input_channel.new_sender()
    output_receiver = output_channel.new_receiver()

    async with (  # (5)!
        Actor1(input_channel.new_receiver(), middle_channel.new_sender(), "actor1"),
        Actor2(middle_channel.new_receiver(), output_channel.new_sender(), "actor2"),
    ):
        await input_sender.send("Hello")  # (6)!
        msg = await output_receiver.receive()  # (7)!
        print(msg)  # (10)!
    # (11)!

if __name__ == "__main__":  # (3)!
    asyncio.run(main())
  1. We define 2 actors: Actor1 and Actor2 that will just forward a message from an input channel to an output channel, adding some text.

  2. We define an async main() function within the main logic of our asyncio program.

  3. We start the main() function in the async loop using asyncio.run().

  4. We create a bunch of broadcast channels to connect our actors.

    • input_channel is the input channel for Actor1.
    • middle_channel is the channel that connects Actor1 and Actor2.
    • output_channel is the output channel for Actor2.
  5. We create two actors and use them as async context managers, Actor1 and Actor2, and connect them by creating new senders and receivers from the channels.

    Note

    We don't use the run() function here because we want to stop the actors when we are done with them, but the actors will run forever (as long as the channel is not closed). So the async context manager is a better fit for this example.

  6. We schedule the sending of the message Hello to Actor1 via input_channel.

  7. We receive (await) the response from Actor2 via output_channel. Between this and the previous steps the async calls in the actors will be executed.

  8. Actor1 sends the re-formatted message (Actor1 forwarding: Hello) to Actor2 via the middle_channel.

  9. Actor2 sends the re-formatted message (Actor2 forwarding: "Actor1 forwarding: 'Hello'") to the output_channel.

  10. Finally, we print the received message, which will still be Actor2 forwarding: "Actor1 forwarding: 'Hello'".

  11. The actors are stopped and cleaned up automatically when the async with block ends.

The expected output is:

Actor2 forwarding: "Actor1 forwarding: 'Hello'"
Receiving from multiple channels¤

This example shows how to create an actor that receives messages from multiple broadcast channels using select().

select.py
import asyncio

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from
from frequenz.sdk.actor import Actor, run


class EchoActor(Actor):  # (1)!
    def __init__(
        self,
        receiver_1: Receiver[bool],
        receiver_2: Receiver[bool],
        output: Sender[bool],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver_1 = receiver_1
        self._receiver_2 = receiver_2
        self._output = output

    async def _run(self) -> None:  # (2)!
        async for selected in select(self._receiver_1, self._receiver_2):  # (10)!
            if selected_from(selected, self._receiver_1):  # (11)!
                print(f"Received from receiver_1: {selected.value}")
                await self._output.send(selected.value)
                if not selected.value:  # (12)!
                    break
            elif selected_from(selected, self._receiver_2):  # (13)!
                print(f"Received from receiver_2: {selected.value}")
                await self._output.send(selected.value)
                if not selected.value:  # (14)!
                    break
            else:
                assert False, "Unknown selected channel"
        print("EchoActor finished")
    # (15)!


# (3)!
input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
echo_channel = Broadcast[bool]("echo_channel")

echo_actor = EchoActor(  # (4)!
    input_channel_1.new_receiver(),
    input_channel_2.new_receiver(),
    echo_channel.new_sender(),
    "echo-actor",
)

echo_receiver = echo_channel.new_receiver()  # (5)!

async def main() -> None:  # (6)!
    # (8)!
    await input_channel_1.new_sender().send(True)
    await input_channel_2.new_sender().send(False)

    await run(echo_actor)  # (9)!

    await echo_channel.close()  # (16)!

    async for message in echo_receiver:  # (17)!
        print(f"Received {message=}")


if __name__ == "__main__":  # (7)!
    asyncio.run(main())
  1. We define an EchoActor that receives messages from two channels and sends them to another channel.

  2. We implement the _run() method that will receive messages from the two channels using select() and send them to the output channel. The run() method will stop if a False message is received.

  3. We create the channels that will be used with the actor.

  4. We create the actor and connect it to the channels by creating new receivers and senders from the channels.

  5. We create a receiver for the echo_channel to eventually receive the messages sent by the actor.

  6. We define the main() function that will run the actor.

  7. We start the main() function in the async loop using asyncio.run().

  8. We send a message to each of the input channels. These messages will be queued in the channels until they are consumed by the actor.

  9. We start the actor and wait for it to finish using the run() function.

  10. The select() function will get the first message available from the two channels. The order in which they will be handled is unknown, but in this example we assume that the first message will be from input_channel_1 (True) and the second from input_channel_1 (False).

  11. The selected_from() function will return True for the input_channel_1 receiver. selected.value holds the received message, so "Received from receiver_1: True" will be printed and True will be sent to the output channel.

  12. Since selected.value is True, the loop will continue, going back to the select() function.

  13. The selected_from() function will return False for the input_channel_1 receiver and True for the input_channel_2 receiver. The message stored in selected.value will now be False, so "Received from receiver_2: False" will be printed and False will be sent to the output channel.

  14. Since selected.value is False, the loop will break.

  15. The _run() method will finish normally and the actor will be stopped, so the run() function will return.

  16. We close the echo_channel to make sure the echo_receiver will stop receiving messages after all the queued messages are consumed (otherwise the step 17 will never end!).

  17. We receive the messages sent by the actor to the echo_channel one by one and print them, it should print first Received message=True and then Received message=False.

The expected output is:

Received from receiver_1: True
Received from receiver_2: False
Received message=True
Received message=False

Classes¤

frequenz.sdk.actor.Actor ¤

Bases: BackgroundService, ABC

A primitive unit of computation that runs autonomously.

To implement an actor, subclasses must implement the _run() method, which should run the actor's logic. The _run() method is called by the base class when the actor is started, and is expected to run until the actor is stopped.

Info

Please read the actor module documentation for more comprehensive guide on how to use and implement actors properly.

Source code in frequenz/sdk/actor/_actor.py
class Actor(BackgroundService, abc.ABC):
    """A primitive unit of computation that runs autonomously.

    To implement an actor, subclasses must implement the
    [`_run()`][frequenz.sdk.actor--the-_run-method] method, which should run the actor's
    logic. The [`_run()`][frequenz.sdk.actor--the-_run-method] method is called by the
    base class when the actor is started, and is expected to run until the actor is
    stopped.

    !!! info

        Please read the [`actor` module documentation][frequenz.sdk.actor] for more
        comprehensive guide on how to use and implement actors properly.
    """

    RESTART_DELAY: timedelta = timedelta(seconds=2)
    """The delay to wait between restarts of this actor."""

    _restart_limit: int | None = None
    """The number of times actors can be restarted when they are stopped by unhandled exceptions.

    If this is bigger than 0 or `None`, the actor will be restarted when there is an
    unhanded exception in the `_run()` method.

    If `None`, the actor will be restarted an unlimited number of times.

    !!! note

        This is mostly used for testing purposes and shouldn't be set in production.
    """

    def start(self) -> None:
        """Start this actor.

        If this actor is already running, this method does nothing.
        """
        if self.is_running:
            return
        self._tasks.clear()
        self._tasks.add(asyncio.create_task(self._run_loop()))

    @abc.abstractmethod
    async def _run(self) -> None:
        """Run this actor's logic."""

    async def _delay_if_restart(self, iteration: int) -> None:
        """Delay the restart of this actor's n'th iteration.

        Args:
            iteration: The current iteration of the restart.
        """
        # NB: I think it makes sense (in the future) to think about deminishing returns
        # the longer the actor has been running.
        # Not just for the restart-delay but actually for the n_restarts counter as well.
        if iteration > 0:
            delay = self.RESTART_DELAY.total_seconds()
            _logger.info("Actor %s: Waiting %s seconds...", self, delay)
            await asyncio.sleep(delay)

    async def _run_loop(self) -> None:
        """Run the actor's task continuously, managing restarts, cancellation, and termination.

        This method handles the execution of the actor's task, including
        restarts for unhandled exceptions, cancellation, or normal termination.

        Raises:
            asyncio.CancelledError: If the actor's `_run()` method is cancelled.
            Exception: If the actor's `_run()` method raises any other exception.
            BaseException: If the actor's `_run()` method raises any base exception.
        """
        _logger.info("Actor %s: Started.", self)
        n_restarts = 0
        while True:
            try:
                await self._delay_if_restart(n_restarts)
                await self._run()
                _logger.info("Actor %s: _run() returned without error.", self)
            except asyncio.CancelledError:
                _logger.info("Actor %s: Cancelled.", self)
                raise
            except Exception:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised an unhandled exception.", self)
                limit_str = "∞" if self._restart_limit is None else self._restart_limit
                limit_str = f"({n_restarts}/{limit_str})"
                if self._restart_limit is None or n_restarts < self._restart_limit:
                    n_restarts += 1
                    _logger.info("Actor %s: Restarting %s...", self._name, limit_str)
                    continue
                _logger.info(
                    "Actor %s: Maximum restarts attempted %s, bailing out...",
                    self,
                    limit_str,
                )
                raise
            except BaseException:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised a BaseException.", self)
                raise
            break

        _logger.info("Actor %s: Stopped.", self)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(*, name: str | None = None) -> None

Initialize this BackgroundService.

PARAMETER DESCRIPTION
name

The name of this background service. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this BackgroundService.

    Args:
        name: The name of this background service. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    self._name: str = str(id(self)) if name is None else name
    self._tasks: set[asyncio.Task[Any]] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.BackgroundService ¤

Bases: ABC

A background service that can be started and stopped.

A background service is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup.

To implement a background service, subclasses must implement the start() method, which should start the background tasks needed by the service, and add them to the _tasks protected attribute.

If you need to collect results or handle exceptions of the tasks when stopping the service, then you need to also override the stop() method, as the base implementation does not collect any results and re-raises all exceptions.

Warning

As background services manage asyncio.Task objects, a reference to them must be held for as long as the background service is expected to be running, otherwise its tasks will be cancelled and the service will stop. For more information, please refer to the Python asyncio documentation.

Example
import datetime
import asyncio

class Clock(BackgroundService):
    def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self._resolution_s = resolution_s

    def start(self) -> None:
        self._tasks.add(asyncio.create_task(self._tick()))

    async def _tick(self) -> None:
        while True:
            await asyncio.sleep(self._resolution_s)
            print(datetime.datetime.now())

async def main() -> None:
    # As an async context manager
    async with Clock(resolution_s=1):
        await asyncio.sleep(5)

    # Manual start/stop (only use if necessary, as cleanup is more complicated)
    clock = Clock(resolution_s=1)
    clock.start()
    await asyncio.sleep(5)
    await clock.stop()

asyncio.run(main())
Source code in frequenz/sdk/actor/_background_service.py
class BackgroundService(abc.ABC):
    """A background service that can be started and stopped.

    A background service is a service that runs in the background spawning one or more
    tasks. The service can be [started][frequenz.sdk.actor.BackgroundService.start]
    and [stopped][frequenz.sdk.actor.BackgroundService.stop] and can work as an
    async context manager to provide deterministic cleanup.

    To implement a background service, subclasses must implement the
    [`start()`][frequenz.sdk.actor.BackgroundService.start] method, which should
    start the background tasks needed by the service, and add them to the `_tasks`
    protected attribute.

    If you need to collect results or handle exceptions of the tasks when stopping the
    service, then you need to also override the
    [`stop()`][frequenz.sdk.actor.BackgroundService.stop] method, as the base
    implementation does not collect any results and re-raises all exceptions.

    !!! warning

        As background services manage [`asyncio.Task`][] objects, a reference to them
        must be held for as long as the background service is expected to be running,
        otherwise its tasks will be cancelled and the service will stop. For more
        information, please refer to the [Python `asyncio`
        documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).

    Example:
        ```python
        import datetime
        import asyncio

        class Clock(BackgroundService):
            def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
                super().__init__(name=name)
                self._resolution_s = resolution_s

            def start(self) -> None:
                self._tasks.add(asyncio.create_task(self._tick()))

            async def _tick(self) -> None:
                while True:
                    await asyncio.sleep(self._resolution_s)
                    print(datetime.datetime.now())

        async def main() -> None:
            # As an async context manager
            async with Clock(resolution_s=1):
                await asyncio.sleep(5)

            # Manual start/stop (only use if necessary, as cleanup is more complicated)
            clock = Clock(resolution_s=1)
            clock.start()
            await asyncio.sleep(5)
            await clock.stop()

        asyncio.run(main())
        ```
    """

    def __init__(self, *, name: str | None = None) -> None:
        """Initialize this BackgroundService.

        Args:
            name: The name of this background service. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
        """
        self._name: str = str(id(self)) if name is None else name
        self._tasks: set[asyncio.Task[Any]] = set()

    @abc.abstractmethod
    def start(self) -> None:
        """Start this background service."""

    @property
    def name(self) -> str:
        """The name of this background service.

        Returns:
            The name of this background service.
        """
        return self._name

    @property
    def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
        """Return the set of running tasks spawned by this background service.

        Users typically should not modify the tasks in the returned set and only use
        them for informational purposes.

        !!! danger

            Changing the returned tasks may lead to unexpected behavior, don't do it
            unless the class explicitly documents it is safe to do so.

        Returns:
            The set of running tasks spawned by this background service.
        """
        return self._tasks

    @property
    def is_running(self) -> bool:
        """Return whether this background service is running.

        A service is considered running when at least one task is running.

        Returns:
            Whether this background service is running.
        """
        return any(not task.done() for task in self._tasks)

    def cancel(self, msg: str | None = None) -> None:
        """Cancel all running tasks spawned by this background service.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        for task in self._tasks:
            task.cancel(msg)

    async def stop(self, msg: str | None = None) -> None:
        """Stop this background service.

        This method cancels all running tasks spawned by this service and waits for them
        to finish.

        Args:
            msg: The message to be passed to the tasks being cancelled.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception.
        """
        if not self._tasks:
            return
        self.cancel(msg)
        try:
            await self.wait()
        except BaseExceptionGroup as exc_group:
            # We want to ignore CancelledError here as we explicitly cancelled all the
            # tasks.
            _, rest = exc_group.split(asyncio.CancelledError)
            if rest is not None:
                # We are filtering out from an exception group, we really don't want to
                # add the exceptions we just filtered by adding a from clause here.
                raise rest  # pylint: disable=raise-missing-from

    async def __aenter__(self) -> Self:
        """Enter an async context.

        Start this background service.

        Returns:
            This background service.
        """
        self.start()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit an async context.

        Stop this background service.

        Args:
            exc_type: The type of the exception raised, if any.
            exc_val: The exception raised, if any.
            exc_tb: The traceback of the exception raised, if any.
        """
        await self.stop()

    async def wait(self) -> None:
        """Wait this background service to finish.

        Wait until all background service tasks are finished.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception (`CancelError` is not considered an error and not returned in
                the exception group).
        """
        # We need to account for tasks that were created between when we started
        # awaiting and we finished awaiting.
        while self._tasks:
            done, pending = await asyncio.wait(self._tasks)
            assert not pending

            # We remove the done tasks, but there might be new ones created after we
            # started waiting.
            self._tasks = self._tasks - done

            exceptions: list[BaseException] = []
            for task in done:
                try:
                    # This will raise a CancelledError if the task was cancelled or any
                    # other exception if the task raised one.
                    _ = task.result()
                except BaseException as error:  # pylint: disable=broad-except
                    exceptions.append(error)
            if exceptions:
                raise BaseExceptionGroup(
                    f"Error while stopping background service {self}", exceptions
                )

    def __await__(self) -> collections.abc.Generator[None, None, None]:
        """Await this background service.

        An awaited background service will wait for all its tasks to finish.

        Returns:
            An implementation-specific generator for the awaitable.
        """
        return self.wait().__await__()

    def __del__(self) -> None:
        """Destroy this instance.

        Cancel all running tasks spawned by this background service.
        """
        self.cancel("{self!r} was deleted")

    def __repr__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"

    def __str__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}[{self._name}]"
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(*, name: str | None = None) -> None

Initialize this BackgroundService.

PARAMETER DESCRIPTION
name

The name of this background service. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this BackgroundService.

    Args:
        name: The name of this background service. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    self._name: str = str(id(self)) if name is None else name
    self._tasks: set[asyncio.Task[Any]] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start abstractmethod ¤
start() -> None

Start this background service.

Source code in frequenz/sdk/actor/_background_service.py
@abc.abstractmethod
def start(self) -> None:
    """Start this background service."""
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.ChannelRegistry ¤

Dynamically creates, own and provide access to broadcast channels.

It can be used by actors to dynamically establish a communication channel between each other.

The registry is responsible for creating channels when they are first requested via the get_or_create() method.

The registry also stores type information to make sure that the same channel is not used for different message types.

Since the registry owns the channels, it is also responsible for closing them when they are no longer needed. There is no way to remove a channel without closing it.

Note

This registry stores Broadcast channels.

Source code in frequenz/sdk/actor/_channel_registry.py
class ChannelRegistry:
    """Dynamically creates, own and provide access to broadcast channels.

    It can be used by actors to dynamically establish a communication channel
    between each other.

    The registry is responsible for creating channels when they are first requested via
    the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.

    The registry also stores type information to make sure that the same channel is not
    used for different message types.

    Since the registry owns the channels, it is also responsible for closing them when
    they are no longer needed. There is no way to remove a channel without closing it.

    Note:
        This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
    """

    def __init__(self, *, name: str) -> None:
        """Initialize this registry.

        Args:
            name: A name to identify the registry in the logs. This name is also used as
                a prefix for the channel names.
        """
        self._name = name
        self._channels: dict[str, _Entry] = {}

    @property
    def name(self) -> str:
        """The name of this registry."""
        return self._name

    def message_type(self, key: str) -> type:
        """Get the message type of the channel for the given key.

        Args:
            key: The key to identify the channel.

        Returns:
            The message type of the channel.

        Raises:
            KeyError: If the channel does not exist.
        """
        entry = self._channels.get(key)
        if entry is None:
            raise KeyError(f"No channel for key {key!r} exists.")
        return entry.message_type

    def __contains__(self, key: str) -> bool:
        """Check whether the channel for the given `key` exists."""
        return key in self._channels

    def get_or_create(self, message_type: type[_T], key: str) -> Broadcast[_T]:
        """Get or create a channel for the given key.

        If a channel for the given key already exists, the message type of the existing
        channel is checked against the requested message type. If they do not match,
        a `ValueError` is raised.

        Note:
            The types have to match exactly, it doesn't do a subtype check due to
            technical limitations. In the future subtype checks might be supported.

        Args:
            message_type: The type of the message that is sent through the channel.
            key: The key to identify the channel.

        Returns:
            The channel for the given key.

        Raises:
            ValueError: If the channel exists and the message type does not match.
        """
        if key not in self._channels:
            if _logger.isEnabledFor(logging.DEBUG):
                _logger.debug(
                    "Creating a new channel for key %r with type %s at:\n%s",
                    key,
                    message_type,
                    "".join(traceback.format_stack(limit=10)[:9]),
                )
            self._channels[key] = _Entry(
                message_type, Broadcast(name=f"{self._name}-{key}")
            )

        entry = self._channels[key]
        if entry.message_type is not message_type:
            exception = ValueError(
                f"Type mismatch, a channel for key {key!r} exists and the requested "
                f"message type {message_type} is not the same as the existing "
                f"message type {entry.message_type}."
            )
            if _logger.isEnabledFor(logging.DEBUG):
                _logger.debug(
                    "%s at:\n%s",
                    str(exception),
                    # We skip the last frame because it's this method, and limit the
                    # stack to 9 frames to avoid adding too much noise.
                    "".join(traceback.format_stack(limit=10)[:9]),
                )
            raise exception

        return cast(Broadcast[_T], entry.channel)

    async def close_and_remove(self, key: str) -> None:
        """Remove the channel for the given key.

        Args:
            key: The key to identify the channel.

        Raises:
            KeyError: If the channel does not exist.
        """
        entry = self._channels.pop(key, None)
        if entry is None:
            raise KeyError(f"No channel for key {key!r} exists.")
        await entry.channel.close()
Attributes¤
name property ¤
name: str

The name of this registry.

Functions¤
__contains__ ¤
__contains__(key: str) -> bool

Check whether the channel for the given key exists.

Source code in frequenz/sdk/actor/_channel_registry.py
def __contains__(self, key: str) -> bool:
    """Check whether the channel for the given `key` exists."""
    return key in self._channels
__init__ ¤
__init__(*, name: str) -> None

Initialize this registry.

PARAMETER DESCRIPTION
name

A name to identify the registry in the logs. This name is also used as a prefix for the channel names.

TYPE: str

Source code in frequenz/sdk/actor/_channel_registry.py
def __init__(self, *, name: str) -> None:
    """Initialize this registry.

    Args:
        name: A name to identify the registry in the logs. This name is also used as
            a prefix for the channel names.
    """
    self._name = name
    self._channels: dict[str, _Entry] = {}
close_and_remove async ¤
close_and_remove(key: str) -> None

Remove the channel for the given key.

PARAMETER DESCRIPTION
key

The key to identify the channel.

TYPE: str

RAISES DESCRIPTION
KeyError

If the channel does not exist.

Source code in frequenz/sdk/actor/_channel_registry.py
async def close_and_remove(self, key: str) -> None:
    """Remove the channel for the given key.

    Args:
        key: The key to identify the channel.

    Raises:
        KeyError: If the channel does not exist.
    """
    entry = self._channels.pop(key, None)
    if entry is None:
        raise KeyError(f"No channel for key {key!r} exists.")
    await entry.channel.close()
get_or_create ¤
get_or_create(
    message_type: type[_T], key: str
) -> Broadcast[_T]

Get or create a channel for the given key.

If a channel for the given key already exists, the message type of the existing channel is checked against the requested message type. If they do not match, a ValueError is raised.

Note

The types have to match exactly, it doesn't do a subtype check due to technical limitations. In the future subtype checks might be supported.

PARAMETER DESCRIPTION
message_type

The type of the message that is sent through the channel.

TYPE: type[_T]

key

The key to identify the channel.

TYPE: str

RETURNS DESCRIPTION
Broadcast[_T]

The channel for the given key.

RAISES DESCRIPTION
ValueError

If the channel exists and the message type does not match.

Source code in frequenz/sdk/actor/_channel_registry.py
def get_or_create(self, message_type: type[_T], key: str) -> Broadcast[_T]:
    """Get or create a channel for the given key.

    If a channel for the given key already exists, the message type of the existing
    channel is checked against the requested message type. If they do not match,
    a `ValueError` is raised.

    Note:
        The types have to match exactly, it doesn't do a subtype check due to
        technical limitations. In the future subtype checks might be supported.

    Args:
        message_type: The type of the message that is sent through the channel.
        key: The key to identify the channel.

    Returns:
        The channel for the given key.

    Raises:
        ValueError: If the channel exists and the message type does not match.
    """
    if key not in self._channels:
        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(
                "Creating a new channel for key %r with type %s at:\n%s",
                key,
                message_type,
                "".join(traceback.format_stack(limit=10)[:9]),
            )
        self._channels[key] = _Entry(
            message_type, Broadcast(name=f"{self._name}-{key}")
        )

    entry = self._channels[key]
    if entry.message_type is not message_type:
        exception = ValueError(
            f"Type mismatch, a channel for key {key!r} exists and the requested "
            f"message type {message_type} is not the same as the existing "
            f"message type {entry.message_type}."
        )
        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(
                "%s at:\n%s",
                str(exception),
                # We skip the last frame because it's this method, and limit the
                # stack to 9 frames to avoid adding too much noise.
                "".join(traceback.format_stack(limit=10)[:9]),
            )
        raise exception

    return cast(Broadcast[_T], entry.channel)
message_type ¤
message_type(key: str) -> type

Get the message type of the channel for the given key.

PARAMETER DESCRIPTION
key

The key to identify the channel.

TYPE: str

RETURNS DESCRIPTION
type

The message type of the channel.

RAISES DESCRIPTION
KeyError

If the channel does not exist.

Source code in frequenz/sdk/actor/_channel_registry.py
def message_type(self, key: str) -> type:
    """Get the message type of the channel for the given key.

    Args:
        key: The key to identify the channel.

    Returns:
        The message type of the channel.

    Raises:
        KeyError: If the channel does not exist.
    """
    entry = self._channels.get(key)
    if entry is None:
        raise KeyError(f"No channel for key {key!r} exists.")
    return entry.message_type

frequenz.sdk.actor.ComponentMetricRequest dataclass ¤

A request object to start streaming a metric for a component.

Source code in frequenz/sdk/actor/_data_sourcing/_component_metric_request.py
@dataclass
class ComponentMetricRequest:
    """A request object to start streaming a metric for a component."""

    namespace: str
    """The namespace that this request belongs to.

    Metric requests with a shared namespace enable the reuse of channels within
    that namespace.

    If for example, an actor making a multiple requests, uses the name of the
    actor as the namespace, then requests from the actor will get reused when
    possible.
    """

    component_id: int
    """The ID of the requested component."""

    metric_id: ComponentMetricId
    """The ID of the requested component's metric."""

    start_time: datetime | None
    """The start time from which data is required.

    When None, we will stream only live data.
    """

    def get_channel_name(self) -> str:
        """Return a channel name constructed from Self.

        This channel name can be used by the sending side and receiving sides to
        identify the right channel from the ChannelRegistry.

        Returns:
            A string denoting a channel name.
        """
        return (
            f"component-stream::{self.component_id}::{self.metric_id.name}::"
            f"{self.start_time}::{self.namespace}"
        )
Attributes¤
component_id instance-attribute ¤
component_id: int

The ID of the requested component.

metric_id instance-attribute ¤
metric_id: ComponentMetricId

The ID of the requested component's metric.

namespace instance-attribute ¤
namespace: str

The namespace that this request belongs to.

Metric requests with a shared namespace enable the reuse of channels within that namespace.

If for example, an actor making a multiple requests, uses the name of the actor as the namespace, then requests from the actor will get reused when possible.

start_time instance-attribute ¤
start_time: datetime | None

The start time from which data is required.

When None, we will stream only live data.

Functions¤
get_channel_name ¤
get_channel_name() -> str

Return a channel name constructed from Self.

This channel name can be used by the sending side and receiving sides to identify the right channel from the ChannelRegistry.

RETURNS DESCRIPTION
str

A string denoting a channel name.

Source code in frequenz/sdk/actor/_data_sourcing/_component_metric_request.py
def get_channel_name(self) -> str:
    """Return a channel name constructed from Self.

    This channel name can be used by the sending side and receiving sides to
    identify the right channel from the ChannelRegistry.

    Returns:
        A string denoting a channel name.
    """
    return (
        f"component-stream::{self.component_id}::{self.metric_id.name}::"
        f"{self.start_time}::{self.namespace}"
    )

frequenz.sdk.actor.ComponentMetricsResamplingActor ¤

Bases: Actor

An actor to resample microgrid component metrics.

Source code in frequenz/sdk/actor/_resampling.py
class ComponentMetricsResamplingActor(Actor):
    """An actor to resample microgrid component metrics."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        *,
        channel_registry: ChannelRegistry,
        data_sourcing_request_sender: Sender[ComponentMetricRequest],
        resampling_request_receiver: Receiver[ComponentMetricRequest],
        config: ResamplerConfig,
        name: str | None = None,
    ) -> None:
        """Initialize an instance.

        Args:
            channel_registry: The channel registry used to get senders and
                receivers for data sourcing subscriptions.
            data_sourcing_request_sender: The sender used to send requests to
                the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor]
                to subscribe to component metrics.
            resampling_request_receiver: The receiver to use to receive new
                resampling subscription requests.
            config: The configuration for the resampler.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._channel_registry: ChannelRegistry = channel_registry
        self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = (
            data_sourcing_request_sender
        )
        self._resampling_request_receiver: Receiver[ComponentMetricRequest] = (
            resampling_request_receiver
        )
        self._resampler: Resampler = Resampler(config)
        self._active_req_channels: set[str] = set()

    async def _subscribe(self, request: ComponentMetricRequest) -> None:
        """Request data for a component metric.

        Args:
            request: The request for component metric data.
        """
        request_channel_name = request.get_channel_name()

        # If we are already handling this request, there is nothing to do.
        if request_channel_name in self._active_req_channels:
            return

        self._active_req_channels.add(request_channel_name)

        data_source_request = dataclasses.replace(
            request, namespace=request.namespace + ":Source"
        )
        data_source_channel_name = data_source_request.get_channel_name()
        await self._data_sourcing_request_sender.send(data_source_request)
        receiver = self._channel_registry.get_or_create(
            Sample[Quantity], data_source_channel_name
        ).new_receiver()

        # This is a temporary hack until the Sender implementation uses
        # exceptions to report errors.
        sender = self._channel_registry.get_or_create(
            Sample[Quantity], request_channel_name
        ).new_sender()

        self._resampler.add_timeseries(request_channel_name, receiver, sender.send)

    async def _process_resampling_requests(self) -> None:
        """Process resampling data requests."""
        async for request in self._resampling_request_receiver:
            await self._subscribe(request)

    async def _run(self) -> None:
        """Resample known component metrics and process resampling requests.

        If there is a resampling error while resampling some component metric,
        then that metric will be discarded and not resampled any more. Any
        other error will be propagated (most likely ending in the actor being
        restarted).

        This method creates 2 main tasks:

        - One task to process incoming subscription requests to resample new metrics.
        - One task to run the resampler.

        Raises:
            RuntimeError: If there is some unexpected error while resampling or
                handling requests.
        """
        tasks_to_cancel: set[asyncio.Task[None]] = set()
        try:
            subscriptions_task = asyncio.create_task(
                self._process_resampling_requests()
            )
            tasks_to_cancel.add(subscriptions_task)

            while True:
                resampling_task = asyncio.create_task(self._resampler.resample())
                tasks_to_cancel.add(resampling_task)
                done, _ = await asyncio.wait(
                    [resampling_task, subscriptions_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )

                if subscriptions_task in done:
                    tasks_to_cancel.remove(subscriptions_task)
                    raise RuntimeError(
                        "There was a problem with the subscriptions channel."
                    )

                if resampling_task in done:
                    tasks_to_cancel.remove(resampling_task)
                    # The resampler shouldn't end without an exception
                    error = resampling_task.exception()
                    assert (
                        error is not None
                    ), "The resample() function shouldn't exit normally."

                    # We don't know what to do with something other than
                    # ResamplingError, so propagate the exception if that is the
                    # case.
                    if not isinstance(error, ResamplingError):
                        raise error
                    for source, source_error in error.exceptions.items():
                        _logger.error(
                            "Error resampling source %s, removing source...", source
                        )
                        removed = self._resampler.remove_timeseries(source)
                        if not removed:
                            _logger.warning(
                                "Got an exception from an unknown source: "
                                "source=%r, exception=%r",
                                source,
                                source_error,
                            )
                    # The resampling_task will be re-created if we reached this point
        finally:
            await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    *,
    channel_registry: ChannelRegistry,
    data_sourcing_request_sender: Sender[
        ComponentMetricRequest
    ],
    resampling_request_receiver: Receiver[
        ComponentMetricRequest
    ],
    config: ResamplerConfig,
    name: str | None = None
) -> None

Initialize an instance.

PARAMETER DESCRIPTION
channel_registry

The channel registry used to get senders and receivers for data sourcing subscriptions.

TYPE: ChannelRegistry

data_sourcing_request_sender

The sender used to send requests to the DataSourcingActor to subscribe to component metrics.

TYPE: Sender[ComponentMetricRequest]

resampling_request_receiver

The receiver to use to receive new resampling subscription requests.

TYPE: Receiver[ComponentMetricRequest]

config

The configuration for the resampler.

TYPE: ResamplerConfig

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_resampling.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    *,
    channel_registry: ChannelRegistry,
    data_sourcing_request_sender: Sender[ComponentMetricRequest],
    resampling_request_receiver: Receiver[ComponentMetricRequest],
    config: ResamplerConfig,
    name: str | None = None,
) -> None:
    """Initialize an instance.

    Args:
        channel_registry: The channel registry used to get senders and
            receivers for data sourcing subscriptions.
        data_sourcing_request_sender: The sender used to send requests to
            the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor]
            to subscribe to component metrics.
        resampling_request_receiver: The receiver to use to receive new
            resampling subscription requests.
        config: The configuration for the resampler.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._channel_registry: ChannelRegistry = channel_registry
    self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = (
        data_sourcing_request_sender
    )
    self._resampling_request_receiver: Receiver[ComponentMetricRequest] = (
        resampling_request_receiver
    )
    self._resampler: Resampler = Resampler(config)
    self._active_req_channels: set[str] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.ConfigManagingActor ¤

Bases: Actor

An actor that monitors a TOML configuration file for updates.

When the file is updated, the new configuration is sent, as a dict, to the output sender.

When the actor is started, if a configuration file already exists, then it will be read and sent to the output sender before the actor starts monitoring the file for updates. This way users can rely on the actor to do the initial configuration reading too.

Source code in frequenz/sdk/actor/_config_managing.py
class ConfigManagingActor(Actor):
    """An actor that monitors a TOML configuration file for updates.

    When the file is updated, the new configuration is sent, as a [`dict`][], to the
    `output` sender.

    When the actor is started, if a configuration file already exists, then it will be
    read and sent to the `output` sender before the actor starts monitoring the file
    for updates. This way users can rely on the actor to do the initial configuration
    reading too.
    """

    def __init__(
        self,
        config_path: pathlib.Path | str,
        output: Sender[Config],
        event_types: abc.Set[EventType] = frozenset(EventType),
        *,
        name: str | None = None,
    ) -> None:
        """Initialize this instance.

        Args:
            config_path: The path to the TOML file with the configuration.
            output: The sender to send the config to.
            event_types: The set of event types to monitor.
            name: The name of the actor. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._config_path: pathlib.Path = (
            config_path
            if isinstance(config_path, pathlib.Path)
            else pathlib.Path(config_path)
        )
        # FileWatcher can't watch for non-existing files, so we need to watch for the
        # parent directory instead just in case a configuration file doesn't exist yet
        # or it is deleted and recreated again.
        self._file_watcher: FileWatcher = FileWatcher(
            paths=[self._config_path.parent], event_types=event_types
        )
        self._output: Sender[Config] = output

    def _read_config(self) -> dict[str, Any]:
        """Read the contents of the configuration file.

        Returns:
            A dictionary containing configuration variables.

        Raises:
            ValueError: If config file cannot be read.
        """
        try:
            with self._config_path.open("rb") as toml_file:
                return tomllib.load(toml_file)
        except ValueError as err:
            _logger.error("%s: Can't read config file, err: %s", self, err)
            raise

    async def send_config(self) -> None:
        """Send the configuration to the output sender."""
        conf_vars = self._read_config()
        config = Config(conf_vars)
        await self._output.send(config)

    async def _run(self) -> None:
        """Monitor for and send configuration file updates.

        At startup, the Config Manager sends the current config so that it
        can be cache in the Broadcast channel and served to receivers even if
        there hasn't been any change to the config file itself.
        """
        await self.send_config()

        async for event in self._file_watcher:
            # Since we are watching the whole parent directory, we need to make sure
            # we only react to events related to the configuration file.
            if event.path != self._config_path:
                continue

            match event.type:
                case EventType.CREATE:
                    _logger.info(
                        "%s: The configuration file %s was created, sending new config...",
                        self,
                        self._config_path,
                    )
                    await self.send_config()
                case EventType.MODIFY:
                    _logger.info(
                        "%s: The configuration file %s was modified, sending update...",
                        self,
                        self._config_path,
                    )
                    await self.send_config()
                case EventType.DELETE:
                    _logger.info(
                        "%s: The configuration file %s was deleted, ignoring...",
                        self,
                        self._config_path,
                    )
                case _:
                    assert_never(event.type)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    config_path: Path | str,
    output: Sender[Config],
    event_types: Set[EventType] = frozenset(EventType),
    *,
    name: str | None = None
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
config_path

The path to the TOML file with the configuration.

TYPE: Path | str

output

The sender to send the config to.

TYPE: Sender[Config]

event_types

The set of event types to monitor.

TYPE: Set[EventType] DEFAULT: frozenset(EventType)

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_config_managing.py
def __init__(
    self,
    config_path: pathlib.Path | str,
    output: Sender[Config],
    event_types: abc.Set[EventType] = frozenset(EventType),
    *,
    name: str | None = None,
) -> None:
    """Initialize this instance.

    Args:
        config_path: The path to the TOML file with the configuration.
        output: The sender to send the config to.
        event_types: The set of event types to monitor.
        name: The name of the actor. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._config_path: pathlib.Path = (
        config_path
        if isinstance(config_path, pathlib.Path)
        else pathlib.Path(config_path)
    )
    # FileWatcher can't watch for non-existing files, so we need to watch for the
    # parent directory instead just in case a configuration file doesn't exist yet
    # or it is deleted and recreated again.
    self._file_watcher: FileWatcher = FileWatcher(
        paths=[self._config_path.parent], event_types=event_types
    )
    self._output: Sender[Config] = output
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
send_config async ¤
send_config() -> None

Send the configuration to the output sender.

Source code in frequenz/sdk/actor/_config_managing.py
async def send_config(self) -> None:
    """Send the configuration to the output sender."""
    conf_vars = self._read_config()
    config = Config(conf_vars)
    await self._output.send(config)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.DataSourcingActor ¤

Bases: Actor

An actor that provides data streams of metrics as time series.

Source code in frequenz/sdk/actor/_data_sourcing/data_sourcing.py
class DataSourcingActor(Actor):
    """An actor that provides data streams of metrics as time series."""

    def __init__(
        self,
        request_receiver: Receiver[ComponentMetricRequest],
        registry: ChannelRegistry,
        *,
        name: str | None = None,
    ) -> None:
        """Create a `DataSourcingActor` instance.

        Args:
            request_receiver: A channel receiver to accept metric requests from.
            registry: A channel registry.  To be replaced by a singleton
                instance.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._request_receiver = request_receiver
        self._microgrid_api_source = MicrogridApiSource(registry)

    async def _run(self) -> None:
        """Run the actor."""
        async for request in self._request_receiver:
            await self._microgrid_api_source.add_metric(request)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    request_receiver: Receiver[ComponentMetricRequest],
    registry: ChannelRegistry,
    *,
    name: str | None = None
) -> None

Create a DataSourcingActor instance.

PARAMETER DESCRIPTION
request_receiver

A channel receiver to accept metric requests from.

TYPE: Receiver[ComponentMetricRequest]

registry

A channel registry. To be replaced by a singleton instance.

TYPE: ChannelRegistry

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_data_sourcing/data_sourcing.py
def __init__(
    self,
    request_receiver: Receiver[ComponentMetricRequest],
    registry: ChannelRegistry,
    *,
    name: str | None = None,
) -> None:
    """Create a `DataSourcingActor` instance.

    Args:
        request_receiver: A channel receiver to accept metric requests from.
        registry: A channel registry.  To be replaced by a singleton
            instance.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._request_receiver = request_receiver
    self._microgrid_api_source = MicrogridApiSource(registry)
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in frequenz/sdk/timeseries/_resampling.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the input sampling period if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = average
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = UNIX_EPOCH

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len class-attribute instance-attribute ¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len class-attribute instance-attribute ¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods class-attribute instance-attribute ¤
max_data_age_in_periods: float = 3.0

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function class-attribute instance-attribute ¤
resampling_function: ResamplingFunction = average

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period instance-attribute ¤
resampling_period: timedelta

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len class-attribute instance-attribute ¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in frequenz/sdk/timeseries/_resampling.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

Functions¤

frequenz.sdk.actor.run async ¤

run(*actors: Actor) -> None

Await the completion of all actors.

Info

Please read the actor module documentation for more comprehensive guide on how to use and implement actors properly.

PARAMETER DESCRIPTION
*actors

the actors to be awaited.

TYPE: Actor DEFAULT: ()

Source code in frequenz/sdk/actor/_run_utils.py
async def run(*actors: Actor) -> None:
    """Await the completion of all actors.

    !!! info

        Please read the [`actor` module documentation][frequenz.sdk.actor] for more
        comprehensive guide on how to use and implement actors properly.

    Args:
        *actors: the actors to be awaited.
    """
    _logger.info("Starting %s actor(s)...", len(actors))

    for actor in actors:
        if actor.is_running:
            _logger.info("Actor %s: Already running, skipping start.", actor)
        else:
            _logger.info("Actor %s: Starting...", actor)
            actor.start()

    # Wait until all actors are done
    pending_tasks = {asyncio.create_task(a.wait(), name=str(a)) for a in actors}
    while pending_tasks:
        done_tasks, pending_tasks = await asyncio.wait(
            pending_tasks, return_when=asyncio.FIRST_COMPLETED
        )

        # This should always be only one task, but we handle many for extra safety
        for task in done_tasks:
            # Cancellation needs to be checked first, otherwise the other methods
            # could raise a CancelledError
            if task.cancelled():
                _logger.info("Actor %s: Cancelled while running.", task.get_name())
            elif exception := task.exception():
                _logger.error(
                    "Actor %s: Raised an exception while running.",
                    task.get_name(),
                    exc_info=exception,
                )
            else:
                _logger.info("Actor %s: Finished normally.", task.get_name())

    _logger.info("All %s actor(s) finished.", len(actors))