Skip to content

actor

frequenz.dispatch.actor ¤

The dispatch actor.

Attributes¤

Classes¤

frequenz.dispatch.actor.DispatchingActor ¤

Bases: Actor

Dispatch actor.

This actor is responsible for handling dispatches for a microgrid.

This means staying in sync with the API and scheduling dispatches as necessary.

Source code in frequenz/dispatch/actor.py
class DispatchingActor(Actor):
    """Dispatch actor.

    This actor is responsible for handling dispatches for a microgrid.

    This means staying in sync with the API and scheduling
    dispatches as necessary.
    """

    @dataclass(order=True)
    class QueueItem:
        """A queue item for the scheduled events."""

        time: datetime
        dispatch_id: int
        dispatch: Dispatch = field(compare=False)

        def __init__(self, time: datetime, dispatch: Dispatch) -> None:
            """Initialize the queue item."""
            self.time = time
            self.dispatch_id = dispatch.id
            self.dispatch = dispatch

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        microgrid_id: int,
        client: Client,
        lifecycle_updates_sender: Sender[DispatchEvent],
        running_state_change_sender: Sender[Dispatch],
    ) -> None:
        """Initialize the actor.

        Args:
            microgrid_id: The microgrid ID to handle dispatches for.
            client: The client to use for fetching dispatches.
            lifecycle_updates_sender: A sender for dispatch lifecycle events.
            running_state_change_sender: A sender for dispatch running state changes.
        """
        super().__init__(name="dispatch")

        self._client = client
        self._dispatches: dict[int, Dispatch] = {}
        self._microgrid_id = microgrid_id
        self._lifecycle_updates_sender = lifecycle_updates_sender
        self._running_state_change_sender = running_state_change_sender
        self._next_event_timer = Timer(
            timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
        )
        """The timer to schedule the next event.

        Interval is chosen arbitrarily, as it will be reset on the first event.
        """

        self._scheduled_events: list["DispatchingActor.QueueItem"] = []
        """The scheduled events, sorted by time.

        Each event is a tuple of the scheduled time and the dispatch.
        heapq is used to keep the list sorted by time, so the next event is
        always at index 0.
        """

    async def _run(self) -> None:
        """Run the actor."""
        _logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)

        # Initial fetch
        await self._fetch()

        stream = self._client.stream(microgrid_id=self._microgrid_id)

        # Streaming updates
        async for selected in select(self._next_event_timer, stream):
            if selected_from(selected, self._next_event_timer):
                if not self._scheduled_events:
                    continue
                _logger.debug(
                    "Executing scheduled event: %s", self._scheduled_events[0].dispatch
                )
                await self._execute_scheduled_event(
                    heappop(self._scheduled_events).dispatch
                )
            elif selected_from(selected, stream):
                _logger.debug("Received dispatch event: %s", selected.message)
                dispatch = Dispatch(selected.message.dispatch)
                match selected.message.event:
                    case Event.CREATED:
                        self._dispatches[dispatch.id] = dispatch
                        await self._update_dispatch_schedule_and_notify(dispatch, None)
                        await self._lifecycle_updates_sender.send(
                            Created(dispatch=dispatch)
                        )
                    case Event.UPDATED:
                        await self._update_dispatch_schedule_and_notify(
                            dispatch, self._dispatches[dispatch.id]
                        )
                        self._dispatches[dispatch.id] = dispatch
                        await self._lifecycle_updates_sender.send(
                            Updated(dispatch=dispatch)
                        )
                    case Event.DELETED:
                        self._dispatches.pop(dispatch.id)
                        await self._update_dispatch_schedule_and_notify(None, dispatch)

                        await self._lifecycle_updates_sender.send(
                            Deleted(dispatch=dispatch)
                        )

    async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
        """Execute a scheduled event.

        Args:
            dispatch: The dispatch to execute.
        """
        await self._send_running_state_change(dispatch)

        # The timer is always a tiny bit delayed, so we need to check if the
        # actor is supposed to be running now (we're assuming it wasn't already
        # running, as all checks are done before scheduling)
        if dispatch.started:
            # If it should be running, schedule the stop event
            self._schedule_stop(dispatch)
        # If the actor is not running, we need to schedule the next start
        else:
            self._schedule_start(dispatch)

        self._update_timer()

    async def _fetch(self) -> None:
        """Fetch all relevant dispatches using list.

        This is used for the initial fetch and for re-fetching all dispatches
        if the connection was lost.
        """
        old_dispatches = self._dispatches
        self._dispatches = {}

        try:
            _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
            async for page in self._client.list(microgrid_id=self._microgrid_id):
                for client_dispatch in page:
                    dispatch = Dispatch(client_dispatch)

                    self._dispatches[dispatch.id] = Dispatch(client_dispatch)
                    old_dispatch = old_dispatches.pop(dispatch.id, None)
                    if not old_dispatch:
                        _logger.info("New dispatch: %s", dispatch)
                        await self._update_dispatch_schedule_and_notify(dispatch, None)
                        await self._lifecycle_updates_sender.send(
                            Created(dispatch=dispatch)
                        )
                    elif dispatch.update_time != old_dispatch.update_time:
                        _logger.info("Updated dispatch: %s", dispatch)
                        await self._update_dispatch_schedule_and_notify(
                            dispatch, old_dispatch
                        )
                        await self._lifecycle_updates_sender.send(
                            Updated(dispatch=dispatch)
                        )

        except grpc.aio.AioRpcError as error:
            _logger.error("Error fetching dispatches: %s", error)
            self._dispatches = old_dispatches
            return

        for dispatch in old_dispatches.values():
            _logger.info("Deleted dispatch: %s", dispatch)
            await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
            await self._update_dispatch_schedule_and_notify(None, dispatch)

            # Set deleted only here as it influences the result of dispatch.started
            # which is used in above in _running_state_change
            dispatch._set_deleted()  # pylint: disable=protected-access
            await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))

    async def _update_dispatch_schedule_and_notify(
        self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
    ) -> None:
        """Update the schedule for a dispatch.

        Schedules, reschedules or cancels the dispatch events
        based on the start_time and active status.

        Sends a running state change notification if necessary.

        For example:
            * when the start_time changes, the dispatch is rescheduled
            * when the dispatch is deactivated, the dispatch is cancelled

        Args:
            dispatch: The dispatch to update the schedule for.
            old_dispatch: The old dispatch, if available.
        """
        # If dispatch is None, the dispatch was deleted
        # and we need to cancel any existing event for it
        if not dispatch and old_dispatch:
            self._remove_scheduled(old_dispatch)

            was_running = old_dispatch.started
            old_dispatch._set_deleted()  # pylint: disable=protected-access)

            # If the dispatch was running, we need to notify
            if was_running:
                await self._send_running_state_change(old_dispatch)

        # A new dispatch was created
        elif dispatch and not old_dispatch:
            assert not self._remove_scheduled(
                dispatch
            ), "New dispatch already scheduled?!"
            # If its currently running, send notification right away
            if dispatch.started:
                await self._send_running_state_change(dispatch)

                self._schedule_stop(dispatch)
            # Otherwise, if it's enabled but not yet running, schedule it
            else:
                self._schedule_start(dispatch)

        # Dispatch was updated
        elif dispatch and old_dispatch:
            # Remove potentially existing scheduled event
            self._remove_scheduled(old_dispatch)

            # Check if the change requires an immediate notification
            if self._update_changed_running_state(dispatch, old_dispatch):
                await self._send_running_state_change(dispatch)

            if dispatch.started:
                self._schedule_stop(dispatch)
            else:
                self._schedule_start(dispatch)

        # We modified the schedule, so we need to reset the timer
        self._update_timer()

    def _update_timer(self) -> None:
        """Update the timer to the next event."""
        if self._scheduled_events:
            due_at: datetime = self._scheduled_events[0].time
            self._next_event_timer.reset(interval=due_at - datetime.now(timezone.utc))
            _logger.debug("Next event scheduled at %s", self._scheduled_events[0].time)

    def _remove_scheduled(self, dispatch: Dispatch) -> bool:
        """Remove a dispatch from the scheduled events.

        Args:
            dispatch: The dispatch to remove.

        Returns:
            True if the dispatch was found and removed, False otherwise.
        """
        for idx, item in enumerate(self._scheduled_events):
            if dispatch.id == item.dispatch.id:
                self._scheduled_events.pop(idx)
                return True

        return False

    def _schedule_start(self, dispatch: Dispatch) -> None:
        """Schedule a dispatch to start.

        Args:
            dispatch: The dispatch to schedule.
        """
        # If the dispatch is not active, don't schedule it
        if not dispatch.active:
            return

        # Schedule the next run
        try:
            if next_run := dispatch.next_run:
                heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
                _logger.debug(
                    "Scheduled dispatch %s to start at %s", dispatch.id, next_run
                )
            else:
                _logger.debug("Dispatch %s has no next run", dispatch.id)
        except ValueError as error:
            _logger.error("Error scheduling dispatch %s: %s", dispatch.id, error)

    def _schedule_stop(self, dispatch: Dispatch) -> None:
        """Schedule a dispatch to stop.

        Args:
            dispatch: The dispatch to schedule.
        """
        # Setup stop timer if the dispatch has a duration
        if dispatch.duration and dispatch.duration > timedelta(seconds=0):
            until = dispatch.until
            assert until is not None
            heappush(self._scheduled_events, self.QueueItem(until, dispatch))
            _logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)

    def _update_changed_running_state(
        self, updated_dispatch: Dispatch, previous_dispatch: Dispatch
    ) -> bool:
        """Check if the running state of a dispatch has changed.

        Checks if any of the running state changes to the dispatch
        require a new message to be sent to the actor so that it can potentially
        change its runtime configuration or start/stop itself.

        Also checks if a dispatch update was not sent due to connection issues
        in which case we need to send the message now.

        Args:
            updated_dispatch: The new dispatch
            previous_dispatch: The old dispatch

        Returns:
            True if the running state has changed, False otherwise.
        """
        # If any of the runtime attributes changed, we need to send a message
        runtime_state_attributes = [
            "started",
            "type",
            "target",
            "duration",
            "dry_run",
            "payload",
        ]

        for attribute in runtime_state_attributes:
            if getattr(updated_dispatch, attribute) != getattr(
                previous_dispatch, attribute
            ):
                return True

        return False

    async def _send_running_state_change(self, dispatch: Dispatch) -> None:
        """Send a running state change message.

        Args:
            dispatch: The dispatch that changed.
        """
        await self._running_state_change_sender.send(dispatch)
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Classes¤
QueueItem dataclass ¤

A queue item for the scheduled events.

Source code in frequenz/dispatch/actor.py
@dataclass(order=True)
class QueueItem:
    """A queue item for the scheduled events."""

    time: datetime
    dispatch_id: int
    dispatch: Dispatch = field(compare=False)

    def __init__(self, time: datetime, dispatch: Dispatch) -> None:
        """Initialize the queue item."""
        self.time = time
        self.dispatch_id = dispatch.id
        self.dispatch = dispatch
Functions¤
__init__ ¤
__init__(time: datetime, dispatch: Dispatch) -> None

Initialize the queue item.

Source code in frequenz/dispatch/actor.py
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
    """Initialize the queue item."""
    self.time = time
    self.dispatch_id = dispatch.id
    self.dispatch = dispatch
Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    microgrid_id: int,
    client: Client,
    lifecycle_updates_sender: Sender[DispatchEvent],
    running_state_change_sender: Sender[Dispatch],
) -> None

Initialize the actor.

PARAMETER DESCRIPTION
microgrid_id

The microgrid ID to handle dispatches for.

TYPE: int

client

The client to use for fetching dispatches.

TYPE: Client

lifecycle_updates_sender

A sender for dispatch lifecycle events.

TYPE: Sender[DispatchEvent]

running_state_change_sender

A sender for dispatch running state changes.

TYPE: Sender[Dispatch]

Source code in frequenz/dispatch/actor.py
def __init__(
    self,
    microgrid_id: int,
    client: Client,
    lifecycle_updates_sender: Sender[DispatchEvent],
    running_state_change_sender: Sender[Dispatch],
) -> None:
    """Initialize the actor.

    Args:
        microgrid_id: The microgrid ID to handle dispatches for.
        client: The client to use for fetching dispatches.
        lifecycle_updates_sender: A sender for dispatch lifecycle events.
        running_state_change_sender: A sender for dispatch running state changes.
    """
    super().__init__(name="dispatch")

    self._client = client
    self._dispatches: dict[int, Dispatch] = {}
    self._microgrid_id = microgrid_id
    self._lifecycle_updates_sender = lifecycle_updates_sender
    self._running_state_change_sender = running_state_change_sender
    self._next_event_timer = Timer(
        timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
    )
    """The timer to schedule the next event.

    Interval is chosen arbitrarily, as it will be reset on the first event.
    """

    self._scheduled_events: list["DispatchingActor.QueueItem"] = []
    """The scheduled events, sorted by time.

    Each event is a tuple of the scheduled time and the dispatch.
    heapq is used to keep the list sorted by time, so the next event is
    always at index 0.
    """
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

Functions¤