Skip to content

client

frequenz.sdk.microgrid.client ¤

Microgrid API client.

This package provides an easy way to connect to the microgrid API.

Classes¤

frequenz.sdk.microgrid.client.Connection ¤

Bases: NamedTuple

Metadata for a connection between microgrid components.

Source code in frequenz/sdk/microgrid/client/_connection.py
class Connection(NamedTuple):
    """Metadata for a connection between microgrid components."""

    start: int
    end: int

    def is_valid(self) -> bool:
        """Check if this instance contains valid data.

        Returns:
            `True` if `start >= 0`, `end > 0`, and `start != end`, `False`
                otherwise.
        """
        return self.start >= 0 and self.end > 0 and self.start != self.end
Functions¤
is_valid() ¤

Check if this instance contains valid data.

RETURNS DESCRIPTION
bool

True if start >= 0, end > 0, and start != end, False otherwise.

Source code in frequenz/sdk/microgrid/client/_connection.py
def is_valid(self) -> bool:
    """Check if this instance contains valid data.

    Returns:
        `True` if `start >= 0`, `end > 0`, and `start != end`, `False`
            otherwise.
    """
    return self.start >= 0 and self.end > 0 and self.start != self.end

frequenz.sdk.microgrid.client.ExponentialBackoff ¤

Bases: RetryStrategy

Provides methods for calculating the exponential interval between retries.

Source code in frequenz/sdk/microgrid/client/_retry.py
class ExponentialBackoff(RetryStrategy):
    """Provides methods for calculating the exponential interval between retries."""

    DEFAULT_INTERVAL = DEFAULT_RETRY_INTERVAL
    DEFAULT_MAX_INTERVAL = 60.0
    DEFAULT_MULTIPLIER = 2.0

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        initial_interval: float = DEFAULT_INTERVAL,
        max_interval: float = DEFAULT_MAX_INTERVAL,
        multiplier: float = DEFAULT_MULTIPLIER,
        jitter: float = DEFAULT_RETRY_JITTER,
        limit: Optional[int] = None,
    ) -> None:
        """Create a `ExponentialBackoff` instance.

        Args:
            initial_interval: time to wait for before the first retry, in
                seconds.
            max_interval: maximum interval, in seconds.
            multiplier: exponential increment for interval.
            jitter: a jitter to add to the retry interval.
            limit: max number of retries before giving up.  `None` means no
                limit, and `0` means no retry.
        """
        self._initial = initial_interval
        self._max = max_interval
        self._multiplier = multiplier
        self._jitter = jitter
        self._limit = limit

        self._count = 0

    def next_interval(self) -> Optional[float]:
        """Return the time to wait before the next retry.

        Returns `None` if the retry limit has been reached, and no more retries
        are possible.

        Returns:
            Time until next retry when below retry limit, and None otherwise.
        """
        if self._limit is not None and self._count >= self._limit:
            return None
        self._count += 1
        exp_backoff_interval = self._initial * self._multiplier ** (self._count - 1)
        return min(exp_backoff_interval + random.uniform(0.0, self._jitter), self._max)
Functions¤
__init__(initial_interval=DEFAULT_INTERVAL, max_interval=DEFAULT_MAX_INTERVAL, multiplier=DEFAULT_MULTIPLIER, jitter=DEFAULT_RETRY_JITTER, limit=None) ¤

Create a ExponentialBackoff instance.

PARAMETER DESCRIPTION
initial_interval

time to wait for before the first retry, in seconds.

TYPE: float DEFAULT: DEFAULT_INTERVAL

max_interval

maximum interval, in seconds.

TYPE: float DEFAULT: DEFAULT_MAX_INTERVAL

multiplier

exponential increment for interval.

TYPE: float DEFAULT: DEFAULT_MULTIPLIER

jitter

a jitter to add to the retry interval.

TYPE: float DEFAULT: DEFAULT_RETRY_JITTER

limit

max number of retries before giving up. None means no limit, and 0 means no retry.

TYPE: Optional[int] DEFAULT: None

Source code in frequenz/sdk/microgrid/client/_retry.py
def __init__(
    self,
    initial_interval: float = DEFAULT_INTERVAL,
    max_interval: float = DEFAULT_MAX_INTERVAL,
    multiplier: float = DEFAULT_MULTIPLIER,
    jitter: float = DEFAULT_RETRY_JITTER,
    limit: Optional[int] = None,
) -> None:
    """Create a `ExponentialBackoff` instance.

    Args:
        initial_interval: time to wait for before the first retry, in
            seconds.
        max_interval: maximum interval, in seconds.
        multiplier: exponential increment for interval.
        jitter: a jitter to add to the retry interval.
        limit: max number of retries before giving up.  `None` means no
            limit, and `0` means no retry.
    """
    self._initial = initial_interval
    self._max = max_interval
    self._multiplier = multiplier
    self._jitter = jitter
    self._limit = limit

    self._count = 0
next_interval() ¤

Return the time to wait before the next retry.

Returns None if the retry limit has been reached, and no more retries are possible.

RETURNS DESCRIPTION
Optional[float]

Time until next retry when below retry limit, and None otherwise.

Source code in frequenz/sdk/microgrid/client/_retry.py
def next_interval(self) -> Optional[float]:
    """Return the time to wait before the next retry.

    Returns `None` if the retry limit has been reached, and no more retries
    are possible.

    Returns:
        Time until next retry when below retry limit, and None otherwise.
    """
    if self._limit is not None and self._count >= self._limit:
        return None
    self._count += 1
    exp_backoff_interval = self._initial * self._multiplier ** (self._count - 1)
    return min(exp_backoff_interval + random.uniform(0.0, self._jitter), self._max)

frequenz.sdk.microgrid.client.LinearBackoff ¤

Bases: RetryStrategy

Provides methods for calculating the interval between retries.

Source code in frequenz/sdk/microgrid/client/_retry.py
class LinearBackoff(RetryStrategy):
    """Provides methods for calculating the interval between retries."""

    def __init__(
        self,
        interval: float = DEFAULT_RETRY_INTERVAL,
        jitter: float = DEFAULT_RETRY_JITTER,
        limit: Optional[int] = None,
    ) -> None:
        """Create a `LinearBackoff` instance.

        Args:
            interval: time to wait for before the next retry, in seconds.
            jitter: a jitter to add to the retry interval.
            limit: max number of retries before giving up.  `None` means no
                limit, and `0` means no retry.
        """
        self._interval = interval
        self._jitter = jitter
        self._limit = limit

        self._count = 0

    def next_interval(self) -> Optional[float]:
        """Return the time to wait before the next retry.

        Returns `None` if the retry limit has been reached, and no more retries
        are possible.

        Returns:
            Time until next retry when below retry limit, and None otherwise.
        """
        if self._limit is not None and self._count >= self._limit:
            return None
        self._count += 1
        return self._interval + random.uniform(0.0, self._jitter)
Functions¤
__init__(interval=DEFAULT_RETRY_INTERVAL, jitter=DEFAULT_RETRY_JITTER, limit=None) ¤

Create a LinearBackoff instance.

PARAMETER DESCRIPTION
interval

time to wait for before the next retry, in seconds.

TYPE: float DEFAULT: DEFAULT_RETRY_INTERVAL

jitter

a jitter to add to the retry interval.

TYPE: float DEFAULT: DEFAULT_RETRY_JITTER

limit

max number of retries before giving up. None means no limit, and 0 means no retry.

TYPE: Optional[int] DEFAULT: None

Source code in frequenz/sdk/microgrid/client/_retry.py
def __init__(
    self,
    interval: float = DEFAULT_RETRY_INTERVAL,
    jitter: float = DEFAULT_RETRY_JITTER,
    limit: Optional[int] = None,
) -> None:
    """Create a `LinearBackoff` instance.

    Args:
        interval: time to wait for before the next retry, in seconds.
        jitter: a jitter to add to the retry interval.
        limit: max number of retries before giving up.  `None` means no
            limit, and `0` means no retry.
    """
    self._interval = interval
    self._jitter = jitter
    self._limit = limit

    self._count = 0
next_interval() ¤

Return the time to wait before the next retry.

Returns None if the retry limit has been reached, and no more retries are possible.

RETURNS DESCRIPTION
Optional[float]

Time until next retry when below retry limit, and None otherwise.

Source code in frequenz/sdk/microgrid/client/_retry.py
def next_interval(self) -> Optional[float]:
    """Return the time to wait before the next retry.

    Returns `None` if the retry limit has been reached, and no more retries
    are possible.

    Returns:
        Time until next retry when below retry limit, and None otherwise.
    """
    if self._limit is not None and self._count >= self._limit:
        return None
    self._count += 1
    return self._interval + random.uniform(0.0, self._jitter)

frequenz.sdk.microgrid.client.MicrogridApiClient ¤

Bases: ABC

Base interface for microgrid API clients to implement.

Source code in frequenz/sdk/microgrid/client/_client.py
class MicrogridApiClient(ABC):
    """Base interface for microgrid API clients to implement."""

    @abstractmethod
    async def components(self) -> Iterable[Component]:
        """Fetch all the components present in the microgrid.

        Returns:
            Iterator whose elements are all the components in the microgrid.
        """

    @abstractmethod
    async def connections(
        self,
        starts: Optional[Set[int]] = None,
        ends: Optional[Set[int]] = None,
    ) -> Iterable[Connection]:
        """Fetch the connections between components in the microgrid.

        Args:
            starts: if set and non-empty, only include connections whose start
                value matches one of the provided component IDs
            ends: if set and non-empty, only include connections whose end value
                matches one of the provided component IDs

        Returns:
            Microgrid connections matching the provided start and end filters.
        """

    @abstractmethod
    async def meter_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[MeterData]:
        """Return a channel receiver that provides a `MeterData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Args:
            component_id: id of the meter to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime meter data.
        """

    @abstractmethod
    async def battery_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[BatteryData]:
        """Return a channel receiver that provides a `BatteryData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Args:
            component_id: id of the battery to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime battery data.
        """

    @abstractmethod
    async def inverter_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[InverterData]:
        """Return a channel receiver that provides an `InverterData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Args:
            component_id: id of the inverter to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime inverter data.
        """

    @abstractmethod
    async def ev_charger_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[EVChargerData]:
        """Return a channel receiver that provides an `EvChargeData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Args:
            component_id: id of the ev charger to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime ev charger data.
        """

    @abstractmethod
    async def set_power(self, component_id: int, power_w: float) -> None:
        """Send request to the Microgrid to set power for component.

        If power > 0, then component will be charged with this power.
        If power < 0, then component will be discharged with this power.
        If power == 0, then stop charging or discharging component.


        Args:
            component_id: id of the component to set power.
            power_w: power to set for the component.
        """

    @abstractmethod
    async def set_bounds(self, component_id: int, lower: float, upper: float) -> None:
        """Send `SetBoundsParam`s received from a channel to nitrogen.

        Args:
            component_id: ID of the component to set bounds for.
            lower: Lower bound to be set for the component.
            upper: Upper bound to be set for the component.
        """
Functions¤
battery_data(component_id, maxsize=RECEIVER_MAX_SIZE) abstractmethod async ¤

Return a channel receiver that provides a BatteryData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

PARAMETER DESCRIPTION
component_id

id of the battery to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[BatteryData]

A channel receiver that provides realtime battery data.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def battery_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[BatteryData]:
    """Return a channel receiver that provides a `BatteryData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Args:
        component_id: id of the battery to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime battery data.
    """
components() abstractmethod async ¤

Fetch all the components present in the microgrid.

RETURNS DESCRIPTION
Iterable[Component]

Iterator whose elements are all the components in the microgrid.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def components(self) -> Iterable[Component]:
    """Fetch all the components present in the microgrid.

    Returns:
        Iterator whose elements are all the components in the microgrid.
    """
connections(starts=None, ends=None) abstractmethod async ¤

Fetch the connections between components in the microgrid.

PARAMETER DESCRIPTION
starts

if set and non-empty, only include connections whose start value matches one of the provided component IDs

TYPE: Optional[Set[int]] DEFAULT: None

ends

if set and non-empty, only include connections whose end value matches one of the provided component IDs

TYPE: Optional[Set[int]] DEFAULT: None

RETURNS DESCRIPTION
Iterable[Connection]

Microgrid connections matching the provided start and end filters.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def connections(
    self,
    starts: Optional[Set[int]] = None,
    ends: Optional[Set[int]] = None,
) -> Iterable[Connection]:
    """Fetch the connections between components in the microgrid.

    Args:
        starts: if set and non-empty, only include connections whose start
            value matches one of the provided component IDs
        ends: if set and non-empty, only include connections whose end value
            matches one of the provided component IDs

    Returns:
        Microgrid connections matching the provided start and end filters.
    """
ev_charger_data(component_id, maxsize=RECEIVER_MAX_SIZE) abstractmethod async ¤

Return a channel receiver that provides an EvChargeData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

PARAMETER DESCRIPTION
component_id

id of the ev charger to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[EVChargerData]

A channel receiver that provides realtime ev charger data.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def ev_charger_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[EVChargerData]:
    """Return a channel receiver that provides an `EvChargeData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Args:
        component_id: id of the ev charger to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime ev charger data.
    """
inverter_data(component_id, maxsize=RECEIVER_MAX_SIZE) abstractmethod async ¤

Return a channel receiver that provides an InverterData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

PARAMETER DESCRIPTION
component_id

id of the inverter to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[InverterData]

A channel receiver that provides realtime inverter data.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def inverter_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[InverterData]:
    """Return a channel receiver that provides an `InverterData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Args:
        component_id: id of the inverter to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime inverter data.
    """
meter_data(component_id, maxsize=RECEIVER_MAX_SIZE) abstractmethod async ¤

Return a channel receiver that provides a MeterData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

PARAMETER DESCRIPTION
component_id

id of the meter to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[MeterData]

A channel receiver that provides realtime meter data.

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def meter_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[MeterData]:
    """Return a channel receiver that provides a `MeterData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Args:
        component_id: id of the meter to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime meter data.
    """
set_bounds(component_id, lower, upper) abstractmethod async ¤

Send SetBoundsParams received from a channel to nitrogen.

PARAMETER DESCRIPTION
component_id

ID of the component to set bounds for.

TYPE: int

lower

Lower bound to be set for the component.

TYPE: float

upper

Upper bound to be set for the component.

TYPE: float

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def set_bounds(self, component_id: int, lower: float, upper: float) -> None:
    """Send `SetBoundsParam`s received from a channel to nitrogen.

    Args:
        component_id: ID of the component to set bounds for.
        lower: Lower bound to be set for the component.
        upper: Upper bound to be set for the component.
    """
set_power(component_id, power_w) abstractmethod async ¤

Send request to the Microgrid to set power for component.

If power > 0, then component will be charged with this power. If power < 0, then component will be discharged with this power. If power == 0, then stop charging or discharging component.

PARAMETER DESCRIPTION
component_id

id of the component to set power.

TYPE: int

power_w

power to set for the component.

TYPE: float

Source code in frequenz/sdk/microgrid/client/_client.py
@abstractmethod
async def set_power(self, component_id: int, power_w: float) -> None:
    """Send request to the Microgrid to set power for component.

    If power > 0, then component will be charged with this power.
    If power < 0, then component will be discharged with this power.
    If power == 0, then stop charging or discharging component.


    Args:
        component_id: id of the component to set power.
        power_w: power to set for the component.
    """

frequenz.sdk.microgrid.client.MicrogridGrpcClient ¤

Bases: MicrogridApiClient

Microgrid API client implementation using gRPC as the underlying protocol.

Source code in frequenz/sdk/microgrid/client/_client.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
class MicrogridGrpcClient(MicrogridApiClient):
    """Microgrid API client implementation using gRPC as the underlying protocol."""

    def __init__(
        self,
        grpc_channel: grpc.aio.Channel,
        target: str,
        retry_spec: RetryStrategy = LinearBackoff(),
    ) -> None:
        """Initialize the class instance.

        Args:
            grpc_channel: asyncio-supporting gRPC channel
            target: server (host:port) to be used for asyncio-supporting gRPC
                channel that the client should use to contact the API
            retry_spec: Specs on how to retry if the connection to a streaming
                method gets lost.
        """
        self.target = target
        self.api = MicrogridStub(grpc_channel)
        self._component_streams: Dict[int, Broadcast[Any]] = {}
        self._streaming_tasks: Dict[int, asyncio.Task[None]] = {}
        self._retry_spec = retry_spec

    async def components(self) -> Iterable[Component]:
        """Fetch all the components present in the microgrid.

        Returns:
            Iterator whose elements are all the components in the microgrid.

        Raises:
            AioRpcError: if connection to Microgrid API cannot be established or
                when the api call exceeded timeout
        """
        try:
            # grpc.aio is missing types and mypy thinks this is not awaitable,
            # but it is
            component_list = await self.api.ListComponents(
                microgrid_pb.ComponentFilter(),
                timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
            )  # type: ignore[misc]
        except grpc.aio.AioRpcError as err:
            msg = f"Failed to list components. Microgrid API: {self.target}. Err: {err.details()}"
            raise grpc.aio.AioRpcError(
                code=err.code(),
                initial_metadata=err.initial_metadata(),
                trailing_metadata=err.trailing_metadata(),
                details=msg,
                debug_error_string=err.debug_error_string(),
            )
        components_only = filter(
            lambda c: c.category
            is not components_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR,
            component_list.components,
        )
        result: Iterable[Component] = map(
            lambda c: Component(
                c.id,
                _component_category_from_protobuf(c.category),
                _component_type_from_protobuf(c.category, c.inverter),
            ),
            components_only,
        )

        return result

    async def connections(
        self,
        starts: Optional[Set[int]] = None,
        ends: Optional[Set[int]] = None,
    ) -> Iterable[Connection]:
        """Fetch the connections between components in the microgrid.

        Args:
            starts: if set and non-empty, only include connections whose start
                value matches one of the provided component IDs
            ends: if set and non-empty, only include connections whose end value
                matches one of the provided component IDs

        Returns:
            Microgrid connections matching the provided start and end filters.

        Raises:
            AioRpcError: if connection to Microgrid API cannot be established or
                when the api call exceeded timeout
        """
        connection_filter = microgrid_pb.ConnectionFilter(starts=starts, ends=ends)
        try:
            valid_components, all_connections = await asyncio.gather(
                self.components(),
                # grpc.aio is missing types and mypy thinks this is not
                # awaitable, but it is
                cast(
                    Awaitable[microgrid_pb.ConnectionList],
                    self.api.ListConnections(
                        connection_filter,
                        timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
                    ),
                ),
            )
        except grpc.aio.AioRpcError as err:
            msg = f"Failed to list connections. Microgrid API: {self.target}. Err: {err.details()}"
            raise grpc.aio.AioRpcError(
                code=err.code(),
                initial_metadata=err.initial_metadata(),
                trailing_metadata=err.trailing_metadata(),
                details=msg,
                debug_error_string=err.debug_error_string(),
            )
        # Filter out the components filtered in `components` method.
        # id=0 is an exception indicating grid component.
        valid_ids = {c.component_id for c in valid_components}
        valid_ids.add(0)

        connections = filter(
            lambda c: (c.start in valid_ids and c.end in valid_ids),
            all_connections.connections,
        )

        result: Iterable[Connection] = map(
            lambda c: Connection(c.start, c.end), connections
        )

        return result

    async def _component_data_task(
        self,
        component_id: int,
        transform: Callable[[microgrid_pb.ComponentData], _GenericComponentData],
        sender: Sender[_GenericComponentData],
    ) -> None:
        """Read data from the microgrid API and send to a channel.

        Args:
            component_id: id of the component to get data for.
            transform: A method for transforming raw component data into the
                desired output type.
            sender: A channel sender, to send the component data to.

        Raises:
            AioRpcError: if connection to Microgrid API cannot be established
        """
        retry_spec: RetryStrategy = self._retry_spec.copy()
        while True:
            _logger.debug(
                "Making call to `GetComponentData`, for component_id=%d", component_id
            )
            try:
                call = self.api.StreamComponentData(
                    microgrid_pb.ComponentIdParam(id=component_id),
                )
                # grpc.aio is missing types and mypy thinks this is not
                # async iterable, but it is
                async for msg in call:  # type: ignore[attr-defined]
                    await sender.send(transform(msg))
            except grpc.aio.AioRpcError as err:
                api_details = f"Microgrid API: {self.target}."
                _logger.exception(
                    "`GetComponentData`, for component_id=%d: exception: %s api: %s",
                    component_id,
                    err,
                    api_details,
                )

            if interval := retry_spec.next_interval():
                _logger.warning(
                    "`GetComponentData`, for component_id=%d: connection ended, "
                    "retrying %s in %0.3f seconds.",
                    component_id,
                    retry_spec.get_progress(),
                    interval,
                )
                await asyncio.sleep(interval)
            else:
                _logger.warning(
                    "`GetComponentData`, for component_id=%d: connection ended, "
                    "retry limit exceeded %s.",
                    component_id,
                    retry_spec.get_progress(),
                )
                break

    def _get_component_data_channel(
        self,
        component_id: int,
        transform: Callable[[microgrid_pb.ComponentData], _GenericComponentData],
    ) -> Broadcast[_GenericComponentData]:
        """Return the broadcast channel for a given component_id.

        If a broadcast channel for the given component_id doesn't exist, create
        a new channel and a task for reading data from the microgrid api and
        sending them to the channel.

        Args:
            component_id: id of the component to get data for.
            transform: A method for transforming raw component data into the
                desired output type.

        Returns:
            The channel for the given component_id.
        """
        if component_id in self._component_streams:
            return self._component_streams[component_id]
        task_name = f"raw-component-data-{component_id}"
        chan = Broadcast[_GenericComponentData](task_name)
        self._component_streams[component_id] = chan

        self._streaming_tasks[component_id] = asyncio.create_task(
            self._component_data_task(
                component_id,
                transform,
                chan.new_sender(),
            ),
            name=task_name,
        )
        return chan

    async def _expect_category(
        self,
        component_id: int,
        expected_category: ComponentCategory,
    ) -> None:
        """Check if the given component_id is of the expected type.

        Raises:
            ValueError: if the given id is unknown or has a different type.

        Args:
            component_id: Component id to check.
            expected_category: Component category that the given id is expected
                to have.
        """
        try:
            comp = next(
                comp
                for comp in await self.components()
                if comp.component_id == component_id
            )
        except StopIteration as exc:
            raise ValueError(
                f"Unable to find component with id {component_id}"
            ) from exc

        if comp.category != expected_category:
            raise ValueError(
                f"Component id {component_id} is a {comp.category}"
                f", not a {expected_category}."
            )

    async def meter_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[MeterData]:
        """Return a channel receiver that provides a `MeterData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Raises:
            ValueError: if the given id is unknown or has a different type.

        Args:
            component_id: id of the meter to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime meter data.
        """
        await self._expect_category(
            component_id,
            ComponentCategory.METER,
        )
        return self._get_component_data_channel(
            component_id,
            MeterData.from_proto,
        ).new_receiver(maxsize=maxsize)

    async def battery_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[BatteryData]:
        """Return a channel receiver that provides a `BatteryData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Raises:
            ValueError: if the given id is unknown or has a different type.

        Args:
            component_id: id of the battery to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime battery data.
        """
        await self._expect_category(
            component_id,
            ComponentCategory.BATTERY,
        )
        return self._get_component_data_channel(
            component_id,
            BatteryData.from_proto,
        ).new_receiver(maxsize=maxsize)

    async def inverter_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[InverterData]:
        """Return a channel receiver that provides an `InverterData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Raises:
            ValueError: if the given id is unknown or has a different type.

        Args:
            component_id: id of the inverter to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime inverter data.
        """
        await self._expect_category(
            component_id,
            ComponentCategory.INVERTER,
        )
        return self._get_component_data_channel(
            component_id,
            InverterData.from_proto,
        ).new_receiver(maxsize=maxsize)

    async def ev_charger_data(
        self,
        component_id: int,
        maxsize: int = RECEIVER_MAX_SIZE,
    ) -> Receiver[EVChargerData]:
        """Return a channel receiver that provides an `EvChargeData` stream.

        If only the latest value is required, the `Receiver` returned by this
        method can be converted into a `Peekable` with the `into_peekable`
        method on the `Receiver.`

        Raises:
            ValueError: if the given id is unknown or has a different type.

        Args:
            component_id: id of the ev charger to get data for.
            maxsize: Size of the receiver's buffer.

        Returns:
            A channel receiver that provides realtime ev charger data.
        """
        await self._expect_category(
            component_id,
            ComponentCategory.EV_CHARGER,
        )
        return self._get_component_data_channel(
            component_id,
            EVChargerData.from_proto,
        ).new_receiver(maxsize=maxsize)

    async def set_power(self, component_id: int, power_w: float) -> None:
        """Send request to the Microgrid to set power for component.

        If power > 0, then component will be charged with this power.
        If power < 0, then component will be discharged with this power.
        If power == 0, then stop charging or discharging component.


        Args:
            component_id: id of the component to set power.
            power_w: power to set for the component.

        Raises:
            AioRpcError: if connection to Microgrid API cannot be established or
                when the api call exceeded timeout
        """
        try:
            await self.api.SetPowerActive(
                microgrid_pb.SetPowerActiveParam(
                    component_id=component_id, power=power_w
                ),
                timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
            )  # type: ignore[misc]
        except grpc.aio.AioRpcError as err:
            msg = f"Failed to set power. Microgrid API: {self.target}. Err: {err.details()}"
            raise grpc.aio.AioRpcError(
                code=err.code(),
                initial_metadata=err.initial_metadata(),
                trailing_metadata=err.trailing_metadata(),
                details=msg,
                debug_error_string=err.debug_error_string(),
            )

    async def set_bounds(
        self,
        component_id: int,
        lower: float,
        upper: float,
    ) -> None:
        """Send `SetBoundsParam`s received from a channel to nitrogen.

        Args:
            component_id: ID of the component to set bounds for.
            lower: Lower bound to be set for the component.
            upper: Upper bound to be set for the component.

        Raises:
            ValueError: when upper bound is less than 0, or when lower bound is
                greater than 0.
            grpc.aio.AioRpcError: if connection to Microgrid API cannot be established
                or when the api call exceeded timeout
        """
        api_details = f"Microgrid API: {self.target}."
        if upper < 0:
            raise ValueError(f"Upper bound {upper} must be greater than or equal to 0.")
        if lower > 0:
            raise ValueError(f"Lower bound {upper} must be less than or equal to 0.")

        try:
            self.api.AddInclusionBounds(
                microgrid_pb.SetBoundsParam(
                    component_id=component_id,
                    # pylint: disable=no-member,line-too-long
                    target_metric=microgrid_pb.SetBoundsParam.TargetMetric.TARGET_METRIC_POWER_ACTIVE,
                    bounds=metrics_pb.Bounds(lower=lower, upper=upper),
                ),
            )
        except grpc.aio.AioRpcError as err:
            _logger.error(
                "set_bounds write failed: %s, for message: %s, api: %s. Err: %s",
                err,
                next,
                api_details,
                err.details(),
            )
            raise
Functions¤
__init__(grpc_channel, target, retry_spec=LinearBackoff()) ¤

Initialize the class instance.

PARAMETER DESCRIPTION
grpc_channel

asyncio-supporting gRPC channel

TYPE: Channel

target

server (host:port) to be used for asyncio-supporting gRPC channel that the client should use to contact the API

TYPE: str

retry_spec

Specs on how to retry if the connection to a streaming method gets lost.

TYPE: RetryStrategy DEFAULT: LinearBackoff()

Source code in frequenz/sdk/microgrid/client/_client.py
def __init__(
    self,
    grpc_channel: grpc.aio.Channel,
    target: str,
    retry_spec: RetryStrategy = LinearBackoff(),
) -> None:
    """Initialize the class instance.

    Args:
        grpc_channel: asyncio-supporting gRPC channel
        target: server (host:port) to be used for asyncio-supporting gRPC
            channel that the client should use to contact the API
        retry_spec: Specs on how to retry if the connection to a streaming
            method gets lost.
    """
    self.target = target
    self.api = MicrogridStub(grpc_channel)
    self._component_streams: Dict[int, Broadcast[Any]] = {}
    self._streaming_tasks: Dict[int, asyncio.Task[None]] = {}
    self._retry_spec = retry_spec
battery_data(component_id, maxsize=RECEIVER_MAX_SIZE) async ¤

Return a channel receiver that provides a BatteryData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

RAISES DESCRIPTION
ValueError

if the given id is unknown or has a different type.

PARAMETER DESCRIPTION
component_id

id of the battery to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[BatteryData]

A channel receiver that provides realtime battery data.

Source code in frequenz/sdk/microgrid/client/_client.py
async def battery_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[BatteryData]:
    """Return a channel receiver that provides a `BatteryData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Raises:
        ValueError: if the given id is unknown or has a different type.

    Args:
        component_id: id of the battery to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime battery data.
    """
    await self._expect_category(
        component_id,
        ComponentCategory.BATTERY,
    )
    return self._get_component_data_channel(
        component_id,
        BatteryData.from_proto,
    ).new_receiver(maxsize=maxsize)
components() async ¤

Fetch all the components present in the microgrid.

RETURNS DESCRIPTION
Iterable[Component]

Iterator whose elements are all the components in the microgrid.

RAISES DESCRIPTION
AioRpcError

if connection to Microgrid API cannot be established or when the api call exceeded timeout

Source code in frequenz/sdk/microgrid/client/_client.py
async def components(self) -> Iterable[Component]:
    """Fetch all the components present in the microgrid.

    Returns:
        Iterator whose elements are all the components in the microgrid.

    Raises:
        AioRpcError: if connection to Microgrid API cannot be established or
            when the api call exceeded timeout
    """
    try:
        # grpc.aio is missing types and mypy thinks this is not awaitable,
        # but it is
        component_list = await self.api.ListComponents(
            microgrid_pb.ComponentFilter(),
            timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
        )  # type: ignore[misc]
    except grpc.aio.AioRpcError as err:
        msg = f"Failed to list components. Microgrid API: {self.target}. Err: {err.details()}"
        raise grpc.aio.AioRpcError(
            code=err.code(),
            initial_metadata=err.initial_metadata(),
            trailing_metadata=err.trailing_metadata(),
            details=msg,
            debug_error_string=err.debug_error_string(),
        )
    components_only = filter(
        lambda c: c.category
        is not components_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR,
        component_list.components,
    )
    result: Iterable[Component] = map(
        lambda c: Component(
            c.id,
            _component_category_from_protobuf(c.category),
            _component_type_from_protobuf(c.category, c.inverter),
        ),
        components_only,
    )

    return result
connections(starts=None, ends=None) async ¤

Fetch the connections between components in the microgrid.

PARAMETER DESCRIPTION
starts

if set and non-empty, only include connections whose start value matches one of the provided component IDs

TYPE: Optional[Set[int]] DEFAULT: None

ends

if set and non-empty, only include connections whose end value matches one of the provided component IDs

TYPE: Optional[Set[int]] DEFAULT: None

RETURNS DESCRIPTION
Iterable[Connection]

Microgrid connections matching the provided start and end filters.

RAISES DESCRIPTION
AioRpcError

if connection to Microgrid API cannot be established or when the api call exceeded timeout

Source code in frequenz/sdk/microgrid/client/_client.py
async def connections(
    self,
    starts: Optional[Set[int]] = None,
    ends: Optional[Set[int]] = None,
) -> Iterable[Connection]:
    """Fetch the connections between components in the microgrid.

    Args:
        starts: if set and non-empty, only include connections whose start
            value matches one of the provided component IDs
        ends: if set and non-empty, only include connections whose end value
            matches one of the provided component IDs

    Returns:
        Microgrid connections matching the provided start and end filters.

    Raises:
        AioRpcError: if connection to Microgrid API cannot be established or
            when the api call exceeded timeout
    """
    connection_filter = microgrid_pb.ConnectionFilter(starts=starts, ends=ends)
    try:
        valid_components, all_connections = await asyncio.gather(
            self.components(),
            # grpc.aio is missing types and mypy thinks this is not
            # awaitable, but it is
            cast(
                Awaitable[microgrid_pb.ConnectionList],
                self.api.ListConnections(
                    connection_filter,
                    timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
                ),
            ),
        )
    except grpc.aio.AioRpcError as err:
        msg = f"Failed to list connections. Microgrid API: {self.target}. Err: {err.details()}"
        raise grpc.aio.AioRpcError(
            code=err.code(),
            initial_metadata=err.initial_metadata(),
            trailing_metadata=err.trailing_metadata(),
            details=msg,
            debug_error_string=err.debug_error_string(),
        )
    # Filter out the components filtered in `components` method.
    # id=0 is an exception indicating grid component.
    valid_ids = {c.component_id for c in valid_components}
    valid_ids.add(0)

    connections = filter(
        lambda c: (c.start in valid_ids and c.end in valid_ids),
        all_connections.connections,
    )

    result: Iterable[Connection] = map(
        lambda c: Connection(c.start, c.end), connections
    )

    return result
ev_charger_data(component_id, maxsize=RECEIVER_MAX_SIZE) async ¤

Return a channel receiver that provides an EvChargeData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

RAISES DESCRIPTION
ValueError

if the given id is unknown or has a different type.

PARAMETER DESCRIPTION
component_id

id of the ev charger to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[EVChargerData]

A channel receiver that provides realtime ev charger data.

Source code in frequenz/sdk/microgrid/client/_client.py
async def ev_charger_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[EVChargerData]:
    """Return a channel receiver that provides an `EvChargeData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Raises:
        ValueError: if the given id is unknown or has a different type.

    Args:
        component_id: id of the ev charger to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime ev charger data.
    """
    await self._expect_category(
        component_id,
        ComponentCategory.EV_CHARGER,
    )
    return self._get_component_data_channel(
        component_id,
        EVChargerData.from_proto,
    ).new_receiver(maxsize=maxsize)
inverter_data(component_id, maxsize=RECEIVER_MAX_SIZE) async ¤

Return a channel receiver that provides an InverterData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

RAISES DESCRIPTION
ValueError

if the given id is unknown or has a different type.

PARAMETER DESCRIPTION
component_id

id of the inverter to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[InverterData]

A channel receiver that provides realtime inverter data.

Source code in frequenz/sdk/microgrid/client/_client.py
async def inverter_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[InverterData]:
    """Return a channel receiver that provides an `InverterData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Raises:
        ValueError: if the given id is unknown or has a different type.

    Args:
        component_id: id of the inverter to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime inverter data.
    """
    await self._expect_category(
        component_id,
        ComponentCategory.INVERTER,
    )
    return self._get_component_data_channel(
        component_id,
        InverterData.from_proto,
    ).new_receiver(maxsize=maxsize)
meter_data(component_id, maxsize=RECEIVER_MAX_SIZE) async ¤

Return a channel receiver that provides a MeterData stream.

If only the latest value is required, the Receiver returned by this method can be converted into a Peekable with the into_peekable method on the Receiver.

RAISES DESCRIPTION
ValueError

if the given id is unknown or has a different type.

PARAMETER DESCRIPTION
component_id

id of the meter to get data for.

TYPE: int

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: RECEIVER_MAX_SIZE

RETURNS DESCRIPTION
Receiver[MeterData]

A channel receiver that provides realtime meter data.

Source code in frequenz/sdk/microgrid/client/_client.py
async def meter_data(
    self,
    component_id: int,
    maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[MeterData]:
    """Return a channel receiver that provides a `MeterData` stream.

    If only the latest value is required, the `Receiver` returned by this
    method can be converted into a `Peekable` with the `into_peekable`
    method on the `Receiver.`

    Raises:
        ValueError: if the given id is unknown or has a different type.

    Args:
        component_id: id of the meter to get data for.
        maxsize: Size of the receiver's buffer.

    Returns:
        A channel receiver that provides realtime meter data.
    """
    await self._expect_category(
        component_id,
        ComponentCategory.METER,
    )
    return self._get_component_data_channel(
        component_id,
        MeterData.from_proto,
    ).new_receiver(maxsize=maxsize)
set_bounds(component_id, lower, upper) async ¤

Send SetBoundsParams received from a channel to nitrogen.

PARAMETER DESCRIPTION
component_id

ID of the component to set bounds for.

TYPE: int

lower

Lower bound to be set for the component.

TYPE: float

upper

Upper bound to be set for the component.

TYPE: float

RAISES DESCRIPTION
ValueError

when upper bound is less than 0, or when lower bound is greater than 0.

AioRpcError

if connection to Microgrid API cannot be established or when the api call exceeded timeout

Source code in frequenz/sdk/microgrid/client/_client.py
async def set_bounds(
    self,
    component_id: int,
    lower: float,
    upper: float,
) -> None:
    """Send `SetBoundsParam`s received from a channel to nitrogen.

    Args:
        component_id: ID of the component to set bounds for.
        lower: Lower bound to be set for the component.
        upper: Upper bound to be set for the component.

    Raises:
        ValueError: when upper bound is less than 0, or when lower bound is
            greater than 0.
        grpc.aio.AioRpcError: if connection to Microgrid API cannot be established
            or when the api call exceeded timeout
    """
    api_details = f"Microgrid API: {self.target}."
    if upper < 0:
        raise ValueError(f"Upper bound {upper} must be greater than or equal to 0.")
    if lower > 0:
        raise ValueError(f"Lower bound {upper} must be less than or equal to 0.")

    try:
        self.api.AddInclusionBounds(
            microgrid_pb.SetBoundsParam(
                component_id=component_id,
                # pylint: disable=no-member,line-too-long
                target_metric=microgrid_pb.SetBoundsParam.TargetMetric.TARGET_METRIC_POWER_ACTIVE,
                bounds=metrics_pb.Bounds(lower=lower, upper=upper),
            ),
        )
    except grpc.aio.AioRpcError as err:
        _logger.error(
            "set_bounds write failed: %s, for message: %s, api: %s. Err: %s",
            err,
            next,
            api_details,
            err.details(),
        )
        raise
set_power(component_id, power_w) async ¤

Send request to the Microgrid to set power for component.

If power > 0, then component will be charged with this power. If power < 0, then component will be discharged with this power. If power == 0, then stop charging or discharging component.

PARAMETER DESCRIPTION
component_id

id of the component to set power.

TYPE: int

power_w

power to set for the component.

TYPE: float

RAISES DESCRIPTION
AioRpcError

if connection to Microgrid API cannot be established or when the api call exceeded timeout

Source code in frequenz/sdk/microgrid/client/_client.py
async def set_power(self, component_id: int, power_w: float) -> None:
    """Send request to the Microgrid to set power for component.

    If power > 0, then component will be charged with this power.
    If power < 0, then component will be discharged with this power.
    If power == 0, then stop charging or discharging component.


    Args:
        component_id: id of the component to set power.
        power_w: power to set for the component.

    Raises:
        AioRpcError: if connection to Microgrid API cannot be established or
            when the api call exceeded timeout
    """
    try:
        await self.api.SetPowerActive(
            microgrid_pb.SetPowerActiveParam(
                component_id=component_id, power=power_w
            ),
            timeout=DEFAULT_GRPC_CALL_TIMEOUT,  # type: ignore[arg-type]
        )  # type: ignore[misc]
    except grpc.aio.AioRpcError as err:
        msg = f"Failed to set power. Microgrid API: {self.target}. Err: {err.details()}"
        raise grpc.aio.AioRpcError(
            code=err.code(),
            initial_metadata=err.initial_metadata(),
            trailing_metadata=err.trailing_metadata(),
            details=msg,
            debug_error_string=err.debug_error_string(),
        )

frequenz.sdk.microgrid.client.RetryStrategy ¤

Bases: ABC

Interface for implementing retry strategies.

Source code in frequenz/sdk/microgrid/client/_retry.py
class RetryStrategy(ABC):
    """Interface for implementing retry strategies."""

    _limit: Optional[int]
    _count: int

    @abstractmethod
    def next_interval(self) -> Optional[float]:
        """Return the time to wait before the next retry.

        Returns `None` if the retry limit has been reached, and no more retries
        are possible.

        Returns:
            Time until next retry when below retry limit, and None otherwise.
        """

    def get_progress(self) -> str:
        """Return a string denoting the retry progress.

        Returns:
            String denoting retry progress in the form "(count/limit)"
        """
        if self._limit is None:
            return f"({self._count}/∞)"

        return f"({self._count}/{self._limit})"

    def reset(self) -> None:
        """Reset the retry counter.

        To be called as soon as a connection is successful.
        """
        self._count = 0

    def copy(self) -> RetryStrategy:
        """Create a new instance of `self`.

        Returns:
            A deepcopy of `self`.
        """
        ret = deepcopy(self)
        ret.reset()
        return ret

    def __iter__(self) -> Iterator[float]:
        """Return an iterator over the retry intervals.

        Yields:
            Next retry interval in seconds.
        """
        while True:
            interval = self.next_interval()
            if interval is None:
                break
            yield interval
Functions¤
__iter__() ¤

Return an iterator over the retry intervals.

YIELDS DESCRIPTION
float

Next retry interval in seconds.

Source code in frequenz/sdk/microgrid/client/_retry.py
def __iter__(self) -> Iterator[float]:
    """Return an iterator over the retry intervals.

    Yields:
        Next retry interval in seconds.
    """
    while True:
        interval = self.next_interval()
        if interval is None:
            break
        yield interval
copy() ¤

Create a new instance of self.

RETURNS DESCRIPTION
RetryStrategy

A deepcopy of self.

Source code in frequenz/sdk/microgrid/client/_retry.py
def copy(self) -> RetryStrategy:
    """Create a new instance of `self`.

    Returns:
        A deepcopy of `self`.
    """
    ret = deepcopy(self)
    ret.reset()
    return ret
get_progress() ¤

Return a string denoting the retry progress.

RETURNS DESCRIPTION
str

String denoting retry progress in the form "(count/limit)"

Source code in frequenz/sdk/microgrid/client/_retry.py
def get_progress(self) -> str:
    """Return a string denoting the retry progress.

    Returns:
        String denoting retry progress in the form "(count/limit)"
    """
    if self._limit is None:
        return f"({self._count}/∞)"

    return f"({self._count}/{self._limit})"
next_interval() abstractmethod ¤

Return the time to wait before the next retry.

Returns None if the retry limit has been reached, and no more retries are possible.

RETURNS DESCRIPTION
Optional[float]

Time until next retry when below retry limit, and None otherwise.

Source code in frequenz/sdk/microgrid/client/_retry.py
@abstractmethod
def next_interval(self) -> Optional[float]:
    """Return the time to wait before the next retry.

    Returns `None` if the retry limit has been reached, and no more retries
    are possible.

    Returns:
        Time until next retry when below retry limit, and None otherwise.
    """
reset() ¤

Reset the retry counter.

To be called as soon as a connection is successful.

Source code in frequenz/sdk/microgrid/client/_retry.py
def reset(self) -> None:
    """Reset the retry counter.

    To be called as soon as a connection is successful.
    """
    self._count = 0