Skip to content

Index

frequenz.dispatch ¤

A highlevel interface for the dispatch API.

A small overview of the most important classes in this module:

Attributes¤

frequenz.dispatch.DispatchEvent module-attribute ¤

DispatchEvent = Created | Updated | Deleted

Type that is sent over the channel for dispatch updates.

This type is used to send dispatches that were created, updated or deleted over the channel.

Classes¤

frequenz.dispatch.Created dataclass ¤

A dispatch created event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Created:
    """A dispatch created event."""

    dispatch: Dispatch
    """The dispatch that was created."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was created.

frequenz.dispatch.Deleted dataclass ¤

A dispatch deleted event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Deleted:
    """A dispatch deleted event."""

    dispatch: Dispatch
    """The dispatch that was deleted."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was deleted.

frequenz.dispatch.Dispatch dataclass ¤

Bases: Dispatch

Dispatch type with extra functionality.

Source code in frequenz/dispatch/_dispatch.py
@dataclass(frozen=True)
class Dispatch(BaseDispatch):
    """Dispatch type with extra functionality."""

    deleted: bool = False
    """Whether the dispatch is deleted."""

    running_state_change_synced: datetime | None = None
    """The last time a message was sent about the running state change."""

    def __init__(
        self,
        client_dispatch: BaseDispatch,
        deleted: bool = False,
        running_state_change_synced: datetime | None = None,
    ):
        """Initialize the dispatch.

        Args:
            client_dispatch: The client dispatch.
            deleted: Whether the dispatch is deleted.
            running_state_change_synced: Timestamp of the last running state change message.
        """
        super().__init__(**client_dispatch.__dict__)
        # Work around frozen to set deleted
        object.__setattr__(self, "deleted", deleted)
        object.__setattr__(
            self,
            "running_state_change_synced",
            running_state_change_synced,
        )

    def _set_deleted(self) -> None:
        """Mark the dispatch as deleted."""
        object.__setattr__(self, "deleted", True)

    @property
    def _running_status_notified(self) -> bool:
        """Check that the latest running state change notification was sent.

        Returns:
            True if the latest running state change notification was sent, False otherwise.
        """
        return self.running_state_change_synced == self.update_time

    def _set_running_status_notified(self) -> None:
        """Mark the latest running state change notification as sent."""
        object.__setattr__(self, "running_state_change_synced", self.update_time)

    def running(self, type_: str) -> RunningState:
        """Check if the dispatch is currently supposed to be running.

        Args:
            type_: The type of the dispatch that should be running.

        Returns:
            RUNNING if the dispatch is running,
            STOPPED if it is stopped,
            DIFFERENT_TYPE if it is for a different type.
        """
        if self.type != type_:
            return RunningState.DIFFERENT_TYPE

        if not self.active or self.deleted:
            return RunningState.STOPPED

        now = datetime.now(tz=timezone.utc)

        if now < self.start_time:
            return RunningState.STOPPED
        # A dispatch without duration is always running once it started
        if self.duration is None:
            return RunningState.RUNNING

        if until := self._until(now):
            return RunningState.RUNNING if now < until else RunningState.STOPPED

        return RunningState.STOPPED

    @property
    def until(self) -> datetime | None:
        """Time when the dispatch should end.

        Returns the time that a running dispatch should end.
        If the dispatch is not running, None is returned.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.
        """
        if not self.active or self.deleted:
            return None

        now = datetime.now(tz=timezone.utc)
        return self._until(now)

    @property
    # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
    # value needs documenting
    def missed_runs(self) -> Iterator[datetime]:  # noqa: DOC405
        """Yield all missed runs of a dispatch.

        Yields all missed runs of a dispatch.

        If a running state change notification was not sent in time
        due to connection issues, this method will yield all missed runs
        since the last sent notification.

        Returns:
            A generator that yields all missed runs of a dispatch.
        """
        if self.update_time == self.running_state_change_synced:
            return

        from_time = self.update_time
        now = datetime.now(tz=timezone.utc)

        while (next_run := self.next_run_after(from_time)) and next_run < now:
            yield next_run
            from_time = next_run

    @property
    def next_run(self) -> datetime | None:
        """Calculate the next run of a dispatch.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        return self.next_run_after(datetime.now(tz=timezone.utc))

    def next_run_after(self, after: datetime) -> datetime | None:
        """Calculate the next run of a dispatch.

        Args:
            after: The time to calculate the next run from.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
            or self.duration is None  # Infinite duration
        ):
            if after > self.start_time:
                return None
            return self.start_time

        # Make sure no weekday is UNSPECIFIED
        if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
            _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
            return None

        # No type information for rrule, so we need to cast
        return cast(datetime | None, self._prepare_rrule().after(after, inc=True))

    def _prepare_rrule(self) -> rrule.rrule:
        """Prepare the rrule object.

        Returns:
            The rrule object.

        Raises:
            ValueError: If the interval is invalid.
        """
        count, until = (None, None)
        if end := self.recurrence.end_criteria:
            count = end.count
            until = end.until

        if self.recurrence.interval is None or self.recurrence.interval < 1:
            raise ValueError("Interval must be at least 1")

        rrule_obj = rrule.rrule(
            freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
            dtstart=self.start_time,
            count=count,
            until=until,
            byminute=self.recurrence.byminutes or None,
            byhour=self.recurrence.byhours or None,
            byweekday=[
                _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
            ]
            or None,
            bymonthday=self.recurrence.bymonthdays or None,
            bymonth=self.recurrence.bymonths or None,
            interval=self.recurrence.interval,
        )

        return rrule_obj

    def _until(self, now: datetime) -> datetime | None:
        """Calculate the time when the dispatch should end.

        If no previous run is found, None is returned.

        Args:
            now: The current time.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.

        Raises:
            ValueError: If the dispatch has no duration.
        """
        if self.duration is None:
            raise ValueError("_until: Dispatch has no duration")

        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
        ):
            return self.start_time + self.duration

        latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)

        if not latest_past_start:
            return None

        return latest_past_start + self.duration
Attributes¤
active instance-attribute ¤
active: bool

Indicates whether the dispatch is active and eligible for processing.

create_time instance-attribute ¤
create_time: datetime

The creation time of the dispatch in UTC. Set when a dispatch is created.

deleted class-attribute instance-attribute ¤
deleted: bool = False

Whether the dispatch is deleted.

dry_run instance-attribute ¤
dry_run: bool

Indicates if the dispatch is a dry run.

Executed for logging and monitoring without affecting actual component states.

duration instance-attribute ¤
duration: timedelta | None

The duration of the dispatch, represented as a timedelta.

id instance-attribute ¤
id: int

The unique identifier for the dispatch.

missed_runs property ¤
missed_runs: Iterator[datetime]

Yield all missed runs of a dispatch.

Yields all missed runs of a dispatch.

If a running state change notification was not sent in time due to connection issues, this method will yield all missed runs since the last sent notification.

RETURNS DESCRIPTION
Iterator[datetime]

A generator that yields all missed runs of a dispatch.

next_run property ¤
next_run: datetime | None

Calculate the next run of a dispatch.

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

payload instance-attribute ¤
payload: dict[str, Any]

The dispatch payload containing arbitrary data.

It is structured as needed for the dispatch operation.

recurrence instance-attribute ¤
recurrence: RecurrenceRule

The recurrence rule for the dispatch.

Defining any repeating patterns or schedules.

running_state_change_synced class-attribute instance-attribute ¤
running_state_change_synced: datetime | None = None

The last time a message was sent about the running state change.

start_time instance-attribute ¤
start_time: datetime

The start time of the dispatch in UTC.

started property ¤
started: bool

Check if the dispatch has started.

A dispatch is considered started if the current time is after the start time but before the end time.

Recurring dispatches are considered started if the current time is after the start time of the last occurrence but before the end time of the last occurrence.

target instance-attribute ¤
target: TargetComponents

The target components of the dispatch.

type instance-attribute ¤
type: str

User-defined information about the type of dispatch.

This is understood and processed by downstream applications.

until property ¤
until: datetime | None

Time when the dispatch should end.

Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.

RETURNS DESCRIPTION
datetime | None

The time when the dispatch should end or None if the dispatch is not running.

update_time instance-attribute ¤
update_time: datetime

The last update time of the dispatch in UTC. Set when a dispatch is modified.

Functions¤
__init__ ¤
__init__(
    client_dispatch: Dispatch,
    deleted: bool = False,
    running_state_change_synced: datetime | None = None,
)

Initialize the dispatch.

PARAMETER DESCRIPTION
client_dispatch

The client dispatch.

TYPE: Dispatch

deleted

Whether the dispatch is deleted.

TYPE: bool DEFAULT: False

running_state_change_synced

Timestamp of the last running state change message.

TYPE: datetime | None DEFAULT: None

Source code in frequenz/dispatch/_dispatch.py
def __init__(
    self,
    client_dispatch: BaseDispatch,
    deleted: bool = False,
    running_state_change_synced: datetime | None = None,
):
    """Initialize the dispatch.

    Args:
        client_dispatch: The client dispatch.
        deleted: Whether the dispatch is deleted.
        running_state_change_synced: Timestamp of the last running state change message.
    """
    super().__init__(**client_dispatch.__dict__)
    # Work around frozen to set deleted
    object.__setattr__(self, "deleted", deleted)
    object.__setattr__(
        self,
        "running_state_change_synced",
        running_state_change_synced,
    )
from_protobuf classmethod ¤
from_protobuf(pb_object: Dispatch) -> Dispatch

Convert a protobuf dispatch to a dispatch.

PARAMETER DESCRIPTION
pb_object

The protobuf dispatch to convert.

TYPE: Dispatch

RETURNS DESCRIPTION
Dispatch

The converted dispatch.

Source code in frequenz/client/dispatch/types.py
@classmethod
def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch":
    """Convert a protobuf dispatch to a dispatch.

    Args:
        pb_object: The protobuf dispatch to convert.

    Returns:
        The converted dispatch.
    """
    return Dispatch(
        id=pb_object.metadata.dispatch_id,
        type=pb_object.data.type,
        create_time=to_datetime(pb_object.metadata.create_time),
        update_time=to_datetime(pb_object.metadata.modification_time),
        start_time=to_datetime(pb_object.data.start_time),
        duration=(
            timedelta(seconds=pb_object.data.duration)
            if pb_object.data.duration
            else None
        ),
        target=_target_components_from_protobuf(pb_object.data.target),
        active=pb_object.data.is_active,
        dry_run=pb_object.data.is_dry_run,
        payload=MessageToDict(pb_object.data.payload),
        recurrence=RecurrenceRule.from_protobuf(pb_object.data.recurrence),
    )
next_run_after ¤
next_run_after(after: datetime) -> datetime | None

Calculate the next run of a dispatch.

PARAMETER DESCRIPTION
after

The time to calculate the next run from.

TYPE: datetime

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

Source code in frequenz/dispatch/_dispatch.py
def next_run_after(self, after: datetime) -> datetime | None:
    """Calculate the next run of a dispatch.

    Args:
        after: The time to calculate the next run from.

    Returns:
        The next run of the dispatch or None if the dispatch is finished.
    """
    if (
        not self.recurrence.frequency
        or self.recurrence.frequency == Frequency.UNSPECIFIED
        or self.duration is None  # Infinite duration
    ):
        if after > self.start_time:
            return None
        return self.start_time

    # Make sure no weekday is UNSPECIFIED
    if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
        _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
        return None

    # No type information for rrule, so we need to cast
    return cast(datetime | None, self._prepare_rrule().after(after, inc=True))
running ¤
running(type_: str) -> RunningState

Check if the dispatch is currently supposed to be running.

PARAMETER DESCRIPTION
type_

The type of the dispatch that should be running.

TYPE: str

RETURNS DESCRIPTION
RunningState

RUNNING if the dispatch is running,

RunningState

STOPPED if it is stopped,

RunningState

DIFFERENT_TYPE if it is for a different type.

Source code in frequenz/dispatch/_dispatch.py
def running(self, type_: str) -> RunningState:
    """Check if the dispatch is currently supposed to be running.

    Args:
        type_: The type of the dispatch that should be running.

    Returns:
        RUNNING if the dispatch is running,
        STOPPED if it is stopped,
        DIFFERENT_TYPE if it is for a different type.
    """
    if self.type != type_:
        return RunningState.DIFFERENT_TYPE

    if not self.active or self.deleted:
        return RunningState.STOPPED

    now = datetime.now(tz=timezone.utc)

    if now < self.start_time:
        return RunningState.STOPPED
    # A dispatch without duration is always running once it started
    if self.duration is None:
        return RunningState.RUNNING

    if until := self._until(now):
        return RunningState.RUNNING if now < until else RunningState.STOPPED

    return RunningState.STOPPED
to_protobuf ¤
to_protobuf() -> Dispatch

Convert a dispatch to a protobuf dispatch.

RETURNS DESCRIPTION
Dispatch

The converted protobuf dispatch.

Source code in frequenz/client/dispatch/types.py
def to_protobuf(self) -> PBDispatch:
    """Convert a dispatch to a protobuf dispatch.

    Returns:
        The converted protobuf dispatch.
    """
    payload = Struct()
    payload.update(self.payload)

    return PBDispatch(
        metadata=DispatchMetadata(
            dispatch_id=self.id,
            create_time=to_timestamp(self.create_time),
            modification_time=to_timestamp(self.update_time),
        ),
        data=DispatchData(
            type=self.type,
            start_time=to_timestamp(self.start_time),
            duration=(
                round(self.duration.total_seconds()) if self.duration else None
            ),
            target=_target_components_to_protobuf(self.target),
            is_active=self.active,
            is_dry_run=self.dry_run,
            payload=payload,
            recurrence=self.recurrence.to_protobuf() if self.recurrence else None,
        ),
    )

frequenz.dispatch.DispatchManagingActor ¤

Bases: Actor

Helper class to manage actors based on dispatches.

Example usage:

import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory

from frequenz.channels import Receiver, Broadcast

class MyActor(Actor):
    def __init__(self, updates_channel: Receiver[DispatchUpdate]):
        super().__init__()
        self._updates_channel = updates_channel
        self._dry_run: bool
        self._options : dict[str, Any]

    async def _run(self) -> None:
        while True:
            update = await self._updates_channel.receive()
            print("Received update:", update)

            self.set_components(update.components)
            self._dry_run = update.dry_run
            self._options = update.options

    def set_components(self, components: TargetComponents) -> None:
        match components:
            case [int(), *_] as component_ids:
                print("Dispatch: Setting components to %s", components)
            case [ComponentCategory.BATTERY, *_]:
                print("Dispatch: Using all battery components")
            case unsupported:
                print(
                    "Dispatch: Requested an unsupported target component %r, "
                    "but only component IDs or category BATTERY are supported.",
                    unsupported,
                )

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    dispatcher = Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    )

    # Create update channel to receive dispatch update events pre-start and mid-run
    dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")

    # Start actor and give it an dispatch updates channel receiver
    my_actor = MyActor(dispatch_updates_channel.new_receiver())

    status_receiver = dispatcher.running_status_change.new_receiver()

    managing_actor = DispatchManagingActor(
        actor=my_actor,
        dispatch_type="EXAMPLE",
        running_status_receiver=status_receiver,
        updates_sender=dispatch_updates_channel.new_sender(),
    )

    await asyncio.gather(dispatcher.start(), managing_actor.start())
Source code in frequenz/dispatch/_managing_actor.py
class DispatchManagingActor(Actor):
    """Helper class to manage actors based on dispatches.

    Example usage:

    ```python
    import os
    import asyncio
    from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
    from frequenz.client.dispatch.types import TargetComponents
    from frequenz.client.common.microgrid.components import ComponentCategory

    from frequenz.channels import Receiver, Broadcast

    class MyActor(Actor):
        def __init__(self, updates_channel: Receiver[DispatchUpdate]):
            super().__init__()
            self._updates_channel = updates_channel
            self._dry_run: bool
            self._options : dict[str, Any]

        async def _run(self) -> None:
            while True:
                update = await self._updates_channel.receive()
                print("Received update:", update)

                self.set_components(update.components)
                self._dry_run = update.dry_run
                self._options = update.options

        def set_components(self, components: TargetComponents) -> None:
            match components:
                case [int(), *_] as component_ids:
                    print("Dispatch: Setting components to %s", components)
                case [ComponentCategory.BATTERY, *_]:
                    print("Dispatch: Using all battery components")
                case unsupported:
                    print(
                        "Dispatch: Requested an unsupported target component %r, "
                        "but only component IDs or category BATTERY are supported.",
                        unsupported,
                    )

    async def run():
        url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
        key  = os.getenv("DISPATCH_API_KEY", "some-key")

        microgrid_id = 1

        dispatcher = Dispatcher(
            microgrid_id=microgrid_id,
            server_url=url,
            key=key
        )

        # Create update channel to receive dispatch update events pre-start and mid-run
        dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")

        # Start actor and give it an dispatch updates channel receiver
        my_actor = MyActor(dispatch_updates_channel.new_receiver())

        status_receiver = dispatcher.running_status_change.new_receiver()

        managing_actor = DispatchManagingActor(
            actor=my_actor,
            dispatch_type="EXAMPLE",
            running_status_receiver=status_receiver,
            updates_sender=dispatch_updates_channel.new_sender(),
        )

        await asyncio.gather(dispatcher.start(), managing_actor.start())
    ```
    """

    def __init__(
        self,
        actor: Actor | Set[Actor],
        dispatch_type: str,
        running_status_receiver: Receiver[Dispatch],
        updates_sender: Sender[DispatchUpdate] | None = None,
    ) -> None:
        """Initialize the dispatch handler.

        Args:
            actor: A set of actors or a single actor to manage.
            dispatch_type: The type of dispatches to handle.
            running_status_receiver: The receiver for dispatch running status changes.
            updates_sender: The sender for dispatch events
        """
        super().__init__()
        self._dispatch_rx = running_status_receiver
        self._actors = frozenset([actor] if isinstance(actor, Actor) else actor)
        self._dispatch_type = dispatch_type
        self._updates_sender = updates_sender

    def _start_actors(self) -> None:
        """Start all actors."""
        for actor in self._actors:
            if actor.is_running:
                _logger.warning("Actor %s is already running", actor.name)
            else:
                actor.start()

    async def _stop_actors(self, msg: str) -> None:
        """Stop all actors.

        Args:
            msg: The message to be passed to the actors being stopped.
        """
        for actor in self._actors:
            if actor.is_running:
                await actor.stop(msg)
            else:
                _logger.warning("Actor %s is not running", actor.name)

    async def _run(self) -> None:
        """Wait for dispatches and handle them."""
        async for dispatch in self._dispatch_rx:
            await self._handle_dispatch(dispatch=dispatch)

    async def _handle_dispatch(self, dispatch: Dispatch) -> None:
        """Handle a dispatch.

        Args:
            dispatch: The dispatch to handle.
        """
        running = dispatch.running(self._dispatch_type)
        match running:
            case RunningState.STOPPED:
                _logger.info("Stopped by dispatch %s", dispatch.id)
                await self._stop_actors("Dispatch stopped")
            case RunningState.RUNNING:
                if self._updates_sender is not None:
                    _logger.info("Updated by dispatch %s", dispatch.id)
                    await self._updates_sender.send(
                        DispatchUpdate(
                            components=dispatch.target,
                            dry_run=dispatch.dry_run,
                            options=dispatch.payload,
                        )
                    )

                _logger.info("Started by dispatch %s", dispatch.id)
                self._start_actors()
            case RunningState.DIFFERENT_TYPE:
                _logger.debug(
                    "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
                )
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    actor: Actor | Set[Actor],
    dispatch_type: str,
    running_status_receiver: Receiver[Dispatch],
    updates_sender: Sender[DispatchUpdate] | None = None,
) -> None

Initialize the dispatch handler.

PARAMETER DESCRIPTION
actor

A set of actors or a single actor to manage.

TYPE: Actor | Set[Actor]

dispatch_type

The type of dispatches to handle.

TYPE: str

running_status_receiver

The receiver for dispatch running status changes.

TYPE: Receiver[Dispatch]

updates_sender

The sender for dispatch events

TYPE: Sender[DispatchUpdate] | None DEFAULT: None

Source code in frequenz/dispatch/_managing_actor.py
def __init__(
    self,
    actor: Actor | Set[Actor],
    dispatch_type: str,
    running_status_receiver: Receiver[Dispatch],
    updates_sender: Sender[DispatchUpdate] | None = None,
) -> None:
    """Initialize the dispatch handler.

    Args:
        actor: A set of actors or a single actor to manage.
        dispatch_type: The type of dispatches to handle.
        running_status_receiver: The receiver for dispatch running status changes.
        updates_sender: The sender for dispatch events
    """
    super().__init__()
    self._dispatch_rx = running_status_receiver
    self._actors = frozenset([actor] if isinstance(actor, Actor) else actor)
    self._dispatch_type = dispatch_type
    self._updates_sender = updates_sender
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.dispatch.DispatchUpdate dataclass ¤

Event emitted when the dispatch changes.

Source code in frequenz/dispatch/_managing_actor.py
@dataclass(frozen=True, kw_only=True)
class DispatchUpdate:
    """Event emitted when the dispatch changes."""

    components: TargetComponents
    """Components to be used."""

    dry_run: bool
    """Whether this is a dry run."""

    options: dict[str, Any]
    """Additional options."""
Attributes¤
components instance-attribute ¤
components: TargetComponents

Components to be used.

dry_run instance-attribute ¤
dry_run: bool

Whether this is a dry run.

options instance-attribute ¤
options: dict[str, Any]

Additional options.

frequenz.dispatch.Dispatcher ¤

A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API. It provides two channels:

Lifecycle events

A channel that sends a dispatch event message whenever a dispatch is created, updated or deleted.

Running status change

Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the consumer to start, stop or reconfigure itself.

Processing running state change dispatches
import os
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    dispatcher = Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    )
    await dispatcher.start()

    actor = MagicMock() # replace with your actor

    changed_running_status = dispatcher.running_status_change.new_receiver()

    async for dispatch in changed_running_status:
        match dispatch.running("DEMO_TYPE"):
            case RunningState.RUNNING:
                print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                if actor.is_running:
                    actor.reconfigure(
                        components=dispatch.target,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )  # this will reconfigure the actor
                else:
                    # this will start a new actor with the given components
                    # and run it for the duration of the dispatch
                    actor.start(
                        components=dispatch.target,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )
            case RunningState.STOPPED:
                actor.stop()  # this will stop the actor
            case RunningState.DIFFERENT_TYPE:
                pass  # dispatch not for this type
Getting notification about dispatch lifecycle events
import os
from typing import assert_never

from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    dispatcher = Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    )
    await dispatcher.start()  # this will start the actor

    events_receiver = dispatcher.lifecycle_events.new_receiver()

    async for event in events_receiver:
        match event:
            case Created(dispatch):
                print(f"A dispatch was created: {dispatch}")
            case Deleted(dispatch):
                print(f"A dispatch was deleted: {dispatch}")
            case Updated(dispatch):
                print(f"A dispatch was updated: {dispatch}")
            case _ as unhandled:
                assert_never(unhandled)
Creating a new dispatch and then modifying it.

Note that this uses the lower-level Client class to create and update the dispatch.

import os
from datetime import datetime, timedelta, timezone

from frequenz.client.common.microgrid.components import ComponentCategory

from frequenz.dispatch import Dispatcher

async def run():
    url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
    key  = os.getenv("DISPATCH_API_KEY", "some-key")

    microgrid_id = 1

    dispatcher = Dispatcher(
        microgrid_id=microgrid_id,
        server_url=url,
        key=key
    )
    await dispatcher.start()  # this will start the actor

    # Create a new dispatch
    new_dispatch = await dispatcher.client.create(
        microgrid_id=microgrid_id,
        type="ECHO_FREQUENCY",  # replace with your own type
        start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
        duration=timedelta(minutes=5),
        target=ComponentCategory.INVERTER,
        payload={"font": "Times New Roman"},  # Arbitrary payload data
    )

    # Modify the dispatch
    await dispatcher.client.update(
        microgrid_id=microgrid_id,
        dispatch_id=new_dispatch.id,
        new_fields={"duration": timedelta(minutes=10)}
    )

    # Validate the modification
    modified_dispatch = await dispatcher.client.get(
        microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
    )
    assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
class Dispatcher:
    """A highlevel interface for the dispatch API.

    This class provides a highlevel interface to the dispatch API.
    It provides two channels:

    Lifecycle events:
        A channel that sends a dispatch event message whenever a dispatch
        is created, updated or deleted.

    Running status change:
        Sends a dispatch message whenever a dispatch is ready
        to be executed according to the schedule or the running status of the
        dispatch changed in a way that could potentially require the consumer to start,
        stop or reconfigure itself.

    Example: Processing running state change dispatches
        ```python
        import os
        from frequenz.dispatch import Dispatcher, RunningState
        from unittest.mock import MagicMock

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            dispatcher = Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key
            )
            await dispatcher.start()

            actor = MagicMock() # replace with your actor

            changed_running_status = dispatcher.running_status_change.new_receiver()

            async for dispatch in changed_running_status:
                match dispatch.running("DEMO_TYPE"):
                    case RunningState.RUNNING:
                        print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                        if actor.is_running:
                            actor.reconfigure(
                                components=dispatch.target,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )  # this will reconfigure the actor
                        else:
                            # this will start a new actor with the given components
                            # and run it for the duration of the dispatch
                            actor.start(
                                components=dispatch.target,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )
                    case RunningState.STOPPED:
                        actor.stop()  # this will stop the actor
                    case RunningState.DIFFERENT_TYPE:
                        pass  # dispatch not for this type
        ```

    Example: Getting notification about dispatch lifecycle events
        ```python
        import os
        from typing import assert_never

        from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            dispatcher = Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key
            )
            await dispatcher.start()  # this will start the actor

            events_receiver = dispatcher.lifecycle_events.new_receiver()

            async for event in events_receiver:
                match event:
                    case Created(dispatch):
                        print(f"A dispatch was created: {dispatch}")
                    case Deleted(dispatch):
                        print(f"A dispatch was deleted: {dispatch}")
                    case Updated(dispatch):
                        print(f"A dispatch was updated: {dispatch}")
                    case _ as unhandled:
                        assert_never(unhandled)
        ```

    Example: Creating a new dispatch and then modifying it.
        Note that this uses the lower-level `Client` class to create and update the dispatch.

        ```python
        import os
        from datetime import datetime, timedelta, timezone

        from frequenz.client.common.microgrid.components import ComponentCategory

        from frequenz.dispatch import Dispatcher

        async def run():
            url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
            key  = os.getenv("DISPATCH_API_KEY", "some-key")

            microgrid_id = 1

            dispatcher = Dispatcher(
                microgrid_id=microgrid_id,
                server_url=url,
                key=key
            )
            await dispatcher.start()  # this will start the actor

            # Create a new dispatch
            new_dispatch = await dispatcher.client.create(
                microgrid_id=microgrid_id,
                type="ECHO_FREQUENCY",  # replace with your own type
                start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
                duration=timedelta(minutes=5),
                target=ComponentCategory.INVERTER,
                payload={"font": "Times New Roman"},  # Arbitrary payload data
            )

            # Modify the dispatch
            await dispatcher.client.update(
                microgrid_id=microgrid_id,
                dispatch_id=new_dispatch.id,
                new_fields={"duration": timedelta(minutes=10)}
            )

            # Validate the modification
            modified_dispatch = await dispatcher.client.get(
                microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
            )
            assert modified_dispatch.duration == timedelta(minutes=10)
        ```
    """

    def __init__(
        self,
        *,
        microgrid_id: int,
        server_url: str,
        key: str,
    ):
        """Initialize the dispatcher.

        Args:
            microgrid_id: The microgrid id.
            server_url: The URL of the dispatch service.
            key: The key to access the service.
        """
        self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
        self._lifecycle_events_channel = Broadcast[DispatchEvent](
            name="lifecycle_events"
        )
        self._client = Client(server_url=server_url, key=key)
        self._actor = DispatchingActor(
            microgrid_id,
            self._client,
            self._lifecycle_events_channel.new_sender(),
            self._running_state_channel.new_sender(),
        )

    async def start(self) -> None:
        """Start the actor."""
        self._actor.start()

    @property
    def client(self) -> Client:
        """Return the client."""
        return self._client

    @property
    def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
        """Return new, updated or deleted dispatches receiver fetcher.

        Returns:
            A new receiver for new dispatches.
        """
        return self._lifecycle_events_channel

    @property
    def running_status_change(self) -> ReceiverFetcher[Dispatch]:
        """Return running status change receiver fetcher.

        This receiver will receive a message whenever the current running
        status of a dispatch changes.

        Usually, one message per scheduled run is to be expected.
        However, things get complicated when a dispatch was modified:

        If it was currently running and the modification now says
        it should not be running or running with different parameters,
        then a message will be sent.

        In other words: Any change that is expected to make an actor start, stop
        or reconfigure itself with new parameters causes a message to be
        sent.

        A non-exhaustive list of possible changes that will cause a message to be sent:
         - The normal scheduled start_time has been reached
         - The duration of the dispatch has been modified
         - The start_time has been modified to be in the future
         - The component selection changed
         - The active status changed
         - The dry_run status changed
         - The payload changed
         - The dispatch was deleted

        Note: Reaching the end time (start_time + duration) will not
        send a message, except when it was reached by modifying the duration.


        Returns:
            A new receiver for dispatches whose running status changed.
        """
        return self._running_state_channel
Attributes¤
client property ¤
client: Client

Return the client.

lifecycle_events property ¤
lifecycle_events: ReceiverFetcher[DispatchEvent]

Return new, updated or deleted dispatches receiver fetcher.

RETURNS DESCRIPTION
ReceiverFetcher[DispatchEvent]

A new receiver for new dispatches.

running_status_change property ¤
running_status_change: ReceiverFetcher[Dispatch]

Return running status change receiver fetcher.

This receiver will receive a message whenever the current running status of a dispatch changes.

Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:

If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.

In other words: Any change that is expected to make an actor start, stop or reconfigure itself with new parameters causes a message to be sent.

A non-exhaustive list of possible changes that will cause a message to be sent
  • The normal scheduled start_time has been reached
  • The duration of the dispatch has been modified
  • The start_time has been modified to be in the future
  • The component selection changed
  • The active status changed
  • The dry_run status changed
  • The payload changed
  • The dispatch was deleted

Note: Reaching the end time (start_time + duration) will not send a message, except when it was reached by modifying the duration.

RETURNS DESCRIPTION
ReceiverFetcher[Dispatch]

A new receiver for dispatches whose running status changed.

Functions¤
__init__ ¤
__init__(*, microgrid_id: int, server_url: str, key: str)

Initialize the dispatcher.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

server_url

The URL of the dispatch service.

TYPE: str

key

The key to access the service.

TYPE: str

Source code in frequenz/dispatch/_dispatcher.py
def __init__(
    self,
    *,
    microgrid_id: int,
    server_url: str,
    key: str,
):
    """Initialize the dispatcher.

    Args:
        microgrid_id: The microgrid id.
        server_url: The URL of the dispatch service.
        key: The key to access the service.
    """
    self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
    self._lifecycle_events_channel = Broadcast[DispatchEvent](
        name="lifecycle_events"
    )
    self._client = Client(server_url=server_url, key=key)
    self._actor = DispatchingActor(
        microgrid_id,
        self._client,
        self._lifecycle_events_channel.new_sender(),
        self._running_state_channel.new_sender(),
    )
start async ¤
start() -> None

Start the actor.

Source code in frequenz/dispatch/_dispatcher.py
async def start(self) -> None:
    """Start the actor."""
    self._actor.start()

frequenz.dispatch.ReceiverFetcher ¤

Bases: Protocol[ReceivedT_co]

An interface that just exposes a new_receiver method.

Source code in frequenz/dispatch/_dispatcher.py
class ReceiverFetcher(Protocol[ReceivedT_co]):
    """An interface that just exposes a `new_receiver` method."""

    @abc.abstractmethod
    def new_receiver(
        self, *, name: str | None = None, limit: int = 50
    ) -> Receiver[ReceivedT_co]:
        """Get a receiver from the channel.

        Args:
            name: A name to identify the receiver in the logs.
            limit: The maximum size of the receiver.

        Returns:
            A receiver instance.
        """
Functions¤
new_receiver abstractmethod ¤
new_receiver(
    *, name: str | None = None, limit: int = 50
) -> Receiver[ReceivedT_co]

Get a receiver from the channel.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: str | None DEFAULT: None

limit

The maximum size of the receiver.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[ReceivedT_co]

A receiver instance.

Source code in frequenz/dispatch/_dispatcher.py
@abc.abstractmethod
def new_receiver(
    self, *, name: str | None = None, limit: int = 50
) -> Receiver[ReceivedT_co]:
    """Get a receiver from the channel.

    Args:
        name: A name to identify the receiver in the logs.
        limit: The maximum size of the receiver.

    Returns:
        A receiver instance.
    """

frequenz.dispatch.RunningState ¤

Bases: Enum

The running state of a dispatch.

Source code in frequenz/dispatch/_dispatch.py
class RunningState(Enum):
    """The running state of a dispatch."""

    RUNNING = "RUNNING"
    """The dispatch is running."""

    STOPPED = "STOPPED"
    """The dispatch is stopped."""

    DIFFERENT_TYPE = "DIFFERENT_TYPE"
    """The dispatch is for a different type."""
Attributes¤
DIFFERENT_TYPE class-attribute instance-attribute ¤
DIFFERENT_TYPE = 'DIFFERENT_TYPE'

The dispatch is for a different type.

RUNNING class-attribute instance-attribute ¤
RUNNING = 'RUNNING'

The dispatch is running.

STOPPED class-attribute instance-attribute ¤
STOPPED = 'STOPPED'

The dispatch is stopped.

frequenz.dispatch.Updated dataclass ¤

A dispatch updated event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Updated:
    """A dispatch updated event."""

    dispatch: Dispatch
    """The dispatch that was updated."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was updated.