Skip to content

types

frequenz.client.dispatch.types ¤

Type wrappers for the generated protobuf messages.

Attributes¤

frequenz.client.dispatch.types.TargetComponents module-attribute ¤

TargetComponents = list[int] | list[ComponentCategory]

One or more target components specifying which components a dispatch targets.

It can be a list of component IDs or a list of categories.

Classes¤

frequenz.client.dispatch.types.Dispatch dataclass ¤

Represents a dispatch operation within a microgrid system.

Source code in frequenz/client/dispatch/types.py
@dataclass(kw_only=True, frozen=True)
class Dispatch:  # pylint: disable=too-many-instance-attributes
    """Represents a dispatch operation within a microgrid system."""

    id: int
    """The unique identifier for the dispatch."""

    type: str
    """User-defined information about the type of dispatch.

    This is understood and processed by downstream applications."""

    start_time: datetime
    """The start time of the dispatch in UTC."""

    duration: timedelta | None
    """The duration of the dispatch, represented as a timedelta."""

    target: TargetComponents
    """The target components of the dispatch."""

    active: bool
    """Indicates whether the dispatch is active and eligible for processing."""

    dry_run: bool
    """Indicates if the dispatch is a dry run.

    Executed for logging and monitoring without affecting actual component states."""

    payload: dict[str, Any]
    """The dispatch payload containing arbitrary data.

    It is structured as needed for the dispatch operation."""

    recurrence: RecurrenceRule
    """The recurrence rule for the dispatch.

    Defining any repeating patterns or schedules."""

    create_time: datetime
    """The creation time of the dispatch in UTC. Set when a dispatch is created."""

    update_time: datetime
    """The last update time of the dispatch in UTC. Set when a dispatch is modified."""

    @property
    def started(self) -> bool:
        """Check if the dispatch has started.

        A dispatch is considered started if the current time is after the start
        time but before the end time.

        Recurring dispatches are considered started if the current time is after
        the start time of the last occurrence but before the end time of the
        last occurrence.
        """
        if not self.active:
            return False

        now = datetime.now(tz=timezone.utc)

        if now < self.start_time:
            return False

        # A dispatch without duration is always running, once it started
        if self.duration is None:
            return True

        if until := self._until(now):
            return now < until

        return False

    @property
    def until(self) -> datetime | None:
        """Time when the dispatch should end.

        Returns the time that a running dispatch should end.
        If the dispatch is not running, None is returned.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.
        """
        if not self.active:
            return None

        now = datetime.now(tz=timezone.utc)
        return self._until(now)

    @property
    def next_run(self) -> datetime | None:
        """Calculate the next run of a dispatch.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        return self.next_run_after(datetime.now(tz=timezone.utc))

    def next_run_after(self, after: datetime) -> datetime | None:
        """Calculate the next run of a dispatch.

        Args:
            after: The time to calculate the next run from.

        Returns:
            The next run of the dispatch or None if the dispatch is finished.
        """
        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
            or self.duration is None  # Infinite duration
        ):
            if after > self.start_time:
                return None
            return self.start_time

        # Make sure no weekday is UNSPECIFIED
        if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
            return None

        # No type information for rrule, so we need to cast
        return cast(
            datetime | None,
            self.recurrence._as_rrule(  # pylint: disable=protected-access
                self.start_time
            ).after(after, inc=True),
        )

    def _until(self, now: datetime) -> datetime | None:
        """Calculate the time when the dispatch should end.

        If no previous run is found, None is returned.

        Args:
            now: The current time.

        Returns:
            The time when the dispatch should end or None if the dispatch is not running.

        Raises:
            ValueError: If the dispatch has no duration.
        """
        if self.duration is None:
            raise ValueError("_until: Dispatch has no duration")

        if (
            not self.recurrence.frequency
            or self.recurrence.frequency == Frequency.UNSPECIFIED
        ):
            return self.start_time + self.duration

        latest_past_start: datetime | None = (
            self.recurrence._as_rrule(  # pylint: disable=protected-access
                self.start_time
            ).before(now, inc=True)
        )

        if not latest_past_start:
            return None

        return latest_past_start + self.duration

    @classmethod
    def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch":
        """Convert a protobuf dispatch to a dispatch.

        Args:
            pb_object: The protobuf dispatch to convert.

        Returns:
            The converted dispatch.
        """
        return Dispatch(
            id=pb_object.metadata.dispatch_id,
            type=pb_object.data.type,
            create_time=to_datetime(pb_object.metadata.create_time),
            update_time=to_datetime(pb_object.metadata.modification_time),
            start_time=to_datetime(pb_object.data.start_time),
            duration=(
                timedelta(seconds=pb_object.data.duration)
                if pb_object.data.duration
                else None
            ),
            target=_target_components_from_protobuf(pb_object.data.target),
            active=pb_object.data.is_active,
            dry_run=pb_object.data.is_dry_run,
            payload=MessageToDict(pb_object.data.payload),
            recurrence=RecurrenceRule.from_protobuf(pb_object.data.recurrence),
        )

    def to_protobuf(self) -> PBDispatch:
        """Convert a dispatch to a protobuf dispatch.

        Returns:
            The converted protobuf dispatch.
        """
        payload = Struct()
        payload.update(self.payload)

        return PBDispatch(
            metadata=DispatchMetadata(
                dispatch_id=self.id,
                create_time=to_timestamp(self.create_time),
                modification_time=to_timestamp(self.update_time),
            ),
            data=DispatchData(
                type=self.type,
                start_time=to_timestamp(self.start_time),
                duration=(
                    round(self.duration.total_seconds()) if self.duration else None
                ),
                target=_target_components_to_protobuf(self.target),
                is_active=self.active,
                is_dry_run=self.dry_run,
                payload=payload,
                recurrence=self.recurrence.to_protobuf() if self.recurrence else None,
            ),
        )
Attributes¤
active instance-attribute ¤
active: bool

Indicates whether the dispatch is active and eligible for processing.

create_time instance-attribute ¤
create_time: datetime

The creation time of the dispatch in UTC. Set when a dispatch is created.

dry_run instance-attribute ¤
dry_run: bool

Indicates if the dispatch is a dry run.

Executed for logging and monitoring without affecting actual component states.

duration instance-attribute ¤
duration: timedelta | None

The duration of the dispatch, represented as a timedelta.

id instance-attribute ¤
id: int

The unique identifier for the dispatch.

next_run property ¤
next_run: datetime | None

Calculate the next run of a dispatch.

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

payload instance-attribute ¤
payload: dict[str, Any]

The dispatch payload containing arbitrary data.

It is structured as needed for the dispatch operation.

recurrence instance-attribute ¤
recurrence: RecurrenceRule

The recurrence rule for the dispatch.

Defining any repeating patterns or schedules.

start_time instance-attribute ¤
start_time: datetime

The start time of the dispatch in UTC.

started property ¤
started: bool

Check if the dispatch has started.

A dispatch is considered started if the current time is after the start time but before the end time.

Recurring dispatches are considered started if the current time is after the start time of the last occurrence but before the end time of the last occurrence.

target instance-attribute ¤

The target components of the dispatch.

type instance-attribute ¤
type: str

User-defined information about the type of dispatch.

This is understood and processed by downstream applications.

until property ¤
until: datetime | None

Time when the dispatch should end.

Returns the time that a running dispatch should end. If the dispatch is not running, None is returned.

RETURNS DESCRIPTION
datetime | None

The time when the dispatch should end or None if the dispatch is not running.

update_time instance-attribute ¤
update_time: datetime

The last update time of the dispatch in UTC. Set when a dispatch is modified.

Functions¤
from_protobuf classmethod ¤
from_protobuf(pb_object: Dispatch) -> Dispatch

Convert a protobuf dispatch to a dispatch.

PARAMETER DESCRIPTION
pb_object

The protobuf dispatch to convert.

TYPE: Dispatch

RETURNS DESCRIPTION
Dispatch

The converted dispatch.

Source code in frequenz/client/dispatch/types.py
@classmethod
def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch":
    """Convert a protobuf dispatch to a dispatch.

    Args:
        pb_object: The protobuf dispatch to convert.

    Returns:
        The converted dispatch.
    """
    return Dispatch(
        id=pb_object.metadata.dispatch_id,
        type=pb_object.data.type,
        create_time=to_datetime(pb_object.metadata.create_time),
        update_time=to_datetime(pb_object.metadata.modification_time),
        start_time=to_datetime(pb_object.data.start_time),
        duration=(
            timedelta(seconds=pb_object.data.duration)
            if pb_object.data.duration
            else None
        ),
        target=_target_components_from_protobuf(pb_object.data.target),
        active=pb_object.data.is_active,
        dry_run=pb_object.data.is_dry_run,
        payload=MessageToDict(pb_object.data.payload),
        recurrence=RecurrenceRule.from_protobuf(pb_object.data.recurrence),
    )
next_run_after ¤
next_run_after(after: datetime) -> datetime | None

Calculate the next run of a dispatch.

PARAMETER DESCRIPTION
after

The time to calculate the next run from.

TYPE: datetime

RETURNS DESCRIPTION
datetime | None

The next run of the dispatch or None if the dispatch is finished.

Source code in frequenz/client/dispatch/types.py
def next_run_after(self, after: datetime) -> datetime | None:
    """Calculate the next run of a dispatch.

    Args:
        after: The time to calculate the next run from.

    Returns:
        The next run of the dispatch or None if the dispatch is finished.
    """
    if (
        not self.recurrence.frequency
        or self.recurrence.frequency == Frequency.UNSPECIFIED
        or self.duration is None  # Infinite duration
    ):
        if after > self.start_time:
            return None
        return self.start_time

    # Make sure no weekday is UNSPECIFIED
    if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
        return None

    # No type information for rrule, so we need to cast
    return cast(
        datetime | None,
        self.recurrence._as_rrule(  # pylint: disable=protected-access
            self.start_time
        ).after(after, inc=True),
    )
to_protobuf ¤
to_protobuf() -> Dispatch

Convert a dispatch to a protobuf dispatch.

RETURNS DESCRIPTION
Dispatch

The converted protobuf dispatch.

Source code in frequenz/client/dispatch/types.py
def to_protobuf(self) -> PBDispatch:
    """Convert a dispatch to a protobuf dispatch.

    Returns:
        The converted protobuf dispatch.
    """
    payload = Struct()
    payload.update(self.payload)

    return PBDispatch(
        metadata=DispatchMetadata(
            dispatch_id=self.id,
            create_time=to_timestamp(self.create_time),
            modification_time=to_timestamp(self.update_time),
        ),
        data=DispatchData(
            type=self.type,
            start_time=to_timestamp(self.start_time),
            duration=(
                round(self.duration.total_seconds()) if self.duration else None
            ),
            target=_target_components_to_protobuf(self.target),
            is_active=self.active,
            is_dry_run=self.dry_run,
            payload=payload,
            recurrence=self.recurrence.to_protobuf() if self.recurrence else None,
        ),
    )

frequenz.client.dispatch.types.DispatchEvent dataclass ¤

Represents an event that occurred during a dispatch operation.

Source code in frequenz/client/dispatch/types.py
@dataclass(kw_only=True, frozen=True)
class DispatchEvent:
    """Represents an event that occurred during a dispatch operation."""

    dispatch: Dispatch
    """The dispatch associated with the event."""

    event: Event
    """The type of event that occurred."""

    @classmethod
    def from_protobuf(
        cls, pb_object: StreamMicrogridDispatchesResponse
    ) -> "DispatchEvent":
        """Convert a protobuf dispatch event to a dispatch event.

        Args:
            pb_object: The protobuf dispatch event to convert.

        Returns:
            The converted dispatch event.
        """
        return DispatchEvent(
            dispatch=Dispatch.from_protobuf(pb_object.dispatch),
            event=Event(pb_object.event),
        )
Attributes¤
dispatch instance-attribute ¤
dispatch: Dispatch

The dispatch associated with the event.

event instance-attribute ¤
event: Event

The type of event that occurred.

Functions¤
from_protobuf classmethod ¤
from_protobuf(
    pb_object: StreamMicrogridDispatchesResponse,
) -> DispatchEvent

Convert a protobuf dispatch event to a dispatch event.

PARAMETER DESCRIPTION
pb_object

The protobuf dispatch event to convert.

TYPE: StreamMicrogridDispatchesResponse

RETURNS DESCRIPTION
DispatchEvent

The converted dispatch event.

Source code in frequenz/client/dispatch/types.py
@classmethod
def from_protobuf(
    cls, pb_object: StreamMicrogridDispatchesResponse
) -> "DispatchEvent":
    """Convert a protobuf dispatch event to a dispatch event.

    Args:
        pb_object: The protobuf dispatch event to convert.

    Returns:
        The converted dispatch event.
    """
    return DispatchEvent(
        dispatch=Dispatch.from_protobuf(pb_object.dispatch),
        event=Event(pb_object.event),
    )

frequenz.client.dispatch.types.Event ¤

Bases: IntEnum

Enum representing the type of event that occurred during a dispatch operation.

Source code in frequenz/client/dispatch/types.py
class Event(IntEnum):
    """Enum representing the type of event that occurred during a dispatch operation."""

    UNSPECIFIED = StreamMicrogridDispatchesResponse.Event.EVENT_UNSPECIFIED
    CREATED = StreamMicrogridDispatchesResponse.Event.EVENT_CREATED
    UPDATED = StreamMicrogridDispatchesResponse.Event.EVENT_UPDATED
    DELETED = StreamMicrogridDispatchesResponse.Event.EVENT_DELETED

frequenz.client.dispatch.types.TimeIntervalFilter dataclass ¤

Filter for a time interval.

Source code in frequenz/client/dispatch/types.py
@dataclass(frozen=True, kw_only=True)
class TimeIntervalFilter:
    """Filter for a time interval."""

    start_from: datetime | None
    """Filter by start_time >= start_from."""

    start_to: datetime | None
    """Filter by start_time < start_to."""

    end_from: datetime | None
    """Filter by end_time >= end_from."""

    end_to: datetime | None
    """Filter by end_time < end_to."""
Attributes¤
end_from instance-attribute ¤
end_from: datetime | None

Filter by end_time >= end_from.

end_to instance-attribute ¤
end_to: datetime | None

Filter by end_time < end_to.

start_from instance-attribute ¤
start_from: datetime | None

Filter by start_time >= start_from.

start_to instance-attribute ¤
start_to: datetime | None

Filter by start_time < start_to.

Functions¤