Skip to content

Index

frequenz.sdk.actor ¤

A base class for creating simple composable actors.

Classes¤

frequenz.sdk.actor.Actor ¤

Bases: BackgroundService, ABC

A primitive unit of computation that runs autonomously.

From Wikipedia, an actor is:

[...] the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).

Channels can be used to implement communication between actors, as shown in the examples below.

To implement an actor, subclasses must implement the _run() method, which should run the actor's logic. The _run() method is called by the base class when the actor is started, and is expected to run until the actor is stopped.

If an unhandled exception is raised in the _run() method, the actor will be restarted automatically. Unhandled BaseExceptions will cause the actor to stop immediately and will be re-raised.

Warning

As actors manage asyncio.Task objects, a reference to them must be held for as long as the actor is expected to be running, otherwise its tasks will be cancelled and the actor will stop. For more information, please refer to the Python asyncio documentation.

Example: Example of an actor receiving from two receivers

```python
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from

class EchoActor(Actor):
    def __init__(
        self,
        recv1: Receiver[bool],
        recv2: Receiver[bool],
        output: Sender[bool],
    ) -> None:
        super().__init__()
        self._recv1 = recv1
        self._recv2 = recv2
        self._output = output

    async def _run(self) -> None:
        async for selected in select(self._recv1, self._recv2):
            if selected_from(selected, self._recv1):
                await self._output.send(selected.value)
            elif selected_from(selected, self._recv1):
                await self._output.send(selected.value)
            else:
                assert False, "Unknown selected channel"


input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
input_channel_2_sender = input_channel_2.new_sender()

echo_channel = Broadcast[bool]("EchoChannel")
echo_receiver = echo_channel.new_receiver()

async with EchoActor(
    input_channel_1.new_receiver(),
    input_channel_2.new_receiver(),
    echo_channel.new_sender(),
):
    await input_channel_2_sender.send(True)
    print(await echo_receiver.receive())
```

Example: Example of composing two actors

```python
from frequenz.channels import Broadcast, Receiver, Sender

class Actor1(Actor):
    def __init__(
        self,
        recv: Receiver[bool],
        output: Sender[bool],
    ) -> None:
        super().__init__()
        self._recv = recv
        self._output = output

    async def _run(self) -> None:
        async for msg in self._recv:
            await self._output.send(msg)


class Actor2(Actor):
    def __init__(
        self,
        recv: Receiver[bool],
        output: Sender[bool],
    ) -> None:
        super().__init__()
        self._recv = recv
        self._output = output

    async def _run(self) -> None:
        async for msg in self._recv:
            await self._output.send(msg)

input_channel: Broadcast[bool] = Broadcast("Input to Actor1")
middle_channel: Broadcast[bool] = Broadcast("Actor1 -> Actor2 stream")
output_channel: Broadcast[bool] = Broadcast("Actor2 output")

input_sender = input_channel.new_sender()
output_receiver = output_channel.new_receiver()

async with (
    Actor1(input_channel.new_receiver(), middle_channel.new_sender()),
    Actor2(middle_channel.new_receiver(), output_channel.new_sender()),
):
    await input_sender.send(True)
    print(await output_receiver.receive())
```
Source code in frequenz/sdk/actor/_actor.py
class Actor(BackgroundService, abc.ABC):
    """A primitive unit of computation that runs autonomously.

    From [Wikipedia](https://en.wikipedia.org/wiki/Actor_model), an actor is:

    > [...] the basic building block of concurrent computation. In response to
    > a message it receives, an actor can: make local decisions, create more actors,
    > send more messages, and determine how to respond to the next message received.
    > Actors may modify their own private state, but can only affect each other
    > indirectly through messaging (removing the need for lock-based synchronization).

    [Channels](https://github.com/frequenz-floss/frequenz-channels-python/) can be used
    to implement communication between actors, as shown in the examples below.

    To implement an actor, subclasses must implement the `_run()` method, which should
    run the actor's logic. The `_run()` method is called by the base class when the
    actor is started, and is expected to run until the actor is stopped.

    If an unhandled exception is raised in the `_run()` method, the actor will be
    restarted automatically. Unhandled [`BaseException`][]s will cause the actor to stop
    immediately and will be re-raised.

    !!! warning

        As actors manage [`asyncio.Task`][] objects, a reference to them must be held
        for as long as the actor is expected to be running, otherwise its tasks will be
        cancelled and the actor will stop. For more information, please refer to the
        [Python `asyncio`
        documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).

    Example: Example of an actor receiving from two receivers

        ```python
        from frequenz.channels import Broadcast, Receiver, Sender
        from frequenz.channels.util import select, selected_from

        class EchoActor(Actor):
            def __init__(
                self,
                recv1: Receiver[bool],
                recv2: Receiver[bool],
                output: Sender[bool],
            ) -> None:
                super().__init__()
                self._recv1 = recv1
                self._recv2 = recv2
                self._output = output

            async def _run(self) -> None:
                async for selected in select(self._recv1, self._recv2):
                    if selected_from(selected, self._recv1):
                        await self._output.send(selected.value)
                    elif selected_from(selected, self._recv1):
                        await self._output.send(selected.value)
                    else:
                        assert False, "Unknown selected channel"


        input_channel_1 = Broadcast[bool]("input_channel_1")
        input_channel_2 = Broadcast[bool]("input_channel_2")
        input_channel_2_sender = input_channel_2.new_sender()

        echo_channel = Broadcast[bool]("EchoChannel")
        echo_receiver = echo_channel.new_receiver()

        async with EchoActor(
            input_channel_1.new_receiver(),
            input_channel_2.new_receiver(),
            echo_channel.new_sender(),
        ):
            await input_channel_2_sender.send(True)
            print(await echo_receiver.receive())
        ```

    Example: Example of composing two actors

        ```python
        from frequenz.channels import Broadcast, Receiver, Sender

        class Actor1(Actor):
            def __init__(
                self,
                recv: Receiver[bool],
                output: Sender[bool],
            ) -> None:
                super().__init__()
                self._recv = recv
                self._output = output

            async def _run(self) -> None:
                async for msg in self._recv:
                    await self._output.send(msg)


        class Actor2(Actor):
            def __init__(
                self,
                recv: Receiver[bool],
                output: Sender[bool],
            ) -> None:
                super().__init__()
                self._recv = recv
                self._output = output

            async def _run(self) -> None:
                async for msg in self._recv:
                    await self._output.send(msg)

        input_channel: Broadcast[bool] = Broadcast("Input to Actor1")
        middle_channel: Broadcast[bool] = Broadcast("Actor1 -> Actor2 stream")
        output_channel: Broadcast[bool] = Broadcast("Actor2 output")

        input_sender = input_channel.new_sender()
        output_receiver = output_channel.new_receiver()

        async with (
            Actor1(input_channel.new_receiver(), middle_channel.new_sender()),
            Actor2(middle_channel.new_receiver(), output_channel.new_sender()),
        ):
            await input_sender.send(True)
            print(await output_receiver.receive())
        ```
    """

    _restart_limit: int | None = None
    """The number of times actors can be restarted when they are stopped by unhandled exceptions.

    If this is bigger than 0 or `None`, the actor will be restarted when there is an
    unhanded exception in the `_run()` method.

    If `None`, the actor will be restarted an unlimited number of times.

    !!! note

        This is mostly used for testing purposes and shouldn't be set in production.
    """

    def start(self) -> None:
        """Start this actor.

        If this actor is already running, this method does nothing.
        """
        if self.is_running:
            return
        self._tasks.clear()
        self._tasks.add(asyncio.create_task(self._run_loop()))

    @abc.abstractmethod
    async def _run(self) -> None:
        """Run this actor's logic."""

    async def _run_loop(self) -> None:
        """Run this actor's task in a loop until `_restart_limit` is reached.

        Raises:
            asyncio.CancelledError: If this actor's `_run()` gets cancelled.
            Exception: If this actor's `_run()` raises any other `Exception` and reached
                the maximum number of restarts.
            BaseException: If this actor's `_run()` raises any other `BaseException`.
        """
        _logger.info("Actor %s: Started.", self)
        n_restarts = 0
        while True:
            try:
                await self._run()
                _logger.info("Actor %s: _run() returned without error.", self)
            except asyncio.CancelledError:
                _logger.info("Actor %s: Cancelled.", self)
                raise
            except Exception:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised an unhandled exception.", self)
                limit_str = "∞" if self._restart_limit is None else self._restart_limit
                limit_str = f"({n_restarts}/{limit_str})"
                if self._restart_limit is None or n_restarts < self._restart_limit:
                    n_restarts += 1
                    _logger.info("Actor %s: Restarting %s...", self._name, limit_str)
                    continue
                _logger.info(
                    "Actor %s: Maximum restarts attempted %s, bailing out...",
                    self,
                    limit_str,
                )
                raise
            except BaseException:  # pylint: disable=broad-except
                _logger.exception("Actor %s: Raised a BaseException.", self)
                raise
            break

        _logger.info("Actor %s: Stopped.", self)
Functions¤
start() ¤

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))

frequenz.sdk.actor.BackgroundService ¤

Bases: ABC

A background service that can be started and stopped.

A background service is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup.

To implement a background service, subclasses must implement the start() method, which should start the background tasks needed by the service, and add them to the _tasks protected attribute.

If you need to collect results or handle exceptions of the tasks when stopping the service, then you need to also override the stop() method, as the base implementation does not collect any results and re-raises all exceptions.

Warning

As background services manage asyncio.Task objects, a reference to them must be held for as long as the background service is expected to be running, otherwise its tasks will be cancelled and the service will stop. For more information, please refer to the Python asyncio documentation.

Example
import datetime
import asyncio

class Clock(BackgroundService):
    def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
        super().__init__(name=name)
        self._resolution_s = resolution_s

    def start(self) -> None:
        self._tasks.add(asyncio.create_task(self._tick()))

    async def _tick(self) -> None:
        while True:
            await asyncio.sleep(self._resolution_s)
            print(datetime.datetime.now())

async def main() -> None:
    # As an async context manager
    async with Clock(resolution_s=1):
        await asyncio.sleep(5)

    # Manual start/stop (only use if necessary, as cleanup is more complicated)
    clock = Clock(resolution_s=1)
    clock.start()
    await asyncio.sleep(5)
    await clock.stop()

asyncio.run(main())
Source code in frequenz/sdk/actor/_background_service.py
class BackgroundService(abc.ABC):
    """A background service that can be started and stopped.

    A background service is a service that runs in the background spawning one or more
    tasks. The service can be [started][frequenz.sdk.actor.BackgroundService.start]
    and [stopped][frequenz.sdk.actor.BackgroundService.stop] and can work as an
    async context manager to provide deterministic cleanup.

    To implement a background service, subclasses must implement the
    [`start()`][frequenz.sdk.actor.BackgroundService.start] method, which should
    start the background tasks needed by the service, and add them to the `_tasks`
    protected attribute.

    If you need to collect results or handle exceptions of the tasks when stopping the
    service, then you need to also override the
    [`stop()`][frequenz.sdk.actor.BackgroundService.stop] method, as the base
    implementation does not collect any results and re-raises all exceptions.

    !!! warning

        As background services manage [`asyncio.Task`][] objects, a reference to them
        must be held for as long as the background service is expected to be running,
        otherwise its tasks will be cancelled and the service will stop. For more
        information, please refer to the [Python `asyncio`
        documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).

    Example:
        ```python
        import datetime
        import asyncio

        class Clock(BackgroundService):
            def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
                super().__init__(name=name)
                self._resolution_s = resolution_s

            def start(self) -> None:
                self._tasks.add(asyncio.create_task(self._tick()))

            async def _tick(self) -> None:
                while True:
                    await asyncio.sleep(self._resolution_s)
                    print(datetime.datetime.now())

        async def main() -> None:
            # As an async context manager
            async with Clock(resolution_s=1):
                await asyncio.sleep(5)

            # Manual start/stop (only use if necessary, as cleanup is more complicated)
            clock = Clock(resolution_s=1)
            clock.start()
            await asyncio.sleep(5)
            await clock.stop()

        asyncio.run(main())
        ```
    """

    def __init__(self, *, name: str | None = None) -> None:
        """Initialize this BackgroundService.

        Args:
            name: The name of this background service. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
        """
        self._name: str = str(id(self)) if name is None else name
        self._tasks: set[asyncio.Task[Any]] = set()

    @abc.abstractmethod
    def start(self) -> None:
        """Start this background service."""

    @property
    def name(self) -> str:
        """The name of this background service.

        Returns:
            The name of this background service.
        """
        return self._name

    @property
    def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
        """Return the set of running tasks spawned by this background service.

        Users typically should not modify the tasks in the returned set and only use
        them for informational purposes.

        !!! danger

            Changing the returned tasks may lead to unexpected behavior, don't do it
            unless the class explicitly documents it is safe to do so.

        Returns:
            The set of running tasks spawned by this background service.
        """
        return self._tasks

    @property
    def is_running(self) -> bool:
        """Return whether this background service is running.

        A service is considered running when at least one task is running.

        Returns:
            Whether this background service is running.
        """
        return any(not task.done() for task in self._tasks)

    def cancel(self, msg: str | None = None) -> None:
        """Cancel all running tasks spawned by this background service.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        for task in self._tasks:
            task.cancel(msg)

    async def stop(self, msg: str | None = None) -> None:
        """Stop this background service.

        This method cancels all running tasks spawned by this service and waits for them
        to finish.

        Args:
            msg: The message to be passed to the tasks being cancelled.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception.

        [//]: # (# noqa: DAR401 rest)
        """
        if not self._tasks:
            return
        self.cancel(msg)
        try:
            await self.wait()
        except BaseExceptionGroup as exc_group:
            # We want to ignore CancelledError here as we explicitly cancelled all the
            # tasks.
            _, rest = exc_group.split(asyncio.CancelledError)
            if rest is not None:
                # We are filtering out from an exception group, we really don't want to
                # add the exceptions we just filtered by adding a from clause here.
                raise rest  # pylint: disable=raise-missing-from

    async def __aenter__(self) -> Self:
        """Enter an async context.

        Start this background service.

        Returns:
            This background service.
        """
        self.start()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit an async context.

        Stop this background service.

        Args:
            exc_type: The type of the exception raised, if any.
            exc_val: The exception raised, if any.
            exc_tb: The traceback of the exception raised, if any.
        """
        await self.stop()

    async def wait(self) -> None:
        """Wait this background service to finish.

        Wait until all background service tasks are finished.

        Raises:
            BaseExceptionGroup: If any of the tasks spawned by this service raised an
                exception (`CancelError` is not considered an error and not returned in
                the exception group).
        """
        # We need to account for tasks that were created between when we started
        # awaiting and we finished awaiting.
        while self._tasks:
            done, pending = await asyncio.wait(self._tasks)
            assert not pending

            # We remove the done tasks, but there might be new ones created after we
            # started waiting.
            self._tasks = self._tasks - done

            exceptions: list[BaseException] = []
            for task in done:
                try:
                    # This will raise a CancelledError if the task was cancelled or any
                    # other exception if the task raised one.
                    _ = task.result()
                except BaseException as error:  # pylint: disable=broad-except
                    exceptions.append(error)
            if exceptions:
                raise BaseExceptionGroup(
                    f"Error while stopping background service {self}", exceptions
                )

    def __await__(self) -> collections.abc.Generator[None, None, None]:
        """Await this background service.

        An awaited background service will wait for all its tasks to finish.

        Returns:
            An implementation-specific generator for the awaitable.
        """
        return self.wait().__await__()

    def __del__(self) -> None:
        """Destroy this instance.

        Cancel all running tasks spawned by this background service.
        """
        self.cancel("{self!r} was deleted")

    def __repr__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"

    def __str__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return f"{type(self).__name__}[{self._name}]"
Attributes¤
is_running: bool property ¤

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name: str property ¤

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks: collections.abc.Set[asyncio.Task[Any]] property ¤

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__() async ¤

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__(exc_type, exc_val, exc_tb) async ¤

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__() ¤

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__() ¤

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__(*, name=None) ¤

Initialize this BackgroundService.

PARAMETER DESCRIPTION
name

The name of this background service. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def __init__(self, *, name: str | None = None) -> None:
    """Initialize this BackgroundService.

    Args:
        name: The name of this background service. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    self._name: str = str(id(self)) if name is None else name
    self._tasks: set[asyncio.Task[Any]] = set()
__repr__() ¤

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__() ¤

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel(msg=None) ¤

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start() abstractmethod ¤

Start this background service.

Source code in frequenz/sdk/actor/_background_service.py
@abc.abstractmethod
def start(self) -> None:
    """Start this background service."""
stop(msg=None) async ¤

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.

    [//]: # (# noqa: DAR401 rest)
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait() async ¤

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.ChannelRegistry ¤

Dynamically creates, own and provide access to channels.

It can be used by actors to dynamically establish a communication channel between each other. Channels are identified by string names.

Source code in frequenz/sdk/actor/_channel_registry.py
class ChannelRegistry:
    """Dynamically creates, own and provide access to channels.

    It can be used by actors to dynamically establish a communication channel
    between each other.  Channels are identified by string names.
    """

    def __init__(self, *, name: str) -> None:
        """Create a `ChannelRegistry` instance.

        Args:
            name: A unique name for the registry.
        """
        self._name = name
        self._channels: Dict[str, Broadcast[Any]] = {}

    def new_sender(self, key: str) -> Sender[Any]:
        """Get a sender to a dynamically created channel with the given key.

        Args:
            key: A key to identify the channel.

        Returns:
            A sender to a dynamically created channel with the given key.
        """
        if key not in self._channels:
            self._channels[key] = Broadcast(f"{self._name}-{key}")
        return self._channels[key].new_sender()

    def new_receiver(self, key: str) -> Receiver[Any]:
        """Get a receiver to a dynamically created channel with the given key.

        Args:
            key: A key to identify the channel.

        Returns:
            A receiver for a dynamically created channel with the given key.
        """
        if key not in self._channels:
            self._channels[key] = Broadcast(f"{self._name}-{key}")
        return self._channels[key].new_receiver()

    async def _close_channel(self, key: str) -> None:
        """Close a channel with the given key.

        This method is private and should only be used in special cases.

        Args:
            key: A key to identify the channel.
        """
        if key in self._channels:
            if channel := self._channels.pop(key, None):
                await channel.close()
Functions¤
__init__(*, name) ¤

Create a ChannelRegistry instance.

PARAMETER DESCRIPTION
name

A unique name for the registry.

TYPE: str

Source code in frequenz/sdk/actor/_channel_registry.py
def __init__(self, *, name: str) -> None:
    """Create a `ChannelRegistry` instance.

    Args:
        name: A unique name for the registry.
    """
    self._name = name
    self._channels: Dict[str, Broadcast[Any]] = {}
new_receiver(key) ¤

Get a receiver to a dynamically created channel with the given key.

PARAMETER DESCRIPTION
key

A key to identify the channel.

TYPE: str

RETURNS DESCRIPTION
Receiver[Any]

A receiver for a dynamically created channel with the given key.

Source code in frequenz/sdk/actor/_channel_registry.py
def new_receiver(self, key: str) -> Receiver[Any]:
    """Get a receiver to a dynamically created channel with the given key.

    Args:
        key: A key to identify the channel.

    Returns:
        A receiver for a dynamically created channel with the given key.
    """
    if key not in self._channels:
        self._channels[key] = Broadcast(f"{self._name}-{key}")
    return self._channels[key].new_receiver()
new_sender(key) ¤

Get a sender to a dynamically created channel with the given key.

PARAMETER DESCRIPTION
key

A key to identify the channel.

TYPE: str

RETURNS DESCRIPTION
Sender[Any]

A sender to a dynamically created channel with the given key.

Source code in frequenz/sdk/actor/_channel_registry.py
def new_sender(self, key: str) -> Sender[Any]:
    """Get a sender to a dynamically created channel with the given key.

    Args:
        key: A key to identify the channel.

    Returns:
        A sender to a dynamically created channel with the given key.
    """
    if key not in self._channels:
        self._channels[key] = Broadcast(f"{self._name}-{key}")
    return self._channels[key].new_sender()

frequenz.sdk.actor.ComponentMetricRequest dataclass ¤

A request object to start streaming a metric for a component.

Source code in frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py
@dataclass
class ComponentMetricRequest:
    """A request object to start streaming a metric for a component."""

    namespace: str
    """The namespace that this request belongs to.

    Metric requests with a shared namespace enable the reuse of channels within
    that namespace.

    If for example, an actor making a multiple requests, uses the name of the
    actor as the namespace, then requests from the actor will get reused when
    possible.
    """

    component_id: int
    """The ID of the requested component."""

    metric_id: ComponentMetricId
    """The ID of the requested component's metric."""

    start_time: Optional[datetime]
    """The start time from which data is required.

    When None, we will stream only live data.
    """

    def get_channel_name(self) -> str:
        """Return a channel name constructed from Self.

        This channel name can be used by the sending side and receiving sides to
        identify the right channel from the ChannelRegistry.

        Returns:
            A string denoting a channel name.
        """
        return (
            f"component-stream::{self.component_id}::{self.metric_id.name}::"
            f"{self.start_time}::{self.namespace}"
        )
Attributes¤
component_id: int instance-attribute ¤

The ID of the requested component.

metric_id: ComponentMetricId instance-attribute ¤

The ID of the requested component's metric.

namespace: str instance-attribute ¤

The namespace that this request belongs to.

Metric requests with a shared namespace enable the reuse of channels within that namespace.

If for example, an actor making a multiple requests, uses the name of the actor as the namespace, then requests from the actor will get reused when possible.

start_time: Optional[datetime] instance-attribute ¤

The start time from which data is required.

When None, we will stream only live data.

Functions¤
get_channel_name() ¤

Return a channel name constructed from Self.

This channel name can be used by the sending side and receiving sides to identify the right channel from the ChannelRegistry.

RETURNS DESCRIPTION
str

A string denoting a channel name.

Source code in frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py
def get_channel_name(self) -> str:
    """Return a channel name constructed from Self.

    This channel name can be used by the sending side and receiving sides to
    identify the right channel from the ChannelRegistry.

    Returns:
        A string denoting a channel name.
    """
    return (
        f"component-stream::{self.component_id}::{self.metric_id.name}::"
        f"{self.start_time}::{self.namespace}"
    )

frequenz.sdk.actor.ComponentMetricsResamplingActor ¤

Bases: Actor

An actor to resample microgrid component metrics.

Source code in frequenz/sdk/actor/_resampling.py
class ComponentMetricsResamplingActor(Actor):
    """An actor to resample microgrid component metrics."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        *,
        channel_registry: ChannelRegistry,
        data_sourcing_request_sender: Sender[ComponentMetricRequest],
        resampling_request_receiver: Receiver[ComponentMetricRequest],
        config: ResamplerConfig,
        name: str | None = None,
    ) -> None:
        """Initialize an instance.

        Args:
            channel_registry: The channel registry used to get senders and
                receivers for data sourcing subscriptions.
            data_sourcing_request_sender: The sender used to send requests to
                the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor]
                to subscribe to component metrics.
            resampling_request_receiver: The receiver to use to receive new
                resampmling subscription requests.
            config: The configuration for the resampler.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._channel_registry: ChannelRegistry = channel_registry
        self._data_sourcing_request_sender: Sender[
            ComponentMetricRequest
        ] = data_sourcing_request_sender
        self._resampling_request_receiver: Receiver[
            ComponentMetricRequest
        ] = resampling_request_receiver
        self._resampler: Resampler = Resampler(config)
        self._active_req_channels: set[str] = set()

    async def _subscribe(self, request: ComponentMetricRequest) -> None:
        """Request data for a component metric.

        Args:
            request: The request for component metric data.
        """
        request_channel_name = request.get_channel_name()

        # If we are already handling this request, there is nothing to do.
        if request_channel_name in self._active_req_channels:
            return

        self._active_req_channels.add(request_channel_name)

        data_source_request = dataclasses.replace(
            request, namespace=request.namespace + ":Source"
        )
        data_source_channel_name = data_source_request.get_channel_name()
        await self._data_sourcing_request_sender.send(data_source_request)
        receiver = self._channel_registry.new_receiver(data_source_channel_name)

        # This is a temporary hack until the Sender implementation uses
        # exceptions to report errors.
        sender = self._channel_registry.new_sender(request_channel_name)

        async def sink_adapter(sample: Sample[Quantity]) -> None:
            await sender.send(sample)

        self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)

    async def _process_resampling_requests(self) -> None:
        """Process resampling data requests."""
        async for request in self._resampling_request_receiver:
            await self._subscribe(request)

    async def _run(self) -> None:
        """Resample known component metrics and process resampling requests.

        If there is a resampling error while resampling some component metric,
        then that metric will be discarded and not resampled any more. Any
        other error will be propagated (most likely ending in the actor being
        restarted).

        This method creates 2 main tasks:

        - One task to process incoming subscription requests to resample new metrics.
        - One task to run the resampler.

        Raises:
            RuntimeError: If there is some unexpected error while resampling or
                handling requests.

        [//]: # (# noqa: DAR401 error)
        """
        tasks_to_cancel: set[asyncio.Task[None]] = set()
        try:
            subscriptions_task = asyncio.create_task(
                self._process_resampling_requests()
            )
            tasks_to_cancel.add(subscriptions_task)

            while True:
                resampling_task = asyncio.create_task(self._resampler.resample())
                tasks_to_cancel.add(resampling_task)
                done, _ = await asyncio.wait(
                    [resampling_task, subscriptions_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )

                if subscriptions_task in done:
                    tasks_to_cancel.remove(subscriptions_task)
                    raise RuntimeError(
                        "There was a problem with the subscriptions channel."
                    )

                if resampling_task in done:
                    tasks_to_cancel.remove(resampling_task)
                    # The resampler shouldn't end without an exception
                    error = resampling_task.exception()
                    assert (
                        error is not None
                    ), "The resample() function shouldn't exit normally."

                    # We don't know what to do with something other than
                    # ResamplingError, so propagate the exception if that is the
                    # case.
                    if not isinstance(error, ResamplingError):
                        raise error
                    for source, source_error in error.exceptions.items():
                        _logger.error(
                            "Error resampling source %s, removing source...", source
                        )
                        removed = self._resampler.remove_timeseries(source)
                        if not removed:
                            _logger.warning(
                                "Got an exception from an unknown source: "
                                "source=%r, exception=%r",
                                source,
                                source_error,
                            )
                    # The resampling_task will be re-created if we reached this point
        finally:
            await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
Functions¤
__init__(*, channel_registry, data_sourcing_request_sender, resampling_request_receiver, config, name=None) ¤

Initialize an instance.

PARAMETER DESCRIPTION
channel_registry

The channel registry used to get senders and receivers for data sourcing subscriptions.

TYPE: ChannelRegistry

data_sourcing_request_sender

The sender used to send requests to the DataSourcingActor to subscribe to component metrics.

TYPE: Sender[ComponentMetricRequest]

resampling_request_receiver

The receiver to use to receive new resampmling subscription requests.

TYPE: Receiver[ComponentMetricRequest]

config

The configuration for the resampler.

TYPE: ResamplerConfig

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_resampling.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    *,
    channel_registry: ChannelRegistry,
    data_sourcing_request_sender: Sender[ComponentMetricRequest],
    resampling_request_receiver: Receiver[ComponentMetricRequest],
    config: ResamplerConfig,
    name: str | None = None,
) -> None:
    """Initialize an instance.

    Args:
        channel_registry: The channel registry used to get senders and
            receivers for data sourcing subscriptions.
        data_sourcing_request_sender: The sender used to send requests to
            the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor]
            to subscribe to component metrics.
        resampling_request_receiver: The receiver to use to receive new
            resampmling subscription requests.
        config: The configuration for the resampler.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._channel_registry: ChannelRegistry = channel_registry
    self._data_sourcing_request_sender: Sender[
        ComponentMetricRequest
    ] = data_sourcing_request_sender
    self._resampling_request_receiver: Receiver[
        ComponentMetricRequest
    ] = resampling_request_receiver
    self._resampler: Resampler = Resampler(config)
    self._active_req_channels: set[str] = set()

frequenz.sdk.actor.ConfigManagingActor ¤

Bases: Actor

An actor that monitors a TOML configuration file for updates.

When the file is updated, the new configuration is sent, as a dict, to the output sender.

When the actor is started, if a configuration file already exists, then it will be read and sent to the output sender before the actor starts monitoring the file for updates. This way users can rely on the actor to do the initial configuration reading too.

Source code in frequenz/sdk/actor/_config_managing.py
class ConfigManagingActor(Actor):
    """An actor that monitors a TOML configuration file for updates.

    When the file is updated, the new configuration is sent, as a [`dict`][], to the
    `output` sender.

    When the actor is started, if a configuration file already exists, then it will be
    read and sent to the `output` sender before the actor starts monitoring the file
    for updates. This way users can rely on the actor to do the initial configuration
    reading too.
    """

    def __init__(
        self,
        config_path: pathlib.Path | str,
        output: Sender[Config],
        event_types: abc.Set[FileWatcher.EventType] = frozenset(FileWatcher.EventType),
        *,
        name: str | None = None,
    ) -> None:
        """Initialize this instance.

        Args:
            config_path: The path to the TOML file with the configuration.
            output: The sender to send the config to.
            event_types: The set of event types to monitor.
            name: The name of the actor. If `None`, `str(id(self))` will
                be used. This is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._config_path: pathlib.Path = (
            config_path
            if isinstance(config_path, pathlib.Path)
            else pathlib.Path(config_path)
        )
        # FileWatcher can't watch for non-existing files, so we need to watch for the
        # parent directory instead just in case a configuration file doesn't exist yet
        # or it is deleted and recreated again.
        self._file_watcher: FileWatcher = FileWatcher(
            paths=[self._config_path.parent], event_types=event_types
        )
        self._output: Sender[Config] = output

    def _read_config(self) -> dict[str, Any]:
        """Read the contents of the configuration file.

        Returns:
            A dictionary containing configuration variables.

        Raises:
            ValueError: If config file cannot be read.
        """
        try:
            with self._config_path.open("rb") as toml_file:
                return tomllib.load(toml_file)
        except ValueError as err:
            logging.error("%s: Can't read config file, err: %s", self, err)
            raise

    async def send_config(self) -> None:
        """Send the configuration to the output sender."""
        conf_vars = self._read_config()
        config = Config(conf_vars)
        await self._output.send(config)

    async def _run(self) -> None:
        """Monitor for and send configuration file updates.

        At startup, the Config Manager sends the current config so that it
        can be cache in the Broadcast channel and served to receivers even if
        there hasn't been any change to the config file itself.
        """
        await self.send_config()

        async for event in self._file_watcher:
            # Since we are watching the whole parent directory, we need to make sure
            # we only react to events related to the configuration file.
            if event.path != self._config_path:
                continue

            match event.type:
                case FileWatcher.EventType.CREATE:
                    _logger.info(
                        "%s: The configuration file %s was created, sending new config...",
                        self,
                        self._config_path,
                    )
                    await self.send_config()
                case FileWatcher.EventType.MODIFY:
                    _logger.info(
                        "%s: The configuration file %s was modified, sending update...",
                        self,
                        self._config_path,
                    )
                    await self.send_config()
                case FileWatcher.EventType.DELETE:
                    _logger.info(
                        "%s: The configuration file %s was deleted, ignoring...",
                        self,
                        self._config_path,
                    )
                case _:
                    assert_never(event.type)
Functions¤
__init__(config_path, output, event_types=frozenset(FileWatcher.EventType), *, name=None) ¤

Initialize this instance.

PARAMETER DESCRIPTION
config_path

The path to the TOML file with the configuration.

TYPE: Path | str

output

The sender to send the config to.

TYPE: Sender[Config]

event_types

The set of event types to monitor.

TYPE: Set[EventType] DEFAULT: frozenset(EventType)

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_config_managing.py
def __init__(
    self,
    config_path: pathlib.Path | str,
    output: Sender[Config],
    event_types: abc.Set[FileWatcher.EventType] = frozenset(FileWatcher.EventType),
    *,
    name: str | None = None,
) -> None:
    """Initialize this instance.

    Args:
        config_path: The path to the TOML file with the configuration.
        output: The sender to send the config to.
        event_types: The set of event types to monitor.
        name: The name of the actor. If `None`, `str(id(self))` will
            be used. This is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._config_path: pathlib.Path = (
        config_path
        if isinstance(config_path, pathlib.Path)
        else pathlib.Path(config_path)
    )
    # FileWatcher can't watch for non-existing files, so we need to watch for the
    # parent directory instead just in case a configuration file doesn't exist yet
    # or it is deleted and recreated again.
    self._file_watcher: FileWatcher = FileWatcher(
        paths=[self._config_path.parent], event_types=event_types
    )
    self._output: Sender[Config] = output
send_config() async ¤

Send the configuration to the output sender.

Source code in frequenz/sdk/actor/_config_managing.py
async def send_config(self) -> None:
    """Send the configuration to the output sender."""
    conf_vars = self._read_config()
    config = Config(conf_vars)
    await self._output.send(config)

frequenz.sdk.actor.DataSourcingActor ¤

Bases: Actor

An actor that provides data streams of metrics as time series.

Source code in frequenz/sdk/actor/_data_sourcing/data_sourcing.py
class DataSourcingActor(Actor):
    """An actor that provides data streams of metrics as time series."""

    def __init__(
        self,
        request_receiver: Receiver[ComponentMetricRequest],
        registry: ChannelRegistry,
        *,
        name: str | None = None,
    ) -> None:
        """Create a `DataSourcingActor` instance.

        Args:
            request_receiver: A channel receiver to accept metric requests from.
            registry: A channel registry.  To be replaced by a singleton
                instance.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._request_receiver = request_receiver
        self._microgrid_api_source = MicrogridApiSource(registry)

    async def _run(self) -> None:
        """Run the actor."""
        async for request in self._request_receiver:
            await self._microgrid_api_source.add_metric(request)
Functions¤
__init__(request_receiver, registry, *, name=None) ¤

Create a DataSourcingActor instance.

PARAMETER DESCRIPTION
request_receiver

A channel receiver to accept metric requests from.

TYPE: Receiver[ComponentMetricRequest]

registry

A channel registry. To be replaced by a singleton instance.

TYPE: ChannelRegistry

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_data_sourcing/data_sourcing.py
def __init__(
    self,
    request_receiver: Receiver[ComponentMetricRequest],
    registry: ChannelRegistry,
    *,
    name: str | None = None,
) -> None:
    """Create a `DataSourcingActor` instance.

    Args:
        request_receiver: A channel receiver to accept metric requests from.
        registry: A channel registry.  To be replaced by a singleton
            instance.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._request_receiver = request_receiver
    self._microgrid_api_source = MicrogridApiSource(registry)

frequenz.sdk.actor.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in frequenz/sdk/timeseries/_resampling.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the input sampling period if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = average
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to: datetime | None = UNIX_EPOCH class-attribute instance-attribute ¤

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT class-attribute instance-attribute ¤

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX class-attribute instance-attribute ¤

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods: float = 3.0 class-attribute instance-attribute ¤

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function: ResamplingFunction = average class-attribute instance-attribute ¤

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period: timedelta instance-attribute ¤

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN class-attribute instance-attribute ¤

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__() ¤

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in frequenz/sdk/timeseries/_resampling.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

Functions¤

frequenz.sdk.actor.run(*actors) async ¤

Await the completion of all actors.

PARAMETER DESCRIPTION
actors

the actors to be awaited.

TYPE: Actor DEFAULT: ()

Source code in frequenz/sdk/actor/_run_utils.py
async def run(*actors: Actor) -> None:
    """Await the completion of all actors.

    Args:
        actors: the actors to be awaited.
    """
    _logger.info("Starting %s actor(s)...", len(actors))

    for actor in actors:
        if actor.is_running:
            _logger.info("Actor %s: Already running, skipping start.", actor)
        else:
            _logger.info("Actor %s: Starting...", actor)
            actor.start()

    # Wait until all actors are done
    pending_tasks = set(asyncio.create_task(a.wait(), name=str(a)) for a in actors)
    while pending_tasks:
        done_tasks, pending_tasks = await asyncio.wait(
            pending_tasks, return_when=asyncio.FIRST_COMPLETED
        )

        # This should always be only one task, but we handle many for extra safety
        for task in done_tasks:
            # Cancellation needs to be checked first, otherwise the other methods
            # could raise a CancelledError
            if task.cancelled():
                _logger.info("Actor %s: Cancelled while running.", task.get_name())
            elif exception := task.exception():
                _logger.error(
                    "Actor %s: Raised an exception while running.",
                    task.get_name(),
                    exc_info=exception,
                )
            else:
                _logger.info("Actor %s: Finished normally.", task.get_name())

    _logger.info("All %s actor(s) finished.", len(actors))