Skip to content

power_distributing

frequenz.sdk.actor.power_distributing.power_distributing ¤

Actor to distribute power between batteries.

When charge/discharge method is called the power should be distributed so that the SoC in batteries stays at the same level. That way of distribution prevents using only one battery, increasing temperature, and maximize the total amount power to charge/discharge.

Purpose of this actor is to keep SoC level of each component at the equal level.

Attributes¤

Classes¤

frequenz.sdk.actor.power_distributing.power_distributing.PowerDistributingActor ¤

Bases: Actor

Actor to distribute the power between batteries in a microgrid.

The purpose of this tool is to keep an equal SoC level in all batteries. The PowerDistributingActor can have many concurrent users which at this time need to be known at construction time.

For each user a bidirectional channel needs to be created through which they can send and receive requests and responses.

It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if the processing function fails then the response will never come. The timeout should be Result:request_timeout + time for processing the request.

Edge cases: * If there are 2 requests to be processed for the same subset of batteries, then only the latest request will be processed. Older request will be ignored. User with older request will get response with Result.Status.IGNORED.

  • If there are 2 requests and their subset of batteries is different but they overlap (they have at least one common battery), then then both batteries will be processed. However it is not expected so the proper error log will be printed.
Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
class PowerDistributingActor(Actor):
    # pylint: disable=too-many-instance-attributes
    """Actor to distribute the power between batteries in a microgrid.

    The purpose of this tool is to keep an equal SoC level in all batteries.
    The PowerDistributingActor can have many concurrent users which at this time
    need to be known at construction time.

    For each user a bidirectional channel needs to be created through which
    they can send and receive requests and responses.

    It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if
    the processing function fails then the response will never come.
    The timeout should be Result:request_timeout + time for processing the request.

    Edge cases:
    * If there are 2 requests to be processed for the same subset of batteries, then
    only the latest request will be processed. Older request will be ignored. User with
    older request will get response with Result.Status.IGNORED.

    * If there are 2 requests and their subset of batteries is different but they
    overlap (they have at least one common battery), then then both batteries
    will be processed. However it is not expected so the proper error log will be
    printed.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        requests_receiver: Receiver[Request],
        results_sender: Sender[Result],
        component_pool_status_sender: Sender[ComponentPoolStatus],
        wait_for_data_sec: float,
        *,
        component_category: ComponentCategory,
        component_type: ComponentType | None = None,
        name: str | None = None,
    ) -> None:
        """Create class instance.

        Args:
            requests_receiver: Receiver for receiving power requests from the power
                manager.
            results_sender: Sender for sending results to the power manager.
            component_pool_status_sender: Channel for sending information about which
                components are expected to be working.
            wait_for_data_sec: How long actor should wait before processing first
                request. It is a time needed to collect first components data.
            component_category: The category of the components that this actor is
                responsible for.
            component_type: The type of the component of the given category that this
                actor is responsible for.  This is used only when the component category
                is not enough to uniquely identify the component.  For example, when the
                category is `ComponentCategory.INVERTER`, the type is needed to identify
                the inverter as a solar inverter or a battery inverter.  This can be
                `None` when the component category is enough to uniquely identify the
                component.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.

        Raises:
            ValueError: If the given component category is not supported.
        """
        super().__init__(name=name)
        self._component_category = component_category
        self._component_type = component_type
        self._requests_receiver = requests_receiver
        self._result_sender = results_sender
        self._wait_for_data_sec = wait_for_data_sec

        self._component_manager: ComponentManager
        if component_category == ComponentCategory.BATTERY:
            self._component_manager = BatteryManager(
                component_pool_status_sender, results_sender
            )
        elif component_category == ComponentCategory.EV_CHARGER:
            self._component_manager = EVChargerManager(
                component_pool_status_sender, results_sender
            )
        elif (
            component_category == ComponentCategory.INVERTER
            and component_type == InverterType.SOLAR
        ):
            self._component_manager = PVManager(
                component_pool_status_sender, results_sender
            )
        else:
            raise ValueError(
                f"PowerDistributor doesn't support controlling: {component_category}"
            )

    async def _run(self) -> None:  # pylint: disable=too-many-locals
        """Run actor main function.

        It waits for new requests in task_queue and process it, and send
        `set_power` request with distributed power.
        The output of the `set_power` method is processed.
        Every battery and inverter that failed or didn't respond in time will be marked
        as broken for some time.
        """
        await self._component_manager.start()

        # Wait few seconds to get data from the channels created above.
        await asyncio.sleep(self._wait_for_data_sec)

        async for request in self._requests_receiver:
            await self._component_manager.distribute_power(request)

    async def stop(self, msg: str | None = None) -> None:
        """Stop this actor.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        await self._component_manager.stop()
        await super().stop(msg)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    requests_receiver: Receiver[Request],
    results_sender: Sender[Result],
    component_pool_status_sender: Sender[
        ComponentPoolStatus
    ],
    wait_for_data_sec: float,
    *,
    component_category: ComponentCategory,
    component_type: ComponentType | None = None,
    name: str | None = None
) -> None

Create class instance.

PARAMETER DESCRIPTION
requests_receiver

Receiver for receiving power requests from the power manager.

TYPE: Receiver[Request]

results_sender

Sender for sending results to the power manager.

TYPE: Sender[Result]

component_pool_status_sender

Channel for sending information about which components are expected to be working.

TYPE: Sender[ComponentPoolStatus]

wait_for_data_sec

How long actor should wait before processing first request. It is a time needed to collect first components data.

TYPE: float

component_category

The category of the components that this actor is responsible for.

TYPE: ComponentCategory

component_type

The type of the component of the given category that this actor is responsible for. This is used only when the component category is not enough to uniquely identify the component. For example, when the category is ComponentCategory.INVERTER, the type is needed to identify the inverter as a solar inverter or a battery inverter. This can be None when the component category is enough to uniquely identify the component.

TYPE: ComponentType | None DEFAULT: None

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If the given component category is not supported.

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    requests_receiver: Receiver[Request],
    results_sender: Sender[Result],
    component_pool_status_sender: Sender[ComponentPoolStatus],
    wait_for_data_sec: float,
    *,
    component_category: ComponentCategory,
    component_type: ComponentType | None = None,
    name: str | None = None,
) -> None:
    """Create class instance.

    Args:
        requests_receiver: Receiver for receiving power requests from the power
            manager.
        results_sender: Sender for sending results to the power manager.
        component_pool_status_sender: Channel for sending information about which
            components are expected to be working.
        wait_for_data_sec: How long actor should wait before processing first
            request. It is a time needed to collect first components data.
        component_category: The category of the components that this actor is
            responsible for.
        component_type: The type of the component of the given category that this
            actor is responsible for.  This is used only when the component category
            is not enough to uniquely identify the component.  For example, when the
            category is `ComponentCategory.INVERTER`, the type is needed to identify
            the inverter as a solar inverter or a battery inverter.  This can be
            `None` when the component category is enough to uniquely identify the
            component.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.

    Raises:
        ValueError: If the given component category is not supported.
    """
    super().__init__(name=name)
    self._component_category = component_category
    self._component_type = component_type
    self._requests_receiver = requests_receiver
    self._result_sender = results_sender
    self._wait_for_data_sec = wait_for_data_sec

    self._component_manager: ComponentManager
    if component_category == ComponentCategory.BATTERY:
        self._component_manager = BatteryManager(
            component_pool_status_sender, results_sender
        )
    elif component_category == ComponentCategory.EV_CHARGER:
        self._component_manager = EVChargerManager(
            component_pool_status_sender, results_sender
        )
    elif (
        component_category == ComponentCategory.INVERTER
        and component_type == InverterType.SOLAR
    ):
        self._component_manager = PVManager(
            component_pool_status_sender, results_sender
        )
    else:
        raise ValueError(
            f"PowerDistributor doesn't support controlling: {component_category}"
        )
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this actor.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this actor.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    await self._component_manager.stop()
    await super().stop(msg)
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )