Skip to content

Index

frequenz.sdk.actor.power_distributing ¤

This module provides feature to set power between many batteries.

Distributing power is very important to keep the microgrid ready for the power requirements. This module provides PowerDistributingActor that knows how to distribute power. It also provides all the secondary features that should be used to communicate with PowerDistributingActor and send requests for charging or discharging power.

Classes¤

frequenz.sdk.actor.power_distributing.BatteryStatus dataclass ¤

Status of the batteries.

Source code in frequenz/sdk/actor/power_distributing/_battery_pool_status.py
@dataclass
class BatteryStatus:
    """Status of the batteries."""

    working: Set[int]
    """Set of working battery ids."""

    uncertain: Set[int]
    """Set of batteries that should be used only if there are no working batteries."""

    def get_working_batteries(self, batteries: abc.Set[int]) -> Set[int]:
        """From the given set of batteries return working batteries.

        Args:
            batteries: Set of batteries

        Returns:
            Subset with working batteries.
        """
        working = self.working.intersection(batteries)
        if len(working) > 0:
            return working
        return self.uncertain.intersection(batteries)
Attributes¤
uncertain: Set[int] instance-attribute ¤

Set of batteries that should be used only if there are no working batteries.

working: Set[int] instance-attribute ¤

Set of working battery ids.

Functions¤
get_working_batteries(batteries) ¤

From the given set of batteries return working batteries.

PARAMETER DESCRIPTION
batteries

Set of batteries

TYPE: Set[int]

RETURNS DESCRIPTION
Set[int]

Subset with working batteries.

Source code in frequenz/sdk/actor/power_distributing/_battery_pool_status.py
def get_working_batteries(self, batteries: abc.Set[int]) -> Set[int]:
    """From the given set of batteries return working batteries.

    Args:
        batteries: Set of batteries

    Returns:
        Subset with working batteries.
    """
    working = self.working.intersection(batteries)
    if len(working) > 0:
        return working
    return self.uncertain.intersection(batteries)

frequenz.sdk.actor.power_distributing.Error dataclass ¤

Bases: Result

Result returned when an error occurred and power was not set at all.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class Error(Result):
    """Result returned when an error occurred and power was not set at all."""

    msg: str
    """The error message explaining why error happened."""
Attributes¤
msg: str instance-attribute ¤

The error message explaining why error happened.

frequenz.sdk.actor.power_distributing.OutOfBounds dataclass ¤

Bases: Result

Result returned when the power was not set because it was out of bounds.

This result happens when the originating request was done with adjust_power = False and the requested power is not within the batteries bounds.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class OutOfBounds(Result):
    """Result returned when the power was not set because it was out of bounds.

    This result happens when the originating request was done with
    `adjust_power = False` and the requested power is not within the batteries bounds.
    """

    bounds: PowerBounds
    """The power bounds for the requested batteries.

    If the requested power negative, then this value is the lower bound.
    Otherwise it is upper bound.
    """
Attributes¤
bounds: PowerBounds instance-attribute ¤

The power bounds for the requested batteries.

If the requested power negative, then this value is the lower bound. Otherwise it is upper bound.

frequenz.sdk.actor.power_distributing.PartialFailure dataclass ¤

Bases: _BaseSuccessMixin, Result

Result returned when any battery failed to perform the request.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class PartialFailure(_BaseSuccessMixin, Result):
    """Result returned when any battery failed to perform the request."""

    failed_power: Power
    """The part of the requested power that failed to be set."""

    failed_batteries: set[int]
    """The subset of batteries for which the request failed."""
Attributes¤
failed_batteries: set[int] instance-attribute ¤

The subset of batteries for which the request failed.

failed_power: Power instance-attribute ¤

The part of the requested power that failed to be set.

frequenz.sdk.actor.power_distributing.PowerDistributingActor ¤

Bases: Actor

Actor to distribute the power between batteries in a microgrid.

The purpose of this tool is to keep an equal SoC level in all batteries. The PowerDistributingActor can have many concurrent users which at this time need to be known at construction time.

For each user a bidirectional channel needs to be created through which they can send and receive requests and responses.

It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if the processing function fails then the response will never come. The timeout should be Result:request_timeout + time for processing the request.

Edge cases: * If there are 2 requests to be processed for the same subset of batteries, then only the latest request will be processed. Older request will be ignored. User with older request will get response with Result.Status.IGNORED.

  • If there are 2 requests and their subset of batteries is different but they overlap (they have at least one common battery), then then both batteries will be processed. However it is not expected so the proper error log will be printed.
Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
class PowerDistributingActor(Actor):
    # pylint: disable=too-many-instance-attributes
    """Actor to distribute the power between batteries in a microgrid.

    The purpose of this tool is to keep an equal SoC level in all batteries.
    The PowerDistributingActor can have many concurrent users which at this time
    need to be known at construction time.

    For each user a bidirectional channel needs to be created through which
    they can send and receive requests and responses.

    It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if
    the processing function fails then the response will never come.
    The timeout should be Result:request_timeout + time for processing the request.

    Edge cases:
    * If there are 2 requests to be processed for the same subset of batteries, then
    only the latest request will be processed. Older request will be ignored. User with
    older request will get response with Result.Status.IGNORED.

    * If there are 2 requests and their subset of batteries is different but they
    overlap (they have at least one common battery), then then both batteries
    will be processed. However it is not expected so the proper error log will be
    printed.
    """

    def __init__(
        self,
        requests_receiver: Receiver[Request],
        channel_registry: ChannelRegistry,
        battery_status_sender: Sender[BatteryStatus],
        wait_for_data_sec: float = 2,
        *,
        name: str | None = None,
    ) -> None:
        """Create class instance.

        Args:
            requests_receiver: Receiver for receiving power requests from other actors.
            channel_registry: Channel registry for creating result channels dynamically
                for each request namespace.
            battery_status_sender: Channel for sending information which batteries are
                working.
            wait_for_data_sec: How long actor should wait before processing first
                request. It is a time needed to collect first components data.
            name: The name of the actor. If `None`, `str(id(self))` will be used. This
                is used mostly for debugging purposes.
        """
        super().__init__(name=name)
        self._requests_receiver = requests_receiver
        self._channel_registry = channel_registry
        self._wait_for_data_sec = wait_for_data_sec
        self._result_senders: Dict[str, Sender[Result]] = {}
        """Dictionary of result senders for each request namespace.

        They are for channels owned by the channel registry, we just hold a reference
        to their senders, for fast access.
        """

        # NOTE: power_distributor_exponent should be received from ConfigManager
        self.power_distributor_exponent: float = 1.0
        self.distribution_algorithm = DistributionAlgorithm(
            self.power_distributor_exponent
        )

        self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
            connection_manager.get().component_graph
        )
        self._battery_receivers: Dict[int, Peekable[BatteryData]] = {}
        self._inverter_receivers: Dict[int, Peekable[InverterData]] = {}

        self._all_battery_status = BatteryPoolStatus(
            battery_ids=set(self._bat_inv_map.keys()),
            battery_status_sender=battery_status_sender,
            max_blocking_duration_sec=30.0,
            max_data_age_sec=10.0,
        )

        self._cached_metrics: dict[int, _CacheEntry | None] = {
            bat_id: None for bat_id, _ in self._bat_inv_map.items()
        }

    def _get_bounds(
        self,
        pairs_data: list[InvBatPair],
    ) -> PowerBounds:
        """Get power bounds for given batteries.

        Args:
            pairs_data: list of battery and adjacent inverter data pairs.

        Returns:
            Power bounds for given batteries.
        """
        return PowerBounds(
            inclusion_lower=sum(
                max(
                    battery.power_inclusion_lower_bound,
                    inverter.active_power_inclusion_lower_bound,
                )
                for battery, inverter in pairs_data
            ),
            inclusion_upper=sum(
                min(
                    battery.power_inclusion_upper_bound,
                    inverter.active_power_inclusion_upper_bound,
                )
                for battery, inverter in pairs_data
            ),
            exclusion_lower=sum(
                min(
                    battery.power_exclusion_lower_bound,
                    inverter.active_power_exclusion_lower_bound,
                )
                for battery, inverter in pairs_data
            ),
            exclusion_upper=sum(
                max(
                    battery.power_exclusion_upper_bound,
                    inverter.active_power_exclusion_upper_bound,
                )
                for battery, inverter in pairs_data
            ),
        )

    async def _send_result(self, namespace: str, result: Result) -> None:
        """Send result to the user.

        Args:
            namespace: namespace of the sender, to identify the result channel with.
            result: Result to send out.
        """
        if not namespace in self._result_senders:
            self._result_senders[namespace] = self._channel_registry.new_sender(
                namespace
            )

        await self._result_senders[namespace].send(result)

    async def _run(self) -> None:
        """Run actor main function.

        It waits for new requests in task_queue and process it, and send
        `set_power` request with distributed power.
        The output of the `set_power` method is processed.
        Every battery and inverter that failed or didn't respond in time will be marked
        as broken for some time.
        """
        await self._create_channels()

        api = connection_manager.get().api_client

        # Wait few seconds to get data from the channels created above.
        await asyncio.sleep(self._wait_for_data_sec)

        async for request in self._requests_receiver:
            try:
                pairs_data: List[InvBatPair] = self._get_components_data(
                    request.batteries, request.include_broken_batteries
                )
            except KeyError as err:
                await self._send_result(
                    request.namespace, Error(request=request, msg=str(err))
                )
                continue

            if not pairs_data and not request.include_broken_batteries:
                error_msg = f"No data for the given batteries {str(request.batteries)}"
                await self._send_result(
                    request.namespace, Error(request=request, msg=str(error_msg))
                )
                continue

            error = self._check_request(request, pairs_data)
            if error:
                await self._send_result(request.namespace, error)
                continue

            try:
                distribution = self._get_power_distribution(request, pairs_data)
            except ValueError as err:
                _logger.exception("Couldn't distribute power")
                error_msg = f"Couldn't distribute power, error: {str(err)}"
                await self._send_result(
                    request.namespace, Error(request=request, msg=str(error_msg))
                )
                continue

            distributed_power_value = (
                request.power.as_watts() - distribution.remaining_power
            )
            battery_distribution = {
                self._inv_bat_map[bat_id]: dist
                for bat_id, dist in distribution.distribution.items()
            }
            _logger.debug(
                "Distributing power %d between the batteries %s",
                distributed_power_value,
                str(battery_distribution),
            )

            failed_power, failed_batteries = await self._set_distributed_power(
                api, distribution, request.request_timeout
            )

            response: Success | PartialFailure
            if len(failed_batteries) > 0:
                succeed_batteries = set(battery_distribution.keys()) - failed_batteries
                response = PartialFailure(
                    request=request,
                    succeeded_power=Power.from_watts(distributed_power_value),
                    succeeded_batteries=succeed_batteries,
                    failed_power=Power.from_watts(failed_power),
                    failed_batteries=failed_batteries,
                    excess_power=Power.from_watts(distribution.remaining_power),
                )
            else:
                succeed_batteries = set(battery_distribution.keys())
                response = Success(
                    request=request,
                    succeeded_power=Power.from_watts(distributed_power_value),
                    succeeded_batteries=succeed_batteries,
                    excess_power=Power.from_watts(distribution.remaining_power),
                )

            asyncio.gather(
                *[
                    self._all_battery_status.update_status(
                        succeed_batteries, failed_batteries
                    ),
                    self._send_result(request.namespace, response),
                ]
            )

    async def _set_distributed_power(
        self,
        api: MicrogridApiClient,
        distribution: DistributionResult,
        timeout: timedelta,
    ) -> Tuple[float, Set[int]]:
        """Send distributed power to the inverters.

        Args:
            api: Microgrid api client
            distribution: Distribution result
            timeout: How long wait for the response

        Returns:
            Tuple where first element is total failed power, and the second element
            set of batteries that failed.
        """
        tasks = {
            inverter_id: asyncio.create_task(api.set_power(inverter_id, power))
            for inverter_id, power in distribution.distribution.items()
        }

        _, pending = await asyncio.wait(
            tasks.values(),
            timeout=timeout.total_seconds(),
            return_when=ALL_COMPLETED,
        )

        await self._cancel_tasks(pending)

        return self._parse_result(tasks, distribution.distribution, timeout)

    def _get_power_distribution(
        self, request: Request, inv_bat_pairs: List[InvBatPair]
    ) -> DistributionResult:
        """Get power distribution result for the batteries in the request.

        Args:
            request: the power request to process.
            inv_bat_pairs: the battery and adjacent inverter data pairs.

        Returns:
            the power distribution result.
        """
        available_bat_ids = {battery.component_id for battery, _ in inv_bat_pairs}
        unavailable_bat_ids = request.batteries - available_bat_ids
        unavailable_inv_ids = {
            self._bat_inv_map[battery_id] for battery_id in unavailable_bat_ids
        }

        if request.include_broken_batteries and not available_bat_ids:
            return self.distribution_algorithm.distribute_power_equally(
                request.power.as_watts(), unavailable_inv_ids
            )

        result = self.distribution_algorithm.distribute_power(
            request.power.as_watts(), inv_bat_pairs
        )

        if request.include_broken_batteries and unavailable_inv_ids:
            additional_result = self.distribution_algorithm.distribute_power_equally(
                result.remaining_power, unavailable_inv_ids
            )

            for inv_id, power in additional_result.distribution.items():
                result.distribution[inv_id] = power
            result.remaining_power = 0.0

        return result

    def _check_request(
        self,
        request: Request,
        pairs_data: List[InvBatPair],
    ) -> Optional[Result]:
        """Check whether the given request if correct.

        Args:
            request: request to check
            pairs_data: list of battery and adjacent inverter data pairs.

        Returns:
            Result for the user if the request is wrong, None otherwise.
        """
        if not request.batteries:
            return Error(request=request, msg="Empty battery IDs in the request")

        for battery in request.batteries:
            if battery not in self._battery_receivers:
                msg = (
                    f"No battery {battery}, available batteries: "
                    f"{list(self._battery_receivers.keys())}"
                )
                return Error(request=request, msg=msg)

        bounds = self._get_bounds(pairs_data)

        power = request.power.as_watts()

        # Zero power requests are always forwarded to the microgrid API, even if they
        # are outside the exclusion bounds.
        if is_close_to_zero(power):
            return None

        if request.adjust_power:
            # Automatic power adjustments can only bring down the requested power down
            # to the inclusion bounds.
            #
            # If the requested power is in the exclusion bounds, it is NOT possible to
            # increase it so that it is outside the exclusion bounds.
            if bounds.exclusion_lower < power < bounds.exclusion_upper:
                return OutOfBounds(request=request, bounds=bounds)
        else:
            in_lower_range = bounds.inclusion_lower <= power <= bounds.exclusion_lower
            in_upper_range = bounds.exclusion_upper <= power <= bounds.inclusion_upper
            if not (in_lower_range or in_upper_range):
                return OutOfBounds(request=request, bounds=bounds)

        return None

    def _get_components_pairs(
        self, component_graph: ComponentGraph
    ) -> Tuple[Dict[int, int], Dict[int, int]]:
        """Create maps between battery and adjacent inverter.

        Args:
            component_graph: component graph

        Returns:
            Tuple where first element is map between battery and adjacent inverter,
                second element of the tuple is map between inverter and adjacent
                battery.
        """
        bat_inv_map: Dict[int, int] = {}
        inv_bat_map: Dict[int, int] = {}

        batteries: Iterable[Component] = component_graph.components(
            component_category={ComponentCategory.BATTERY}
        )

        for battery in batteries:
            inverters: List[Component] = [
                component
                for component in component_graph.predecessors(battery.component_id)
                if component.category == ComponentCategory.INVERTER
            ]

            if len(inverters) == 0:
                _logger.error("No inverters for battery %d", battery.component_id)
                continue

            if len(inverters) > 1:
                _logger.error(
                    "Battery %d has more then one inverter. It is not supported now.",
                    battery.component_id,
                )

            bat_inv_map[battery.component_id] = inverters[0].component_id
            inv_bat_map[inverters[0].component_id] = battery.component_id

        return bat_inv_map, inv_bat_map

    def _get_components_data(
        self, batteries: abc.Set[int], include_broken: bool
    ) -> List[InvBatPair]:
        """Get data for the given batteries and adjacent inverters.

        Args:
            batteries: Batteries that needs data.
            include_broken: whether all batteries in the batteries set in the
                request must be used regardless the status.

        Raises:
            KeyError: If any battery in the given list doesn't exists in microgrid.

        Returns:
            Pairs of battery and adjacent inverter data.
        """
        pairs_data: List[InvBatPair] = []
        working_batteries = (
            batteries
            if include_broken
            else self._all_battery_status.get_working_batteries(batteries)
        )

        for battery_id in working_batteries:
            if battery_id not in self._battery_receivers:
                raise KeyError(
                    f"No battery {battery_id}, "
                    f"available batteries: {list(self._battery_receivers.keys())}"
                )

            inverter_id: int = self._bat_inv_map[battery_id]

            data = self._get_battery_inverter_data(battery_id, inverter_id)
            if not data and include_broken:
                cached_entry = self._cached_metrics[battery_id]
                if cached_entry and not cached_entry.has_expired():
                    data = cached_entry.inv_bat_pair
                else:
                    data = None
            if data is None:
                _logger.warning(
                    "Skipping battery %d because its message isn't correct.",
                    battery_id,
                )
                continue

            pairs_data.append(data)
        return pairs_data

    def _get_battery_inverter_data(
        self, battery_id: int, inverter_id: int
    ) -> Optional[InvBatPair]:
        """Get battery and inverter data if they are correct.

        Each float data from the microgrid can be "NaN".
        We can't do math operations on "NaN".
        So check all the metrics and:
        * if power bounds are NaN, then try to replace it with the corresponding
          power bounds from the adjacent component. If metric in the adjacent component
          is also NaN, then return None.
        * if other metrics are NaN then return None. We can't assume anything for other
          metrics.

        Args:
            battery_id: battery id
            inverter_id: inverter id

        Returns:
            Data for the battery and adjacent inverter without NaN values.
                Return None if we could not replace NaN values.
        """
        battery_data = self._battery_receivers[battery_id].peek()
        inverter_data = self._inverter_receivers[inverter_id].peek()

        # It means that nothing has been send on this channels, yet.
        # This should be handled by BatteryStatus. BatteryStatus should not return
        # this batteries as working.
        if battery_data is None or inverter_data is None:
            _logger.error(
                "Battery %d or inverter %d send no data, yet. They should be not used.",
                battery_id,
                inverter_id,
            )
            return None

        not_replaceable_metrics = [
            battery_data.soc,
            battery_data.soc_lower_bound,
            battery_data.soc_upper_bound,
            # We could replace capacity with 0, but it won't change distribution.
            # This battery will be ignored in distribution anyway.
            battery_data.capacity,
        ]
        if any(map(isnan, not_replaceable_metrics)):
            _logger.debug("Some metrics for battery %d are NaN", battery_id)
            return None

        replaceable_metrics = [
            battery_data.power_inclusion_lower_bound,
            battery_data.power_inclusion_upper_bound,
            inverter_data.active_power_inclusion_lower_bound,
            inverter_data.active_power_inclusion_upper_bound,
        ]

        # If all values are ok then return them.
        if not any(map(isnan, replaceable_metrics)):
            inv_bat_pair = InvBatPair(battery_data, inverter_data)
            self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
            return inv_bat_pair

        # Replace NaN with the corresponding value in the adjacent component.
        # If both metrics are None, return None to ignore this battery.
        replaceable_pairs = [
            ("power_inclusion_lower_bound", "active_power_inclusion_lower_bound"),
            ("power_inclusion_upper_bound", "active_power_inclusion_upper_bound"),
        ]

        battery_new_metrics = {}
        inverter_new_metrics = {}
        for bat_attr, inv_attr in replaceable_pairs:
            bat_bound = getattr(battery_data, bat_attr)
            inv_bound = getattr(inverter_data, inv_attr)
            if isnan(bat_bound) and isnan(inv_bound):
                _logger.debug("Some metrics for battery %d are NaN", battery_id)
                return None
            if isnan(bat_bound):
                battery_new_metrics[bat_attr] = inv_bound
            elif isnan(inv_bound):
                inverter_new_metrics[inv_attr] = bat_bound

        inv_bat_pair = InvBatPair(
            replace(battery_data, **battery_new_metrics),
            replace(inverter_data, **inverter_new_metrics),
        )
        self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
        return inv_bat_pair

    async def _create_channels(self) -> None:
        """Create channels to get data of components in microgrid."""
        api = connection_manager.get().api_client
        for battery_id, inverter_id in self._bat_inv_map.items():
            bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
            self._battery_receivers[battery_id] = bat_recv.into_peekable()

            inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id)
            self._inverter_receivers[inverter_id] = inv_recv.into_peekable()

    def _parse_result(
        self,
        tasks: Dict[int, asyncio.Task[None]],
        distribution: Dict[int, float],
        request_timeout: timedelta,
    ) -> Tuple[float, Set[int]]:
        """Parse the results of `set_power` requests.

        Check if any task has failed and determine the reason for failure.
        If any task did not succeed, then the corresponding battery is marked as broken.

        Args:
            tasks: A dictionary where the key is the inverter ID and the value is the task that
                set the power for this inverter. Each task should be finished or cancelled.
            distribution: A dictionary where the key is the inverter ID and the value is how much
                power was set to the corresponding inverter.
            request_timeout: The timeout that was used for the request.

        Returns:
            A tuple where the first element is the total failed power, and the second element is
            the set of batteries that failed.
        """
        failed_power: float = 0.0
        failed_batteries: Set[int] = set()

        for inverter_id, aws in tasks.items():
            battery_id = self._inv_bat_map[inverter_id]
            try:
                aws.result()
            except grpc.aio.AioRpcError as err:
                failed_power += distribution[inverter_id]
                failed_batteries.add(battery_id)
                if err.code() == grpc.StatusCode.OUT_OF_RANGE:
                    _logger.debug(
                        "Set power for battery %d failed, error %s",
                        battery_id,
                        str(err),
                    )
                else:
                    _logger.warning(
                        "Set power for battery %d failed, error %s. Mark it as broken.",
                        battery_id,
                        str(err),
                    )
            except asyncio.exceptions.CancelledError:
                failed_power += distribution[inverter_id]
                failed_batteries.add(battery_id)
                _logger.warning(
                    "Battery %d didn't respond in %f sec. Mark it as broken.",
                    battery_id,
                    request_timeout.total_seconds(),
                )

        return failed_power, failed_batteries

    async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None:
        """Cancel given asyncio tasks and wait for them.

        Args:
            tasks: tasks to cancel.
        """
        for aws in tasks:
            aws.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

    async def stop(self, msg: str | None = None) -> None:
        """Stop this actor.

        Args:
            msg: The message to be passed to the tasks being cancelled.
        """
        await self._all_battery_status.stop()
        await super().stop(msg)
Functions¤
__init__(requests_receiver, channel_registry, battery_status_sender, wait_for_data_sec=2, *, name=None) ¤

Create class instance.

PARAMETER DESCRIPTION
requests_receiver

Receiver for receiving power requests from other actors.

TYPE: Receiver[Request]

channel_registry

Channel registry for creating result channels dynamically for each request namespace.

TYPE: ChannelRegistry

battery_status_sender

Channel for sending information which batteries are working.

TYPE: Sender[BatteryStatus]

wait_for_data_sec

How long actor should wait before processing first request. It is a time needed to collect first components data.

TYPE: float DEFAULT: 2

name

The name of the actor. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
def __init__(
    self,
    requests_receiver: Receiver[Request],
    channel_registry: ChannelRegistry,
    battery_status_sender: Sender[BatteryStatus],
    wait_for_data_sec: float = 2,
    *,
    name: str | None = None,
) -> None:
    """Create class instance.

    Args:
        requests_receiver: Receiver for receiving power requests from other actors.
        channel_registry: Channel registry for creating result channels dynamically
            for each request namespace.
        battery_status_sender: Channel for sending information which batteries are
            working.
        wait_for_data_sec: How long actor should wait before processing first
            request. It is a time needed to collect first components data.
        name: The name of the actor. If `None`, `str(id(self))` will be used. This
            is used mostly for debugging purposes.
    """
    super().__init__(name=name)
    self._requests_receiver = requests_receiver
    self._channel_registry = channel_registry
    self._wait_for_data_sec = wait_for_data_sec
    self._result_senders: Dict[str, Sender[Result]] = {}
    """Dictionary of result senders for each request namespace.

    They are for channels owned by the channel registry, we just hold a reference
    to their senders, for fast access.
    """

    # NOTE: power_distributor_exponent should be received from ConfigManager
    self.power_distributor_exponent: float = 1.0
    self.distribution_algorithm = DistributionAlgorithm(
        self.power_distributor_exponent
    )

    self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
        connection_manager.get().component_graph
    )
    self._battery_receivers: Dict[int, Peekable[BatteryData]] = {}
    self._inverter_receivers: Dict[int, Peekable[InverterData]] = {}

    self._all_battery_status = BatteryPoolStatus(
        battery_ids=set(self._bat_inv_map.keys()),
        battery_status_sender=battery_status_sender,
        max_blocking_duration_sec=30.0,
        max_data_age_sec=10.0,
    )

    self._cached_metrics: dict[int, _CacheEntry | None] = {
        bat_id: None for bat_id, _ in self._bat_inv_map.items()
    }
stop(msg=None) async ¤

Stop this actor.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/power_distributing/power_distributing.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this actor.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    await self._all_battery_status.stop()
    await super().stop(msg)

frequenz.sdk.actor.power_distributing.Request dataclass ¤

Request to set power to the PowerDistributingActor.

Source code in frequenz/sdk/actor/power_distributing/request.py
@dataclasses.dataclass
class Request:
    """Request to set power to the `PowerDistributingActor`."""

    namespace: str
    """The namespace of the request.

    This will be used to identify the channel for sending the response into, in the
    channel registry.
    """

    power: Power
    """The requested power."""

    batteries: abc.Set[int]
    """The component ids of the batteries to be used for this request."""

    request_timeout: timedelta = timedelta(seconds=5.0)
    """The maximum amount of time to wait for the request to be fulfilled."""

    adjust_power: bool = True
    """Whether to adjust the power to match the bounds.

    If `True`, the power will be adjusted (lowered) to match the bounds, so
    only the reduced power will be set.

    If `False` and the power is outside the batteries' bounds, the request will
    fail and be replied to with an `OutOfBound` result.
    """

    include_broken_batteries: bool = False
    """Whether to use all batteries included in the batteries set regardless the status.

    If set to `True`, the power distribution algorithm will consider all batteries,
    including the broken ones, when distributing power.  In such cases, any remaining
    power after distributing among the available batteries will be distributed equally
    among the unavailable (broken) batteries.  If all batteries in the set are
    unavailable, the power will be equally distributed among all the unavailable
    batteries in the request.

    If set to `False`, the power distribution will only take into account the available
    batteries, excluding any broken ones.
    """
Attributes¤
adjust_power: bool = True class-attribute instance-attribute ¤

Whether to adjust the power to match the bounds.

If True, the power will be adjusted (lowered) to match the bounds, so only the reduced power will be set.

If False and the power is outside the batteries' bounds, the request will fail and be replied to with an OutOfBound result.

batteries: abc.Set[int] instance-attribute ¤

The component ids of the batteries to be used for this request.

include_broken_batteries: bool = False class-attribute instance-attribute ¤

Whether to use all batteries included in the batteries set regardless the status.

If set to True, the power distribution algorithm will consider all batteries, including the broken ones, when distributing power. In such cases, any remaining power after distributing among the available batteries will be distributed equally among the unavailable (broken) batteries. If all batteries in the set are unavailable, the power will be equally distributed among all the unavailable batteries in the request.

If set to False, the power distribution will only take into account the available batteries, excluding any broken ones.

namespace: str instance-attribute ¤

The namespace of the request.

This will be used to identify the channel for sending the response into, in the channel registry.

power: Power instance-attribute ¤

The requested power.

request_timeout: timedelta = timedelta(seconds=5.0) class-attribute instance-attribute ¤

The maximum amount of time to wait for the request to be fulfilled.

frequenz.sdk.actor.power_distributing.Result dataclass ¤

Bases: _BaseResultMixin

Power distribution result.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class Result(_BaseResultMixin):
    """Power distribution result."""

frequenz.sdk.actor.power_distributing.Success dataclass ¤

Bases: _BaseSuccessMixin, Result

Result returned when setting the power succeeded for all batteries.

Source code in frequenz/sdk/actor/power_distributing/result.py
@dataclasses.dataclass
class Success(_BaseSuccessMixin, Result):  # Order matters here. See above.
    """Result returned when setting the power succeeded for all batteries."""