Skip to content

Index

frequenz.dispatch ¤

A highlevel interface for the dispatch API.

A small overview of the most important classes in this module:

Attributes¤

frequenz.dispatch.DispatchEvent module-attribute ¤

DispatchEvent = Created | Updated | Deleted

Type that is sent over the channel for dispatch updates.

This type is used to send dispatches that were created, updated or deleted over the channel.

Classes¤

frequenz.dispatch.Created dataclass ¤

A dispatch created event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Created:
    """A dispatch created event."""

    dispatch: Dispatch
    """The dispatch that was created."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was created.

frequenz.dispatch.Deleted dataclass ¤

A dispatch deleted event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Deleted:
    """A dispatch deleted event."""

    dispatch: Dispatch
    """The dispatch that was deleted."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was deleted.

frequenz.dispatch.Dispatch dataclass ¤

Bases: Dispatch

Dispatch type with extra functionality.

Source code in frequenz/dispatch/_dispatch.py
@dataclass(frozen=True)
class Dispatch(BaseDispatch):
    """Dispatch type with extra functionality."""

    deleted: bool = False
    """Whether the dispatch is deleted."""

    running_state_change_synced: datetime | None = None
    """The last time a message was sent about the running state change."""

    def __init__(
        self,
        client_dispatch: BaseDispatch,
        deleted: bool = False,
        running_state_change_synced: datetime | None = None,
    ):
        """Initialize the dispatch.

        Args:
            client_dispatch: The client dispatch.
            deleted: Whether the dispatch is deleted.
            running_state_change_synced: Timestamp of the last running state change message.
        """
        super().__init__(**client_dispatch.__dict__)
        # Work around frozen to set deleted
        object.__setattr__(self, "deleted", deleted)
        object.__setattr__(
            self,
            "running_state_change_synced",
            running_state_change_synced,
        )

    def _set_deleted(self) -> None:
        """Mark the dispatch as deleted."""
        object.__setattr__(self, "deleted", True)

    @property
    def _running_status_notified(self) -> bool:
        """Check that the latest running state change notification was sent.

        Returns:
            True if the latest running state change notification was sent, False otherwise.
        """
        return self.running_state_change_synced == self.update_time

    def _set_running_status_notified(self) -> None:
        """Mark the latest running state change notification as sent."""
        object.__setattr__(self, "running_state_change_synced", self.update_time)

    def running(self, type_: str) -> RunningState:
        """Check if the dispatch is currently supposed to be running.

        Args:
            type_: The type of the dispatch that should be running.

        Returns:
            RUNNING if the dispatch is running,
            STOPPED if it is stopped,
            DIFFERENT_TYPE if it is for a different type.
        """
        if self.type != type_:
            return RunningState.DIFFERENT_TYPE

        if not self.active or self.deleted:
            return RunningState.STOPPED

        now = datetime.now(tz=timezone.utc)
        if until := self._until(now):
            return RunningState.RUNNING if now < until else RunningState.STOPPED

        return RunningState.STOPPED

    @property
    def until(self) -> datetime | None:
        """Time when the dispatch should end.

        Returns the time that a running dispatch should end.
        If the dispatch is not running, None is returned.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.
        """
        if not self.active or self.deleted:
            return None

        now = datetime.now(tz=timezone.utc)
        return self._until(now)

    @property
    # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
    # value needs documenting
    def missed_runs(self) -> Iterator[datetime]:  # noqa: DOC405
        """Yield all missed runs of a dispatch.

        Yields all missed runs of a dispatch.

        If a running state change notification was not sent in time
        due to connection issues, this method will yield all missed runs
        since the last sent notification.

        Returns:
            A generator that yields all missed runs of a dispatch.
        """
        if self.update_time == self.running_state_change_synced:
            return

        from_time = self.update_time
        now = datetime.now(tz=timezone.utc)

        while (next_run := self.next_run_after(from_time)) and next_run < now:
            yield next_run
            from_time = next_run

    @property
    def next_run(self) -> datetime | None:
        """Calculate the next run of a dispatch.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        return self.next_run_after(datetime.now(tz=timezone.utc))

    def next_run_after(self, after: datetime) -> datetime | None:
        """Calculate the next run of a dispatch.

        Args:
            after: The time to calculate the next run from.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
        ):
            if after > self.start_time:
                return None
            return self.start_time

        # Make sure no weekday is UNSPECIFIED
        if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
            _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
            return None

        # No type information for rrule, so we need to cast
        return cast(datetime | None, self._prepare_rrule().after(after, inc=True))

    def _prepare_rrule(self) -> rrule.rrule:
        """Prepare the rrule object.

        Returns:
            The rrule object.
        """
        count, until = (None, None)
        if end := self.recurrence.end_criteria:
            count = end.count
            until = end.until

        rrule_obj = rrule.rrule(
            freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
            dtstart=self.start_time,
            count=count,
            until=until,
            byminute=self.recurrence.byminutes,
            byhour=self.recurrence.byhours,
            byweekday=[
                _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
            ],
            bymonthday=self.recurrence.bymonthdays,
            bymonth=self.recurrence.bymonths,
            interval=self.recurrence.interval,
        )

        return rrule_obj

    def _until(self, now: datetime) -> datetime | None:
        """Calculate the time when the dispatch should end.

        If no previous run is found, None is returned.

        Args:
            now: The current time.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.
        """
        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
        ):
            return self.start_time + self.duration

        latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)

        if not latest_past_start:
            return None

        return latest_past_start + self.duration
Attributes¤
active instance-attribute ¤
active: bool

Indicates whether the dispatch is active and eligible for processing.

create_time instance-attribute ¤
create_time: datetime

The creation time of the dispatch in UTC. Set when a dispatch is created.

deleted class-attribute instance-attribute ¤
deleted: bool = False

Whether the dispatch is deleted.

dry_run instance-attribute ¤
dry_run: bool

Indicates if the dispatch is a dry run.

Executed for logging and monitoring without affecting actual component states.

duration instance-attribute ¤
duration: timedelta

The duration of the dispatch, represented as a timedelta.

id instance-attribute ¤
id: int

The unique identifier for the dispatch.

microgrid_id instance-attribute ¤
microgrid_id: int

The identifier of the microgrid to which this dispatch belongs.

missed_runs property ¤
missed_runs: Iterator[datetime]

Yield all missed runs of a dispatch.

Yields all missed runs of a dispatch.

If a running state change notification was not sent in time due to connection issues, this method will yield all missed runs since the last sent notification.

RETURNS DESCRIPTION
Iterator[datetime]

A generator that yields all missed runs of a dispatch.

next_run property ¤
next_run: datetime | None

Calculate the next run of a dispatch.

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

payload instance-attribute ¤
payload: dict[str, Any]

The dispatch payload containing arbitrary data.

It is structured as needed for the dispatch operation.

recurrence instance-attribute ¤
recurrence: RecurrenceRule

The recurrence rule for the dispatch.

Defining any repeating patterns or schedules.

running_state_change_synced class-attribute instance-attribute ¤
running_state_change_synced: datetime | None = None

The last time a message was sent about the running state change.

selector instance-attribute ¤

The component selector specifying which components the dispatch targets.

start_time instance-attribute ¤
start_time: datetime

The start time of the dispatch in UTC.

type instance-attribute ¤
type: str

User-defined information about the type of dispatch.

This is understood and processed by downstream applications.

until property ¤
until: datetime | None

Time when the dispatch should end.

Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.

RETURNS DESCRIPTION
datetime | None

The time when the dispatch should end or None if the dispatch is not running.

update_time instance-attribute ¤
update_time: datetime

The last update time of the dispatch in UTC. Set when a dispatch is modified.

Functions¤
__init__ ¤
__init__(
    client_dispatch: Dispatch,
    deleted: bool = False,
    running_state_change_synced: datetime | None = None,
)

Initialize the dispatch.

PARAMETER DESCRIPTION
client_dispatch

The client dispatch.

TYPE: Dispatch

deleted

Whether the dispatch is deleted.

TYPE: bool DEFAULT: False

running_state_change_synced

Timestamp of the last running state change message.

TYPE: datetime | None DEFAULT: None

Source code in frequenz/dispatch/_dispatch.py
def __init__(
    self,
    client_dispatch: BaseDispatch,
    deleted: bool = False,
    running_state_change_synced: datetime | None = None,
):
    """Initialize the dispatch.

    Args:
        client_dispatch: The client dispatch.
        deleted: Whether the dispatch is deleted.
        running_state_change_synced: Timestamp of the last running state change message.
    """
    super().__init__(**client_dispatch.__dict__)
    # Work around frozen to set deleted
    object.__setattr__(self, "deleted", deleted)
    object.__setattr__(
        self,
        "running_state_change_synced",
        running_state_change_synced,
    )
from_protobuf classmethod ¤
from_protobuf(pb_object: Dispatch) -> Dispatch

Convert a protobuf dispatch to a dispatch.

PARAMETER DESCRIPTION
pb_object

The protobuf dispatch to convert.

TYPE: Dispatch

RETURNS DESCRIPTION
Dispatch

The converted dispatch.

Source code in frequenz/client/dispatch/types.py
@classmethod
def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch":
    """Convert a protobuf dispatch to a dispatch.

    Args:
        pb_object: The protobuf dispatch to convert.

    Returns:
        The converted dispatch.
    """
    return Dispatch(
        id=pb_object.id,
        microgrid_id=pb_object.microgrid_id,
        type=pb_object.type,
        create_time=to_datetime(pb_object.create_time),
        update_time=to_datetime(pb_object.update_time),
        start_time=to_datetime(pb_object.start_time),
        duration=timedelta(seconds=pb_object.duration),
        selector=component_selector_from_protobuf(pb_object.selector),
        active=pb_object.is_active,
        dry_run=pb_object.is_dry_run,
        payload=MessageToDict(pb_object.payload),
        recurrence=RecurrenceRule.from_protobuf(pb_object.recurrence),
    )
next_run_after ¤
next_run_after(after: datetime) -> datetime | None

Calculate the next run of a dispatch.

PARAMETER DESCRIPTION
after

The time to calculate the next run from.

TYPE: datetime

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

Source code in frequenz/dispatch/_dispatch.py
def next_run_after(self, after: datetime) -> datetime | None:
    """Calculate the next run of a dispatch.

    Args:
        after: The time to calculate the next run from.

    Returns:
        The next run of the dispatch or None if the dispatch is finished.
    """
    if (
        not self.recurrence.frequency
        or self.recurrence.frequency == Frequency.UNSPECIFIED
    ):
        if after > self.start_time:
            return None
        return self.start_time

    # Make sure no weekday is UNSPECIFIED
    if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
        _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
        return None

    # No type information for rrule, so we need to cast
    return cast(datetime | None, self._prepare_rrule().after(after, inc=True))
running ¤
running(type_: str) -> RunningState

Check if the dispatch is currently supposed to be running.

PARAMETER DESCRIPTION
type_

The type of the dispatch that should be running.

TYPE: str

RETURNS DESCRIPTION
RunningState

RUNNING if the dispatch is running,

RunningState

STOPPED if it is stopped,

RunningState

DIFFERENT_TYPE if it is for a different type.

Source code in frequenz/dispatch/_dispatch.py
def running(self, type_: str) -> RunningState:
    """Check if the dispatch is currently supposed to be running.

    Args:
        type_: The type of the dispatch that should be running.

    Returns:
        RUNNING if the dispatch is running,
        STOPPED if it is stopped,
        DIFFERENT_TYPE if it is for a different type.
    """
    if self.type != type_:
        return RunningState.DIFFERENT_TYPE

    if not self.active or self.deleted:
        return RunningState.STOPPED

    now = datetime.now(tz=timezone.utc)
    if until := self._until(now):
        return RunningState.RUNNING if now < until else RunningState.STOPPED

    return RunningState.STOPPED
to_protobuf ¤
to_protobuf() -> Dispatch

Convert a dispatch to a protobuf dispatch.

RETURNS DESCRIPTION
Dispatch

The converted protobuf dispatch.

Source code in frequenz/client/dispatch/types.py
def to_protobuf(self) -> PBDispatch:
    """Convert a dispatch to a protobuf dispatch.

    Returns:
        The converted protobuf dispatch.
    """
    pb_dispatch = PBDispatch()

    pb_dispatch.id = self.id
    pb_dispatch.microgrid_id = self.microgrid_id
    pb_dispatch.type = self.type
    pb_dispatch.create_time.CopyFrom(to_timestamp(self.create_time))
    pb_dispatch.update_time.CopyFrom(to_timestamp(self.update_time))
    pb_dispatch.start_time.CopyFrom(to_timestamp(self.start_time))
    pb_dispatch.duration = int(self.duration.total_seconds())
    pb_dispatch.selector.CopyFrom(component_selector_to_protobuf(self.selector))
    pb_dispatch.is_active = self.active
    pb_dispatch.is_dry_run = self.dry_run
    pb_dispatch.payload.update(self.payload)
    pb_dispatch.recurrence.CopyFrom(self.recurrence.to_protobuf())

    return pb_dispatch

frequenz.dispatch.Dispatcher ¤

A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API. It provides two channels:

Lifecycle events

A channel that sends a dispatch event message whenever a dispatch is created, updated or deleted.

Running status change

Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the consumer to start, stop or reconfigure itself.

Processing running state change dispatches
import os
import grpc.aio
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock

async def run():
    host = os.getenv("DISPATCH_API_HOST", "localhost")
    port = os.getenv("DISPATCH_API_PORT", "50051")

    service_address = f"{host}:{port}"
    grpc_channel = grpc.aio.insecure_channel(service_address)
    microgrid_id = 1
    dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
    await dispatcher.start()

    actor = MagicMock() # replace with your actor

    changed_running_status = dispatcher.running_status_change.new_receiver()

    async for dispatch in changed_running_status:
        match dispatch.running("DEMO_TYPE"):
            case RunningState.RUNNING:
                print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                if actor.is_running:
                    actor.reconfigure(
                        components=dispatch.selector,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )  # this will reconfigure the actor
                else:
                    # this will start a new actor with the given components
                    # and run it for the duration of the dispatch
                    actor.start(
                        components=dispatch.selector,
                        run_parameters=dispatch.payload, # custom actor parameters
                        dry_run=dispatch.dry_run,
                        until=dispatch.until,
                    )
            case RunningState.STOPPED:
                actor.stop()  # this will stop the actor
            case RunningState.DIFFERENT_TYPE:
                pass  # dispatch not for this type
Getting notification about dispatch lifecycle events
import os
from typing import assert_never

import grpc.aio
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

async def run():
    host = os.getenv("DISPATCH_API_HOST", "localhost")
    port = os.getenv("DISPATCH_API_PORT", "50051")

    service_address = f"{host}:{port}"
    grpc_channel = grpc.aio.insecure_channel(service_address)
    microgrid_id = 1
    dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
    dispatcher.start()  # this will start the actor

    events_receiver = dispatcher.lifecycle_events.new_receiver()

    async for event in events_receiver:
        match event:
            case Created(dispatch):
                print(f"A dispatch was created: {dispatch}")
            case Deleted(dispatch):
                print(f"A dispatch was deleted: {dispatch}")
            case Updated(dispatch):
                print(f"A dispatch was updated: {dispatch}")
            case _ as unhandled:
                assert_never(unhandled)
Creating a new dispatch and then modifying it.

Note that this uses the lower-level Client class to create and update the dispatch.

import os
from datetime import datetime, timedelta, timezone

import grpc.aio
from frequenz.client.common.microgrid.components import ComponentCategory

from frequenz.dispatch import Dispatcher

async def run():
    host = os.getenv("DISPATCH_API_HOST", "localhost")
    port = os.getenv("DISPATCH_API_PORT", "50051")

    service_address = f"{host}:{port}"
    grpc_channel = grpc.aio.insecure_channel(service_address)
    microgrid_id = 1
    dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
    await dispatcher.start()  # this will start the actor

    # Create a new dispatch
    new_dispatch = await dispatcher.client.create(
        microgrid_id=microgrid_id,
        _type="ECHO_FREQUENCY",  # replace with your own type
        start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
        duration=timedelta(minutes=5),
        selector=ComponentCategory.INVERTER,
        payload={"font": "Times New Roman"},  # Arbitrary payload data
    )

    # Modify the dispatch
    await dispatcher.client.update(
        dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
    )

    # Validate the modification
    modified_dispatch = await dispatcher.client.get(new_dispatch.id)
    assert modified_dispatch.duration == timedelta(minutes=10)
Source code in frequenz/dispatch/_dispatcher.py
class Dispatcher:
    """A highlevel interface for the dispatch API.

    This class provides a highlevel interface to the dispatch API.
    It provides two channels:

    Lifecycle events:
        A channel that sends a dispatch event message whenever a dispatch
        is created, updated or deleted.

    Running status change:
        Sends a dispatch message whenever a dispatch is ready
        to be executed according to the schedule or the running status of the
        dispatch changed in a way that could potentially require the consumer to start,
        stop or reconfigure itself.

    Example: Processing running state change dispatches
        ```python
        import os
        import grpc.aio
        from frequenz.dispatch import Dispatcher, RunningState
        from unittest.mock import MagicMock

        async def run():
            host = os.getenv("DISPATCH_API_HOST", "localhost")
            port = os.getenv("DISPATCH_API_PORT", "50051")

            service_address = f"{host}:{port}"
            grpc_channel = grpc.aio.insecure_channel(service_address)
            microgrid_id = 1
            dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
            await dispatcher.start()

            actor = MagicMock() # replace with your actor

            changed_running_status = dispatcher.running_status_change.new_receiver()

            async for dispatch in changed_running_status:
                match dispatch.running("DEMO_TYPE"):
                    case RunningState.RUNNING:
                        print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
                        if actor.is_running:
                            actor.reconfigure(
                                components=dispatch.selector,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )  # this will reconfigure the actor
                        else:
                            # this will start a new actor with the given components
                            # and run it for the duration of the dispatch
                            actor.start(
                                components=dispatch.selector,
                                run_parameters=dispatch.payload, # custom actor parameters
                                dry_run=dispatch.dry_run,
                                until=dispatch.until,
                            )
                    case RunningState.STOPPED:
                        actor.stop()  # this will stop the actor
                    case RunningState.DIFFERENT_TYPE:
                        pass  # dispatch not for this type
        ```

    Example: Getting notification about dispatch lifecycle events
        ```python
        import os
        from typing import assert_never

        import grpc.aio
        from frequenz.dispatch import Created, Deleted, Dispatcher, Updated

        async def run():
            host = os.getenv("DISPATCH_API_HOST", "localhost")
            port = os.getenv("DISPATCH_API_PORT", "50051")

            service_address = f"{host}:{port}"
            grpc_channel = grpc.aio.insecure_channel(service_address)
            microgrid_id = 1
            dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
            dispatcher.start()  # this will start the actor

            events_receiver = dispatcher.lifecycle_events.new_receiver()

            async for event in events_receiver:
                match event:
                    case Created(dispatch):
                        print(f"A dispatch was created: {dispatch}")
                    case Deleted(dispatch):
                        print(f"A dispatch was deleted: {dispatch}")
                    case Updated(dispatch):
                        print(f"A dispatch was updated: {dispatch}")
                    case _ as unhandled:
                        assert_never(unhandled)
        ```

    Example: Creating a new dispatch and then modifying it.
        Note that this uses the lower-level `Client` class to create and update the dispatch.

        ```python
        import os
        from datetime import datetime, timedelta, timezone

        import grpc.aio
        from frequenz.client.common.microgrid.components import ComponentCategory

        from frequenz.dispatch import Dispatcher

        async def run():
            host = os.getenv("DISPATCH_API_HOST", "localhost")
            port = os.getenv("DISPATCH_API_PORT", "50051")

            service_address = f"{host}:{port}"
            grpc_channel = grpc.aio.insecure_channel(service_address)
            microgrid_id = 1
            dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
            await dispatcher.start()  # this will start the actor

            # Create a new dispatch
            new_dispatch = await dispatcher.client.create(
                microgrid_id=microgrid_id,
                _type="ECHO_FREQUENCY",  # replace with your own type
                start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
                duration=timedelta(minutes=5),
                selector=ComponentCategory.INVERTER,
                payload={"font": "Times New Roman"},  # Arbitrary payload data
            )

            # Modify the dispatch
            await dispatcher.client.update(
                dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
            )

            # Validate the modification
            modified_dispatch = await dispatcher.client.get(new_dispatch.id)
            assert modified_dispatch.duration == timedelta(minutes=10)
        ```
    """

    def __init__(
        self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
    ):
        """Initialize the dispatcher.

        Args:
            microgrid_id: The microgrid id.
            grpc_channel: The gRPC channel.
            svc_addr: The service address.
        """
        self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
        self._lifecycle_events_channel = Broadcast[DispatchEvent](
            name="lifecycle_events"
        )
        self._client = Client(grpc_channel, svc_addr)
        self._actor = DispatchingActor(
            microgrid_id,
            self._client,
            self._lifecycle_events_channel.new_sender(),
            self._running_state_channel.new_sender(),
        )

    async def start(self) -> None:
        """Start the actor."""
        self._actor.start()

    @property
    def client(self) -> Client:
        """Return the client."""
        return self._client

    @property
    def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
        """Return new, updated or deleted dispatches receiver fetcher.

        Returns:
            A new receiver for new dispatches.
        """
        return self._lifecycle_events_channel

    @property
    def running_status_change(self) -> ReceiverFetcher[Dispatch]:
        """Return running status change receiver fetcher.

        This receiver will receive a message whenever the current running
        status of a dispatch changes.

        Usually, one message per scheduled run is to be expected.
        However, things get complicated when a dispatch was modified:

        If it was currently running and the modification now says
        it should not be running or running with different parameters,
        then a message will be sent.

        In other words: Any change that is expected to make an actor start, stop
        or reconfigure itself with new parameters causes a message to be
        sent.

        A non-exhaustive list of possible changes that will cause a message to be sent:
         - The normal scheduled start_time has been reached
         - The duration of the dispatch has been modified
         - The start_time has been modified to be in the future
         - The component selection changed
         - The active status changed
         - The dry_run status changed
         - The payload changed
         - The dispatch was deleted

        Note: Reaching the end time (start_time + duration) will not
        send a message, except when it was reached by modifying the duration.


        Returns:
            A new receiver for dispatches whose running status changed.
        """
        return self._running_state_channel
Attributes¤
client property ¤
client: Client

Return the client.

lifecycle_events property ¤
lifecycle_events: ReceiverFetcher[DispatchEvent]

Return new, updated or deleted dispatches receiver fetcher.

RETURNS DESCRIPTION
ReceiverFetcher[DispatchEvent]

A new receiver for new dispatches.

running_status_change property ¤
running_status_change: ReceiverFetcher[Dispatch]

Return running status change receiver fetcher.

This receiver will receive a message whenever the current running status of a dispatch changes.

Usually, one message per scheduled run is to be expected. However, things get complicated when a dispatch was modified:

If it was currently running and the modification now says it should not be running or running with different parameters, then a message will be sent.

In other words: Any change that is expected to make an actor start, stop or reconfigure itself with new parameters causes a message to be sent.

A non-exhaustive list of possible changes that will cause a message to be sent
  • The normal scheduled start_time has been reached
  • The duration of the dispatch has been modified
  • The start_time has been modified to be in the future
  • The component selection changed
  • The active status changed
  • The dry_run status changed
  • The payload changed
  • The dispatch was deleted

Note: Reaching the end time (start_time + duration) will not send a message, except when it was reached by modifying the duration.

RETURNS DESCRIPTION
ReceiverFetcher[Dispatch]

A new receiver for dispatches whose running status changed.

Functions¤
__init__ ¤
__init__(
    microgrid_id: int, grpc_channel: Channel, svc_addr: str
)

Initialize the dispatcher.

PARAMETER DESCRIPTION
microgrid_id

The microgrid id.

TYPE: int

grpc_channel

The gRPC channel.

TYPE: Channel

svc_addr

The service address.

TYPE: str

Source code in frequenz/dispatch/_dispatcher.py
def __init__(
    self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
):
    """Initialize the dispatcher.

    Args:
        microgrid_id: The microgrid id.
        grpc_channel: The gRPC channel.
        svc_addr: The service address.
    """
    self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
    self._lifecycle_events_channel = Broadcast[DispatchEvent](
        name="lifecycle_events"
    )
    self._client = Client(grpc_channel, svc_addr)
    self._actor = DispatchingActor(
        microgrid_id,
        self._client,
        self._lifecycle_events_channel.new_sender(),
        self._running_state_channel.new_sender(),
    )
start async ¤
start() -> None

Start the actor.

Source code in frequenz/dispatch/_dispatcher.py
async def start(self) -> None:
    """Start the actor."""
    self._actor.start()

frequenz.dispatch.ReceiverFetcher ¤

Bases: Protocol[ReceivedT_co]

An interface that just exposes a new_receiver method.

Source code in frequenz/dispatch/_dispatcher.py
class ReceiverFetcher(Protocol[ReceivedT_co]):
    """An interface that just exposes a `new_receiver` method."""

    @abc.abstractmethod
    def new_receiver(
        self, *, name: str | None = None, limit: int = 50
    ) -> Receiver[ReceivedT_co]:
        """Get a receiver from the channel.

        Args:
            name: A name to identify the receiver in the logs.
            limit: The maximum size of the receiver.

        Returns:
            A receiver instance.
        """
Functions¤
new_receiver abstractmethod ¤
new_receiver(
    *, name: str | None = None, limit: int = 50
) -> Receiver[ReceivedT_co]

Get a receiver from the channel.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: str | None DEFAULT: None

limit

The maximum size of the receiver.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[ReceivedT_co]

A receiver instance.

Source code in frequenz/dispatch/_dispatcher.py
@abc.abstractmethod
def new_receiver(
    self, *, name: str | None = None, limit: int = 50
) -> Receiver[ReceivedT_co]:
    """Get a receiver from the channel.

    Args:
        name: A name to identify the receiver in the logs.
        limit: The maximum size of the receiver.

    Returns:
        A receiver instance.
    """

frequenz.dispatch.RunningState ¤

Bases: Enum

The running state of a dispatch.

Source code in frequenz/dispatch/_dispatch.py
class RunningState(Enum):
    """The running state of a dispatch."""

    RUNNING = "RUNNING"
    """The dispatch is running."""

    STOPPED = "STOPPED"
    """The dispatch is stopped."""

    DIFFERENT_TYPE = "DIFFERENT_TYPE"
    """The dispatch is for a different type."""
Attributes¤
DIFFERENT_TYPE class-attribute instance-attribute ¤
DIFFERENT_TYPE = 'DIFFERENT_TYPE'

The dispatch is for a different type.

RUNNING class-attribute instance-attribute ¤
RUNNING = 'RUNNING'

The dispatch is running.

STOPPED class-attribute instance-attribute ¤
STOPPED = 'STOPPED'

The dispatch is stopped.

frequenz.dispatch.Updated dataclass ¤

A dispatch updated event.

Source code in frequenz/dispatch/_event.py
@dataclass(frozen=True)
class Updated:
    """A dispatch updated event."""

    dispatch: Dispatch
    """The dispatch that was updated."""
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch that was updated.