Skip to content

actor

frequenz.dispatch.actor ¤

The dispatch actor.

Attributes¤

Classes¤

frequenz.dispatch.actor.DispatchingActor ¤

Bases: Actor

Dispatch actor.

This actor is responsible for handling dispatches for a microgrid.

This means staying in sync with the API and scheduling dispatches as necessary.

Source code in frequenz/dispatch/actor.py
class DispatchingActor(Actor):
    """Dispatch actor.

    This actor is responsible for handling dispatches for a microgrid.

    This means staying in sync with the API and scheduling
    dispatches as necessary.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        microgrid_id: int,
        client: Client,
        lifecycle_updates_sender: Sender[DispatchEvent],
        running_state_change_sender: Sender[Dispatch],
        poll_interval: timedelta = _DEFAULT_POLL_INTERVAL,
    ) -> None:
        """Initialize the actor.

        Args:
            microgrid_id: The microgrid ID to handle dispatches for.
            client: The client to use for fetching dispatches.
            lifecycle_updates_sender: A sender for dispatch lifecycle events.
            running_state_change_sender: A sender for dispatch running state changes.
            poll_interval: The interval to poll the API for dispatche changes.
        """
        super().__init__(name="dispatch")

        self._client = client
        self._dispatches: dict[int, Dispatch] = {}
        self._scheduled: dict[int, asyncio.Task[None]] = {}
        self._microgrid_id = microgrid_id
        self._lifecycle_updates_sender = lifecycle_updates_sender
        self._running_state_change_sender = running_state_change_sender
        self._poll_timer = Timer(poll_interval, SkipMissedAndDrift())

    async def _run(self) -> None:
        """Run the actor."""
        self._poll_timer.reset()
        try:
            async for _ in self._poll_timer:
                await self._fetch()
        except asyncio.CancelledError:
            for task in self._scheduled.values():
                task.cancel()
            raise

    async def _fetch(self) -> None:
        """Fetch all relevant dispatches."""
        old_dispatches = self._dispatches
        self._dispatches = {}

        try:
            _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
            async for client_dispatch in self._client.list(
                microgrid_id=self._microgrid_id
            ):
                dispatch = Dispatch(client_dispatch)

                self._dispatches[dispatch.id] = Dispatch(client_dispatch)
                old_dispatch = old_dispatches.pop(dispatch.id, None)
                if not old_dispatch:
                    self._update_dispatch_schedule(dispatch, None)
                    _logger.info("New dispatch: %s", dispatch)
                    await self._lifecycle_updates_sender.send(
                        Created(dispatch=dispatch)
                    )
                elif dispatch.update_time != old_dispatch.update_time:
                    self._update_dispatch_schedule(dispatch, old_dispatch)
                    _logger.info("Updated dispatch: %s", dispatch)
                    await self._lifecycle_updates_sender.send(
                        Updated(dispatch=dispatch)
                    )

                    if self._running_state_change(dispatch, old_dispatch):
                        await self._send_running_state_change(dispatch)

        except grpc.aio.AioRpcError as error:
            _logger.error("Error fetching dispatches: %s", error)
            self._dispatches = old_dispatches
            return

        for dispatch in old_dispatches.values():
            _logger.info("Deleted dispatch: %s", dispatch)
            dispatch._set_deleted()  # pylint: disable=protected-access
            await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
            if task := self._scheduled.pop(dispatch.id, None):
                task.cancel()

            if self._running_state_change(None, dispatch):
                await self._send_running_state_change(dispatch)

    def _update_dispatch_schedule(
        self, dispatch: Dispatch, old_dispatch: Dispatch | None
    ) -> None:
        """Update the schedule for a dispatch.

        Schedules, reschedules or cancels the dispatch based on the start_time
        and active status.

        For example:
            * when the start_time changes, the dispatch is rescheduled
            * when the dispatch is deactivated, the dispatch is cancelled

        Args:
            dispatch: The dispatch to update the schedule for.
            old_dispatch: The old dispatch, if available.
        """
        if (
            old_dispatch
            and old_dispatch.active
            and old_dispatch.start_time != dispatch.start_time
        ):
            if task := self._scheduled.pop(dispatch.id, None):
                task.cancel()

        if dispatch.active and dispatch.id not in self._scheduled:
            self._scheduled[dispatch.id] = asyncio.create_task(
                self._schedule_task(dispatch)
            )

    async def _schedule_task(self, dispatch: Dispatch) -> None:
        """Wait for a dispatch to become ready.

        Waits for the dispatches next run and then notifies that it is ready.

        Args:
            dispatch: The dispatch to schedule.
        """

        def next_run_info() -> tuple[datetime, datetime] | None:
            now = datetime.now(tz=timezone.utc)
            next_run = dispatch.next_run_after(now)

            if next_run is None:
                return None

            return now, next_run

        while pair := next_run_info():
            now, next_time = pair

            if next_time - now > _MAX_AHEAD_SCHEDULE:
                await asyncio.sleep(_MAX_AHEAD_SCHEDULE.total_seconds())
                continue

            _logger.info("Dispatch %s scheduled for %s", dispatch.id, next_time)
            await asyncio.sleep((next_time - now).total_seconds())

            _logger.info("Dispatch ready: %s", dispatch)
            await self._running_state_change_sender.send(dispatch)

        _logger.info("Dispatch finished: %s", dispatch)
        self._scheduled.pop(dispatch.id)

    def _running_state_change(
        self, updated_dispatch: Dispatch | None, previous_dispatch: Dispatch | None
    ) -> bool:
        """Check if the running state of a dispatch has changed.

        Checks if any of the running state changes to the dispatch
        require a new message to be sent to the actor so that it can potentially
        change its runtime configuration or start/stop itself.

        Also checks if a dispatch update was not sent due to connection issues
        in which case we need to send the message now.

        Args:
            updated_dispatch: The new dispatch, if available.
            previous_dispatch: The old dispatch, if available.

        Returns:
            True if the running state has changed, False otherwise.
        """
        # New dispatch
        if previous_dispatch is None:
            assert updated_dispatch is not None

            # Client was not informed about the dispatch, do it now
            # pylint: disable=protected-access
            if not updated_dispatch._running_status_notified:
                return True

        # Deleted dispatch
        if updated_dispatch is None:
            assert previous_dispatch is not None
            return (
                previous_dispatch.running(previous_dispatch.type)
                == RunningState.RUNNING
            )

        # If any of the runtime attributes changed, we need to send a message
        runtime_state_attributes = [
            "running",
            "type",
            "selector",
            "duration",
            "dry_run",
            "payload",
        ]

        for attribute in runtime_state_attributes:
            if getattr(updated_dispatch, attribute) != getattr(
                previous_dispatch, attribute
            ):
                return True

        return False

    async def _send_running_state_change(self, dispatch: Dispatch) -> None:
        """Send a running state change message.

        Args:
            dispatch: The dispatch that changed.
        """
        await self._running_state_change_sender.send(dispatch)
        # Update the last sent notification time
        # so we know if this change was already sent
        dispatch._set_running_status_notified()  # pylint: disable=protected-access
Attributes¤
RESTART_DELAY class-attribute instance-attribute ¤
RESTART_DELAY: timedelta = timedelta(seconds=2)

The delay to wait between restarts of this actor.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__init__ ¤
__init__(
    microgrid_id: int,
    client: Client,
    lifecycle_updates_sender: Sender[DispatchEvent],
    running_state_change_sender: Sender[Dispatch],
    poll_interval: timedelta = _DEFAULT_POLL_INTERVAL,
) -> None

Initialize the actor.

PARAMETER DESCRIPTION
microgrid_id

The microgrid ID to handle dispatches for.

TYPE: int

client

The client to use for fetching dispatches.

TYPE: Client

lifecycle_updates_sender

A sender for dispatch lifecycle events.

TYPE: Sender[DispatchEvent]

running_state_change_sender

A sender for dispatch running state changes.

TYPE: Sender[Dispatch]

poll_interval

The interval to poll the API for dispatche changes.

TYPE: timedelta DEFAULT: _DEFAULT_POLL_INTERVAL

Source code in frequenz/dispatch/actor.py
def __init__(
    self,
    microgrid_id: int,
    client: Client,
    lifecycle_updates_sender: Sender[DispatchEvent],
    running_state_change_sender: Sender[Dispatch],
    poll_interval: timedelta = _DEFAULT_POLL_INTERVAL,
) -> None:
    """Initialize the actor.

    Args:
        microgrid_id: The microgrid ID to handle dispatches for.
        client: The client to use for fetching dispatches.
        lifecycle_updates_sender: A sender for dispatch lifecycle events.
        running_state_change_sender: A sender for dispatch running state changes.
        poll_interval: The interval to poll the API for dispatche changes.
    """
    super().__init__(name="dispatch")

    self._client = client
    self._dispatches: dict[int, Dispatch] = {}
    self._scheduled: dict[int, asyncio.Task[None]] = {}
    self._microgrid_id = microgrid_id
    self._lifecycle_updates_sender = lifecycle_updates_sender
    self._running_state_change_sender = running_state_change_sender
    self._poll_timer = Timer(poll_interval, SkipMissedAndDrift())
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
start ¤
start() -> None

Start this actor.

If this actor is already running, this method does nothing.

Source code in frequenz/sdk/actor/_actor.py
def start(self) -> None:
    """Start this actor.

    If this actor is already running, this method does nothing.
    """
    if self.is_running:
        return
    self._tasks.clear()
    self._tasks.add(asyncio.create_task(self._run_loop()))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )