Skip to content

reporting

frequenz.client.reporting ¤

Client to connect to the Reporting API.

This package provides a low-level interface for interacting with the reporting API.

Classes¤

frequenz.client.reporting.ReportingApiClient ¤

Bases: BaseApiClient[ReportingStub]

A client for the Reporting service.

Source code in frequenz/client/reporting/_client.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
class ReportingApiClient(BaseApiClient[ReportingStub]):
    """A client for the Reporting service."""

    # pylint: disable-next=too-many-arguments
    def __init__(
        self,
        server_url: str,
        *,
        auth_key: str | None = None,
        sign_secret: str | None = None,
        connect: bool = True,
        channel_defaults: ChannelOptions = ChannelOptions(),  # default options
    ) -> None:
        """Create a new Reporting client.

        Args:
            server_url: The URL of the Reporting service.
            auth_key: The API key for the authorization.
            sign_secret: The secret to use for HMAC signing the message
            connect: Whether to connect to the server immediately.
            channel_defaults: The default channel options.
        """
        super().__init__(
            server_url,
            ReportingStub,
            connect=connect,
            channel_defaults=channel_defaults,
            auth_key=auth_key,
            sign_secret=sign_secret,
        )

        self._components_data_streams: dict[
            tuple[
                tuple[
                    tuple[int, tuple[int, ...]], ...
                ],  # microgrid_components as a tuple of tuples
                tuple[str, ...],  # metric names
                float | None,  # start_time timestamp
                float | None,  # end_time timestamp
                int | None,  # resampling period in seconds
                bool,  # include_states
                bool,  # include_bounds
            ],
            GrpcStreamBroadcaster[
                PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
            ],
        ] = {}
        self._sensors_data_streams: dict[
            tuple[
                tuple[
                    tuple[int, tuple[int, ...]], ...
                ],  # microgrid_sensors as a tuple of tuples
                tuple[str, ...],  # metric names
                float | None,  # start_time timestamp
                float | None,  # end_time timestamp
                int | None,  # resampling period in seconds
                bool,  # include_states
            ],
            GrpcStreamBroadcaster[
                PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
            ],
        ] = {}
        self._aggregated_data_streams: dict[
            tuple[
                int,  # microgrid_id
                str,  # metric name
                str,  # aggregation_formula
                float | None,  # start_time timestamp
                float | None,  # end_time timestamp
                int | None,  # resampling period in seconds
            ],
            GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample],
        ] = {}

    @property
    def stub(self) -> ReportingStub:
        """The gRPC stub for the API."""
        if self.channel is None or self._stub is None:
            raise ClientNotConnected(server_url=self.server_url, operation="stub")
        return self._stub

    # pylint: disable=too-many-arguments
    def receive_single_component_data(
        self,
        *,
        microgrid_id: int,
        component_id: int,
        metrics: Metric | list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
        include_bounds: bool = False,
    ) -> Receiver[MetricSample]:
        """Iterate over the data for a single metric.

        Args:
            microgrid_id: The microgrid ID.
            component_id: The component ID.
            metrics: The metric name or list of metric names.
            start_time: start datetime, if None, the earliest available data will be used
            end_time: end datetime, if None starts streaming indefinitely from start_time
            resampling_period: The period for resampling the data.
            include_states: Whether to include the state data.
            include_bounds: Whether to include the bound data.

        Returns:
            A receiver of `MetricSample`s.
        """
        receiver = self._receive_microgrid_components_data_batch(
            microgrid_components=[(microgrid_id, [component_id])],
            metrics=[metrics] if isinstance(metrics, Metric) else metrics,
            start_time=start_time,
            end_time=end_time,
            resampling_period=resampling_period,
            include_states=include_states,
            include_bounds=include_bounds,
        )

        return BatchUnrollReceiver(receiver)

    # pylint: disable=too-many-arguments
    def receive_microgrid_components_data(
        self,
        *,
        microgrid_components: list[tuple[int, list[int]]],
        metrics: Metric | list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
        include_bounds: bool = False,
    ) -> Receiver[MetricSample]:
        """Iterate over the data for multiple microgrids and components.

        Args:
            microgrid_components: List of tuples where each tuple contains
                                  microgrid ID and corresponding component IDs.
            metrics: The metric name or list of metric names.
            start_time: start datetime, if None, the earliest available data will be used
            end_time: end datetime, if None starts streaming indefinitely from start_time
            resampling_period: The period for resampling the data.
            include_states: Whether to include the state data.
            include_bounds: Whether to include the bound data.

        Returns:
            A receiver of `MetricSample`s.
        """
        receiver = self._receive_microgrid_components_data_batch(
            microgrid_components=microgrid_components,
            metrics=[metrics] if isinstance(metrics, Metric) else metrics,
            start_time=start_time,
            end_time=end_time,
            resampling_period=resampling_period,
            include_states=include_states,
            include_bounds=include_bounds,
        )

        return BatchUnrollReceiver(receiver)

    # pylint: disable=too-many-arguments
    # pylint: disable=too-many-locals
    def _receive_microgrid_components_data_batch(
        self,
        *,
        microgrid_components: list[tuple[int, list[int]]],
        metrics: list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
        include_bounds: bool = False,
    ) -> Receiver[ComponentsDataBatch]:
        """Return a Receiver for the microgrid component data stream."""
        stream_key = (
            tuple((mid, tuple(cids)) for mid, cids in microgrid_components),
            tuple(metric.name for metric in metrics),
            start_time.timestamp() if start_time else None,
            end_time.timestamp() if end_time else None,
            round(resampling_period.total_seconds()) if resampling_period else None,
            include_states,
            include_bounds,
        )

        if (
            stream_key not in self._components_data_streams
            or not self._components_data_streams[stream_key].is_running
        ):
            microgrid_components_pb = [
                PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
                for mid, cids in microgrid_components
            ]

            def dt2ts(dt: datetime) -> PBTimestamp:
                ts = PBTimestamp()
                ts.FromDatetime(dt)
                return ts

            time_filter = PBTimeFilter(
                start=dt2ts(start_time) if start_time else None,
                end=dt2ts(end_time) if end_time else None,
            )

            incl_states = (
                PBFilterOption.FILTER_OPTION_INCLUDE
                if include_states
                else PBFilterOption.FILTER_OPTION_EXCLUDE
            )
            incl_bounds = (
                PBFilterOption.FILTER_OPTION_INCLUDE
                if include_bounds
                else PBFilterOption.FILTER_OPTION_EXCLUDE
            )
            include_options = (
                PBReceiveMicrogridComponentsDataStreamRequest.IncludeOptions(
                    bounds=incl_bounds,
                    states=incl_states,
                )
            )

            stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
                time_filter=time_filter,
                resampling_options=PBResamplingOptions(
                    resolution=(
                        round(resampling_period.total_seconds())
                        if resampling_period
                        else None
                    )
                ),
                include_options=include_options,
            )

            metric_conns_pb = [
                PBMetricConnections(metric=metric.to_proto(), connections=[])
                for metric in metrics
            ]

            request = PBReceiveMicrogridComponentsDataStreamRequest(
                microgrid_components=microgrid_components_pb,
                metrics=metric_conns_pb,
                filter=stream_filter,
            )

            def transform_response(
                response: PBReceiveMicrogridComponentsDataStreamResponse,
            ) -> ComponentsDataBatch:
                return ComponentsDataBatch(response)

            def stream_method() -> (
                AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse]
            ):
                call_iterator = self.stub.ReceiveMicrogridComponentsDataStream(
                    request,
                )
                return cast(
                    AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
                    call_iterator,
                )

            self._components_data_streams[stream_key] = GrpcStreamBroadcaster(
                stream_name="microgrid-components-data-stream",
                stream_method=stream_method,
                transform=transform_response,
                retry_strategy=None,
            )

        return self._components_data_streams[stream_key].new_receiver()

    # pylint: disable=too-many-arguments
    def receive_single_sensor_data(
        self,
        *,
        microgrid_id: int,
        sensor_id: int,
        metrics: Metric | list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
    ) -> Receiver[MetricSample]:
        """Iterate over the data for a single sensor and metric.

        Args:
            microgrid_id: The microgrid ID.
            sensor_id: The sensor ID.
            metrics: The metric name or list of metric names.
            start_time: start datetime, if None, the earliest available data will be used.
            end_time: end datetime, if None starts streaming indefinitely from start_time.
            resampling_period: The period for resampling the data.
            include_states: Whether to include the state data.

        Returns:
            A receiver of `MetricSample`s.
        """
        receiver = self._receive_microgrid_sensors_data_batch(
            microgrid_sensors=[(microgrid_id, [sensor_id])],
            metrics=[metrics] if isinstance(metrics, Metric) else metrics,
            start_time=start_time,
            end_time=end_time,
            resampling_period=resampling_period,
            include_states=include_states,
        )
        return BatchUnrollReceiver(receiver)

    # pylint: disable=too-many-arguments
    def receive_microgrid_sensors_data(
        self,
        *,
        microgrid_sensors: list[tuple[int, list[int]]],
        metrics: Metric | list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
    ) -> Receiver[MetricSample]:
        """Iterate over the data for multiple sensors in a microgrid.

        Args:
            microgrid_sensors: List of tuples where each tuple contains
                                microgrid ID and corresponding sensor IDs.
            metrics: The metric name or list of metric names.
            start_time: start datetime, if None, the earliest available data will be used.
            end_time: end datetime, if None starts streaming indefinitely from start_time.
            resampling_period: The period for resampling the data.
            include_states: Whether to include the state data.

        Returns:
            A receiver of `MetricSample`s.
        """
        receiver = self._receive_microgrid_sensors_data_batch(
            microgrid_sensors=microgrid_sensors,
            metrics=[metrics] if isinstance(metrics, Metric) else metrics,
            start_time=start_time,
            end_time=end_time,
            resampling_period=resampling_period,
            include_states=include_states,
        )
        return BatchUnrollReceiver(receiver)

    # pylint: disable=too-many-arguments
    # pylint: disable=too-many-locals
    def _receive_microgrid_sensors_data_batch(
        self,
        *,
        microgrid_sensors: list[tuple[int, list[int]]],
        metrics: list[Metric],
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta | None,
        include_states: bool = False,
    ) -> Receiver[SensorsDataBatch]:
        """Iterate over the sensor data batches in the stream using GrpcStreamBroadcaster.

        Args:
            microgrid_sensors: A list of tuples of microgrid IDs and sensor IDs.
            metrics: A list of metrics.
            start_time: start datetime, if None, the earliest available data will be used.
            end_time: end datetime, if None starts streaming indefinitely from start_time.
            resampling_period: The period for resampling the data.
            include_states: Whether to include the state data.

        Returns:
            A receiver of `SensorsDataBatch`s.
        """
        stream_key = (
            tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors),
            tuple(metric.name for metric in metrics),
            start_time.timestamp() if start_time else None,
            end_time.timestamp() if end_time else None,
            round(resampling_period.total_seconds()) if resampling_period else None,
            include_states,
        )

        if (
            stream_key not in self._sensors_data_streams
            or not self._sensors_data_streams[stream_key].is_running
        ):

            microgrid_sensors_pb = [
                PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids)
                for mid, sids in microgrid_sensors
            ]

            def dt2ts(dt: datetime) -> PBTimestamp:
                ts = PBTimestamp()
                ts.FromDatetime(dt)
                return ts

            time_filter = PBTimeFilter(
                start=dt2ts(start_time) if start_time else None,
                end=dt2ts(end_time) if end_time else None,
            )

            incl_states = (
                PBFilterOption.FILTER_OPTION_INCLUDE
                if include_states
                else PBFilterOption.FILTER_OPTION_EXCLUDE
            )
            include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions(
                states=incl_states,
            )

            stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter(
                time_filter=time_filter,
                resampling_options=PBResamplingOptions(
                    resolution=(
                        round(resampling_period.total_seconds())
                        if resampling_period is not None
                        else None
                    )
                ),
                include_options=include_options,
            )

            metric_conns_pb = [
                PBMetricConnections(
                    metric=metric.to_proto(),
                    connections=[],
                )
                for metric in metrics
            ]

            request = PBReceiveMicrogridSensorsDataStreamRequest(
                microgrid_sensors=microgrid_sensors_pb,
                metrics=metric_conns_pb,
                filter=stream_filter,
            )

            def transform_response(
                response: PBReceiveMicrogridSensorsDataStreamResponse,
            ) -> SensorsDataBatch:
                return SensorsDataBatch(response)

            def stream_method() -> (
                AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse]
            ):
                call_iterator = self.stub.ReceiveMicrogridSensorsDataStream(request)
                return cast(
                    AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse],
                    call_iterator,
                )

            self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster(
                stream_name="microgrid-sensors-data-stream",
                stream_method=stream_method,
                transform=transform_response,
            )

        return self._sensors_data_streams[stream_key].new_receiver()

    def receive_aggregated_data(
        self,
        *,
        microgrid_id: int,
        metric: Metric,
        aggregation_formula: str,
        start_time: datetime | None,
        end_time: datetime | None,
        resampling_period: timedelta,
    ) -> Receiver[MetricSample]:
        """Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.

        For now this only supports a single metric and aggregation formula.
        Args:
            microgrid_id: The microgrid ID.
            metric: The metric name.
            aggregation_formula: The aggregation formula.
            start_time: start datetime, if None, the earliest available data will be used
            end_time: end datetime, if None starts streaming indefinitely from start_time
            resampling_period: The period for resampling the data.

        Returns:
            A receiver of `MetricSample`s.

        Raises:
            ValueError: If the resampling_period is not provided.
        """
        stream_key = (
            microgrid_id,
            metric.name,
            aggregation_formula,
            start_time.timestamp() if start_time else None,
            end_time.timestamp() if end_time else None,
            round(resampling_period.total_seconds()) if resampling_period else None,
        )
        if (
            stream_key not in self._aggregated_data_streams
            or not self._aggregated_data_streams[stream_key].is_running
        ):

            if not resampling_period:
                raise ValueError("resampling_period must be provided")

            aggregation_config = PBAggregationConfig(
                microgrid_id=microgrid_id,
                metric=metric.to_proto(),
                aggregation_formula=aggregation_formula,
            )

            def dt2ts(dt: datetime) -> PBTimestamp:
                ts = PBTimestamp()
                ts.FromDatetime(dt)
                return ts

            time_filter = PBTimeFilter(
                start=dt2ts(start_time) if start_time else None,
                end=dt2ts(end_time) if end_time else None,
            )

            stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
                time_filter=time_filter,
                resampling_options=PBResamplingOptions(
                    resolution=round(resampling_period.total_seconds())
                ),
            )

            request = PBAggregatedStreamRequest(
                aggregation_configs=[aggregation_config],
                filter=stream_filter,
            )

            def transform_response(
                response: PBAggregatedStreamResponse,
            ) -> MetricSample:
                return AggregatedMetric(response).sample()

            def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
                call_iterator = (
                    self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
                        request,
                    )
                )

                return cast(AsyncIterable[PBAggregatedStreamResponse], call_iterator)

            self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster(
                stream_name="aggregated-microgrid-data-stream",
                stream_method=stream_method,
                transform=transform_response,
                retry_strategy=None,
            )

        return self._aggregated_data_streams[stream_key].new_receiver()
Attributes¤
channel property ¤
channel: Channel

The underlying gRPC channel used to communicate with the server.

Warning

This channel is provided as a last resort for advanced users. It is not recommended to use this property directly unless you know what you are doing and you don't care about being tied to a specific gRPC library.

RAISES DESCRIPTION
ClientNotConnected

If the client is not connected to the server.

channel_defaults property ¤
channel_defaults: ChannelOptions

The default options for the gRPC channel.

is_connected property ¤
is_connected: bool

Whether the client is connected to the server.

server_url property ¤
server_url: str

The URL of the server.

stub property ¤
stub: ReportingStub

The gRPC stub for the API.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter a context manager.

Source code in frequenz/client/base/client.py
async def __aenter__(self) -> Self:
    """Enter a context manager."""
    self.connect()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None

Exit a context manager.

Source code in frequenz/client/base/client.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: Any | None,
) -> bool | None:
    """Exit a context manager."""
    if self._channel is None:
        return None
    result = await self._channel.__aexit__(_exc_type, _exc_val, _exc_tb)
    self._channel = None
    self._stub = None
    return result
__init__ ¤
__init__(
    server_url: str,
    *,
    auth_key: str | None = None,
    sign_secret: str | None = None,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions()
) -> None

Create a new Reporting client.

PARAMETER DESCRIPTION
server_url

The URL of the Reporting service.

TYPE: str

auth_key

The API key for the authorization.

TYPE: str | None DEFAULT: None

sign_secret

The secret to use for HMAC signing the message

TYPE: str | None DEFAULT: None

connect

Whether to connect to the server immediately.

TYPE: bool DEFAULT: True

channel_defaults

The default channel options.

TYPE: ChannelOptions DEFAULT: ChannelOptions()

Source code in frequenz/client/reporting/_client.py
def __init__(
    self,
    server_url: str,
    *,
    auth_key: str | None = None,
    sign_secret: str | None = None,
    connect: bool = True,
    channel_defaults: ChannelOptions = ChannelOptions(),  # default options
) -> None:
    """Create a new Reporting client.

    Args:
        server_url: The URL of the Reporting service.
        auth_key: The API key for the authorization.
        sign_secret: The secret to use for HMAC signing the message
        connect: Whether to connect to the server immediately.
        channel_defaults: The default channel options.
    """
    super().__init__(
        server_url,
        ReportingStub,
        connect=connect,
        channel_defaults=channel_defaults,
        auth_key=auth_key,
        sign_secret=sign_secret,
    )

    self._components_data_streams: dict[
        tuple[
            tuple[
                tuple[int, tuple[int, ...]], ...
            ],  # microgrid_components as a tuple of tuples
            tuple[str, ...],  # metric names
            float | None,  # start_time timestamp
            float | None,  # end_time timestamp
            int | None,  # resampling period in seconds
            bool,  # include_states
            bool,  # include_bounds
        ],
        GrpcStreamBroadcaster[
            PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
        ],
    ] = {}
    self._sensors_data_streams: dict[
        tuple[
            tuple[
                tuple[int, tuple[int, ...]], ...
            ],  # microgrid_sensors as a tuple of tuples
            tuple[str, ...],  # metric names
            float | None,  # start_time timestamp
            float | None,  # end_time timestamp
            int | None,  # resampling period in seconds
            bool,  # include_states
        ],
        GrpcStreamBroadcaster[
            PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
        ],
    ] = {}
    self._aggregated_data_streams: dict[
        tuple[
            int,  # microgrid_id
            str,  # metric name
            str,  # aggregation_formula
            float | None,  # start_time timestamp
            float | None,  # end_time timestamp
            int | None,  # resampling period in seconds
        ],
        GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample],
    ] = {}
connect ¤
connect(
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...
) -> None

Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL, this method does nothing. If you want to force a reconnection, you can call disconnect() first.

PARAMETER DESCRIPTION
server_url

The URL of the server to connect to. If not provided, the previously used URL is used.

TYPE: str | None DEFAULT: None

auth_key

The API key to use when connecting to the service. If an Ellipsis is provided, the previously used auth_key is used.

TYPE: str | None | EllipsisType DEFAULT: ...

sign_secret

The secret to use when creating message HMAC. If an Ellipsis is provided,

TYPE: str | None | EllipsisType DEFAULT: ...

Source code in frequenz/client/base/client.py
def connect(
    self,
    server_url: str | None = None,
    *,
    auth_key: str | None | EllipsisType = ...,
    sign_secret: str | None | EllipsisType = ...,
) -> None:
    """Connect to the server, possibly using a new URL.

    If the client is already connected and the URL is the same as the previous URL,
    this method does nothing. If you want to force a reconnection, you can call
    [disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

    Args:
        server_url: The URL of the server to connect to. If not provided, the
            previously used URL is used.
        auth_key: The API key to use when connecting to the service. If an Ellipsis
            is provided, the previously used auth_key is used.
        sign_secret: The secret to use when creating message HMAC. If an Ellipsis is
            provided,
    """
    reconnect = False
    if server_url is not None and server_url != self._server_url:  # URL changed
        self._server_url = server_url
        reconnect = True
    if auth_key is not ... and auth_key != self._auth_key:
        self._auth_key = auth_key
        reconnect = True
    if sign_secret is not ... and sign_secret != self._sign_secret:
        self._sign_secret = sign_secret
        reconnect = True
    if self.is_connected and not reconnect:  # Desired connection already exists
        return

    interceptors: list[ClientInterceptor] = []
    if self._auth_key is not None:
        interceptors += [
            AuthenticationInterceptorUnaryUnary(self._auth_key),  # type: ignore [list-item]
            AuthenticationInterceptorUnaryStream(self._auth_key),  # type: ignore [list-item]
        ]
    if self._sign_secret is not None:
        interceptors += [
            SigningInterceptorUnaryUnary(self._sign_secret),  # type: ignore [list-item]
            SigningInterceptorUnaryStream(self._sign_secret),  # type: ignore [list-item]
        ]

    self._channel = parse_grpc_uri(
        self._server_url,
        interceptors,
        defaults=self._channel_defaults,
    )
    self._stub = self._create_stub(self._channel)
disconnect async ¤
disconnect() -> None

Disconnect from the server.

If the client is not connected, this method does nothing.

Source code in frequenz/client/base/client.py
async def disconnect(self) -> None:
    """Disconnect from the server.

    If the client is not connected, this method does nothing.
    """
    await self.__aexit__(None, None, None)
receive_aggregated_data ¤
receive_aggregated_data(
    *,
    microgrid_id: int,
    metric: Metric,
    aggregation_formula: str,
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta
) -> Receiver[MetricSample]

Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.

For now this only supports a single metric and aggregation formula. Args: microgrid_id: The microgrid ID. metric: The metric name. aggregation_formula: The aggregation formula. start_time: start datetime, if None, the earliest available data will be used end_time: end datetime, if None starts streaming indefinitely from start_time resampling_period: The period for resampling the data.

RETURNS DESCRIPTION
Receiver[MetricSample]

A receiver of MetricSamples.

RAISES DESCRIPTION
ValueError

If the resampling_period is not provided.

Source code in frequenz/client/reporting/_client.py
def receive_aggregated_data(
    self,
    *,
    microgrid_id: int,
    metric: Metric,
    aggregation_formula: str,
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta,
) -> Receiver[MetricSample]:
    """Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.

    For now this only supports a single metric and aggregation formula.
    Args:
        microgrid_id: The microgrid ID.
        metric: The metric name.
        aggregation_formula: The aggregation formula.
        start_time: start datetime, if None, the earliest available data will be used
        end_time: end datetime, if None starts streaming indefinitely from start_time
        resampling_period: The period for resampling the data.

    Returns:
        A receiver of `MetricSample`s.

    Raises:
        ValueError: If the resampling_period is not provided.
    """
    stream_key = (
        microgrid_id,
        metric.name,
        aggregation_formula,
        start_time.timestamp() if start_time else None,
        end_time.timestamp() if end_time else None,
        round(resampling_period.total_seconds()) if resampling_period else None,
    )
    if (
        stream_key not in self._aggregated_data_streams
        or not self._aggregated_data_streams[stream_key].is_running
    ):

        if not resampling_period:
            raise ValueError("resampling_period must be provided")

        aggregation_config = PBAggregationConfig(
            microgrid_id=microgrid_id,
            metric=metric.to_proto(),
            aggregation_formula=aggregation_formula,
        )

        def dt2ts(dt: datetime) -> PBTimestamp:
            ts = PBTimestamp()
            ts.FromDatetime(dt)
            return ts

        time_filter = PBTimeFilter(
            start=dt2ts(start_time) if start_time else None,
            end=dt2ts(end_time) if end_time else None,
        )

        stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
            time_filter=time_filter,
            resampling_options=PBResamplingOptions(
                resolution=round(resampling_period.total_seconds())
            ),
        )

        request = PBAggregatedStreamRequest(
            aggregation_configs=[aggregation_config],
            filter=stream_filter,
        )

        def transform_response(
            response: PBAggregatedStreamResponse,
        ) -> MetricSample:
            return AggregatedMetric(response).sample()

        def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
            call_iterator = (
                self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
                    request,
                )
            )

            return cast(AsyncIterable[PBAggregatedStreamResponse], call_iterator)

        self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster(
            stream_name="aggregated-microgrid-data-stream",
            stream_method=stream_method,
            transform=transform_response,
            retry_strategy=None,
        )

    return self._aggregated_data_streams[stream_key].new_receiver()
receive_microgrid_components_data ¤
receive_microgrid_components_data(
    *,
    microgrid_components: list[tuple[int, list[int]]],
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
    include_bounds: bool = False
) -> Receiver[MetricSample]

Iterate over the data for multiple microgrids and components.

PARAMETER DESCRIPTION
microgrid_components

List of tuples where each tuple contains microgrid ID and corresponding component IDs.

TYPE: list[tuple[int, list[int]]]

metrics

The metric name or list of metric names.

TYPE: Metric | list[Metric]

start_time

start datetime, if None, the earliest available data will be used

TYPE: datetime | None

end_time

end datetime, if None starts streaming indefinitely from start_time

TYPE: datetime | None

resampling_period

The period for resampling the data.

TYPE: timedelta | None

include_states

Whether to include the state data.

TYPE: bool DEFAULT: False

include_bounds

Whether to include the bound data.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Receiver[MetricSample]

A receiver of MetricSamples.

Source code in frequenz/client/reporting/_client.py
def receive_microgrid_components_data(
    self,
    *,
    microgrid_components: list[tuple[int, list[int]]],
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
    include_bounds: bool = False,
) -> Receiver[MetricSample]:
    """Iterate over the data for multiple microgrids and components.

    Args:
        microgrid_components: List of tuples where each tuple contains
                              microgrid ID and corresponding component IDs.
        metrics: The metric name or list of metric names.
        start_time: start datetime, if None, the earliest available data will be used
        end_time: end datetime, if None starts streaming indefinitely from start_time
        resampling_period: The period for resampling the data.
        include_states: Whether to include the state data.
        include_bounds: Whether to include the bound data.

    Returns:
        A receiver of `MetricSample`s.
    """
    receiver = self._receive_microgrid_components_data_batch(
        microgrid_components=microgrid_components,
        metrics=[metrics] if isinstance(metrics, Metric) else metrics,
        start_time=start_time,
        end_time=end_time,
        resampling_period=resampling_period,
        include_states=include_states,
        include_bounds=include_bounds,
    )

    return BatchUnrollReceiver(receiver)
receive_microgrid_sensors_data ¤
receive_microgrid_sensors_data(
    *,
    microgrid_sensors: list[tuple[int, list[int]]],
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False
) -> Receiver[MetricSample]

Iterate over the data for multiple sensors in a microgrid.

PARAMETER DESCRIPTION
microgrid_sensors

List of tuples where each tuple contains microgrid ID and corresponding sensor IDs.

TYPE: list[tuple[int, list[int]]]

metrics

The metric name or list of metric names.

TYPE: Metric | list[Metric]

start_time

start datetime, if None, the earliest available data will be used.

TYPE: datetime | None

end_time

end datetime, if None starts streaming indefinitely from start_time.

TYPE: datetime | None

resampling_period

The period for resampling the data.

TYPE: timedelta | None

include_states

Whether to include the state data.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Receiver[MetricSample]

A receiver of MetricSamples.

Source code in frequenz/client/reporting/_client.py
def receive_microgrid_sensors_data(
    self,
    *,
    microgrid_sensors: list[tuple[int, list[int]]],
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
) -> Receiver[MetricSample]:
    """Iterate over the data for multiple sensors in a microgrid.

    Args:
        microgrid_sensors: List of tuples where each tuple contains
                            microgrid ID and corresponding sensor IDs.
        metrics: The metric name or list of metric names.
        start_time: start datetime, if None, the earliest available data will be used.
        end_time: end datetime, if None starts streaming indefinitely from start_time.
        resampling_period: The period for resampling the data.
        include_states: Whether to include the state data.

    Returns:
        A receiver of `MetricSample`s.
    """
    receiver = self._receive_microgrid_sensors_data_batch(
        microgrid_sensors=microgrid_sensors,
        metrics=[metrics] if isinstance(metrics, Metric) else metrics,
        start_time=start_time,
        end_time=end_time,
        resampling_period=resampling_period,
        include_states=include_states,
    )
    return BatchUnrollReceiver(receiver)
receive_single_component_data ¤
receive_single_component_data(
    *,
    microgrid_id: int,
    component_id: int,
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
    include_bounds: bool = False
) -> Receiver[MetricSample]

Iterate over the data for a single metric.

PARAMETER DESCRIPTION
microgrid_id

The microgrid ID.

TYPE: int

component_id

The component ID.

TYPE: int

metrics

The metric name or list of metric names.

TYPE: Metric | list[Metric]

start_time

start datetime, if None, the earliest available data will be used

TYPE: datetime | None

end_time

end datetime, if None starts streaming indefinitely from start_time

TYPE: datetime | None

resampling_period

The period for resampling the data.

TYPE: timedelta | None

include_states

Whether to include the state data.

TYPE: bool DEFAULT: False

include_bounds

Whether to include the bound data.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Receiver[MetricSample]

A receiver of MetricSamples.

Source code in frequenz/client/reporting/_client.py
def receive_single_component_data(
    self,
    *,
    microgrid_id: int,
    component_id: int,
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
    include_bounds: bool = False,
) -> Receiver[MetricSample]:
    """Iterate over the data for a single metric.

    Args:
        microgrid_id: The microgrid ID.
        component_id: The component ID.
        metrics: The metric name or list of metric names.
        start_time: start datetime, if None, the earliest available data will be used
        end_time: end datetime, if None starts streaming indefinitely from start_time
        resampling_period: The period for resampling the data.
        include_states: Whether to include the state data.
        include_bounds: Whether to include the bound data.

    Returns:
        A receiver of `MetricSample`s.
    """
    receiver = self._receive_microgrid_components_data_batch(
        microgrid_components=[(microgrid_id, [component_id])],
        metrics=[metrics] if isinstance(metrics, Metric) else metrics,
        start_time=start_time,
        end_time=end_time,
        resampling_period=resampling_period,
        include_states=include_states,
        include_bounds=include_bounds,
    )

    return BatchUnrollReceiver(receiver)
receive_single_sensor_data ¤
receive_single_sensor_data(
    *,
    microgrid_id: int,
    sensor_id: int,
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False
) -> Receiver[MetricSample]

Iterate over the data for a single sensor and metric.

PARAMETER DESCRIPTION
microgrid_id

The microgrid ID.

TYPE: int

sensor_id

The sensor ID.

TYPE: int

metrics

The metric name or list of metric names.

TYPE: Metric | list[Metric]

start_time

start datetime, if None, the earliest available data will be used.

TYPE: datetime | None

end_time

end datetime, if None starts streaming indefinitely from start_time.

TYPE: datetime | None

resampling_period

The period for resampling the data.

TYPE: timedelta | None

include_states

Whether to include the state data.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Receiver[MetricSample]

A receiver of MetricSamples.

Source code in frequenz/client/reporting/_client.py
def receive_single_sensor_data(
    self,
    *,
    microgrid_id: int,
    sensor_id: int,
    metrics: Metric | list[Metric],
    start_time: datetime | None,
    end_time: datetime | None,
    resampling_period: timedelta | None,
    include_states: bool = False,
) -> Receiver[MetricSample]:
    """Iterate over the data for a single sensor and metric.

    Args:
        microgrid_id: The microgrid ID.
        sensor_id: The sensor ID.
        metrics: The metric name or list of metric names.
        start_time: start datetime, if None, the earliest available data will be used.
        end_time: end datetime, if None starts streaming indefinitely from start_time.
        resampling_period: The period for resampling the data.
        include_states: Whether to include the state data.

    Returns:
        A receiver of `MetricSample`s.
    """
    receiver = self._receive_microgrid_sensors_data_batch(
        microgrid_sensors=[(microgrid_id, [sensor_id])],
        metrics=[metrics] if isinstance(metrics, Metric) else metrics,
        start_time=start_time,
        end_time=end_time,
        resampling_period=resampling_period,
        include_states=include_states,
    )
    return BatchUnrollReceiver(receiver)