Skip to content

dispatch

frequenz.dispatch ¤

A highlevel interface for the dispatch API.

A small overview of the most important classes in this module:

Attributes¤

frequenz.dispatch.DispatchEvent module-attribute ¤

DispatchEvent = Created | Updated | Deleted

Type that is sent over the channel for dispatch updates.

This type is used to send dispatches that were created, updated or deleted over the channel.

Classes¤

frequenz.dispatch.ActorDispatcher ¤

Bases: BackgroundService

Helper class to manage actors based on dispatches.

Example usage:

import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
from frequenz.sdk.actor import Actor, run

class MyActor(Actor):
    def __init__(
            self,
            *,
            name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
        self._dry_run: bool = False
        self._options: dict[str, Any] = {}

    @classmethod
    def new_with_dispatch(
            cls,
            initial_dispatch: DispatchInfo,
            dispatch_updates_receiver: Receiver[DispatchInfo],
            *,
            name: str | None = None,
    ) -> "Self":
        self = cls(name=name)
        self._dispatch_updates_receiver = dispatch_updates_receiver
        self._update_dispatch_information(initial_dispatch)
        return self

    @override
    async def _run(self) -> None:
        other_recv: Receiver[Any] = ...

        if self._dispatch_updates_receiver is None:
            async for msg in other_recv:
                # do stuff
                ...
        else:
            await self._run_with_dispatch(other_recv)

    async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
        async for selected in select(self._dispatch_updates_receiver, other_recv):
            if selected_from(selected, self._dispatch_updates_receiver):
                self._update_dispatch_information(selected.message)
            elif selected_from(selected, other_recv):
                # do stuff
                ...
            else:
                assert False, f"Unexpected selected receiver: {selected}"

    def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
        print("Received update:", dispatch_update)
        self._dry_run = dispatch_update.dry_run
        self._options = dispatch_update.options
        match dispatch_update.components:
            case []:
                print("Dispatch: Using all components")
            case list() as ids if isinstance(ids[0], int):
                component_ids = ids
            case [ComponentCategory.BATTERY, *_]:
                component_category = ComponentCategory.BATTERY
            case unsupported:
                print(
                    "Dispatch: Requested an unsupported selector %r, "
                    "but only component IDs or category BATTERY are supported.",
                    unsupported,
                )

async def main():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    async with Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    ) as dispatcher:
        status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

        managing_actor = ActorDispatcher(
            actor_factory=MyActor.new_with_dispatch,
            running_status_receiver=status_receiver,
        )

        await run(managing_actor)
Source code in frequenz/dispatch/_actor_dispatcher.py
class ActorDispatcher(BackgroundService):
    """Helper class to manage actors based on dispatches.

    Example usage:

    ```python
    import os
    import asyncio
    from typing import override
    from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
    from frequenz.client.dispatch.types import TargetComponents
    from frequenz.client.common.microgrid.components import ComponentCategory
    from frequenz.channels import Receiver, Broadcast, select, selected_from
    from frequenz.sdk.actor import Actor, run

    class MyActor(Actor):
        def __init__(
                self,
                *,
                name: str | None = None,
        ) -> None:
            super().__init__(name=name)
            self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
            self._dry_run: bool = False
            self._options: dict[str, Any] = {}

        @classmethod
        def new_with_dispatch(
                cls,
                initial_dispatch: DispatchInfo,
                dispatch_updates_receiver: Receiver[DispatchInfo],
                *,
                name: str | None = None,
        ) -> "Self":
            self = cls(name=name)
            self._dispatch_updates_receiver = dispatch_updates_receiver
            self._update_dispatch_information(initial_dispatch)
            return self

        @override
        async def _run(self) -> None:
            other_recv: Receiver[Any] = ...

            if self._dispatch_updates_receiver is None:
                async for msg in other_recv:
                    # do stuff
                    ...
            else:
                await self._run_with_dispatch(other_recv)

        async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
            async for selected in select(self._dispatch_updates_receiver, other_recv):
                if selected_from(selected, self._dispatch_updates_receiver):
                    self._update_dispatch_information(selected.message)
                elif selected_from(selected, other_recv):
                    # do stuff
                    ...
                else:
                    assert False, f"Unexpected selected receiver: {selected}"

        def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
            print("Received update:", dispatch_update)
            self._dry_run = dispatch_update.dry_run
            self._options = dispatch_update.options
            match dispatch_update.components:
                case []:
                    print("Dispatch: Using all components")
                case list() as ids if isinstance(ids[0], int):
                    component_ids = ids
                case [ComponentCategory.BATTERY, *_]:
                    component_category = ComponentCategory.BATTERY
                case unsupported:
                    print(
                        "Dispatch: Requested an unsupported selector %r, "
                        "but only component IDs or category BATTERY are supported.",
                        unsupported,
                    )

    async def main():
        url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
        key  = os.getenv("DISPATCH_API_KEY", "some-key")

        microgrid_id = 1

        async with Dispatcher(
            microgrid_id=microgrid_id,
            server_url=url,
            key=key
        ) as dispatcher:
            status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

            managing_actor = ActorDispatcher(
                actor_factory=MyActor.new_with_dispatch,
                running_status_receiver=status_receiver,
            )

            await run(managing_actor)
    ```
    """

    class FailedDispatchesRetrier(BackgroundService):
        """Manages the retring of failed dispatches."""

        def __init__(self, retry_interval: timedelta) -> None:
            """Initialize the retry manager.

            Args:
                retry_interval: The interval between retries.
            """
            super().__init__()
            self._retry_interval = retry_interval
            self._channel = Broadcast[Dispatch](name="retry_channel")
            self._sender = self._channel.new_sender()

        def start(self) -> None:
            """Start the background service.

            This is a no-op.
            """

        def new_receiver(self) -> Receiver[Dispatch]:
            """Create a new receiver for dispatches to retry.

            Returns:
                The receiver.
            """
            return self._channel.new_receiver()

        def retry(self, dispatch: Dispatch) -> None:
            """Retry a dispatch.

            Args:
                dispatch: The dispatch information to retry.
            """
            task = asyncio.create_task(self._retry_after_delay(dispatch))
            self._tasks.add(task)
            task.add_done_callback(self._tasks.remove)

        async def _retry_after_delay(self, dispatch: Dispatch) -> None:
            """Retry a dispatch after a delay.

            Args:
                dispatch: The dispatch information to retry.
            """
            _logger.info(
                "Will retry dispatch %s after %s",
                dispatch.id,
                self._retry_interval,
            )
            await asyncio.sleep(self._retry_interval.total_seconds())
            _logger.info("Retrying dispatch %s now", dispatch.id)
            await self._sender.send(dispatch)

    def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        actor_factory: Callable[
            [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
        ],
        running_status_receiver: Receiver[Dispatch],
        dispatch_identity: Callable[[Dispatch], int] | None = None,
        retry_interval: timedelta = timedelta(seconds=60),
    ) -> None:
        """Initialize the dispatch handler.

        Args:
            actor_factory: A callable that creates an actor with some initial dispatch
                information.
            running_status_receiver: The receiver for dispatch running status changes.
            dispatch_identity: A function to identify to which actor a dispatch refers.
                By default, it uses the dispatch ID.
            retry_interval: The interval between retries.
        """
        super().__init__()
        self._dispatch_identity: Callable[[Dispatch], int] = (
            dispatch_identity if dispatch_identity else lambda d: d.id
        )

        self._dispatch_rx = running_status_receiver
        self._actor_factory = actor_factory
        self._actors: dict[int, Actor] = {}
        self._updates_channel = Broadcast[DispatchInfo](
            name="dispatch_updates_channel", resend_latest=True
        )
        self._updates_sender = self._updates_channel.new_sender()
        self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)

    def start(self) -> None:
        """Start the background service."""
        self._tasks.add(asyncio.create_task(self._run()))

    async def _start_actor(self, dispatch: Dispatch) -> None:
        """Start all actors."""
        dispatch_update = DispatchInfo(
            components=dispatch.target,
            dry_run=dispatch.dry_run,
            options=dispatch.payload,
        )

        identity = self._dispatch_identity(dispatch)
        actor: Actor | None = self._actors.get(identity)

        if actor:
            sent_str = ""
            if self._updates_sender is not None:
                sent_str = ", sent a dispatch update instead of creating a new actor"
                await self._updates_sender.send(dispatch_update)
            _logger.info(
                "Actor for dispatch type %r is already running%s",
                dispatch.type,
                sent_str,
            )
        else:
            try:
                _logger.info("Starting actor for dispatch type %r", dispatch.type)
                actor = await self._actor_factory(
                    dispatch_update,
                    self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
                )

                actor.start()

            except Exception as e:  # pylint: disable=broad-except
                _logger.error(
                    "Failed to start actor for dispatch type %r",
                    dispatch.type,
                    exc_info=e,
                )
                self._retrier.retry(dispatch)
            else:
                # No exception occurred, so we can add the actor to the list
                self._actors[identity] = actor

    async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
        """Stop all actors.

        Args:
            stopping_dispatch: The dispatch that is stopping the actor.
            msg: The message to be passed to the actors being stopped.
        """
        identity = self._dispatch_identity(stopping_dispatch)

        if actor := self._actors.pop(identity, None):
            await actor.stop(msg)
        else:
            _logger.warning(
                "Actor for dispatch type %r is not running", stopping_dispatch.type
            )

    async def _run(self) -> None:
        """Run the background service."""
        async with self._retrier:
            retry_recv = self._retrier.new_receiver()

            async for selected in select(retry_recv, self._dispatch_rx):
                if retry_recv.triggered(selected):
                    self._retrier.retry(selected.message)
                elif self._dispatch_rx.triggered(selected):
                    await self._handle_dispatch(selected.message)

    async def _handle_dispatch(self, dispatch: Dispatch) -> None:
        """Handle a dispatch.

        Args:
            dispatch: The dispatch to handle.
        """
        if dispatch.started:
            await self._start_actor(dispatch)
        else:
            await self._stop_actor(dispatch, "Dispatch stopped")
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Classes¤
FailedDispatchesRetrier ¤

Bases: BackgroundService

Manages the retring of failed dispatches.

Source code in frequenz/dispatch/_actor_dispatcher.py
class FailedDispatchesRetrier(BackgroundService):
    """Manages the retring of failed dispatches."""

    def __init__(self, retry_interval: timedelta) -> None:
        """Initialize the retry manager.

        Args:
            retry_interval: The interval between retries.
        """
        super().__init__()
        self._retry_interval = retry_interval
        self._channel = Broadcast[Dispatch](name="retry_channel")
        self._sender = self._channel.new_sender()

    def start(self) -> None:
        """Start the background service.

        This is a no-op.
        """

    def new_receiver(self) -> Receiver[Dispatch]:
        """Create a new receiver for dispatches to retry.

        Returns:
            The receiver.
        """
        return self._channel.new_receiver()

    def retry(self, dispatch: Dispatch) -> None:
        """Retry a dispatch.

        Args:
            dispatch: The dispatch information to retry.
        """
        task = asyncio.create_task(self._retry_after_delay(dispatch))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.remove)

    async def _retry_after_delay(self, dispatch: Dispatch) -> None:
        """Retry a dispatch after a delay.

        Args:
            dispatch: The dispatch information to retry.
        """
        _logger.info(
            "Will retry dispatch %s after %s",
            dispatch.id,
            self._retry_interval,
        )
        await asyncio.sleep(self._retry_interval.total_seconds())
        _logger.info("Retrying dispatch %s now", dispatch.id)
        await self._sender.send(dispatch)
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(retry_interval: timedelta) -> None

Initialize the retry manager.

PARAMETER DESCRIPTION
retry_interval

The interval between retries.

TYPE: timedelta

Source code in frequenz/dispatch/_actor_dispatcher.py
def __init__(self, retry_interval: timedelta) -> None:
    """Initialize the retry manager.

    Args:
        retry_interval: The interval between retries.
    """
    super().__init__()
    self._retry_interval = retry_interval
    self._channel = Broadcast[Dispatch](name="retry_channel")
    self._sender = self._channel.new_sender()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
new_receiver ¤
new_receiver() -> Receiver[Dispatch]

Create a new receiver for dispatches to retry.

RETURNS DESCRIPTION
Receiver[Dispatch]

The receiver.

Source code in frequenz/dispatch/_actor_dispatcher.py
def new_receiver(self) -> Receiver[Dispatch]:
    """Create a new receiver for dispatches to retry.

    Returns:
        The receiver.
    """
    return self._channel.new_receiver()
retry ¤
retry(dispatch: Dispatch) -> None

Retry a dispatch.

PARAMETER DESCRIPTION
dispatch

The dispatch information to retry.

TYPE: Dispatch

Source code in frequenz/dispatch/_actor_dispatcher.py
def retry(self, dispatch: Dispatch) -> None:
    """Retry a dispatch.

    Args:
        dispatch: The dispatch information to retry.
    """
    task = asyncio.create_task(self._retry_after_delay(dispatch))
    self._tasks.add(task)
    task.add_done_callback(self._tasks.remove)
start ¤
start() -> None

Start the background service.

This is a no-op.

Source code in frequenz/dispatch/_actor_dispatcher.py
def start(self) -> None:
    """Start the background service.

    This is a no-op.
    """
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )
Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    actor_factory: Callable[
        [DispatchInfo, Receiver[DispatchInfo]],
        Awaitable[Actor],
    ],
    running_status_receiver: Receiver[Dispatch],
    dispatch_identity: (
        Callable[[Dispatch], int] | None
    ) = None,
    retry_interval: timedelta = timedelta(seconds=60),
) -> None

Initialize the dispatch handler.

PARAMETER DESCRIPTION
actor_factory

A callable that creates an actor with some initial dispatch information.

TYPE: Callable[[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]]

running_status_receiver

The receiver for dispatch running status changes.

TYPE: Receiver[Dispatch]

dispatch_identity

A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID.

TYPE: Callable[[Dispatch], int] | None DEFAULT: None

retry_interval

The interval between retries.

TYPE: timedelta DEFAULT: timedelta(seconds=60)

Source code in frequenz/dispatch/_actor_dispatcher.py
def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    actor_factory: Callable[
        [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
    ],
    running_status_receiver: Receiver[Dispatch],
    dispatch_identity: Callable[[Dispatch], int] | None = None,
    retry_interval: timedelta = timedelta(seconds=60),
) -> None:
    """Initialize the dispatch handler.

    Args:
        actor_factory: A callable that creates an actor with some initial dispatch
            information.
        running_status_receiver: The receiver for dispatch running status changes.
        dispatch_identity: A function to identify to which actor a dispatch refers.
            By default, it uses the dispatch ID.
        retry_interval: The interval between retries.
    """
    super().__init__()
    self._dispatch_identity: Callable[[Dispatch], int] = (
        dispatch_identity if dispatch_identity else lambda d: d.id
    )

    self._dispatch_rx = running_status_receiver
    self._actor_factory = actor_factory
    self._actors: dict[int, Actor] = {}
    self._updates_channel = Broadcast[DispatchInfo](
        name="dispatch_updates_channel", resend_latest=True
    )
    self._updates_sender = self._updates_channel.new_sender()
    self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start the background service.

Source code in frequenz/dispatch/_actor_dispatcher.py
def start(self) -> None:
    """Start the background service."""
    self._tasks.add(asyncio.create_task(self._run()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.dispatch.Created dataclass ¤

A dispatch created event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Created:
    """A dispatch created event."""

    dispatch: Dispatch
    """The dispatch that was created."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was created.

frequenz.dispatch.Deleted dataclass ¤

A dispatch deleted event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Deleted:
    """A dispatch deleted event."""

    dispatch: Dispatch
    """The dispatch that was deleted."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was deleted.

frequenz.dispatch.Dispatch dataclass ¤

Bases: Dispatch

Dispatch type with extra functionality.

Source code in frequenz/dispatch/_dispatch.py
@dataclass(frozen=True)
class Dispatch(BaseDispatch):
    """Dispatch type with extra functionality."""

    deleted: bool = False
    """Whether the dispatch is deleted."""

    def __init__(
        self,
        client_dispatch: BaseDispatch,
        deleted: bool = False,
    ):
        """Initialize the dispatch.

        Args:
            client_dispatch: The client dispatch.
            deleted: Whether the dispatch is deleted.
        """
        super().__init__(**client_dispatch.__dict__)
        # Work around frozen to set deleted
        object.__setattr__(self, "deleted", deleted)

    def _set_deleted(self) -> None:
        """Mark the dispatch as deleted."""
        object.__setattr__(self, "deleted", True)

    @property
    def started(self) -> bool:
        """Check if the dispatch is started.

        Returns:
            True if the dispatch is started, False otherwise.
        """
        if self.deleted:
            return False

        return super().started

    # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
    # value needs documenting
    def missed_runs(self, since: datetime) -> Iterator[datetime]:  # noqa: DOC405
        """Yield all missed runs of a dispatch.

        Yields all missed runs of a dispatch.

        Args:
            since: The time to start checking for missed runs.

        Returns:
            A generator that yields all missed runs of a dispatch.

        Yields:
            datetime: The missed run.
        """
        now = datetime.now(tz=timezone.utc)

        while (next_run := self.next_run_after(since)) and next_run < now:
            yield next_run
            since = next_run
Attributes¤
active instance-attribute ¤
active: bool

Indicates whether the dispatch is active and eligible for processing.

create_time instance-attribute ¤
create_time: datetime

The creation time of the dispatch in UTC. Set when a dispatch is created.

deleted class-attribute instance-attribute ¤
deleted: bool = False

Whether the dispatch is deleted.

dry_run instance-attribute ¤
dry_run: bool

Indicates if the dispatch is a dry run.

Executed for logging and monitoring without affecting actual component states.

duration instance-attribute ¤
duration: timedelta | None

The duration of the dispatch, represented as a timedelta.

id instance-attribute ¤
id: int

The unique identifier for the dispatch.

next_run property ¤
next_run: datetime | None

Calculate the next run of a dispatch.

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

payload instance-attribute ¤
payload: dict[str, Any]

The dispatch payload containing arbitrary data.

It is structured as needed for the dispatch operation.

recurrence instance-attribute ¤
recurrence: RecurrenceRule

The recurrence rule for the dispatch.

Defining any repeating patterns or schedules.

start_time instance-attribute ¤
start_time: datetime

The start time of the dispatch in UTC.

started property ¤
started: bool

Check if the dispatch is started.

RETURNS DESCRIPTION
bool

True if the dispatch is started, False otherwise.

target instance-attribute ¤
target: TargetComponents

The target components of the dispatch.

type instance-attribute ¤
type: str

User-defined information about the type of dispatch.

This is understood and processed by downstream applications.

until property ¤
until: datetime | None

Time when the dispatch should end.

Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.

RETURNS DESCRIPTION
datetime | None

The time when the dispatch should end or None if the dispatch is not running.

update_time instance-attribute ¤
update_time: datetime

The last update time of the dispatch in UTC. Set when a dispatch is modified.

Functions¤
__init__ ¤
__init__(client_dispatch: Dispatch, deleted: bool = False)

Initialize the dispatch.

PARAMETER DESCRIPTION
client_dispatch

The client dispatch.

TYPE: Dispatch

deleted

Whether the dispatch is deleted.

TYPE: bool DEFAULT: False

Source code in frequenz/dispatch/_dispatch.py
def __init__(
    self,
    client_dispatch: BaseDispatch,
    deleted: bool = False,
):
    """Initialize the dispatch.

    Args:
        client_dispatch: The client dispatch.
        deleted: Whether the dispatch is deleted.
    """
    super().__init__(**client_dispatch.__dict__)
    # Work around frozen to set deleted
    object.__setattr__(self, "deleted", deleted)
from_protobuf classmethod ¤
from_protobuf(pb_object: Dispatch) -> Dispatch

Convert a protobuf dispatch to a dispatch.

PARAMETER DESCRIPTION
pb_object

The protobuf dispatch to convert.

TYPE: Dispatch

RETURNS DESCRIPTION
Dispatch

The converted dispatch.

Source code in frequenz/client/dispatch/types.py
@classmethod
def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch":
    """Convert a protobuf dispatch to a dispatch.

    Args:
        pb_object: The protobuf dispatch to convert.

    Returns:
        The converted dispatch.
    """
    return Dispatch(
        id=pb_object.metadata.dispatch_id,
        type=pb_object.data.type,
        create_time=to_datetime(pb_object.metadata.create_time),
        update_time=to_datetime(pb_object.metadata.modification_time),
        start_time=to_datetime(pb_object.data.start_time),
        duration=(
            timedelta(seconds=pb_object.data.duration)
            if pb_object.data.HasField("duration")
            else None
        ),
        target=_target_components_from_protobuf(pb_object.data.target),
        active=pb_object.data.is_active,
        dry_run=pb_object.data.is_dry_run,
        payload=MessageToDict(pb_object.data.payload),
        recurrence=RecurrenceRule.from_protobuf(pb_object.data.recurrence),
    )
missed_runs ¤
missed_runs(since: datetime) -> Iterator[datetime]

Yield all missed runs of a dispatch.

Yields all missed runs of a dispatch.

PARAMETER DESCRIPTION
since

The time to start checking for missed runs.

TYPE: datetime

RETURNS DESCRIPTION
Iterator[datetime]

A generator that yields all missed runs of a dispatch.

YIELDS DESCRIPTION
datetime

The missed run.

TYPE:: datetime

Source code in frequenz/dispatch/_dispatch.py
def missed_runs(self, since: datetime) -> Iterator[datetime]:  # noqa: DOC405
    """Yield all missed runs of a dispatch.

    Yields all missed runs of a dispatch.

    Args:
        since: The time to start checking for missed runs.

    Returns:
        A generator that yields all missed runs of a dispatch.

    Yields:
        datetime: The missed run.
    """
    now = datetime.now(tz=timezone.utc)

    while (next_run := self.next_run_after(since)) and next_run < now:
        yield next_run
        since = next_run
next_run_after ¤
next_run_after(after: datetime) -> datetime | None

Calculate the next run of a dispatch.

PARAMETER DESCRIPTION
after

The time to calculate the next run from.

TYPE: datetime

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

Source code in frequenz/client/dispatch/types.py
def next_run_after(self, after: datetime) -> datetime | None:
    """Calculate the next run of a dispatch.

    Args:
        after: The time to calculate the next run from.

    Returns:
        The next run of the dispatch or None if the dispatch is finished.
    """
    if (
        not self.recurrence.frequency
        or self.recurrence.frequency == Frequency.UNSPECIFIED
        or self.duration is None  # Infinite duration
    ):
        if after > self.start_time:
            return None
        return self.start_time

    # Make sure no weekday is UNSPECIFIED
    if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
        return None

    # No type information for rrule, so we need to cast
    return cast(
        datetime | None,
        self.recurrence._as_rrule(  # pylint: disable=protected-access
            self.start_time
        ).after(after, inc=True),
    )
to_protobuf ¤
to_protobuf() -> Dispatch

Convert a dispatch to a protobuf dispatch.

RETURNS DESCRIPTION
Dispatch

The converted protobuf dispatch.

Source code in frequenz/client/dispatch/types.py
def to_protobuf(self) -> PBDispatch:
    """Convert a dispatch to a protobuf dispatch.

    Returns:
        The converted protobuf dispatch.
    """
    payload = Struct()
    payload.update(self.payload)

    return PBDispatch(
        metadata=DispatchMetadata(
            dispatch_id=self.id,
            create_time=to_timestamp(self.create_time),
            modification_time=to_timestamp(self.update_time),
        ),
        data=DispatchData(
            type=self.type,
            start_time=to_timestamp(self.start_time),
            duration=(
                None
                if self.duration is None
                else round(self.duration.total_seconds())
            ),
            target=_target_components_to_protobuf(self.target),
            is_active=self.active,
            is_dry_run=self.dry_run,
            payload=payload,
            recurrence=self.recurrence.to_protobuf() if self.recurrence else None,
        ),
    )

frequenz.dispatch.DispatchInfo dataclass ¤

Event emitted when the dispatch changes.

Source code in frequenz/dispatch/_actor_dispatcher.py
@dataclass(frozen=True, kw_only=True)
class DispatchInfo:
    """Event emitted when the dispatch changes."""

    components: TargetComponents
    """Components to be used."""

    dry_run: bool
    """Whether this is a dry run."""

    options: dict[str, Any]
    """Additional options."""
Attributes¤
components instance-attribute ¤
components: TargetComponents

Components to be used.

dry_run instance-attribute ¤
dry_run: bool

Whether this is a dry run.

options instance-attribute ¤
options: dict[str, Any]

Additional options.

frequenz.dispatch.Dispatcher ¤

Bases: BackgroundService

A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API. It provides receivers for various events and management of actors based on dispatches.

The receivers shortly explained:

  • Lifecycle events receiver: Receives an event whenever a dispatch is created, updated or deleted.
  • Running status change receiver: Receives an event whenever the running status of a dispatch changes. The running status of a dispatch can change due to a variety of reasons, such as but not limited to the dispatch being started, stopped, modified or deleted or reaching its scheduled start or end time.

    Any change that could potentially require the consumer to start, stop or reconfigure itself will cause a message to be sent.

Managing an actor
import os
from frequenz.dispatch import Dispatcher, MergeByType
from unittest.mock import MagicMock

async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
    return MagicMock(dispatch=dispatch, receiver=receiver)

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    async with Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    ) as dispatcher:
        dispatcher.start_managing(
            dispatch_type="DISPATCH_TYPE",
            actor_factory=create_actor,
            merge_strategy=MergeByType(),
        )

        await dispatcher
Processing running state change dispatches
import os
from frequenz.dispatch import Dispatcher
from unittest.mock import MagicMock

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    async with Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    ) as dispatcher:
        actor = MagicMock() # replace with your actor

        rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")

        async for dispatch in rs_receiver:
            if dispatch.started:
                print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                if actor.is_running:
                    actor.reconfigure(
                        components=dispatch.target,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )  # this will reconfigure the actor
                else:
                    # this will start a new actor with the given components
                    # and run it for the duration of the dispatch
                    actor.start(
                        components=dispatch.target,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )
            else:
                actor.stop()  # this will stop the actor
Getting notification about dispatch lifecycle events
import os
from typing import assert_never

from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    async with Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key,
    ) as dispatcher:
        events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")

        async for event in events_receiver:
            match event:
                case Created(dispatch):
                    print(f"A dispatch was created: {dispatch}")
                case Deleted(dispatch):
                    print(f"A dispatch was deleted: {dispatch}")
                case Updated(dispatch):
                    print(f"A dispatch was updated: {dispatch}")
                case _ as unhandled:
                    assert_never(unhandled)
Creating a new dispatch and then modifying it.

Note that this uses the lower-level Client class to create and update the dispatch.

import os
from datetime import datetime, timedelta, timezone

from frequenz.client.common.microgrid.components import ComponentCategory

from frequenz.dispatch import Dispatcher

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    async with Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key,
    ) as dispatcher:
        # Create a new dispatch
        new_dispatch = await dispatcher.client.create(
            microgrid_id=microgrid_id,
            type="ECHO_FREQUENCY",  # replace with your own type
            start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
            duration=timedelta(minutes=5),
            target=ComponentCategory.INVERTER,
            payload={"font": "Times New Roman"},  # Arbitrary payload data
        )

        # Modify the dispatch
        await dispatcher.client.update(
            microgrid_id=microgrid_id,
            dispatch_id=new_dispatch.id,
            new_fields={"duration": timedelta(minutes=10)}
        )

        # Validate the modification
        modified_dispatch = await dispatcher.client.get(
            microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
        )
        assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class Dispatcher(BackgroundService):
    """A highlevel interface for the dispatch API.

    This class provides a highlevel interface to the dispatch API.
    It provides receivers for various events and management of actors based on
    dispatches.

    The receivers shortly explained:

    * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
        Receives an event whenever a dispatch is created, updated or deleted.
    * [Running status change
        receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]:
        Receives an event whenever the running status of a dispatch changes.
        The running status of a dispatch can change due to a variety of reasons,
        such as but not limited to the dispatch being started, stopped, modified
        or deleted or reaching its scheduled start or end time.

        Any change that could potentially require the consumer to start, stop or
        reconfigure itself will cause a message to be sent.

    Example: Managing an actor
        ```python
        import os
        from frequenz.dispatch import Dispatcher, MergeByType
        from unittest.mock import MagicMock

        async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
            return MagicMock(dispatch=dispatch, receiver=receiver)

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            async with Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key
            ) as dispatcher:
                dispatcher.start_managing(
                    dispatch_type="DISPATCH_TYPE",
                    actor_factory=create_actor,
                    merge_strategy=MergeByType(),
                )

                await dispatcher
        ```

    Example: Processing running state change dispatches
        ```python
        import os
        from frequenz.dispatch import Dispatcher
        from unittest.mock import MagicMock

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            async with Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key
            ) as dispatcher:
                actor = MagicMock() # replace with your actor

                rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")

                async for dispatch in rs_receiver:
                    if dispatch.started:
                        print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                        if actor.is_running:
                            actor.reconfigure(
                                components=dispatch.target,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )  # this will reconfigure the actor
                        else:
                            # this will start a new actor with the given components
                            # and run it for the duration of the dispatch
                            actor.start(
                                components=dispatch.target,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )
                    else:
                        actor.stop()  # this will stop the actor
        ```

    Example: Getting notification about dispatch lifecycle events
        ```python
        import os
        from typing import assert_never

        from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            async with Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key,
            ) as dispatcher:
                events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")

                async for event in events_receiver:
                    match event:
                        case Created(dispatch):
                            print(f"A dispatch was created: {dispatch}")
                        case Deleted(dispatch):
                            print(f"A dispatch was deleted: {dispatch}")
                        case Updated(dispatch):
                            print(f"A dispatch was updated: {dispatch}")
                        case _ as unhandled:
                            assert_never(unhandled)
        ```

    Example: Creating a new dispatch and then modifying it.
        Note that this uses the lower-level `Client` class to create and update the dispatch.

        ```python
        import os
        from datetime import datetime, timedelta, timezone

        from frequenz.client.common.microgrid.components import ComponentCategory

        from frequenz.dispatch import Dispatcher

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            async with Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key,
            ) as dispatcher:
                # Create a new dispatch
                new_dispatch = await dispatcher.client.create(
                    microgrid_id=microgrid_id,
                    type="ECHO_FREQUENCY",  # replace with your own type
                    start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
                    duration=timedelta(minutes=5),
                    target=ComponentCategory.INVERTER,
                    payload={"font": "Times New Roman"},  # Arbitrary payload data
                )

                # Modify the dispatch
                await dispatcher.client.update(
                    microgrid_id=microgrid_id,
                    dispatch_id=new_dispatch.id,
                    new_fields={"duration": timedelta(minutes=10)}
                )

                # Validate the modification
                modified_dispatch = await dispatcher.client.get(
                    microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
                )
                assert modified_dispatch.duration == timedelta(minutes=10)
        ```
    """

    def __init__(
        self,
        *,
        microgrid_id: int,
        server_url: str,
        key: str,
    ):
        """Initialize the dispatcher.

        Args:
            microgrid_id: The microgrid id.
            server_url: The URL of the dispatch service.
            key: The key to access the service.
        """
        super().__init__()

        self._client = Client(server_url=server_url, key=key)
        self._bg_service = DispatchScheduler(
            microgrid_id,
            self._client,
        )
        self._actor_dispatchers: dict[str, ActorDispatcher] = {}
        self._empty_event = Event()
        self._empty_event.set()

    @override
    def start(self) -> None:
        """Start the local dispatch service."""
        self._bg_service.start()

    @property
    @override
    def is_running(self) -> bool:
        """Whether the local dispatch service is running."""
        return self._bg_service.is_running

    @override
    async def wait(self) -> None:
        """Wait until all actor dispatches are stopped."""
        await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())

        self._actor_dispatchers.clear()

    @override
    def cancel(self, msg: str | None = None) -> None:
        """Stop the local dispatch service."""
        self._bg_service.cancel(msg)

        for instance in self._actor_dispatchers.values():
            instance.cancel()

    async def wait_for_initialization(self) -> None:
        """Wait until the background service is initialized."""
        await self._bg_service.wait_for_initialization()

    def is_managed(self, dispatch_type: str) -> bool:
        """Check if the dispatcher is managing actors for a given dispatch type.

        Args:
            dispatch_type: The type of the dispatch to check.

        Returns:
            True if the dispatcher is managing actors for the given dispatch type.
        """
        return dispatch_type in self._actor_dispatchers

    async def start_managing(
        self,
        dispatch_type: str,
        *,
        actor_factory: Callable[
            [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
        ],
        merge_strategy: MergeStrategy | None = None,
        retry_interval: timedelta = timedelta(seconds=60),
    ) -> None:
        """Manage actors for a given dispatch type.

        Creates and manages an
        [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
        start, stop and reconfigure actors based on received dispatches.

        You can await the `Dispatcher` instance to block until all types
        registered with `start_managing()` are stopped using
        `stop_managing()`

        "Merging" means that when multiple dispatches are active at the same time,
        the intervals are merged into one.

        This also decides how instances are mapped from dispatches to actors:

        * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
        one single instance identified by the dispatch type.
        * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
        dispatch maps to an instance identified by the dispatch type and target.
        So different dispatches with equal type and target will map to the same
        instance.
        * `None` — No merging, each dispatch maps to a separate instance.

        Args:
            dispatch_type: The type of the dispatch to manage.
            actor_factory: The factory to create actors.
            merge_strategy: The strategy to merge running intervals.
            retry_interval: Retry interval for when actor creation fails.
        """
        dispatcher = self._actor_dispatchers.get(dispatch_type)

        if dispatcher is not None:
            _logger.debug(
                "Ignoring duplicate actor dispatcher request for %r", dispatch_type
            )
            return

        self._empty_event.clear()

        def id_identity(dispatch: Dispatch) -> int:
            return dispatch.id

        dispatcher = ActorDispatcher(
            actor_factory=actor_factory,
            running_status_receiver=await self.new_running_state_event_receiver(
                dispatch_type,
                merge_strategy=merge_strategy,
            ),
            dispatch_identity=(
                id_identity if merge_strategy is None else merge_strategy.identity
            ),
            retry_interval=retry_interval,
        )

        self._actor_dispatchers[dispatch_type] = dispatcher
        dispatcher.start()

    async def stop_managing(self, dispatch_type: str) -> None:
        """Stop managing actors for a given dispatch type.

        Args:
            dispatch_type: The type of the dispatch to stop managing.
        """
        dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
        if dispatcher is not None:
            await dispatcher.stop()

        if not self._actor_dispatchers:
            self._empty_event.set()

    @property
    def client(self) -> Client:
        """Return the client."""
        return self._client

    @override
    async def __aenter__(self) -> Self:
        """Enter an async context.

        Start this background service.

        Returns:
            This background service.
        """
        await super().__aenter__()
        await self.wait_for_initialization()
        return self

    def new_lifecycle_events_receiver(
        self, dispatch_type: str
    ) -> Receiver[DispatchEvent]:
        """Return new, updated or deleted dispatches receiver.

        Args:
            dispatch_type: The type of the dispatch to listen for.

        Returns:
            A new receiver for new dispatches.
        """
        return self._bg_service.new_lifecycle_events_receiver(dispatch_type)

    async def new_running_state_event_receiver(
        self,
        dispatch_type: str,
        *,
        merge_strategy: MergeStrategy | None = None,
    ) -> Receiver[Dispatch]:
        """Return running state event receiver.

        This receiver will receive a message whenever the current running
        status of a dispatch changes.

        Usually, one message per scheduled run is to be expected.
        However, things get complicated when a dispatch was modified:

        If it was currently running and the modification now says
        it should not be running or running with different parameters,
        then a message will be sent.

        In other words: Any change that is expected to make an actor start, stop
        or adjust itself according to new dispatch options causes a message to be
        sent.

        A non-exhaustive list of possible changes that will cause a message to be sent:
         - The normal scheduled start_time has been reached
         - The duration of the dispatch has been modified
         - The start_time has been modified to be in the future
         - The component selection changed
         - The active status changed
         - The dry_run status changed
         - The payload changed
         - The dispatch was deleted

        `merge_strategy` is an instance of a class derived from
        [`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
        are:

        * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
          of the same type
        * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
          dispatches of the same type and target
        * `None` — no merging, just send all events (default)

        Running intervals from multiple dispatches will be merged, according to
        the chosen strategy.

        While merging, stop events are ignored as long as at least one
        merge-criteria-matching dispatch remains active.

        Args:
            dispatch_type: The type of the dispatch to listen for.
            merge_strategy: The type of the strategy to merge running intervals.

        Returns:
            A new receiver for dispatches whose running status changed.
        """
        return await self._bg_service.new_running_state_event_receiver(
            dispatch_type,
            merge_strategy=merge_strategy,
        )
Attributes¤
client property ¤
client: Client

Return the client.

is_running property ¤
is_running: bool

Whether the local dispatch service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/dispatch/_dispatcher.py
@override
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    await super().__aenter__()
    await self.wait_for_initialization()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(*, microgrid_id: int, server_url: str, key: str)

Initialize the dispatcher.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

server_url

The URL of the dispatch service.

TYPE: str

key

The key to access the service.

TYPE: str

Source code in frequenz/dispatch/_dispatcher.py
def __init__(
    self,
    *,
    microgrid_id: int,
    server_url: str,
    key: str,
):
    """Initialize the dispatcher.

    Args:
        microgrid_id: The microgrid id.
        server_url: The URL of the dispatch service.
        key: The key to access the service.
    """
    super().__init__()

    self._client = Client(server_url=server_url, key=key)
    self._bg_service = DispatchScheduler(
        microgrid_id,
        self._client,
    )
    self._actor_dispatchers: dict[str, ActorDispatcher] = {}
    self._empty_event = Event()
    self._empty_event.set()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Stop the local dispatch service.

Source code in frequenz/dispatch/_dispatcher.py
@override
def cancel(self, msg: str | None = None) -> None:
    """Stop the local dispatch service."""
    self._bg_service.cancel(msg)

    for instance in self._actor_dispatchers.values():
        instance.cancel()
is_managed ¤
is_managed(dispatch_type: str) -> bool

Check if the dispatcher is managing actors for a given dispatch type.

PARAMETER DESCRIPTION
dispatch_type

The type of the dispatch to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if the dispatcher is managing actors for the given dispatch type.

Source code in frequenz/dispatch/_dispatcher.py
def is_managed(self, dispatch_type: str) -> bool:
    """Check if the dispatcher is managing actors for a given dispatch type.

    Args:
        dispatch_type: The type of the dispatch to check.

    Returns:
        True if the dispatcher is managing actors for the given dispatch type.
    """
    return dispatch_type in self._actor_dispatchers
new_lifecycle_events_receiver ¤
new_lifecycle_events_receiver(
    dispatch_type: str,
) -> Receiver[DispatchEvent]

Return new, updated or deleted dispatches receiver.

PARAMETER DESCRIPTION
dispatch_type

The type of the dispatch to listen for.

TYPE: str

RETURNS DESCRIPTION
Receiver[DispatchEvent]

A new receiver for new dispatches.

Source code in frequenz/dispatch/_dispatcher.py
def new_lifecycle_events_receiver(
    self, dispatch_type: str
) -> Receiver[DispatchEvent]:
    """Return new, updated or deleted dispatches receiver.

    Args:
        dispatch_type: The type of the dispatch to listen for.

    Returns:
        A new receiver for new dispatches.
    """
    return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
new_running_state_event_receiver async ¤
new_running_state_event_receiver(
    dispatch_type: str,
    *,
    merge_strategy: MergeStrategy | None = None
) -> Receiver[Dispatch]

Return running state event receiver.

This receiver will receive a message whenever the current running status of a dispatch changes.

Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:

If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.

In other words: Any change that is expected to make an actor start, stop or adjust itself according to new dispatch options causes a message to be sent.

A non-exhaustive list of possible changes that will cause a message to be sent
  • The normal scheduled start_time has been reached
  • The duration of the dispatch has been modified
  • The start_time has been modified to be in the future
  • The component selection changed
  • The active status changed
  • The dry_run status changed
  • The payload changed
  • The dispatch was deleted

merge_strategy is an instance of a class derived from MergeStrategy Available strategies are:

  • MergeByType — merges all dispatches of the same type
  • MergeByTypeTarget — merges all dispatches of the same type and target
  • None — no merging, just send all events (default)

Running intervals from multiple dispatches will be merged, according to the chosen strategy.

While merging, stop events are ignored as long as at least one merge-criteria-matching dispatch remains active.

PARAMETER DESCRIPTION
dispatch_type

The type of the dispatch to listen for.

TYPE: str

merge_strategy

The type of the strategy to merge running intervals.

TYPE: MergeStrategy | None DEFAULT: None

RETURNS DESCRIPTION
Receiver[Dispatch]

A new receiver for dispatches whose running status changed.

Source code in frequenz/dispatch/_dispatcher.py
async def new_running_state_event_receiver(
    self,
    dispatch_type: str,
    *,
    merge_strategy: MergeStrategy | None = None,
) -> Receiver[Dispatch]:
    """Return running state event receiver.

    This receiver will receive a message whenever the current running
    status of a dispatch changes.

    Usually, one message per scheduled run is to be expected.
    However, things get complicated when a dispatch was modified:

    If it was currently running and the modification now says
    it should not be running or running with different parameters,
    then a message will be sent.

    In other words: Any change that is expected to make an actor start, stop
    or adjust itself according to new dispatch options causes a message to be
    sent.

    A non-exhaustive list of possible changes that will cause a message to be sent:
     - The normal scheduled start_time has been reached
     - The duration of the dispatch has been modified
     - The start_time has been modified to be in the future
     - The component selection changed
     - The active status changed
     - The dry_run status changed
     - The payload changed
     - The dispatch was deleted

    `merge_strategy` is an instance of a class derived from
    [`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
    are:

    * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
      of the same type
    * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
      dispatches of the same type and target
    * `None` — no merging, just send all events (default)

    Running intervals from multiple dispatches will be merged, according to
    the chosen strategy.

    While merging, stop events are ignored as long as at least one
    merge-criteria-matching dispatch remains active.

    Args:
        dispatch_type: The type of the dispatch to listen for.
        merge_strategy: The type of the strategy to merge running intervals.

    Returns:
        A new receiver for dispatches whose running status changed.
    """
    return await self._bg_service.new_running_state_event_receiver(
        dispatch_type,
        merge_strategy=merge_strategy,
    )
start ¤
start() -> None

Start the local dispatch service.

Source code in frequenz/dispatch/_dispatcher.py
@override
def start(self) -> None:
    """Start the local dispatch service."""
    self._bg_service.start()
start_managing async ¤
start_managing(
    dispatch_type: str,
    *,
    actor_factory: Callable[
        [DispatchInfo, Receiver[DispatchInfo]],
        Awaitable[Actor],
    ],
    merge_strategy: MergeStrategy | None = None,
    retry_interval: timedelta = timedelta(seconds=60)
) -> None

Manage actors for a given dispatch type.

Creates and manages an ActorDispatcher for the given type that will start, stop and reconfigure actors based on received dispatches.

You can await the Dispatcher instance to block until all types registered with start_managing() are stopped using stop_managing()

"Merging" means that when multiple dispatches are active at the same time, the intervals are merged into one.

This also decides how instances are mapped from dispatches to actors:

  • MergeByType — All dispatches map to one single instance identified by the dispatch type.
  • MergeByTypeTarget — A dispatch maps to an instance identified by the dispatch type and target. So different dispatches with equal type and target will map to the same instance.
  • None — No merging, each dispatch maps to a separate instance.
PARAMETER DESCRIPTION
dispatch_type

The type of the dispatch to manage.

TYPE: str

actor_factory

The factory to create actors.

TYPE: Callable[[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]]

merge_strategy

The strategy to merge running intervals.

TYPE: MergeStrategy | None DEFAULT: None

retry_interval

Retry interval for when actor creation fails.

TYPE: timedelta DEFAULT: timedelta(seconds=60)

Source code in frequenz/dispatch/_dispatcher.py
async def start_managing(
    self,
    dispatch_type: str,
    *,
    actor_factory: Callable[
        [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
    ],
    merge_strategy: MergeStrategy | None = None,
    retry_interval: timedelta = timedelta(seconds=60),
) -> None:
    """Manage actors for a given dispatch type.

    Creates and manages an
    [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
    start, stop and reconfigure actors based on received dispatches.

    You can await the `Dispatcher` instance to block until all types
    registered with `start_managing()` are stopped using
    `stop_managing()`

    "Merging" means that when multiple dispatches are active at the same time,
    the intervals are merged into one.

    This also decides how instances are mapped from dispatches to actors:

    * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
    one single instance identified by the dispatch type.
    * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
    dispatch maps to an instance identified by the dispatch type and target.
    So different dispatches with equal type and target will map to the same
    instance.
    * `None` — No merging, each dispatch maps to a separate instance.

    Args:
        dispatch_type: The type of the dispatch to manage.
        actor_factory: The factory to create actors.
        merge_strategy: The strategy to merge running intervals.
        retry_interval: Retry interval for when actor creation fails.
    """
    dispatcher = self._actor_dispatchers.get(dispatch_type)

    if dispatcher is not None:
        _logger.debug(
            "Ignoring duplicate actor dispatcher request for %r", dispatch_type
        )
        return

    self._empty_event.clear()

    def id_identity(dispatch: Dispatch) -> int:
        return dispatch.id

    dispatcher = ActorDispatcher(
        actor_factory=actor_factory,
        running_status_receiver=await self.new_running_state_event_receiver(
            dispatch_type,
            merge_strategy=merge_strategy,
        ),
        dispatch_identity=(
            id_identity if merge_strategy is None else merge_strategy.identity
        ),
        retry_interval=retry_interval,
    )

    self._actor_dispatchers[dispatch_type] = dispatcher
    dispatcher.start()
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
stop_managing async ¤
stop_managing(dispatch_type: str) -> None

Stop managing actors for a given dispatch type.

PARAMETER DESCRIPTION
dispatch_type

The type of the dispatch to stop managing.

TYPE: str

Source code in frequenz/dispatch/_dispatcher.py
async def stop_managing(self, dispatch_type: str) -> None:
    """Stop managing actors for a given dispatch type.

    Args:
        dispatch_type: The type of the dispatch to stop managing.
    """
    dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
    if dispatcher is not None:
        await dispatcher.stop()

    if not self._actor_dispatchers:
        self._empty_event.set()
wait async ¤
wait() -> None

Wait until all actor dispatches are stopped.

Source code in frequenz/dispatch/_dispatcher.py
@override
async def wait(self) -> None:
    """Wait until all actor dispatches are stopped."""
    await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())

    self._actor_dispatchers.clear()
wait_for_initialization async ¤
wait_for_initialization() -> None

Wait until the background service is initialized.

Source code in frequenz/dispatch/_dispatcher.py
async def wait_for_initialization(self) -> None:
    """Wait until the background service is initialized."""
    await self._bg_service.wait_for_initialization()

frequenz.dispatch.MergeByType ¤

Bases: MergeStrategy

Merge running intervals based on the dispatch type.

Source code in frequenz/dispatch/_merge_strategies.py
class MergeByType(MergeStrategy):
    """Merge running intervals based on the dispatch type."""

    @override
    def identity(self, dispatch: Dispatch) -> int:
        """Identity function for the merge criteria."""
        return hash(dispatch.type)

    @override
    def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
        """Filter dispatches based on the merge strategy.

        Keeps start events.
        Keeps stop events only if no other dispatches matching the
        strategy's criteria are running.
        """
        if dispatch.started:
            logging.debug("Keeping start event %s", dispatch.id)
            return True

        other_dispatches_running = any(
            existing_dispatch.started
            for existing_dispatch in dispatches.values()
            if (
                self.identity(existing_dispatch) == self.identity(dispatch)
                and existing_dispatch.id != dispatch.id
            )
        )

        logging.debug(
            "stop event %s because other_dispatches_running=%s",
            dispatch.id,
            other_dispatches_running,
        )
        return not other_dispatches_running
Functions¤
filter ¤
filter(
    dispatches: Mapping[int, Dispatch], dispatch: Dispatch
) -> bool

Filter dispatches based on the merge strategy.

Keeps start events. Keeps stop events only if no other dispatches matching the strategy's criteria are running.

Source code in frequenz/dispatch/_merge_strategies.py
@override
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
    """Filter dispatches based on the merge strategy.

    Keeps start events.
    Keeps stop events only if no other dispatches matching the
    strategy's criteria are running.
    """
    if dispatch.started:
        logging.debug("Keeping start event %s", dispatch.id)
        return True

    other_dispatches_running = any(
        existing_dispatch.started
        for existing_dispatch in dispatches.values()
        if (
            self.identity(existing_dispatch) == self.identity(dispatch)
            and existing_dispatch.id != dispatch.id
        )
    )

    logging.debug(
        "stop event %s because other_dispatches_running=%s",
        dispatch.id,
        other_dispatches_running,
    )
    return not other_dispatches_running
identity ¤
identity(dispatch: Dispatch) -> int

Identity function for the merge criteria.

Source code in frequenz/dispatch/_merge_strategies.py
@override
def identity(self, dispatch: Dispatch) -> int:
    """Identity function for the merge criteria."""
    return hash(dispatch.type)

frequenz.dispatch.MergeByTypeTarget ¤

Bases: MergeByType

Merge running intervals based on the dispatch type and target.

Source code in frequenz/dispatch/_merge_strategies.py
class MergeByTypeTarget(MergeByType):
    """Merge running intervals based on the dispatch type and target."""

    @override
    def identity(self, dispatch: Dispatch) -> int:
        """Identity function for the merge criteria."""
        return hash((dispatch.type, tuple(dispatch.target)))
Functions¤
filter ¤
filter(
    dispatches: Mapping[int, Dispatch], dispatch: Dispatch
) -> bool

Filter dispatches based on the merge strategy.

Keeps start events. Keeps stop events only if no other dispatches matching the strategy's criteria are running.

Source code in frequenz/dispatch/_merge_strategies.py
@override
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
    """Filter dispatches based on the merge strategy.

    Keeps start events.
    Keeps stop events only if no other dispatches matching the
    strategy's criteria are running.
    """
    if dispatch.started:
        logging.debug("Keeping start event %s", dispatch.id)
        return True

    other_dispatches_running = any(
        existing_dispatch.started
        for existing_dispatch in dispatches.values()
        if (
            self.identity(existing_dispatch) == self.identity(dispatch)
            and existing_dispatch.id != dispatch.id
        )
    )

    logging.debug(
        "stop event %s because other_dispatches_running=%s",
        dispatch.id,
        other_dispatches_running,
    )
    return not other_dispatches_running
identity ¤
identity(dispatch: Dispatch) -> int

Identity function for the merge criteria.

Source code in frequenz/dispatch/_merge_strategies.py
@override
def identity(self, dispatch: Dispatch) -> int:
    """Identity function for the merge criteria."""
    return hash((dispatch.type, tuple(dispatch.target)))

frequenz.dispatch.MergeStrategy ¤

Bases: ABC

Base class for strategies to merge running intervals.

Source code in frequenz/dispatch/_bg_service.py
class MergeStrategy(ABC):
    """Base class for strategies to merge running intervals."""

    @abstractmethod
    def identity(self, dispatch: Dispatch) -> int:
        """Identity function for the merge criteria."""

    @abstractmethod
    def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
        """Filter dispatches based on the strategy.

        Args:
            dispatches: All dispatches, available as context.
            dispatch: The dispatch to filter.

        Returns:
            True if the dispatch should be included, False otherwise.
        """
Functions¤
filter abstractmethod ¤
filter(
    dispatches: Mapping[int, Dispatch], dispatch: Dispatch
) -> bool

Filter dispatches based on the strategy.

PARAMETER DESCRIPTION
dispatches

All dispatches, available as context.

TYPE: Mapping[int, Dispatch]

dispatch

The dispatch to filter.

TYPE: Dispatch

RETURNS DESCRIPTION
bool

True if the dispatch should be included, False otherwise.

Source code in frequenz/dispatch/_bg_service.py
@abstractmethod
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
    """Filter dispatches based on the strategy.

    Args:
        dispatches: All dispatches, available as context.
        dispatch: The dispatch to filter.

    Returns:
        True if the dispatch should be included, False otherwise.
    """
identity abstractmethod ¤
identity(dispatch: Dispatch) -> int

Identity function for the merge criteria.

Source code in frequenz/dispatch/_bg_service.py
@abstractmethod
def identity(self, dispatch: Dispatch) -> int:
    """Identity function for the merge criteria."""

frequenz.dispatch.Updated dataclass ¤

A dispatch updated event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Updated:
    """A dispatch updated event."""

    dispatch: Dispatch
    """The dispatch that was updated."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was updated.