Skip to content

Index

frequenz.sdk.actor.power_distributing ¤

This module provides feature to set power between many batteries.

Distributing power is very important to keep the microgrid ready for the power requirements. This module provides PowerDistributingActor that knows how to distribute power. It also provides all the secondary features that should be used to communicate with PowerDistributingActor and send requests for charging or discharging power.

Attributes¤

frequenz.sdk.actor.power_distributing.Result module-attribute ¤

Power distribution result.

Example: Handling power distribution results

```python
from typing import assert_never

from frequenz.sdk.actor.power_distributing import (
    Error,
    OutOfBounds,
    PartialFailure,
    Result,
    Success,
)
from frequenz.sdk.actor.power_distributing.request import Request
from frequenz.sdk.actor.power_distributing.result import PowerBounds
from frequenz.sdk.timeseries._quantities import Power

def handle_power_request_result(result: Result) -> None:
    match result:
        case Success() as success:
            print(f"Power request was successful: {success}")
        case PartialFailure() as partial_failure:
            print(f"Power request was partially successful: {partial_failure}")
        case OutOfBounds() as out_of_bounds:
            print(f"Power request was out of bounds: {out_of_bounds}")
        case Error() as error:
            print(f"Power request failed: {error}")
        case _ as unreachable:
            assert_never(unreachable)

request = Request(
    namespace="TestChannel",
    power=Power.from_watts(123.4),
    component_ids={8, 18},
)

results: list[Result] = [
    Success(
        request,
        succeeded_power=Power.from_watts(123.4),
        succeeded_components={8, 18},
        excess_power=Power.zero(),
    ),
    PartialFailure(
        request,
        succeeded_power=Power.from_watts(103.4),
        succeeded_components={8},
        excess_power=Power.zero(),
        failed_components={18},
        failed_power=Power.from_watts(20.0),
    ),
    OutOfBounds(request, bounds=PowerBounds(0, 0, 0, 800)),
    Error(request, msg="The components are not available"),
]

for r in results:
    handle_power_request_result(r)
```

Classes¤

frequenz.sdk.actor.power_distributing.ComponentPoolStatus dataclass ¤

Status of all components of a certain category in the microgrid.

Source code in frequenz/sdk/actor/power_distributing/_component_status/_component_status.py
@dataclass
class ComponentPoolStatus:
    """Status of all components of a certain category in the microgrid."""

    working: set[int]
    """Set of working component ids."""

    uncertain: set[int]
    """Set of components to be used only when there are none known to be working."""

    def get_working_components(self, components: abc.Set[int]) -> set[int]:
        """From the given set of components return the working ones.

        Args:
            components: Set of components.

        Returns:
            Subset with working components.
        """
        working = self.working.intersection(components)
        if len(working) > 0:
            return working
        return self.uncertain.intersection(components)
Attributes¤
uncertain instance-attribute ¤
uncertain: set[int]

Set of components to be used only when there are none known to be working.

working instance-attribute ¤
working: set[int]

Set of working component ids.

Functions¤
get_working_components ¤
get_working_components(components: Set[int]) -> set[int]

From the given set of components return the working ones.

PARAMETER DESCRIPTION
components

Set of components.

TYPE: Set[int]

RETURNS DESCRIPTION
set[int]

Subset with working components.

Source code in frequenz/sdk/actor/power_distributing/_component_status/_component_status.py
def get_working_components(self, components: abc.Set[int]) -> set[int]:
    """From the given set of components return the working ones.

    Args:
        components: Set of components.

    Returns:
        Subset with working components.
    """
    working = self.working.intersection(components)
    if len(working) > 0:
        return working
    return self.uncertain.intersection(components)

frequenz.sdk.actor.power_distributing.Error dataclass ¤

Bases: _BaseResultMixin

Result returned when an error occurred and power was not set at all.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class Error(_BaseResultMixin):
    """Result returned when an error occurred and power was not set at all."""

    msg: str
    """The error message explaining why error happened."""
Attributes¤
msg instance-attribute ¤
msg: str

The error message explaining why error happened.

request instance-attribute ¤
request: Request

The user's request to which this message responds.

frequenz.sdk.actor.power_distributing.OutOfBounds dataclass ¤

Bases: _BaseResultMixin

Result returned when the power was not set because it was out of bounds.

This result happens when the originating request was done with adjust_power = False and the requested power is not within the available bounds.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class OutOfBounds(_BaseResultMixin):
    """Result returned when the power was not set because it was out of bounds.

    This result happens when the originating request was done with
    `adjust_power = False` and the requested power is not within the available bounds.
    """

    bounds: PowerBounds
    """The power bounds for the requested components.

    If the requested power negative, then this value is the lower bound.
    Otherwise it is upper bound.
    """
Attributes¤
bounds instance-attribute ¤
bounds: PowerBounds

The power bounds for the requested components.

If the requested power negative, then this value is the lower bound. Otherwise it is upper bound.

request instance-attribute ¤
request: Request

The user's request to which this message responds.

frequenz.sdk.actor.power_distributing.PartialFailure dataclass ¤

Bases: _BaseSuccessMixin, _BaseResultMixin

Result returned when some of the components had an error setting the power.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class PartialFailure(_BaseSuccessMixin, _BaseResultMixin):
    """Result returned when some of the components had an error setting the power."""

    failed_power: Power
    """The part of the requested power that failed to be set."""

    failed_components: abc.Set[int]
    """The subset of batteries for which the request failed."""
Attributes¤
excess_power instance-attribute ¤
excess_power: Power

The part of the requested power that could not be fulfilled.

This happens when the requested power is outside the available power bounds.

failed_components instance-attribute ¤
failed_components: Set[int]

The subset of batteries for which the request failed.

failed_power instance-attribute ¤
failed_power: Power

The part of the requested power that failed to be set.

request instance-attribute ¤
request: Request

The user's request to which this message responds.

succeeded_components instance-attribute ¤
succeeded_components: Set[int]

The subset of components for which power was set successfully.

succeeded_power instance-attribute ¤
succeeded_power: Power

The part of the requested power that was successfully set.

frequenz.sdk.actor.power_distributing.PowerDistributingActor ¤

Bases: Actor

Actor to distribute the power between batteries in a microgrid.

The purpose of this tool is to keep an equal SoC level in all batteries. The PowerDistributingActor can have many concurrent users which at this time need to be known at construction time.

For each user a bidirectional channel needs to be created through which they can send and receive requests and responses.

It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if the processing function fails then the response will never come. The timeout should be Result:request_timeout + time for processing the request.

Edge cases: * If there are 2 requests to be processed for the same subset of batteries, then only the latest request will be processed. Older request will be ignored. User with older request will get response with Result.Status.IGNORED.

  • If there are 2 requests and their subset of batteries is different but they overlap (they have at least one common battery), then then both batteries will be processed. However it is not expected so the proper error log will be printed.
Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
class PowerDistributingActor(Actor):
    # pylint: disable=too-many-instance-attributes
    """Actor to distribute the power between batteries in a microgrid.

    The purpose of this tool is to keep an equal SoC level in all batteries.
    The PowerDistributingActor can have many concurrent users which at this time
    need to be known at construction time.

    For each user a bidirectional channel needs to be created through which
    they can send and receive requests and responses.

    It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if
    the processing function fails then the response will never come.
    The timeout should be Result:request_timeout + time for processing the request.

    Edge cases:
    * If there are 2 requests to be processed for the same subset of batteries, then
    only the latest request will be processed. Older request will be ignored. User with
    older request will get response with Result.Status.IGNORED.

    * If there are 2 requests and their subset of batteries is different but they
    overlap (they have at least one common battery), then then both batteries
    will be processed. However it is not expected so the proper error log will be
    printed.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        requests_receiver: Receiver[Request],
        results_sender: Sender[Result],
        component_pool_status_sender: Sender[ComponentPoolStatus],
        wait_for_data_sec: float,
        *,
        component_category: ComponentCategory,
        component_type: ComponentType | None = None,
        name: str | None = None,
    ) -> None:
        """Create class instance.

        Args:
            requests_receiver: Receiver for receiving power requests from the power
                manager.
            results_sender: Sender for sending results to the power manager.
            component_pool_status_sender: Channel for sending information about which
                components are expected to be working.
            wait_for_data_sec: How long actor should wait before processing first
                request. It is a time needed to collect first components data.
            component_category: The category of the components that this actor is
                responsible for.
            component_type: The type of the component of the given category that this
                actor is responsible for.  This is used only when the component category
                is not enough to uniquely identify the component.  For example, when the
                category is `ComponentCategory.INVERTER`, the type is needed to identify
                the inverter as a solar inverter or a battery inverter.  This can be
                `None` when the component category is enough to uniquely identify the
                component.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.

        Raises:
            ValueError: If the given component category is not supported.
        """
        super().__init__(name=name)
        self._component_category = component_category
        self._component_type = component_type
        self._requests_receiver = requests_receiver
        self._result_sender = results_sender
        self._wait_for_data_sec = wait_for_data_sec

        self._component_manager: ComponentManager
        if component_category == ComponentCategory.BATTERY:
            self._component_manager = BatteryManager(
                component_pool_status_sender, results_sender
            )
        elif component_category == ComponentCategory.EV_CHARGER:
            self._component_manager = EVChargerManager(
                component_pool_status_sender, results_sender
            )
        elif (
            component_category == ComponentCategory.INVERTER
            and component_type == InverterType.SOLAR
        ):
            self._component_manager = PVManager(
                component_pool_status_sender, results_sender
            )
        else:
            raise ValueError(
                f"PowerDistributor doesn't support controlling: {component_category}"
            )

    async def _run(self) -> None:  # pylint: disable=too-many-locals
        """Run actor main function.

        It waits for new requests in task_queue and process it, and send
        `set_power` request with distributed power.
        The output of the `set_power` method is processed.
        Every battery and inverter that failed or didn't respond in time will be marked
        as broken for some time.
        """
        await self._component_manager.start()

        # Wait few seconds to get data from the channels created above.
        await asyncio.sleep(self._wait_for_data_sec)

        async for request in self._requests_receiver:
            await self._component_manager.distribute_power(request)

    async def stop(self, msg: str | None = None) -> None:
        """Stop this actor.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        await self._component_manager.stop()
        await super().stop(msg)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    requests_receiver: Receiver[Request],
    results_sender: Sender[Result],
    component_pool_status_sender: Sender[
        ComponentPoolStatus
    ],
    wait_for_data_sec: float,
    *,
    component_category: ComponentCategory,
    component_type: ComponentType | None = None,
    name: str | None = None
) -> None

Create class instance.

PARAMETER DESCRIPTION
requests_receiver

Receiver for receiving power requests from the power manager.

TYPE: Receiver[Request]

results_sender

Sender for sending results to the power manager.

TYPE: Sender[Result]

component_pool_status_sender

Channel for sending information about which components are expected to be working.

TYPE: Sender[ComponentPoolStatus]

wait_for_data_sec

How long actor should wait before processing first request. It is a time needed to collect first components data.

TYPE: float

component_category

The category of the components that this actor is responsible for.

TYPE: ComponentCategory

component_type

The type of the component of the given category that this actor is responsible for. This is used only when the component category is not enough to uniquely identify the component. For example, when the category is ComponentCategory.INVERTER, the type is needed to identify the inverter as a solar inverter or a battery inverter. This can be None when the component category is enough to uniquely identify the component.

TYPE: ComponentType | None DEFAULT: None

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If the given component category is not supported.

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    requests_receiver: Receiver[Request],
    results_sender: Sender[Result],
    component_pool_status_sender: Sender[ComponentPoolStatus],
    wait_for_data_sec: float,
    *,
    component_category: ComponentCategory,
    component_type: ComponentType | None = None,
    name: str | None = None,
) -> None:
    """Create class instance.

    Args:
        requests_receiver: Receiver for receiving power requests from the power
            manager.
        results_sender: Sender for sending results to the power manager.
        component_pool_status_sender: Channel for sending information about which
            components are expected to be working.
        wait_for_data_sec: How long actor should wait before processing first
            request. It is a time needed to collect first components data.
        component_category: The category of the components that this actor is
            responsible for.
        component_type: The type of the component of the given category that this
            actor is responsible for.  This is used only when the component category
            is not enough to uniquely identify the component.  For example, when the
            category is `ComponentCategory.INVERTER`, the type is needed to identify
            the inverter as a solar inverter or a battery inverter.  This can be
            `None` when the component category is enough to uniquely identify the
            component.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.

    Raises:
        ValueError: If the given component category is not supported.
    """
    super().__init__(name=name)
    self._component_category = component_category
    self._component_type = component_type
    self._requests_receiver = requests_receiver
    self._result_sender = results_sender
    self._wait_for_data_sec = wait_for_data_sec

    self._component_manager: ComponentManager
    if component_category == ComponentCategory.BATTERY:
        self._component_manager = BatteryManager(
            component_pool_status_sender, results_sender
        )
    elif component_category == ComponentCategory.EV_CHARGER:
        self._component_manager = EVChargerManager(
            component_pool_status_sender, results_sender
        )
    elif (
        component_category == ComponentCategory.INVERTER
        and component_type == InverterType.SOLAR
    ):
        self._component_manager = PVManager(
            component_pool_status_sender, results_sender
        )
    else:
        raise ValueError(
            f"PowerDistributor doesn't support controlling: {component_category}"
        )
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this actor.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this actor.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    await self._component_manager.stop()
    await super().stop(msg)
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.actor.power_distributing.Request dataclass ¤

Request to set power to the PowerDistributingActor.

Source code in frequenz/sdk/actor/power_distributing/request.py
@dataclasses.dataclass
class Request:
    """Request to set power to the `PowerDistributingActor`."""

    power: Power
    """The requested power."""

    component_ids: abc.Set[int]
    """The component ids of the components to be used for this request."""

    request_timeout: timedelta = timedelta(seconds=5.0)
    """The maximum amount of time to wait for the request to be fulfilled."""

    adjust_power: bool = True
    """Whether to adjust the power to match the bounds.

    If `True`, the power will be adjusted (lowered) to match the bounds, so
    only the reduced power will be set.

    If `False` and the power is outside the available bounds, the request will
    fail and be replied to with an `OutOfBound` result.
    """
Attributes¤
adjust_power class-attribute instance-attribute ¤
adjust_power: bool = True

Whether to adjust the power to match the bounds.

If True, the power will be adjusted (lowered) to match the bounds, so only the reduced power will be set.

If False and the power is outside the available bounds, the request will fail and be replied to with an OutOfBound result.

component_ids instance-attribute ¤
component_ids: Set[int]

The component ids of the components to be used for this request.

power instance-attribute ¤
power: Power

The requested power.

request_timeout class-attribute instance-attribute ¤
request_timeout: timedelta = timedelta(seconds=5.0)

The maximum amount of time to wait for the request to be fulfilled.

frequenz.sdk.actor.power_distributing.Success dataclass ¤

Bases: _BaseSuccessMixin, _BaseResultMixin

Result returned when setting the power was successful for all components.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class Success(_BaseSuccessMixin, _BaseResultMixin):  # Order matters here. See above.
    """Result returned when setting the power was successful for all components."""
Attributes¤
excess_power instance-attribute ¤
excess_power: Power

The part of the requested power that could not be fulfilled.

This happens when the requested power is outside the available power bounds.

request instance-attribute ¤
request: Request

The user's request to which this message responds.

succeeded_components instance-attribute ¤
succeeded_components: Set[int]

The subset of components for which power was set successfully.

succeeded_power instance-attribute ¤
succeeded_power: Power

The part of the requested power that was successfully set.