Skip to content

Index

frequenz.client.dispatch ¤

Dispatch API client for Python.

Classes¤

frequenz.client.dispatch.Client ¤

Bases: BaseApiClient[MicrogridDispatchServiceStub]

Dispatch API client.

Source code in frequenz/client/dispatch/_client.py
class Client(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]):
    """Dispatch API client."""

    streams: dict[
        int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]
    ] = {}
    """A dictionary of streamers, keyed by microgrid_id."""

    def __init__(
        self,
        *,
        server_url: str,
        key: str,
        connect: bool = True,
    ) -> None:
        """Initialize the client.

        Args:
            server_url: The URL of the server to connect to.
            key: API key to use for authentication.
            connect: Whether to connect to the service immediately.
        """
        super().__init__(
            server_url,
            dispatch_pb2_grpc.MicrogridDispatchServiceStub,
            connect=connect,
            channel_defaults=ChannelOptions(
                port=DEFAULT_DISPATCH_PORT,
                ssl=SslOptions(
                    enabled=True,
                    root_certificates=Path(
                        str(
                            files("frequenz.client.dispatch").joinpath("certs/root.crt")
                        ),
                    ),
                ),
            ),
        )
        self._metadata = (("key", key),)

    # pylint: disable=too-many-arguments, too-many-locals
    async def list(
        self,
        microgrid_id: int,
        component_selectors: Iterator[ComponentSelector] = iter(()),
        start_from: datetime | None = None,
        start_to: datetime | None = None,
        end_from: datetime | None = None,
        end_to: datetime | None = None,
        active: bool | None = None,
        dry_run: bool | None = None,
        page_size: int | None = None,
    ) -> AsyncIterator[Iterator[Dispatch]]:
        """List dispatches.

        This function handles pagination internally and returns an async iterator
        over the dispatches. Pagination parameters like `page_size` and `page_token`
        can be used, but they are mutually exclusive.

        Example usage:

        ```python
        client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
        async for page in client.list(microgrid_id=1):
            for dispatch in page:
                print(dispatch)
        ```

        Args:
            microgrid_id: The microgrid_id to list dispatches for.
            component_selectors: optional, list of component ids or categories to filter by.
            start_from: optional, filter by start_time >= start_from.
            start_to: optional, filter by start_time < start_to.
            end_from: optional, filter by end_time >= end_from.
            end_to: optional, filter by end_time < end_to.
            active: optional, filter by active status.
            dry_run: optional, filter by dry_run status.
            page_size: optional, number of dispatches to return per page.

        Returns:
            An async iterator over pages of dispatches.

        Yields:
            A page of dispatches over which you can lazily iterate.
        """

        def to_interval(
            from_: datetime | None, to: datetime | None
        ) -> PBTimeIntervalFilter | None:
            return (
                PBTimeIntervalFilter(
                    **{"from": to_timestamp(from_)}, to=to_timestamp(to)
                )
                if from_ or to
                else None
            )

        # Setup parameters
        start_time_interval = to_interval(start_from, start_to)
        end_time_interval = to_interval(end_from, end_to)
        selectors = list(map(component_selector_to_protobuf, component_selectors))
        filters = DispatchFilter(
            selectors=selectors,
            start_time_interval=start_time_interval,
            end_time_interval=end_time_interval,
            is_active=active,
            is_dry_run=dry_run,
        )

        request = ListMicrogridDispatchesRequest(
            microgrid_id=microgrid_id,
            filter=filters,
            pagination_params=PaginationParams(page_size=page_size),
        )

        while True:
            response = await cast(
                Awaitable[ListMicrogridDispatchesResponse],
                self.stub.ListMicrogridDispatches(request, metadata=self._metadata),
            )

            yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)

            if len(response.pagination_info.next_page_token):
                request.pagination_params.CopyFrom(
                    PaginationParams(
                        page_token=response.pagination_info.next_page_token
                    )
                )
            else:
                break

    def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
        """Receive a stream of dispatch events.

        This function returns a receiver channel that can be used to receive
        dispatch events.
        An event is one of [CREATE, UPDATE, DELETE].

        Example usage:

        ```
        client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
        async for message in client.stream(microgrid_id=1):
            print(message.event, message.dispatch)
        ```

        Args:
            microgrid_id: The microgrid_id to receive dispatches for.

        Returns:
            A receiver channel to receive the stream of dispatch events.
        """
        return self._get_stream(microgrid_id).new_receiver()

    def _get_stream(
        self, microgrid_id: int
    ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
        """Get an instance to the streaming helper."""
        broadcaster = self.streams.get(microgrid_id)
        if broadcaster is None:
            request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id)
            broadcaster = GrpcStreamBroadcaster(
                stream_name="StreamMicrogridDispatches",
                stream_method=lambda: cast(
                    AsyncIterator[StreamMicrogridDispatchesResponse],
                    self.stub.StreamMicrogridDispatches(
                        request, metadata=self._metadata
                    ),
                ),
                transform=DispatchEvent.from_protobuf,
            )
            self.streams[microgrid_id] = broadcaster

        return broadcaster

    async def create(
        self,
        microgrid_id: int,
        type: str,  # pylint: disable=redefined-builtin
        start_time: datetime,
        duration: timedelta,
        selector: ComponentSelector,
        active: bool = True,
        dry_run: bool = False,
        payload: dict[str, Any] | None = None,
        recurrence: RecurrenceRule | None = None,
    ) -> Dispatch:
        """Create a dispatch.

        Will try to return the created dispatch, identifying it by
        the same fields as the request.

        Args:
            microgrid_id: The microgrid_id to create the dispatch for.
            type: User defined string to identify the dispatch type.
            start_time: The start time of the dispatch.
            duration: The duration of the dispatch.
            selector: The component selector for the dispatch.
            active: The active status of the dispatch.
            dry_run: The dry_run status of the dispatch.
            payload: The payload of the dispatch.
            recurrence: The recurrence rule of the dispatch.

        Returns:
            Dispatch: The created dispatch

        Raises:
            ValueError: If start_time is in the past.
            ValueError: If the created dispatch could not be found.
        """
        if start_time <= datetime.now(tz=start_time.tzinfo):
            raise ValueError("start_time must not be in the past")

        # Raise if it's not UTC
        if start_time.tzinfo is None or start_time.tzinfo.utcoffset(start_time) is None:
            raise ValueError("start_time must be timezone aware")

        request = DispatchCreateRequest(
            microgrid_id=microgrid_id,
            type=type,
            start_time=start_time,
            duration=duration,
            selector=selector,
            active=active,
            dry_run=dry_run,
            payload=payload or {},
            recurrence=recurrence,
        )

        response = await cast(
            Awaitable[CreateMicrogridDispatchResponse],
            self.stub.CreateMicrogridDispatch(
                request.to_protobuf(), metadata=self._metadata
            ),
        )

        return Dispatch.from_protobuf(response.dispatch)

    async def update(
        self,
        *,
        microgrid_id: int,
        dispatch_id: int,
        new_fields: dict[str, Any],
    ) -> Dispatch:
        """Update a dispatch.

        The `new_fields` argument is a dictionary of fields to update. The keys are
        the field names, and the values are the new values for the fields.

        For recurrence fields, the keys are preceeded by "recurrence.".

        Note that updating `type` and `dry_run` is not supported.

        Args:
            microgrid_id: The microgrid_id to update the dispatch for.
            dispatch_id: The dispatch_id to update.
            new_fields: The fields to update.

        Returns:
            Dispatch: The updated dispatch.

        Raises:
            ValueError: If updating `type` or `dry_run`.
        """
        msg = UpdateMicrogridDispatchRequest(
            dispatch_id=dispatch_id, microgrid_id=microgrid_id
        )

        for key, val in new_fields.items():
            path = key.split(".")

            match path[0]:
                case "start_time":
                    msg.update.start_time.CopyFrom(to_timestamp(val))
                case "duration":
                    msg.update.duration = int(val.total_seconds())
                case "selector":
                    msg.update.selector.CopyFrom(component_selector_to_protobuf(val))
                case "is_active":
                    msg.update.is_active = val
                case "payload":
                    msg.update.payload.update(val)
                case "active":
                    msg.update.is_active = val
                    key = "is_active"
                case "recurrence":
                    match path[1]:
                        case "freq":
                            msg.update.recurrence.freq = val
                        # Proto uses "freq" instead of "frequency"
                        case "frequency":
                            msg.update.recurrence.freq = val
                            # Correct the key to "recurrence.freq"
                            key = "recurrence.freq"
                        case "interval":
                            msg.update.recurrence.interval = val
                        case "end_criteria":
                            msg.update.recurrence.end_criteria.CopyFrom(
                                val.to_protobuf()
                            )
                        case "byminutes":
                            msg.update.recurrence.byminutes.extend(val)
                        case "byhours":
                            msg.update.recurrence.byhours.extend(val)
                        case "byweekdays":
                            msg.update.recurrence.byweekdays.extend(val)
                        case "bymonthdays":
                            msg.update.recurrence.bymonthdays.extend(val)
                        case "bymonths":
                            msg.update.recurrence.bymonths.extend(val)
                        case _:
                            raise ValueError(f"Unknown recurrence field: {path[1]}")
                case _:
                    raise ValueError(f"Unknown field: {path[0]}")

            msg.update_mask.paths.append(key)

        response = await cast(
            Awaitable[UpdateMicrogridDispatchResponse],
            self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata),
        )

        return Dispatch.from_protobuf(response.dispatch)

    async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
        """Get a dispatch.

        Args:
            microgrid_id: The microgrid_id to get the dispatch for.
            dispatch_id: The dispatch_id to get.

        Returns:
            Dispatch: The dispatch.
        """
        request = GetMicrogridDispatchRequest(
            dispatch_id=dispatch_id, microgrid_id=microgrid_id
        )
        response = await cast(
            Awaitable[GetMicrogridDispatchResponse],
            self.stub.GetMicrogridDispatch(request, metadata=self._metadata),
        )
        return Dispatch.from_protobuf(response.dispatch)

    async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
        """Delete a dispatch.

        Args:
            microgrid_id: The microgrid_id to delete the dispatch for.
            dispatch_id: The dispatch_id to delete.
        """
        request = DeleteMicrogridDispatchRequest(
            dispatch_id=dispatch_id, microgrid_id=microgrid_id
        )
        await cast(
            Awaitable[None],
            self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata),
        )
Attributes¤
channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

streams class-attribute instance-attribute ¤
streams: dict[
    int,
    GrpcStreamBroadcaster[
        StreamMicrogridDispatchesResponse, DispatchEvent
    ],
] = {}

A dictionary of streamers, keyed by microgrid_id.

stub property ¤
stub: StubT

The underlying gRPC stub.

Warning

This stub is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__(
    *, server_url: str, key: str, connect: bool = True
) -> None

Initialize the client.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to.

TYPE: str

key

API key to use for authentication.

TYPE: str

connect

Whether to connect to the service immediately.

TYPE: bool DEFAULT: True

Source code in frequenz/client/dispatch/_client.py
def __init__(
    self,
    *,
    server_url: str,
    key: str,
    connect: bool = True,
) -> None:
    """Initialize the client.

    Args:
        server_url: The URL of the server to connect to.
        key: API key to use for authentication.
        connect: Whether to connect to the service immediately.
    """
    super().__init__(
        server_url,
        dispatch_pb2_grpc.MicrogridDispatchServiceStub,
        connect=connect,
        channel_defaults=ChannelOptions(
            port=DEFAULT_DISPATCH_PORT,
            ssl=SslOptions(
                enabled=True,
                root_certificates=Path(
                    str(
                        files("frequenz.client.dispatch").joinpath("certs/root.crt")
                    ),
                ),
            ),
        ),
    )
    self._metadata = (("key", key),)
connect ¤
connect(server_url: str | None = None) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

Source code in frequenz/client/base/client.py
def connect(self, server_url: str | None = None) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
    """
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
    elif self.is_connected:
        return
    self._channel = parse_grpc_uri(self._server_url, self._channel_defaults)
    self._stub = self._create_stub(self._channel)
create async ¤
create(
    microgrid_id: int,
    type: str,
    start_time: datetime,
    duration: timedelta,
    selector: ComponentSelector,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None,
) -> Dispatch

Create a dispatch.

Will try to return the created dispatch, identifying it by the same fields as the request.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to create the dispatch for.

TYPE: int

type

User defined string to identify the dispatch type.

TYPE: str

start_time

The start time of the dispatch.

TYPE: datetime

duration

The duration of the dispatch.

TYPE: timedelta

selector

The component selector for the dispatch.

TYPE: ComponentSelector

active

The active status of the dispatch.

TYPE: bool DEFAULT: True

dry_run

The dry_run status of the dispatch.

TYPE: bool DEFAULT: False

payload

The payload of the dispatch.

TYPE: dict[str, Any] | None DEFAULT: None

recurrence

The recurrence rule of the dispatch.

TYPE: RecurrenceRule | None DEFAULT: None

RETURNS DESCRIPTION
Dispatch

The created dispatch

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If start_time is in the past.

ValueError

If the created dispatch could not be found.

Source code in frequenz/client/dispatch/_client.py
async def create(
    self,
    microgrid_id: int,
    type: str,  # pylint: disable=redefined-builtin
    start_time: datetime,
    duration: timedelta,
    selector: ComponentSelector,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None,
) -> Dispatch:
    """Create a dispatch.

    Will try to return the created dispatch, identifying it by
    the same fields as the request.

    Args:
        microgrid_id: The microgrid_id to create the dispatch for.
        type: User defined string to identify the dispatch type.
        start_time: The start time of the dispatch.
        duration: The duration of the dispatch.
        selector: The component selector for the dispatch.
        active: The active status of the dispatch.
        dry_run: The dry_run status of the dispatch.
        payload: The payload of the dispatch.
        recurrence: The recurrence rule of the dispatch.

    Returns:
        Dispatch: The created dispatch

    Raises:
        ValueError: If start_time is in the past.
        ValueError: If the created dispatch could not be found.
    """
    if start_time <= datetime.now(tz=start_time.tzinfo):
        raise ValueError("start_time must not be in the past")

    # Raise if it's not UTC
    if start_time.tzinfo is None or start_time.tzinfo.utcoffset(start_time) is None:
        raise ValueError("start_time must be timezone aware")

    request = DispatchCreateRequest(
        microgrid_id=microgrid_id,
        type=type,
        start_time=start_time,
        duration=duration,
        selector=selector,
        active=active,
        dry_run=dry_run,
        payload=payload or {},
        recurrence=recurrence,
    )

    response = await cast(
        Awaitable[CreateMicrogridDispatchResponse],
        self.stub.CreateMicrogridDispatch(
            request.to_protobuf(), metadata=self._metadata
        ),
    )

    return Dispatch.from_protobuf(response.dispatch)
delete async ¤
delete(*, microgrid_id: int, dispatch_id: int) -> None

Delete a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to delete the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to delete.

TYPE: int

Source code in frequenz/client/dispatch/_client.py
async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
    """Delete a dispatch.

    Args:
        microgrid_id: The microgrid_id to delete the dispatch for.
        dispatch_id: The dispatch_id to delete.
    """
    request = DeleteMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )
    await cast(
        Awaitable[None],
        self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata),
    )
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
get async ¤
get(*, microgrid_id: int, dispatch_id: int) -> Dispatch

Get a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to get the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to get.

TYPE: int

RETURNS DESCRIPTION
Dispatch

The dispatch.

TYPE: Dispatch

Source code in frequenz/client/dispatch/_client.py
async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
    """Get a dispatch.

    Args:
        microgrid_id: The microgrid_id to get the dispatch for.
        dispatch_id: The dispatch_id to get.

    Returns:
        Dispatch: The dispatch.
    """
    request = GetMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )
    response = await cast(
        Awaitable[GetMicrogridDispatchResponse],
        self.stub.GetMicrogridDispatch(request, metadata=self._metadata),
    )
    return Dispatch.from_protobuf(response.dispatch)
list async ¤
list(
    microgrid_id: int,
    component_selectors: Iterator[ComponentSelector] = iter(
        ()
    ),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None,
) -> AsyncIterator[Iterator[Dispatch]]

List dispatches.

This function handles pagination internally and returns an async iterator over the dispatches. Pagination parameters like page_size and page_token can be used, but they are mutually exclusive.

Example usage:

client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
async for page in client.list(microgrid_id=1):
    for dispatch in page:
        print(dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to list dispatches for.

TYPE: int

component_selectors

optional, list of component ids or categories to filter by.

TYPE: Iterator[ComponentSelector] DEFAULT: iter(())

start_from

optional, filter by start_time >= start_from.

TYPE: datetime | None DEFAULT: None

start_to

optional, filter by start_time < start_to.

TYPE: datetime | None DEFAULT: None

end_from

optional, filter by end_time >= end_from.

TYPE: datetime | None DEFAULT: None

end_to

optional, filter by end_time < end_to.

TYPE: datetime | None DEFAULT: None

active

optional, filter by active status.

TYPE: bool | None DEFAULT: None

dry_run

optional, filter by dry_run status.

TYPE: bool | None DEFAULT: None

page_size

optional, number of dispatches to return per page.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

An async iterator over pages of dispatches.

YIELDS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

A page of dispatches over which you can lazily iterate.

Source code in frequenz/client/dispatch/_client.py
async def list(
    self,
    microgrid_id: int,
    component_selectors: Iterator[ComponentSelector] = iter(()),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None,
) -> AsyncIterator[Iterator[Dispatch]]:
    """List dispatches.

    This function handles pagination internally and returns an async iterator
    over the dispatches. Pagination parameters like `page_size` and `page_token`
    can be used, but they are mutually exclusive.

    Example usage:

    ```python
    client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
    async for page in client.list(microgrid_id=1):
        for dispatch in page:
            print(dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to list dispatches for.
        component_selectors: optional, list of component ids or categories to filter by.
        start_from: optional, filter by start_time >= start_from.
        start_to: optional, filter by start_time < start_to.
        end_from: optional, filter by end_time >= end_from.
        end_to: optional, filter by end_time < end_to.
        active: optional, filter by active status.
        dry_run: optional, filter by dry_run status.
        page_size: optional, number of dispatches to return per page.

    Returns:
        An async iterator over pages of dispatches.

    Yields:
        A page of dispatches over which you can lazily iterate.
    """

    def to_interval(
        from_: datetime | None, to: datetime | None
    ) -> PBTimeIntervalFilter | None:
        return (
            PBTimeIntervalFilter(
                **{"from": to_timestamp(from_)}, to=to_timestamp(to)
            )
            if from_ or to
            else None
        )

    # Setup parameters
    start_time_interval = to_interval(start_from, start_to)
    end_time_interval = to_interval(end_from, end_to)
    selectors = list(map(component_selector_to_protobuf, component_selectors))
    filters = DispatchFilter(
        selectors=selectors,
        start_time_interval=start_time_interval,
        end_time_interval=end_time_interval,
        is_active=active,
        is_dry_run=dry_run,
    )

    request = ListMicrogridDispatchesRequest(
        microgrid_id=microgrid_id,
        filter=filters,
        pagination_params=PaginationParams(page_size=page_size),
    )

    while True:
        response = await cast(
            Awaitable[ListMicrogridDispatchesResponse],
            self.stub.ListMicrogridDispatches(request, metadata=self._metadata),
        )

        yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)

        if len(response.pagination_info.next_page_token):
            request.pagination_params.CopyFrom(
                PaginationParams(
                    page_token=response.pagination_info.next_page_token
                )
            )
        else:
            break
stream ¤
stream(microgrid_id: int) -> Receiver[DispatchEvent]

Receive a stream of dispatch events.

This function returns a receiver channel that can be used to receive dispatch events. An event is one of [CREATE, UPDATE, DELETE].

Example usage:

client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
async for message in client.stream(microgrid_id=1):
    print(message.event, message.dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to receive dispatches for.

TYPE: int

RETURNS DESCRIPTION
Receiver[DispatchEvent]

A receiver channel to receive the stream of dispatch events.

Source code in frequenz/client/dispatch/_client.py
def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
    """Receive a stream of dispatch events.

    This function returns a receiver channel that can be used to receive
    dispatch events.
    An event is one of [CREATE, UPDATE, DELETE].

    Example usage:

    ```
    client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
    async for message in client.stream(microgrid_id=1):
        print(message.event, message.dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to receive dispatches for.

    Returns:
        A receiver channel to receive the stream of dispatch events.
    """
    return self._get_stream(microgrid_id).new_receiver()
update async ¤
update(
    *,
    microgrid_id: int,
    dispatch_id: int,
    new_fields: dict[str, Any]
) -> Dispatch

Update a dispatch.

The new_fields argument is a dictionary of fields to update. The keys are the field names, and the values are the new values for the fields.

For recurrence fields, the keys are preceeded by "recurrence.".

Note that updating type and dry_run is not supported.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to update the dispatch for.

TYPE: int

dispatch_id

The dispatch_id to update.

TYPE: int

new_fields

The fields to update.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Dispatch

The updated dispatch.

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If updating type or dry_run.

Source code in frequenz/client/dispatch/_client.py
async def update(
    self,
    *,
    microgrid_id: int,
    dispatch_id: int,
    new_fields: dict[str, Any],
) -> Dispatch:
    """Update a dispatch.

    The `new_fields` argument is a dictionary of fields to update. The keys are
    the field names, and the values are the new values for the fields.

    For recurrence fields, the keys are preceeded by "recurrence.".

    Note that updating `type` and `dry_run` is not supported.

    Args:
        microgrid_id: The microgrid_id to update the dispatch for.
        dispatch_id: The dispatch_id to update.
        new_fields: The fields to update.

    Returns:
        Dispatch: The updated dispatch.

    Raises:
        ValueError: If updating `type` or `dry_run`.
    """
    msg = UpdateMicrogridDispatchRequest(
        dispatch_id=dispatch_id, microgrid_id=microgrid_id
    )

    for key, val in new_fields.items():
        path = key.split(".")

        match path[0]:
            case "start_time":
                msg.update.start_time.CopyFrom(to_timestamp(val))
            case "duration":
                msg.update.duration = int(val.total_seconds())
            case "selector":
                msg.update.selector.CopyFrom(component_selector_to_protobuf(val))
            case "is_active":
                msg.update.is_active = val
            case "payload":
                msg.update.payload.update(val)
            case "active":
                msg.update.is_active = val
                key = "is_active"
            case "recurrence":
                match path[1]:
                    case "freq":
                        msg.update.recurrence.freq = val
                    # Proto uses "freq" instead of "frequency"
                    case "frequency":
                        msg.update.recurrence.freq = val
                        # Correct the key to "recurrence.freq"
                        key = "recurrence.freq"
                    case "interval":
                        msg.update.recurrence.interval = val
                    case "end_criteria":
                        msg.update.recurrence.end_criteria.CopyFrom(
                            val.to_protobuf()
                        )
                    case "byminutes":
                        msg.update.recurrence.byminutes.extend(val)
                    case "byhours":
                        msg.update.recurrence.byhours.extend(val)
                    case "byweekdays":
                        msg.update.recurrence.byweekdays.extend(val)
                    case "bymonthdays":
                        msg.update.recurrence.bymonthdays.extend(val)
                    case "bymonths":
                        msg.update.recurrence.bymonths.extend(val)
                    case _:
                        raise ValueError(f"Unknown recurrence field: {path[1]}")
            case _:
                raise ValueError(f"Unknown field: {path[0]}")

        msg.update_mask.paths.append(key)

    response = await cast(
        Awaitable[UpdateMicrogridDispatchResponse],
        self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata),
    )

    return Dispatch.from_protobuf(response.dispatch)