Skip to content

actor

frequenz.sdk.actor ¤

Actors are a primitive unit of computation that runs autonomously.

Actor Programming Model¤

From Wikipedia

The actor model in computer science is a mathematical model of concurrent computation that treats an actor as the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).

We won't get into much more detail here because it is outside the scope of this documentation. However, if you are interested in learning more about the actor programming model, here are some useful resources:

Frequenz SDK Actors¤

The Actor class serves as the foundation for creating concurrent tasks and all actors in the SDK inherit from it. This class provides a straightforward way to implement actors. It shares similarities with the traditional actor programming model but also has some unique features:

  • Message Passing: Like traditional actors, our Actor class communicates through message passing. Even when no particular message passing mechanism is enforced, the SDK actors use frequenz.channels for communication.

  • Automatic Restart: If an unhandled exception occurs in an actor's logic (_run() method), the actor will be automatically restarted. This ensures robustness in the face of errors.

  • Simplified Task Management: Actors manage asynchronous tasks using asyncio. You can create and manage tasks within the actor, and the Actor class handles task cancellation and cleanup.

  • Simplified lifecycle management: Actors are [async context managers] and also a run() function is provided to easily run a group of actors and wait for them to finish.

Lifecycle¤

Actors are not started when they are created. There are 3 main ways to start an actor (from most to least recommended):

  1. By using the run() function.
  2. By using the actor as an async context manager.
  3. By using the start() method.

Warning

  1. If an actor raises an unhandled exception, it will be restarted automatically.

  2. Actors manage asyncio.Task objects, so a reference to the actor object must be held for as long as the actor is expected to be running, otherwise its tasks will be cancelled and the actor will stop. For more information, please refer to the Python asyncio documentation.

The run() Function¤

The run() function can start many actors at once and waits for them to finish. If any of the actors is stopped with errors, the errors will be logged.

Example
import asyncio
from frequenz.sdk.actor import Actor, run

class MyActor(Actor):
    async def _run(self) -> None:
        while True:
            print("Hello World!")
            await asyncio.sleep(1)

await asyncio.run(MyActor()) # (1)!
  1. This line will block until the actor completes its execution or is manually stopped.
Async Context Manager¤

When an actor is used as an async context manager, it is started when the async with block is entered and stopped automatically when the block is exited (even if an unhandled exception is raised).

Example
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        print("Hello World!")

async with MyActor() as actor: # (1)!
    print("The actor is running")
# (2)!
  1. start() is called automatically when entering the async with block.
  2. stop() is called automatically when exiting the async with block.
The start() Method¤

When using the start() method, the actor is started in the background and the method returns immediately. The actor will continue to run until it is manually stopped or until it completes its execution.

Example
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        print("Hello World!")

actor = MyActor() # (1)!
actor.start() # (2)!
print("The actor is running") # (3)!
await actor.stop() # (4)!
  1. The actor is created but not started yet.
  2. The actor is started manually, it keeps running in the background.
  3. Danger

    If this function would raise an exception, the actor will never be stopped!

  4. Until the actor is stopped manually.

Warning

This method is not recommended because it is easy to forget to stop the actor manually, especially in error conditions.

Communication¤

The Actor class doesn't impose any specific way to communicate between actors. However, frequenz.channels are always used as the communication mechanism between actors in the SDK.

Implementing an Actor¤

To implement an actor, you must inherit from the Actor class and implement an initializer and the abstract _run() method.

Initialization¤

The initializer is called when the actor is created. The Actor class initializer (__init__) should be always called first in the class we are implementing to make sure actors are properly initialized.

The Actor.__init__() takes one optional argument, a name that will be used to identify the actor in logs. If no name is provided, a default name will be generated, but it is recommended that Actor subclasses can also receive a name as an argument to make it easier to identify individual instances in logs more easily.

The actor initializer normally also accepts as arguments the input channels receivers and output channels senders that will be used for communication. These channels should be created outside the actor and passed to it as arguments to ensure actors can be composed easily.

Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor

class EchoActor(Actor):  # (1)!
    def __init__(
            self,
            receiver: Receiver[int],  # (2)!
            output: Sender[int],  # (3)!
            name: str | None = None,  # (4)!
    ) -> None:
        super().__init__(name=name) # (5)!
        self._input: Receiver[int] = receiver  # (6)!
        self._output: Sender[int] = output  # (7)!
  1. We define a new actor class called EchoActor that inherits from Actor.

  2. We accept an receiver argument that will be used to receive messages from a channel.

  3. We accept an output argument that will be used to send messages to a channel.
  4. We accept an optional name argument that will be used to identify the actor in logs.
  5. We call Actor.__init__() to make sure the actor is properly initialized.
  6. We store the receiver argument in a private attribute to use it later.
  7. We store the output argument in a private attribute to use it later.
The _run() Method¤

The abstract _run() method is called automatically by the base class when the actor is started.

Normally an actor should run forever (or until it is stopped), so it is very common to have an infinite loop in the _run() method, typically receiving messages from one or more channels (receivers), processing them and sending the results to other channels (senders). However, it is worth noting that an actor can also be designed for a one-time execution or a limited number of runs, terminating upon completion.

Example echo actor
from frequenz.channels import Receiver, Sender
from frequenz.sdk.actor import Actor

class EchoActor(Actor):
    def __init__(
            self,
            receiver: Receiver[int],
            output: Sender[int],
            name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._input: Receiver[int] = receiver
        self._output: Sender[int] = output

    async def _run(self) -> None:  # (1)!
        async for msg in self._input:  # (2)!
            await self._output.send(msg)  # (3)!
  1. We implement the abstract _run() method.
  2. We receive messages from the receiver one by one.
  3. We send the received message to the output channel.
Stopping¤

By default, the stop() method will call the cancel() method (which will cancel all the tasks created by the actor) and will wait for them to finish.

This means that when an actor is stopped, the _run() method will receive a CancelledError exception. You should have this in mind when implementing your actor and make sure to handle this exception properly if you need to do any cleanup.

The actor will handle the CancelledError exception automatically if it is not handled in the _run() method, so if there is no need for extra cleanup, you don't need to worry about it.

If an unhandled exception is raised in the _run() method, the actor will re-run the _run() method automatically. This ensures robustness in the face of errors, but you should also have this in mind if you need to do any cleanup to make sure the re-run doesn't cause any problems.

Tip

You could implement your own stop() method to, for example, send a message to a channel to notify that the actor should stop, or any other more graceful way to stop the actor if you need to make sure it can't be interrupted at any await point.

Spawning Extra Tasks¤

Actors run at least one background task, created automatically by the Actor class. But Actor inherits from BackgroundService, which provides a few methods to create and manage extra tasks.

If your actor needs to spawn extra tasks, you can use BackgroundService facilities to manage the tasks, so they are also automatically stopped when the actor is stopped.

All you need to do is add the newly spawned tasks to the actor's tasks set.

Example
import asyncio
from frequenz.sdk.actor import Actor

class MyActor(Actor):
    async def _run(self) -> None:
        extra_task = asyncio.create_task(self._extra_task())  # (1)!
        self.tasks.add(extra_task)  # (2)!
        while True:  # (3)!
            print("_run() running")
            await asyncio.sleep(1)

    async def _extra_task(self) -> None:
        while True:  # (4)!
            print("_extra_task() running")
            await asyncio.sleep(1.1)

async with MyActor() as actor:  # (5)!
    await asyncio.sleep(3)  # (6)!
# (7)!
  1. We create a new task using asyncio.create_task().
  2. We add the task to the actor's tasks set. This ensures the task will be cancelled and cleaned up when the actor is stopped.
  3. We leave the actor running forever.
  4. The extra task will also run forever.
  5. The actor is started.
  6. We wait for 3 seconds, the actor should print a bunch of "_run() running" and "_extra_task() running" messages while it's running.
  7. The actor is stopped and the extra task is cancelled automatically.
Examples¤

Here are a few simple but complete examples to demonstrate how to create actors and connect them using channels.

Tip

The code examples are annotated with markers (like ), you can click on them to see the step-by-step explanation of what's going on. The annotations are numbered according to the order of execution.

Composing actors¤

This example shows how to create two actors and connect them using broadcast channels.

compose.py
import asyncio

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.sdk.actor import Actor

class Actor1(Actor):  # (1)!
    def __init__(
        self,
        receiver: Receiver[str],
        output: Sender[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver = receiver
        self._output = output

    async def _run(self) -> None:
        async for msg in self._receiver:
            await self._output.send(f"Actor1 forwarding: {msg!r}")  # (8)!


class Actor2(Actor):
    def __init__(
        self,
        receiver: Receiver[str],
        output: Sender[str],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver = receiver
        self._output = output

    async def _run(self) -> None:
        async for msg in self._receiver:
            await self._output.send(f"Actor2 forwarding: {msg!r}")  # (9)!


async def main() -> None:  # (2)!
    # (4)!
    input_channel: Broadcast[str] = Broadcast("Input to Actor1")
    middle_channel: Broadcast[str] = Broadcast("Actor1 -> Actor2 stream")
    output_channel: Broadcast[str] = Broadcast("Actor2 output")

    input_sender = input_channel.new_sender()
    output_receiver = output_channel.new_receiver()

    async with (  # (5)!
        Actor1(input_channel.new_receiver(), middle_channel.new_sender(), "actor1"),
        Actor2(middle_channel.new_receiver(), output_channel.new_sender(), "actor2"),
    ):
        await input_sender.send("Hello")  # (6)!
        msg = await output_receiver.receive()  # (7)!
        print(msg)  # (10)!
    # (11)!

if __name__ == "__main__":  # (3)!
    asyncio.run(main())
  1. We define 2 actors: Actor1 and Actor2 that will just forward a message from an input channel to an output channel, adding some text.

  2. We define an async main() function within the main logic of our asyncio program.

  3. We start the main() function in the async loop using asyncio.run().

  4. We create a bunch of broadcast channels to connect our actors.

    • input_channel is the input channel for Actor1.
    • middle_channel is the channel that connects Actor1 and Actor2.
    • output_channel is the output channel for Actor2.
  5. We create two actors and use them as async context managers, Actor1 and Actor2, and connect them by creating new senders and receivers from the channels.

    Note

    We don't use the run() function here because we want to stop the actors when we are done with them, but the actors will run forever (as long as the channel is not closed). So the async context manager is a better fit for this example.

  6. We schedule the sending of the message Hello to Actor1 via input_channel.

  7. We receive (await) the response from Actor2 via output_channel. Between this and the previous steps the async calls in the actors will be executed.

  8. Actor1 sends the re-formatted message (Actor1 forwarding: Hello) to Actor2 via the middle_channel.

  9. Actor2 sends the re-formatted message (Actor2 forwarding: "Actor1 forwarding: 'Hello'") to the output_channel.

  10. Finally, we print the received message, which will still be Actor2 forwarding: "Actor1 forwarding: 'Hello'".

  11. The actors are stopped and cleaned up automatically when the async with block ends.

The expected output is:

Actor2 forwarding: "Actor1 forwarding: 'Hello'"
Receiving from multiple channels¤

This example shows how to create an actor that receives messages from multiple broadcast channels using select().

select.py
import asyncio

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from
from frequenz.sdk.actor import Actor, run


class EchoActor(Actor):  # (1)!
    def __init__(
        self,
        receiver_1: Receiver[bool],
        receiver_2: Receiver[bool],
        output: Sender[bool],
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._receiver_1 = receiver_1
        self._receiver_2 = receiver_2
        self._output = output

    async def _run(self) -> None:  # (2)!
        async for selected in select(self._receiver_1, self._receiver_2):  # (10)!
            if selected_from(selected, self._receiver_1):  # (11)!
                print(f"Received from receiver_1: {selected.value}")
                await self._output.send(selected.value)
                if not selected.value:  # (12)!
                    break
            elif selected_from(selected, self._receiver_2):  # (13)!
                print(f"Received from receiver_2: {selected.value}")
                await self._output.send(selected.value)
                if not selected.value:  # (14)!
                    break
            else:
                assert False, "Unknown selected channel"
        print("EchoActor finished")
    # (15)!


# (3)!
input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
echo_channel = Broadcast[bool]("echo_channel")

echo_actor = EchoActor(  # (4)!
    input_channel_1.new_receiver(),
    input_channel_2.new_receiver(),
    echo_channel.new_sender(),
    "echo-actor",
)

echo_receiver = echo_channel.new_receiver()  # (5)!

async def main() -> None:  # (6)!
    # (8)!
    await input_channel_1.new_sender().send(True)
    await input_channel_2.new_sender().send(False)

    await run(echo_actor)  # (9)!

    await echo_channel.close()  # (16)!

    async for message in echo_receiver:  # (17)!
        print(f"Received {message=}")


if __name__ == "__main__":  # (7)!
    asyncio.run(main())
  1. We define an EchoActor that receives messages from two channels and sends them to another channel.

  2. We implement the _run() method that will receive messages from the two channels using select() and send them to the output channel. The run() method will stop if a False message is received.

  3. We create the channels that will be used with the actor.

  4. We create the actor and connect it to the channels by creating new receivers and senders from the channels.

  5. We create a receiver for the echo_channel to eventually receive the messages sent by the actor.

  6. We define the main() function that will run the actor.

  7. We start the main() function in the async loop using asyncio.run().

  8. We send a message to each of the input channels. These messages will be queued in the channels until they are consumed by the actor.

  9. We start the actor and wait for it to finish using the run() function.

  10. The select() function will get the first message available from the two channels. The order in which they will be handled is unknown, but in this example we assume that the first message will be from input_channel_1 (True) and the second from input_channel_1 (False).

  11. The selected_from() function will return True for the input_channel_1 receiver. selected.value holds the received message, so "Received from receiver_1: True" will be printed and True will be sent to the output channel.

  12. Since selected.value is True, the loop will continue, going back to the select() function.

  13. The selected_from() function will return False for the input_channel_1 receiver and True for the input_channel_2 receiver. The message stored in selected.value will now be False, so "Received from receiver_2: False" will be printed and False will be sent to the output channel.

  14. Since selected.value is False, the loop will break.

  15. The _run() method will finish normally and the actor will be stopped, so the run() function will return.

  16. We close the echo_channel to make sure the echo_receiver will stop receiving messages after all the queued messages are consumed (otherwise the step 17 will never end!).

  17. We receive the messages sent by the actor to the echo_channel one by one and print them, it should print first Received message=True and then Received message=False.

The expected output is:

Received from receiver_1: True
Received from receiver_2: False
Received message=True
Received message=False

Classes¤

frequenz.sdk.actor.Actor ¤

Bases: BackgroundService, ABC

A primitive unit of computation that runs autonomously.

To implement an actor, subclasses must implement the _run() method, which should run the actor's logic. The _run() method is called by the base class when the actor is started, and is expected to run until the actor is stopped.

Info

Please read the actor module documentation for more comprehensive guide on how to use and implement actors properly.

Source code in frequenz/sdk/actor/_actor.py
class Actor(BackgroundService, abc.ABC):
    """A primitive unit of computation that runs autonomously.

    To implement an actor, subclasses must implement the
    [`_run()`][frequenz.sdk.actor--the-_run-method] method, which should run the actor's
    logic. The [`_run()`][frequenz.sdk.actor--the-_run-method] method is called by the
    base class when the actor is started, and is expected to run until the actor is
    stopped.

    !!! info

        Please read the [`actor` module documentation][frequenz.sdk.actor] for more
        comprehensive guide on how to use and implement actors properly.
    """

    RESTART_DELAY: timedelta = timedelta(seconds=2)
    """The delay to wait between restarts of this actor."""

    _restart_limit: int | None = None
    """The number of times actors can be restarted when they are stopped by unhandled exceptions.

    If this is bigger than 0 or `None`, the actor will be restarted when there is an
    unhanded exception in the `_run()` method.

    If `None`, the actor will be restarted an unlimited number of times.

    !!! note

        This is mostly used for testing purposes and shouldn't be set in production.
    """

    def start(self) -> None:
        """Start this actor.

        If this actor is already running, this method does nothing.
        """
        if self.is_running:
            return
        self._tasks.clear()
        self._tasks.add(asyncio.create_task(self._run_loop()))

    @abc.abstractmethod
    async def _run(self) -> None:
        """Run this actor's logic."""

    async def _delay_if_restart(self, iteration: int) -> None:
        """Delay the restart of this actor's n'th iteration.

        Args:
            iteration: The current iteration of the restart.
        """
        # NB: I think it makes sense (in the future) to think about deminishing returns
        # the longer the actor has been running.
        # Not just for the restart-delay but actually for the n_restarts counter as well.
        if iteration > 0:
            delay = self.RESTART_DELAY.total_seconds()
            _logger.info("Actor %s: Waiting %s seconds...", self, delay)
            await asyncio.sleep(delay)

    async def _run_loop(self) -> None:
        """Run the actor's task continuously, managing restarts, cancellation, and termination.

        This method handles the execution of the actor's task, including
        restarts for unhandled exceptions, cancellation, or normal termination.

        Raises:
            asyncio.CancelledError: If the actor's `_run()` method is cancelled.
            Exception: If the actor's `_run()` method raises any other exception.
            BaseException: If the actor's `_run()` method raises any base exception.
        """
        _logger.info("Actor %s: Started.", self)
        n_restarts = 0
        while True:
            try:
                await self._delay_if_restart(n_restarts)
                await self._run()
                _logger.info("Actor %s: _run() returned without error.", self)
            except asyncio.CancelledError:
                _logger.info("Actor %s: Cancelled.", self)
                raise
            except Exception:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised an unhandled exception.", self)
                limit_str = "∞" if self._restart_limit is None else self._restart_limit
                limit_str = f"({n_restarts}/{limit_str})"
                if self._restart_limit is None or n_restarts < self._restart_limit:
                    n_restarts += 1
                    _logger.info("Actor %s: Restarting %s...", self._name, limit_str)
                    continue
                _logger.info(
                    "Actor %s: Maximum restarts attempted %s, bailing out...",
                    self,
                    limit_str,
                )
                raise
            except BaseException:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised a BaseException.", self)
                raise
            break

        _logger.info("Actor %s: Stopped.", self)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(*, name: str | None = None) -> None

Initialize this BackgroundService.

PARAMETER DESCRIPTION
name

The name of this background service. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this BackgroundService.

    Args:
        name: The name of this background service. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    self._name: str = str(id(self)) if name is None else name
    self._tasks: set[asyncio.Task[Any]] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.BackgroundService ¤

Bases: ABC

A background service that can be started and stopped.

A background service is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup.

To implement a background service, subclasses must implement the start() method, which should start the background tasks needed by the service, and add them to the _tasks protected attribute.

If you need to collect results or handle exceptions of the tasks when stopping the service, then you need to also override the stop() method, as the base implementation does not collect any results and re-raises all exceptions.

Warning

As background services manage asyncio.Task objects, a reference to them must be held for as long as the background service is expected to be running, otherwise its tasks will be cancelled and the service will stop. For more information, please refer to the Python asyncio documentation.

Example
import datetime
import asyncio

class Clock(BackgroundService):
    def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self._resolution_s = resolution_s

    def start(self) -> None:
        self._tasks.add(asyncio.create_task(self._tick()))

    async def _tick(self) -> None:
        while True:
            await asyncio.sleep(self._resolution_s)
            print(datetime.datetime.now())

async def main() -> None:
    # As an async context manager
    async with Clock(resolution_s=1):
        await asyncio.sleep(5)

    # Manual start/stop (only use if necessary, as cleanup is more complicated)
    clock = Clock(resolution_s=1)
    clock.start()
    await asyncio.sleep(5)
    await clock.stop()

asyncio.run(main())
Source code in frequenz/sdk/actor/_background_service.py
class BackgroundService(abc.ABC):
    """A background service that can be started and stopped.

    A background service is a service that runs in the background spawning one or more
    tasks. The service can be [started][frequenz.sdk.actor.BackgroundService.start]
    and [stopped][frequenz.sdk.actor.BackgroundService.stop] and can work as an
    async context manager to provide deterministic cleanup.

    To implement a background service, subclasses must implement the
    [`start()`][frequenz.sdk.actor.BackgroundService.start] method, which should
    start the background tasks needed by the service, and add them to the `_tasks`
    protected attribute.

    If you need to collect results or handle exceptions of the tasks when stopping the
    service, then you need to also override the
    [`stop()`][frequenz.sdk.actor.BackgroundService.stop] method, as the base
    implementation does not collect any results and re-raises all exceptions.

    !!! warning

        As background services manage [`asyncio.Task`][] objects, a reference to them
        must be held for as long as the background service is expected to be running,
        otherwise its tasks will be cancelled and the service will stop. For more
        information, please refer to the [Python `asyncio`
        documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).

    Example:
        ```python
        import datetime
        import asyncio

        class Clock(BackgroundService):
            def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
                super().__init__(name=name)
                self._resolution_s = resolution_s

            def start(self) -> None:
                self._tasks.add(asyncio.create_task(self._tick()))

            async def _tick(self) -> None:
                while True:
                    await asyncio.sleep(self._resolution_s)
                    print(datetime.datetime.now())

        async def main() -> None:
            # As an async context manager
            async with Clock(resolution_s=1):
                await asyncio.sleep(5)

            # Manual start/stop (only use if necessary, as cleanup is more complicated)
            clock = Clock(resolution_s=1)
            clock.start()
            await asyncio.sleep(5)
            await clock.stop()

        asyncio.run(main())
        ```
    """

    def __init__(self, *, name: str | None = None) -> None:
        """Initialize this BackgroundService.

        Args:
            name: The name of this background service. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
        """
        self._name: str = str(id(self)) if name is None else name
        self._tasks: set[asyncio.Task[Any]] = set()

    @abc.abstractmethod
    def start(self) -> None:
        """Start this background service."""

    @property
    def name(self) -> str:
        """The name of this background service.

        Returns:
            The name of this background service.
        """
        return self._name

    @property
    def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
        """Return the set of running tasks spawned by this background service.

        Users typically should not modify the tasks in the returned set and only use
        them for informational purposes.

        !!! danger

            Changing the returned tasks may lead to unexpected behavior, don't do it
            unless the class explicitly documents it is safe to do so.

        Returns:
            The set of running tasks spawned by this background service.
        """
        return self._tasks

    @property
    def is_running(self) -> bool:
        """Return whether this background service is running.

        A service is considered running when at least one task is running.

        Returns:
            Whether this background service is running.
        """
        return any(not task.done() for task in self._tasks)

    def cancel(self, msg: str | None = None) -> None:
        """Cancel all running tasks spawned by this background service.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        for task in self._tasks:
            task.cancel(msg)

    # We need the noqa because pydoclint can't figure out `rest` is
    # a `BaseExceptionGroup` instance.
    async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
        """Stop this background service.

        This method cancels all running tasks spawned by this service and waits for them
        to finish.

        Args:
            msg: The message to be passed to the tasks being cancelled.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception.
        """
        if not self._tasks:
            return
        self.cancel(msg)
        try:
            await self.wait()
        except BaseExceptionGroup as exc_group:
            # We want to ignore CancelledError here as we explicitly cancelled all the
            # tasks.
            _, rest = exc_group.split(asyncio.CancelledError)
            if rest is not None:
                # We are filtering out from an exception group, we really don't want to
                # add the exceptions we just filtered by adding a from clause here.
                raise rest  # pylint: disable=raise-missing-from

    async def __aenter__(self) -> Self:
        """Enter an async context.

        Start this background service.

        Returns:
            This background service.
        """
        self.start()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit an async context.

        Stop this background service.

        Args:
            exc_type: The type of the exception raised, if any.
            exc_val: The exception raised, if any.
            exc_tb: The traceback of the exception raised, if any.
        """
        await self.stop()

    async def wait(self) -> None:
        """Wait this background service to finish.

        Wait until all background service tasks are finished.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception (`CancelError` is not considered an error and not returned in
                the exception group).
        """
        # We need to account for tasks that were created between when we started
        # awaiting and we finished awaiting.
        while self._tasks:
            done, pending = await asyncio.wait(self._tasks)
            assert not pending

            # We remove the done tasks, but there might be new ones created after we
            # started waiting.
            self._tasks = self._tasks - done

            exceptions: list[BaseException] = []
            for task in done:
                try:
                    # This will raise a CancelledError if the task was cancelled or any
                    # other exception if the task raised one.
                    _ = task.result()
                except BaseException as error:  # pylint: disable=broad-except
                    exceptions.append(error)
            if exceptions:
                raise BaseExceptionGroup(
                    f"Error while stopping background service {self}", exceptions
                )

    def __await__(self) -> collections.abc.Generator[None, None, None]:
        """Await this background service.

        An awaited background service will wait for all its tasks to finish.

        Returns:
            An implementation-specific generator for the awaitable.
        """
        return self.wait().__await__()

    def __del__(self) -> None:
        """Destroy this instance.

        Cancel all running tasks spawned by this background service.
        """
        self.cancel("{self!r} was deleted")

    def __repr__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"

    def __str__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}[{self._name}]"
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(*, name: str | None = None) -> None

Initialize this BackgroundService.

PARAMETER DESCRIPTION
name

The name of this background service. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this BackgroundService.

    Args:
        name: The name of this background service. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    self._name: str = str(id(self)) if name is None else name
    self._tasks: set[asyncio.Task[Any]] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start abstractmethod ¤
start() -> None

Start this background service.

Source code in frequenz/sdk/actor/_background_service.py
@abc.abstractmethod
def start(self) -> None:
    """Start this background service."""
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in frequenz/sdk/timeseries/_resampling.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the *input sampling period* if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = average
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = UNIX_EPOCH

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len class-attribute instance-attribute ¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len class-attribute instance-attribute ¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods class-attribute instance-attribute ¤
max_data_age_in_periods: float = 3.0

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function class-attribute instance-attribute ¤
resampling_function: ResamplingFunction = average

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period instance-attribute ¤
resampling_period: timedelta

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len class-attribute instance-attribute ¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in frequenz/sdk/timeseries/_resampling.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

Functions¤

frequenz.sdk.actor.run async ¤

run(*actors: Actor) -> None

Await the completion of all actors.

Info

Please read the actor module documentation for more comprehensive guide on how to use and implement actors properly.

PARAMETER DESCRIPTION
*actors

the actors to be awaited.

TYPE: Actor DEFAULT: ()

Source code in frequenz/sdk/actor/_run_utils.py
async def run(*actors: Actor) -> None:
    """Await the completion of all actors.

    !!! info

        Please read the [`actor` module documentation][frequenz.sdk.actor] for more
        comprehensive guide on how to use and implement actors properly.

    Args:
        *actors: the actors to be awaited.
    """
    _logger.info("Starting %s actor(s)...", len(actors))

    for actor in actors:
        if actor.is_running:
            _logger.info("Actor %s: Already running, skipping start.", actor)
        else:
            _logger.info("Actor %s: Starting...", actor)
            actor.start()

    # Wait until all actors are done
    pending_tasks = {asyncio.create_task(a.wait(), name=str(a)) for a in actors}
    while pending_tasks:
        done_tasks, pending_tasks = await asyncio.wait(
            pending_tasks, return_when=asyncio.FIRST_COMPLETED
        )

        # This should always be only one task, but we handle many for extra safety
        for task in done_tasks:
            # Cancellation needs to be checked first, otherwise the other methods
            # could raise a CancelledError
            if task.cancelled():
                _logger.info("Actor %s: Cancelled while running.", task.get_name())
            elif exception := task.exception():
                _logger.error(
                    "Actor %s: Raised an exception while running.",
                    task.get_name(),
                    exc_info=exception,
                )
            else:
                _logger.info("Actor %s: Finished normally.", task.get_name())

    _logger.info("All %s actor(s) finished.", len(actors))