Skip to content

Index

frequenz.client.dispatch ¤

Dispatch API client for Python.

Classes¤

frequenz.client.dispatch.DispatchApiClient ¤

Bases: BaseApiClient[MicrogridDispatchServiceStub]

Dispatch API client.

Source code in frequenz/client/dispatch/_client.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
class DispatchApiClient(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]):
    """Dispatch API client."""

    # pylint: disable-next=too-many-arguments
    def __init__(
        self,
        *,
        server_url: str,
        key: str,
        connect: bool = True,
        call_timeout: timedelta = timedelta(seconds=60),
        stream_timeout: timedelta = timedelta(minutes=5),
    ) -> None:
        """Initialize the client.

        Args:
            server_url: The URL of the server to connect to.
            key: API key to use for authentication.
            connect: Whether to connect to the service immediately.
            call_timeout: Timeout for gRPC calls, default is 60 seconds.
            stream_timeout: Timeout for gRPC streams, default is 5 minutes.
        """
        super().__init__(
            server_url,
            dispatch_pb2_grpc.MicrogridDispatchServiceStub,
            connect=connect,
            channel_defaults=ChannelOptions(
                port=DEFAULT_DISPATCH_PORT,
                ssl=SslOptions(enabled=True),
            ),
        )
        self._metadata = (("key", key),)
        self._streams: dict[
            MicrogridId,
            GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent],
        ] = {}
        """A dictionary of streamers, keyed by microgrid_id."""

        self._call_timeout_seconds = call_timeout.total_seconds()
        self._stream_timeout_seconds = stream_timeout.total_seconds()

    @property
    def call_timeout(self) -> timedelta:
        """Get the call timeout."""
        return timedelta(seconds=self._call_timeout_seconds)

    @property
    def stream_timeout(self) -> timedelta:
        """Get the stream timeout."""
        return timedelta(seconds=self._stream_timeout_seconds)

    @property
    def stub(self) -> dispatch_pb2_grpc.MicrogridDispatchServiceAsyncStub:
        """The stub for the service."""
        if self._channel is None or self._stub is None:
            raise ClientNotConnected(server_url=self.server_url, operation="stub")
        # This type: ignore is needed because we need to cast the sync stub to
        # the async stub, but we can't use cast because the async stub doesn't
        # actually exists to the eyes of the interpreter, it only exists for the
        # type-checker, so it can only be used for type hints.
        return self._stub  # type: ignore

    # pylint: disable=too-many-arguments, too-many-locals
    async def list(
        self,
        microgrid_id: MicrogridId,
        *,
        target_components: Iterator[TargetComponents] = iter(()),
        start_from: datetime | None = None,
        start_to: datetime | None = None,
        end_from: datetime | None = None,
        end_to: datetime | None = None,
        active: bool | None = None,
        dry_run: bool | None = None,
        page_size: int | None = None,
    ) -> AsyncIterator[Iterator[Dispatch]]:
        """List dispatches.

        This function handles pagination internally and returns an async iterator
        over the dispatches. Pagination parameters like `page_size` and `page_token`
        can be used, but they are mutually exclusive.

        Example usage:

        ```python
        client = DispatchApiClient(
            key="key",
            server_url="grpc://dispatch.url.goes.here.example.com"
        )
        async for page in client.list(microgrid_id=MicrogridId(1)):
            for dispatch in page:
                print(dispatch)
        ```

        Args:
            microgrid_id: The microgrid_id to list dispatches for.
            target_components: optional, list of component ids or categories to filter by.
            start_from: optional, filter by start_time >= start_from.
            start_to: optional, filter by start_time < start_to.
            end_from: optional, filter by end_time >= end_from.
            end_to: optional, filter by end_time < end_to.
            active: optional, filter by active status.
            dry_run: optional, filter by dry_run status.
            page_size: optional, number of dispatches to return per page.

        Returns:
            An async iterator over pages of dispatches.

        Yields:
            A page of dispatches over which you can lazily iterate.
        """

        def to_interval(
            from_: datetime | None, to: datetime | None
        ) -> PBTimeIntervalFilter | None:
            return (
                PBTimeIntervalFilter(
                    from_time=to_timestamp(from_), to_time=to_timestamp(to)
                )
                if from_ or to
                else None
            )

        # Setup parameters
        start_time_interval = to_interval(start_from, start_to)
        end_time_interval = to_interval(end_from, end_to)
        targets = list(map(_target_components_to_protobuf, target_components))
        filters = DispatchFilter(
            targets=targets,
            start_time_interval=start_time_interval,
            end_time_interval=end_time_interval,
            is_active=active,
            is_dry_run=dry_run,
        )

        request = ListMicrogridDispatchesRequest(
            microgrid_id=int(microgrid_id),
            filter=filters,
            pagination_params=(
                PaginationParams(page_size=page_size) if page_size else None
            ),
        )

        while True:
            response = await cast(
                Awaitable[ListMicrogridDispatchesResponse],
                self.stub.ListMicrogridDispatches(
                    request, metadata=self._metadata, timeout=self._call_timeout_seconds
                ),
            )

            yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)

            if len(response.pagination_info.next_page_token):
                request.pagination_params.CopyFrom(
                    PaginationParams(
                        page_token=response.pagination_info.next_page_token
                    )
                )
            else:
                break

    def stream(self, microgrid_id: MicrogridId) -> channels.Receiver[DispatchEvent]:
        """Receive a stream of dispatch events.

        This function returns a receiver channel that can be used to receive
        dispatch events.
        An event is one of [CREATE, UPDATE, DELETE].

        Example usage:

        ```
        client = DispatchApiClient(
            key="key",
            server_url="grpc://dispatch.url.goes.here.example.com"
        )
        async for message in client.stream(microgrid_id=1):
            print(message.event, message.dispatch)
        ```

        Args:
            microgrid_id: The microgrid_id to receive dispatches for.

        Returns:
            A receiver channel to receive the stream of dispatch events.
        """
        return self._get_stream(microgrid_id).new_receiver()

    def _get_stream(
        self, microgrid_id: MicrogridId
    ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
        """Get an instance to the streaming helper."""
        broadcaster = self._streams.get(microgrid_id)
        if broadcaster is not None and not broadcaster.is_running:
            del self._streams[microgrid_id]
            broadcaster = None
        if broadcaster is None:
            request = StreamMicrogridDispatchesRequest(microgrid_id=int(microgrid_id))
            broadcaster = GrpcStreamBroadcaster(
                stream_name="StreamMicrogridDispatches",
                stream_method=lambda: cast(
                    AsyncIterator[StreamMicrogridDispatchesResponse],
                    self.stub.StreamMicrogridDispatches(
                        request,
                        metadata=self._metadata,
                        timeout=self._stream_timeout_seconds,
                    ),
                ),
                transform=DispatchEvent.from_protobuf,
                retry_strategy=LinearBackoff(interval=1, limit=None),
            )
            self._streams[microgrid_id] = broadcaster

        return broadcaster

    async def create(  # pylint: disable=too-many-positional-arguments
        self,
        microgrid_id: MicrogridId,
        type: str,  # pylint: disable=redefined-builtin
        start_time: datetime | Literal["NOW"],
        duration: timedelta | None,
        target: TargetComponents,
        *,
        active: bool = True,
        dry_run: bool = False,
        payload: dict[str, Any] | None = None,
        recurrence: RecurrenceRule | None = None,
    ) -> Dispatch:
        """Create a dispatch.

        Args:
            microgrid_id: The microgrid_id to create the dispatch for.
            type: User defined string to identify the dispatch type.
            start_time: The start time of the dispatch. Can be "NOW" for immediate start.
            duration: The duration of the dispatch. Can be `None` for infinite
                or no-duration dispatches (e.g. switching a component on).
            target: The component target for the dispatch.
            active: The active status of the dispatch.
            dry_run: The dry_run status of the dispatch.
            payload: The payload of the dispatch.
            recurrence: The recurrence rule of the dispatch.

        Returns:
            Dispatch: The created dispatch

        Raises:
            ValueError: If start_time is in the past.
        """
        if isinstance(start_time, datetime):
            if start_time <= datetime.now(tz=start_time.tzinfo):
                raise ValueError("start_time must not be in the past")

            # Raise if it's not UTC
            if (
                start_time.tzinfo is None
                or start_time.tzinfo.utcoffset(start_time) is None
            ):
                raise ValueError("start_time must be timezone aware")

        request = DispatchCreateRequest(
            microgrid_id=microgrid_id,
            type=type,
            start_time=start_time,
            duration=duration,
            target=target,
            active=active,
            dry_run=dry_run,
            payload=payload or {},
            recurrence=recurrence,
        )

        response = await cast(
            Awaitable[CreateMicrogridDispatchResponse],
            self.stub.CreateMicrogridDispatch(
                request.to_protobuf(),
                metadata=self._metadata,
                timeout=self._call_timeout_seconds,
            ),
        )

        return Dispatch.from_protobuf(response.dispatch)

    async def update(
        self,
        *,
        microgrid_id: MicrogridId,
        dispatch_id: DispatchId,
        new_fields: dict[str, Any],
    ) -> Dispatch:
        """Update a dispatch.

        The `new_fields` argument is a dictionary of fields to update. The keys are
        the field names, and the values are the new values for the fields.

        For recurrence fields, the keys are preceeded by "recurrence.".

        Note that updating `type` and `dry_run` is not supported.

        Args:
            microgrid_id: The microgrid_id to update the dispatch for.
            dispatch_id: The dispatch_id to update.
            new_fields: The fields to update.

        Returns:
            Dispatch: The updated dispatch.

        Raises:
            ValueError: If updating `type` or `dry_run`.
        """
        msg = UpdateMicrogridDispatchRequest(
            dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
        )

        for key, val in new_fields.items():
            path = key.split(".")

            match path[0]:
                case "start_time":
                    msg.update.start_time.CopyFrom(to_timestamp(val))
                case "duration":
                    if val is None:
                        msg.update.ClearField("duration")
                    else:
                        msg.update.duration = round(val.total_seconds())
                case "target":
                    msg.update.target.CopyFrom(_target_components_to_protobuf(val))
                case "is_active":
                    msg.update.is_active = val
                case "payload":
                    msg.update.payload.update(val)
                case "active":
                    msg.update.is_active = val
                    key = "is_active"
                case "recurrence":
                    match path[1]:
                        case "freq":
                            msg.update.recurrence.freq = val
                        # Proto uses "freq" instead of "frequency"
                        case "frequency":
                            msg.update.recurrence.freq = val
                            # Correct the key to "recurrence.freq"
                            key = "recurrence.freq"
                        case "interval":
                            msg.update.recurrence.interval = val
                        case "end_criteria":
                            msg.update.recurrence.end_criteria.CopyFrom(
                                val.to_protobuf()
                            )
                        case "byminutes":
                            msg.update.recurrence.byminutes.extend(val)
                        case "byhours":
                            msg.update.recurrence.byhours.extend(val)
                        case "byweekdays":
                            msg.update.recurrence.byweekdays.extend(val)
                        case "bymonthdays":
                            msg.update.recurrence.bymonthdays.extend(val)
                        case "bymonths":
                            msg.update.recurrence.bymonths.extend(val)
                        case _:
                            raise ValueError(f"Unknown recurrence field: {path[1]}")
                case _:
                    raise ValueError(f"Unknown field: {path[0]}")

            msg.update_mask.paths.append(key)

        response = await cast(
            Awaitable[UpdateMicrogridDispatchResponse],
            self.stub.UpdateMicrogridDispatch(
                msg, metadata=self._metadata, timeout=self._call_timeout_seconds
            ),
        )

        return Dispatch.from_protobuf(response.dispatch)

    async def get(
        self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
    ) -> Dispatch:
        """Get a dispatch.

        Args:
            microgrid_id: The microgrid_id to get the dispatch for.
            dispatch_id: The dispatch_id to get.

        Returns:
            Dispatch: The dispatch.
        """
        request = GetMicrogridDispatchRequest(
            dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
        )
        response = await cast(
            Awaitable[GetMicrogridDispatchResponse],
            self.stub.GetMicrogridDispatch(
                request, metadata=self._metadata, timeout=self._call_timeout_seconds
            ),
        )
        return Dispatch.from_protobuf(response.dispatch)

    async def delete(
        self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
    ) -> None:
        """Delete a dispatch.

        Args:
            microgrid_id: The microgrid_id to delete the dispatch for.
            dispatch_id: The dispatch_id to delete.
        """
        request = DeleteMicrogridDispatchRequest(
            dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
        )
        await cast(
            Awaitable[None],
            self.stub.DeleteMicrogridDispatch(
                request, metadata=self._metadata, timeout=self._call_timeout_seconds
            ),
        )
Attributes¤
call_timeout property ¤
call_timeout: timedelta

Get the call timeout.

channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

stream_timeout property ¤
stream_timeout: timedelta

Get the stream timeout.

stub property ¤

The stub for the service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__(
    *,
    server_url: str,
    key: str,
    connect: bool = True,
    call_timeout: timedelta = timedelta(seconds=60),
    stream_timeout: timedelta = timedelta(minutes=5)
) -> None

Initialize the client.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to.

TYPE: str

key

API key to use for authentication.

TYPE: str

connect

Whether to connect to the service immediately.

TYPE: bool DEFAULT: True

call_timeout

Timeout for gRPC calls, default is 60 seconds.

TYPE: timedelta DEFAULT: timedelta(seconds=60)

stream_timeout

Timeout for gRPC streams, default is 5 minutes.

TYPE: timedelta DEFAULT: timedelta(minutes=5)

Source code in frequenz/client/dispatch/_client.py
def __init__(
    self,
    *,
    server_url: str,
    key: str,
    connect: bool = True,
    call_timeout: timedelta = timedelta(seconds=60),
    stream_timeout: timedelta = timedelta(minutes=5),
) -> None:
    """Initialize the client.

    Args:
        server_url: The URL of the server to connect to.
        key: API key to use for authentication.
        connect: Whether to connect to the service immediately.
        call_timeout: Timeout for gRPC calls, default is 60 seconds.
        stream_timeout: Timeout for gRPC streams, default is 5 minutes.
    """
    super().__init__(
        server_url,
        dispatch_pb2_grpc.MicrogridDispatchServiceStub,
        connect=connect,
        channel_defaults=ChannelOptions(
            port=DEFAULT_DISPATCH_PORT,
            ssl=SslOptions(enabled=True),
        ),
    )
    self._metadata = (("key", key),)
    self._streams: dict[
        MicrogridId,
        GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent],
    ] = {}
    """A dictionary of streamers, keyed by microgrid_id."""

    self._call_timeout_seconds = call_timeout.total_seconds()
    self._stream_timeout_seconds = stream_timeout.total_seconds()
connect ¤
connect(
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...
) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

auth_key

The API key to use when connecting to the service. If an Ellipsis is provided, the previously used auth_key is used.

TYPE: str | None | EllipsisType DEFAULT: ...

sign_secret

The secret to use when creating message HMAC. If an Ellipsis is provided,

TYPE: str | None | EllipsisType DEFAULT: ...

Source code in frequenz/client/base/client.py
def connect(
    self,
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...,
) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
        auth_key: The API key to use when connecting to the service. If an Ellipsis
            is provided, the previously used auth_key is used.
        sign_secret: The secret to use when creating message HMAC. If an Ellipsis is
            provided,
    """
    reconnect = False
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
        reconnect = True
    if auth_key is not ... and auth_key != self._auth_key:
        self._auth_key = auth_key
        reconnect = True
    if sign_secret is not ... and sign_secret != self._sign_secret:
        self._sign_secret = sign_secret
        reconnect = True
    if self.is_connected and not reconnect:  # Desired connection already exists
        return

    interceptors: list[ClientInterceptor] = []
    if self._auth_key is not None:
        interceptors += [
            AuthenticationInterceptorUnaryUnary(self._auth_key),  # type: ignore [list-item]
            AuthenticationInterceptorUnaryStream(self._auth_key),  # type: ignore [list-item]
        ]
    if self._sign_secret is not None:
        interceptors += [
            SigningInterceptorUnaryUnary(self._sign_secret),  # type: ignore [list-item]
            SigningInterceptorUnaryStream(self._sign_secret),  # type: ignore [list-item]
        ]

    self._channel = parse_grpc_uri(
        self._server_url,
        interceptors,
        defaults=self._channel_defaults,
    )
    self._stub = self._create_stub(self._channel)
create async ¤
create(
    microgrid_id: MicrogridId,
    type: str,
    start_time: datetime | Literal["NOW"],
    duration: timedelta | None,
    target: TargetComponents,
    *,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None
) -> Dispatch

Create a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to create the dispatch for.

TYPE: MicrogridId

type

User defined string to identify the dispatch type.

TYPE: str

start_time

The start time of the dispatch. Can be "NOW" for immediate start.

TYPE: datetime | Literal['NOW']

duration

The duration of the dispatch. Can be None for infinite or no-duration dispatches (e.g. switching a component on).

TYPE: timedelta | None

target

The component target for the dispatch.

TYPE: TargetComponents

active

The active status of the dispatch.

TYPE: bool DEFAULT: True

dry_run

The dry_run status of the dispatch.

TYPE: bool DEFAULT: False

payload

The payload of the dispatch.

TYPE: dict[str, Any] | None DEFAULT: None

recurrence

The recurrence rule of the dispatch.

TYPE: RecurrenceRule | None DEFAULT: None

RETURNS DESCRIPTION
Dispatch

The created dispatch

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If start_time is in the past.

Source code in frequenz/client/dispatch/_client.py
async def create(  # pylint: disable=too-many-positional-arguments
    self,
    microgrid_id: MicrogridId,
    type: str,  # pylint: disable=redefined-builtin
    start_time: datetime | Literal["NOW"],
    duration: timedelta | None,
    target: TargetComponents,
    *,
    active: bool = True,
    dry_run: bool = False,
    payload: dict[str, Any] | None = None,
    recurrence: RecurrenceRule | None = None,
) -> Dispatch:
    """Create a dispatch.

    Args:
        microgrid_id: The microgrid_id to create the dispatch for.
        type: User defined string to identify the dispatch type.
        start_time: The start time of the dispatch. Can be "NOW" for immediate start.
        duration: The duration of the dispatch. Can be `None` for infinite
            or no-duration dispatches (e.g. switching a component on).
        target: The component target for the dispatch.
        active: The active status of the dispatch.
        dry_run: The dry_run status of the dispatch.
        payload: The payload of the dispatch.
        recurrence: The recurrence rule of the dispatch.

    Returns:
        Dispatch: The created dispatch

    Raises:
        ValueError: If start_time is in the past.
    """
    if isinstance(start_time, datetime):
        if start_time <= datetime.now(tz=start_time.tzinfo):
            raise ValueError("start_time must not be in the past")

        # Raise if it's not UTC
        if (
            start_time.tzinfo is None
            or start_time.tzinfo.utcoffset(start_time) is None
        ):
            raise ValueError("start_time must be timezone aware")

    request = DispatchCreateRequest(
        microgrid_id=microgrid_id,
        type=type,
        start_time=start_time,
        duration=duration,
        target=target,
        active=active,
        dry_run=dry_run,
        payload=payload or {},
        recurrence=recurrence,
    )

    response = await cast(
        Awaitable[CreateMicrogridDispatchResponse],
        self.stub.CreateMicrogridDispatch(
            request.to_protobuf(),
            metadata=self._metadata,
            timeout=self._call_timeout_seconds,
        ),
    )

    return Dispatch.from_protobuf(response.dispatch)
delete async ¤
delete(
    *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> None

Delete a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to delete the dispatch for.

TYPE: MicrogridId

dispatch_id

The dispatch_id to delete.

TYPE: DispatchId

Source code in frequenz/client/dispatch/_client.py
async def delete(
    self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> None:
    """Delete a dispatch.

    Args:
        microgrid_id: The microgrid_id to delete the dispatch for.
        dispatch_id: The dispatch_id to delete.
    """
    request = DeleteMicrogridDispatchRequest(
        dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
    )
    await cast(
        Awaitable[None],
        self.stub.DeleteMicrogridDispatch(
            request, metadata=self._metadata, timeout=self._call_timeout_seconds
        ),
    )
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
get async ¤
get(
    *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> Dispatch

Get a dispatch.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to get the dispatch for.

TYPE: MicrogridId

dispatch_id

The dispatch_id to get.

TYPE: DispatchId

RETURNS DESCRIPTION
Dispatch

The dispatch.

TYPE: Dispatch

Source code in frequenz/client/dispatch/_client.py
async def get(
    self, *, microgrid_id: MicrogridId, dispatch_id: DispatchId
) -> Dispatch:
    """Get a dispatch.

    Args:
        microgrid_id: The microgrid_id to get the dispatch for.
        dispatch_id: The dispatch_id to get.

    Returns:
        Dispatch: The dispatch.
    """
    request = GetMicrogridDispatchRequest(
        dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
    )
    response = await cast(
        Awaitable[GetMicrogridDispatchResponse],
        self.stub.GetMicrogridDispatch(
            request, metadata=self._metadata, timeout=self._call_timeout_seconds
        ),
    )
    return Dispatch.from_protobuf(response.dispatch)
list async ¤
list(
    microgrid_id: MicrogridId,
    *,
    target_components: Iterator[TargetComponents] = iter(
        ()
    ),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None
) -> AsyncIterator[Iterator[Dispatch]]

List dispatches.

This function handles pagination internally and returns an async iterator over the dispatches. Pagination parameters like page_size and page_token can be used, but they are mutually exclusive.

Example usage:

client = DispatchApiClient(
    key="key",
    server_url="grpc://dispatch.url.goes.here.example.com"
)
async for page in client.list(microgrid_id=MicrogridId(1)):
    for dispatch in page:
        print(dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to list dispatches for.

TYPE: MicrogridId

target_components

optional, list of component ids or categories to filter by.

TYPE: Iterator[TargetComponents] DEFAULT: iter(())

start_from

optional, filter by start_time >= start_from.

TYPE: datetime | None DEFAULT: None

start_to

optional, filter by start_time < start_to.

TYPE: datetime | None DEFAULT: None

end_from

optional, filter by end_time >= end_from.

TYPE: datetime | None DEFAULT: None

end_to

optional, filter by end_time < end_to.

TYPE: datetime | None DEFAULT: None

active

optional, filter by active status.

TYPE: bool | None DEFAULT: None

dry_run

optional, filter by dry_run status.

TYPE: bool | None DEFAULT: None

page_size

optional, number of dispatches to return per page.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

An async iterator over pages of dispatches.

YIELDS DESCRIPTION
AsyncIterator[Iterator[Dispatch]]

A page of dispatches over which you can lazily iterate.

Source code in frequenz/client/dispatch/_client.py
async def list(
    self,
    microgrid_id: MicrogridId,
    *,
    target_components: Iterator[TargetComponents] = iter(()),
    start_from: datetime | None = None,
    start_to: datetime | None = None,
    end_from: datetime | None = None,
    end_to: datetime | None = None,
    active: bool | None = None,
    dry_run: bool | None = None,
    page_size: int | None = None,
) -> AsyncIterator[Iterator[Dispatch]]:
    """List dispatches.

    This function handles pagination internally and returns an async iterator
    over the dispatches. Pagination parameters like `page_size` and `page_token`
    can be used, but they are mutually exclusive.

    Example usage:

    ```python
    client = DispatchApiClient(
        key="key",
        server_url="grpc://dispatch.url.goes.here.example.com"
    )
    async for page in client.list(microgrid_id=MicrogridId(1)):
        for dispatch in page:
            print(dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to list dispatches for.
        target_components: optional, list of component ids or categories to filter by.
        start_from: optional, filter by start_time >= start_from.
        start_to: optional, filter by start_time < start_to.
        end_from: optional, filter by end_time >= end_from.
        end_to: optional, filter by end_time < end_to.
        active: optional, filter by active status.
        dry_run: optional, filter by dry_run status.
        page_size: optional, number of dispatches to return per page.

    Returns:
        An async iterator over pages of dispatches.

    Yields:
        A page of dispatches over which you can lazily iterate.
    """

    def to_interval(
        from_: datetime | None, to: datetime | None
    ) -> PBTimeIntervalFilter | None:
        return (
            PBTimeIntervalFilter(
                from_time=to_timestamp(from_), to_time=to_timestamp(to)
            )
            if from_ or to
            else None
        )

    # Setup parameters
    start_time_interval = to_interval(start_from, start_to)
    end_time_interval = to_interval(end_from, end_to)
    targets = list(map(_target_components_to_protobuf, target_components))
    filters = DispatchFilter(
        targets=targets,
        start_time_interval=start_time_interval,
        end_time_interval=end_time_interval,
        is_active=active,
        is_dry_run=dry_run,
    )

    request = ListMicrogridDispatchesRequest(
        microgrid_id=int(microgrid_id),
        filter=filters,
        pagination_params=(
            PaginationParams(page_size=page_size) if page_size else None
        ),
    )

    while True:
        response = await cast(
            Awaitable[ListMicrogridDispatchesResponse],
            self.stub.ListMicrogridDispatches(
                request, metadata=self._metadata, timeout=self._call_timeout_seconds
            ),
        )

        yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)

        if len(response.pagination_info.next_page_token):
            request.pagination_params.CopyFrom(
                PaginationParams(
                    page_token=response.pagination_info.next_page_token
                )
            )
        else:
            break
stream ¤
stream(
    microgrid_id: MicrogridId,
) -> Receiver[DispatchEvent]

Receive a stream of dispatch events.

This function returns a receiver channel that can be used to receive dispatch events. An event is one of [CREATE, UPDATE, DELETE].

Example usage:

client = DispatchApiClient(
    key="key",
    server_url="grpc://dispatch.url.goes.here.example.com"
)
async for message in client.stream(microgrid_id=1):
    print(message.event, message.dispatch)
PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to receive dispatches for.

TYPE: MicrogridId

RETURNS DESCRIPTION
Receiver[DispatchEvent]

A receiver channel to receive the stream of dispatch events.

Source code in frequenz/client/dispatch/_client.py
def stream(self, microgrid_id: MicrogridId) -> channels.Receiver[DispatchEvent]:
    """Receive a stream of dispatch events.

    This function returns a receiver channel that can be used to receive
    dispatch events.
    An event is one of [CREATE, UPDATE, DELETE].

    Example usage:

    ```
    client = DispatchApiClient(
        key="key",
        server_url="grpc://dispatch.url.goes.here.example.com"
    )
    async for message in client.stream(microgrid_id=1):
        print(message.event, message.dispatch)
    ```

    Args:
        microgrid_id: The microgrid_id to receive dispatches for.

    Returns:
        A receiver channel to receive the stream of dispatch events.
    """
    return self._get_stream(microgrid_id).new_receiver()
update async ¤
update(
    *,
    microgrid_id: MicrogridId,
    dispatch_id: DispatchId,
    new_fields: dict[str, Any]
) -> Dispatch

Update a dispatch.

The new_fields argument is a dictionary of fields to update. The keys are the field names, and the values are the new values for the fields.

For recurrence fields, the keys are preceeded by "recurrence.".

Note that updating type and dry_run is not supported.

PARAMETER DESCRIPTION
microgrid_id

The microgrid_id to update the dispatch for.

TYPE: MicrogridId

dispatch_id

The dispatch_id to update.

TYPE: DispatchId

new_fields

The fields to update.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Dispatch

The updated dispatch.

TYPE: Dispatch

RAISES DESCRIPTION
ValueError

If updating type or dry_run.

Source code in frequenz/client/dispatch/_client.py
async def update(
    self,
    *,
    microgrid_id: MicrogridId,
    dispatch_id: DispatchId,
    new_fields: dict[str, Any],
) -> Dispatch:
    """Update a dispatch.

    The `new_fields` argument is a dictionary of fields to update. The keys are
    the field names, and the values are the new values for the fields.

    For recurrence fields, the keys are preceeded by "recurrence.".

    Note that updating `type` and `dry_run` is not supported.

    Args:
        microgrid_id: The microgrid_id to update the dispatch for.
        dispatch_id: The dispatch_id to update.
        new_fields: The fields to update.

    Returns:
        Dispatch: The updated dispatch.

    Raises:
        ValueError: If updating `type` or `dry_run`.
    """
    msg = UpdateMicrogridDispatchRequest(
        dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
    )

    for key, val in new_fields.items():
        path = key.split(".")

        match path[0]:
            case "start_time":
                msg.update.start_time.CopyFrom(to_timestamp(val))
            case "duration":
                if val is None:
                    msg.update.ClearField("duration")
                else:
                    msg.update.duration = round(val.total_seconds())
            case "target":
                msg.update.target.CopyFrom(_target_components_to_protobuf(val))
            case "is_active":
                msg.update.is_active = val
            case "payload":
                msg.update.payload.update(val)
            case "active":
                msg.update.is_active = val
                key = "is_active"
            case "recurrence":
                match path[1]:
                    case "freq":
                        msg.update.recurrence.freq = val
                    # Proto uses "freq" instead of "frequency"
                    case "frequency":
                        msg.update.recurrence.freq = val
                        # Correct the key to "recurrence.freq"
                        key = "recurrence.freq"
                    case "interval":
                        msg.update.recurrence.interval = val
                    case "end_criteria":
                        msg.update.recurrence.end_criteria.CopyFrom(
                            val.to_protobuf()
                        )
                    case "byminutes":
                        msg.update.recurrence.byminutes.extend(val)
                    case "byhours":
                        msg.update.recurrence.byhours.extend(val)
                    case "byweekdays":
                        msg.update.recurrence.byweekdays.extend(val)
                    case "bymonthdays":
                        msg.update.recurrence.bymonthdays.extend(val)
                    case "bymonths":
                        msg.update.recurrence.bymonths.extend(val)
                    case _:
                        raise ValueError(f"Unknown recurrence field: {path[1]}")
            case _:
                raise ValueError(f"Unknown field: {path[0]}")

        msg.update_mask.paths.append(key)

    response = await cast(
        Awaitable[UpdateMicrogridDispatchResponse],
        self.stub.UpdateMicrogridDispatch(
            msg, metadata=self._metadata, timeout=self._call_timeout_seconds
        ),
    )

    return Dispatch.from_protobuf(response.dispatch)