Skip to content

Index

frequenz.sdk.timeseries ¤

Handling of timeseries streams.

A timeseries is a stream (normally an async iterator) of Samples.

Periodicity and alignment¤

All the data produced by this package is always periodic, in UTC, and aligned to the Epoch (by default).

Classes normally take a (re)sampling period as and argument and, optionally, an align_to argument.

This means timestamps are always separated exactly by a period, and that this timestamp falls always at multiples of the period, starting at the align_to.

This ensures that the data is predictable and consistent among restarts.

Example

If we have a period of 10 seconds, and are aligning to the UNIX epoch. Assuming the following timeline starts in 1970-01-01 00:00:00 UTC and our current now is 1970-01-01 00:00:32 UTC, then the next timestamp will be at 1970-01-01 00:00:40 UTC:

align_to = 1970-01-01 00:00:00         next event = 1970-01-01 00:00:40
|                                       |
|---------|---------|---------|-|-------|---------|---------|---------|
0        10        20        30 |      40        50        60        70
                               now = 1970-01-01 00:00:32

Attributes¤

frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_INIT module-attribute ¤

DEFAULT_BUFFER_LEN_INIT = 16

Default initial buffer length.

Buffers will be created initially with this length, but they could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_MAX module-attribute ¤

DEFAULT_BUFFER_LEN_MAX = 1024

Default maximum allowed buffer length.

If a buffer length would get bigger than this, it will be truncated to this length.

frequenz.sdk.timeseries.DEFAULT_BUFFER_LEN_WARN module-attribute ¤

DEFAULT_BUFFER_LEN_WARN = 128

Default minimum buffer length that will produce a warning.

If a buffer length would get bigger than this, a warning will be logged.

frequenz.sdk.timeseries.Sink module-attribute ¤

Sink = Callable[
    [Sample[Quantity]], Coroutine[None, None, None]
]

A sink for a timeseries.

A new timeseries can be generated by sending samples to a sink.

This should be an async callable, for example:

async some_sink(Sample) -> None:
    ...
PARAMETER DESCRIPTION
sample

A sample to be sent out.

TYPE: Sample

frequenz.sdk.timeseries.Source module-attribute ¤

A source for a timeseries.

A timeseries can be received sample by sample in a streaming way using a source.

Classes¤

frequenz.sdk.timeseries.Bounds dataclass ¤

Bases: Generic[_T]

Lower and upper bound values.

Depending on the genertic type _T, the lower and upper bounds can be None, in which case it means that there is no lower or upper bound, respectively.

When checking if an item is within the bounds, the item must always be not None.

Source code in src/frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Bounds(Generic[_T]):
    """Lower and upper bound values.

    Depending on the genertic type `_T`, the lower and upper bounds can be `None`, in
    which case it means that there is no lower or upper bound, respectively.

    When checking if an item is within the bounds, the item must always be not `None`.
    """

    lower: _T
    """Lower bound."""

    upper: _T
    """Upper bound."""

    def __contains__(self, item: _T) -> bool:
        """
        Check if the value is within the range of the container.

        Args:
            item: The value to check. Can't be `None` even if `_T` can be `None`.

        Returns:
            bool: True if value is within the range, otherwise False.
        """
        assert item is not None, "Can't check if `None` is within the bounds."
        if self.lower is None and self.upper is None:
            return True
        if self.lower is None:
            return item <= self.upper
        if self.upper is None:
            return self.lower <= item

        return cast(Comparable, self.lower) <= item <= cast(Comparable, self.upper)
Attributes¤
lower instance-attribute ¤
lower: _T

Lower bound.

upper instance-attribute ¤
upper: _T

Upper bound.

Functions¤
__contains__ ¤
__contains__(item: _T) -> bool

Check if the value is within the range of the container.

PARAMETER DESCRIPTION
item

The value to check. Can't be None even if _T can be None.

TYPE: _T

RETURNS DESCRIPTION
bool

True if value is within the range, otherwise False.

TYPE: bool

Source code in src/frequenz/sdk/timeseries/_base_types.py
def __contains__(self, item: _T) -> bool:
    """
    Check if the value is within the range of the container.

    Args:
        item: The value to check. Can't be `None` even if `_T` can be `None`.

    Returns:
        bool: True if value is within the range, otherwise False.
    """
    assert item is not None, "Can't check if `None` is within the bounds."
    if self.lower is None and self.upper is None:
        return True
    if self.lower is None:
        return item <= self.upper
    if self.upper is None:
        return self.lower <= item

    return cast(Comparable, self.lower) <= item <= cast(Comparable, self.upper)

frequenz.sdk.timeseries.ClocksInfo dataclass ¤

Information about the wall clock and monotonic clock and their drift.

The monotonic_requested_sleep and monotonic_elapsed values must be strictly positive, while the wall_clock_elapsed can be negative if the wall clock jumped back in time.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@dataclass(frozen=True, kw_only=True)
class ClocksInfo:
    """Information about the wall clock and monotonic clock and their drift.

    The `monotonic_requested_sleep` and `monotonic_elapsed` values must be strictly
    positive, while the `wall_clock_elapsed` can be negative if the wall clock jumped
    back in time.
    """

    monotonic_requested_sleep: timedelta
    """The requested monotonic sleep time used to gather the information (must be positive)."""

    monotonic_time: float
    """The monotonic time right after the sleep was done."""

    wall_clock_time: datetime
    """The wall clock time right after the sleep was done."""

    monotonic_elapsed: timedelta
    """The elapsed time in monotonic time (must be non-negative)."""

    wall_clock_elapsed: timedelta
    """The elapsed time in wall clock time."""

    wall_clock_factor: float = float("nan")
    """The factor to convert wall clock time to monotonic time.

    Typically, if the wall clock time expanded compared to the monotonic time (i.e.
    is more in the future), the returned value will be smaller than 1. If the wall
    clock time compressed compared to the monotonic time (i.e. is more in the past),
    the returned value will be bigger than 1.

    In cases where there are big time jumps this might be overridden by the previous
    wall clock factor to avoid adjusting by excessive amounts, when the time will
    resync anyway to catch up.
    """

    def __post_init__(self) -> None:
        """Check that the values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.monotonic_requested_sleep <= _TD_ZERO:
            raise ValueError(
                f"monotonic_requested_sleep must be strictly positive, not "
                f"{self.monotonic_requested_sleep!r}"
            )
        if not math.isfinite(self.monotonic_time):
            raise ValueError(
                f"monotonic_time must be a number, not {self.monotonic_time!r}"
            )
        if self.monotonic_elapsed <= _TD_ZERO:
            raise ValueError(
                f"monotonic_elapsed must be strictly positive, not {self.monotonic_elapsed!r}"
            )

        # This is a hack to cache the calculated value, once set it will be "immutable"
        # too, so it shouldn't change the logical "frozenness" of the class.
        if math.isnan(self.wall_clock_factor):
            wall_clock_elapsed = self.wall_clock_elapsed
            if wall_clock_elapsed <= _TD_ZERO:
                _logger.warning(
                    "The monotonic clock advanced %s, but the wall clock stayed still or "
                    "jumped back (elapsed: %s)! Hopefully this was just a singular jump in "
                    "time and not a permanent issue with the wall clock not moving at all. "
                    "For purposes of calculating the wall clock factor, a fake elapsed time "
                    "of one tenth of the elapsed monotonic time will be used.",
                    self.monotonic_elapsed,
                    wall_clock_elapsed,
                )
                wall_clock_elapsed = self.monotonic_elapsed * 0.1
            # We need to use __setattr__ here to bypass the frozen nature of the
            # dataclass. Since we are constructing the class, this is fine and the only
            # way to set calculated defaults in frozen dataclasses at the moment.
            super().__setattr__(
                "wall_clock_factor", self.monotonic_elapsed / wall_clock_elapsed
            )

    @property
    def monotonic_drift(self) -> timedelta:
        """The difference between the monotonic elapsed and requested sleep time.

        This number should be always positive, as the monotonic time should never
        jump back in time.
        """
        return self.monotonic_elapsed - self.monotonic_requested_sleep

    @property
    def wall_clock_jump(self) -> timedelta:
        """The amount of time the wall clock jumped compared to the monotonic time.

        If the wall clock is faster then the monotonic time (or jumped forward in time),
        the returned value will be positive. If the wall clock is slower than the
        monotonic time (or jumped backwards in time), the returned value will be
        negative.

        Note:
            Strictly speaking, both could be in sync and the result would be 0.0, but
            this is extremely unlikely due to floating point precision and the fact
            that both clocks are obtained as slightly different times.
        """
        return self.wall_clock_elapsed - self.monotonic_elapsed

    def wall_clock_to_monotonic(self, wall_clock_timedelta: timedelta, /) -> timedelta:
        """Convert a wall clock timedelta to a monotonic timedelta.

        This is useful to calculate how much one should sleep on the monotonic clock
        to reach a particular wall clock time, adjusting to the difference in speed
        or jumps between both.

        Args:
            wall_clock_timedelta: The wall clock timedelta to convert.

        Returns:
            The monotonic timedelta corresponding to `wall_clock_time` using the
                `wall_clock_factor`.
        """
        return wall_clock_timedelta * self.wall_clock_factor
Attributes¤
monotonic_drift property ¤
monotonic_drift: timedelta

The difference between the monotonic elapsed and requested sleep time.

This number should be always positive, as the monotonic time should never jump back in time.

monotonic_elapsed instance-attribute ¤
monotonic_elapsed: timedelta

The elapsed time in monotonic time (must be non-negative).

monotonic_requested_sleep instance-attribute ¤
monotonic_requested_sleep: timedelta

The requested monotonic sleep time used to gather the information (must be positive).

monotonic_time instance-attribute ¤
monotonic_time: float

The monotonic time right after the sleep was done.

wall_clock_elapsed instance-attribute ¤
wall_clock_elapsed: timedelta

The elapsed time in wall clock time.

wall_clock_factor class-attribute instance-attribute ¤
wall_clock_factor: float = float('nan')

The factor to convert wall clock time to monotonic time.

Typically, if the wall clock time expanded compared to the monotonic time (i.e. is more in the future), the returned value will be smaller than 1. If the wall clock time compressed compared to the monotonic time (i.e. is more in the past), the returned value will be bigger than 1.

In cases where there are big time jumps this might be overridden by the previous wall clock factor to avoid adjusting by excessive amounts, when the time will resync anyway to catch up.

wall_clock_jump property ¤
wall_clock_jump: timedelta

The amount of time the wall clock jumped compared to the monotonic time.

If the wall clock is faster then the monotonic time (or jumped forward in time), the returned value will be positive. If the wall clock is slower than the monotonic time (or jumped backwards in time), the returned value will be negative.

Note

Strictly speaking, both could be in sync and the result would be 0.0, but this is extremely unlikely due to floating point precision and the fact that both clocks are obtained as slightly different times.

wall_clock_time instance-attribute ¤
wall_clock_time: datetime

The wall clock time right after the sleep was done.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that the values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def __post_init__(self) -> None:
    """Check that the values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.monotonic_requested_sleep <= _TD_ZERO:
        raise ValueError(
            f"monotonic_requested_sleep must be strictly positive, not "
            f"{self.monotonic_requested_sleep!r}"
        )
    if not math.isfinite(self.monotonic_time):
        raise ValueError(
            f"monotonic_time must be a number, not {self.monotonic_time!r}"
        )
    if self.monotonic_elapsed <= _TD_ZERO:
        raise ValueError(
            f"monotonic_elapsed must be strictly positive, not {self.monotonic_elapsed!r}"
        )

    # This is a hack to cache the calculated value, once set it will be "immutable"
    # too, so it shouldn't change the logical "frozenness" of the class.
    if math.isnan(self.wall_clock_factor):
        wall_clock_elapsed = self.wall_clock_elapsed
        if wall_clock_elapsed <= _TD_ZERO:
            _logger.warning(
                "The monotonic clock advanced %s, but the wall clock stayed still or "
                "jumped back (elapsed: %s)! Hopefully this was just a singular jump in "
                "time and not a permanent issue with the wall clock not moving at all. "
                "For purposes of calculating the wall clock factor, a fake elapsed time "
                "of one tenth of the elapsed monotonic time will be used.",
                self.monotonic_elapsed,
                wall_clock_elapsed,
            )
            wall_clock_elapsed = self.monotonic_elapsed * 0.1
        # We need to use __setattr__ here to bypass the frozen nature of the
        # dataclass. Since we are constructing the class, this is fine and the only
        # way to set calculated defaults in frozen dataclasses at the moment.
        super().__setattr__(
            "wall_clock_factor", self.monotonic_elapsed / wall_clock_elapsed
        )
wall_clock_to_monotonic ¤
wall_clock_to_monotonic(
    wall_clock_timedelta: timedelta,
) -> timedelta

Convert a wall clock timedelta to a monotonic timedelta.

This is useful to calculate how much one should sleep on the monotonic clock to reach a particular wall clock time, adjusting to the difference in speed or jumps between both.

PARAMETER DESCRIPTION
wall_clock_timedelta

The wall clock timedelta to convert.

TYPE: timedelta

RETURNS DESCRIPTION
timedelta

The monotonic timedelta corresponding to wall_clock_time using the wall_clock_factor.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def wall_clock_to_monotonic(self, wall_clock_timedelta: timedelta, /) -> timedelta:
    """Convert a wall clock timedelta to a monotonic timedelta.

    This is useful to calculate how much one should sleep on the monotonic clock
    to reach a particular wall clock time, adjusting to the difference in speed
    or jumps between both.

    Args:
        wall_clock_timedelta: The wall clock timedelta to convert.

    Returns:
        The monotonic timedelta corresponding to `wall_clock_time` using the
            `wall_clock_factor`.
    """
    return wall_clock_timedelta * self.wall_clock_factor

frequenz.sdk.timeseries.Fuse dataclass ¤

Fuse data class.

Source code in src/frequenz/sdk/timeseries/_fuse.py
@dataclass(frozen=True)
class Fuse:
    """Fuse data class."""

    max_current: Current
    """Rated current of the fuse."""
Attributes¤
max_current instance-attribute ¤
max_current: Current

Rated current of the fuse.

frequenz.sdk.timeseries.MovingWindow ¤

Bases: BackgroundService

A data window that moves with the latest datapoints of a data stream.

After initialization the MovingWindow can be accessed by an integer index or a timestamp. A sub window can be accessed by using a slice of integers or timestamps.

Note that a numpy ndarray is returned and thus users can use numpys operations directly on a window.

The window uses a ring buffer for storage and the first element is aligned to a fixed defined point in time. Since the moving nature of the window, the date of the first and the last element are constantly changing and therefore the point in time that defines the alignment can be outside of the time window. Modulo arithmetic is used to move the align_to timestamp into the latest window.

If for example the align_to parameter is set to datetime(1, 1, 1, tzinfo=timezone.utc) and the window size is bigger than one day then the first element will always be aligned to midnight.

Resampling might be required to reduce the number of samples to store, and it can be set by specifying the resampler config parameter so that the user can control the granularity of the samples to be stored in the underlying buffer.

If resampling is not required, the resampler config parameter can be set to None in which case the MovingWindow will not perform any resampling.

Example: Calculate the mean of a time interval

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    async with MovingWindow(
        size=timedelta(seconds=5),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        time_start = datetime.now(tz=timezone.utc)
        time_end = time_start + timedelta(seconds=5)

        # ... wait for 5 seconds until the buffer is filled
        await asyncio.sleep(5)

        # return an numpy array from the window
        array = window[time_start:time_end]
        # and use it to for example calculate the mean
        mean = array.mean()

asyncio.run(run())
```

Example: Create a polars data frame from a MovingWindow

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    # create a window that stores two days of data
    # starting at 1.1.23 with samplerate=1
    async with MovingWindow(
        size=timedelta(days=2),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        # wait for one full day until the buffer is filled
        await asyncio.sleep(60*60*24)

        time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
        time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)

        # You can now create a polars series with one full day of data by
        # passing the window slice, like:
        # series = pl.Series("Jan_1", window[time_start:time_end])

asyncio.run(run())
```
Source code in src/frequenz/sdk/timeseries/_moving_window.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class MovingWindow(BackgroundService):
    """
    A data window that moves with the latest datapoints of a data stream.

    After initialization the `MovingWindow` can be accessed by an integer
    index or a timestamp. A sub window can be accessed by using a slice of
    integers or timestamps.

    Note that a numpy ndarray is returned and thus users can use
    numpys operations directly on a window.

    The window uses a ring buffer for storage and the first element is aligned to
    a fixed defined point in time. Since the moving nature of the window, the
    date of the first and the last element are constantly changing and therefore
    the point in time that defines the alignment can be outside of the time window.
    Modulo arithmetic is used to move the `align_to` timestamp into the latest
    window.

    If for example the `align_to` parameter is set to
    `datetime(1, 1, 1, tzinfo=timezone.utc)` and the window size is bigger than
    one day then the first element will always be aligned to midnight.

    Resampling might be required to reduce the number of samples to store, and
    it can be set by specifying the resampler config parameter so that the user
    can control the granularity of the samples to be stored in the underlying
    buffer.

    If resampling is not required, the resampler config parameter can be
    set to None in which case the MovingWindow will not perform any resampling.

    Example: Calculate the mean of a time interval

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            async with MovingWindow(
                size=timedelta(seconds=5),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                time_start = datetime.now(tz=timezone.utc)
                time_end = time_start + timedelta(seconds=5)

                # ... wait for 5 seconds until the buffer is filled
                await asyncio.sleep(5)

                # return an numpy array from the window
                array = window[time_start:time_end]
                # and use it to for example calculate the mean
                mean = array.mean()

        asyncio.run(run())
        ```

    Example: Create a polars data frame from a `MovingWindow`

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            # create a window that stores two days of data
            # starting at 1.1.23 with samplerate=1
            async with MovingWindow(
                size=timedelta(days=2),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                # wait for one full day until the buffer is filled
                await asyncio.sleep(60*60*24)

                time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
                time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)

                # You can now create a polars series with one full day of data by
                # passing the window slice, like:
                # series = pl.Series("Jan_1", window[time_start:time_end])

        asyncio.run(run())
        ```
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        *,
        size: timedelta,
        resampled_data_recv: Receiver[Sample[Quantity]],
        input_sampling_period: timedelta,
        resampler_config: ResamplerConfig | None = None,
        align_to: datetime = UNIX_EPOCH,
        name: str | None = None,
    ) -> None:
        """
        Initialize the MovingWindow.

        This method creates the underlying ring buffer and starts a
        new task that updates the ring buffer with new incoming samples.
        The task stops running only if the channel receiver is closed.

        Args:
            size: The time span of the moving window over which samples will be stored.
            resampled_data_recv: A receiver that delivers samples with a
                given sampling period.
            input_sampling_period: The time interval between consecutive input samples.
            resampler_config: The resampler configuration in case resampling is required.
            align_to: A timestamp that defines a point in time to which
                the window is aligned to modulo window size. For further
                information, consult the class level documentation.
            name: The name of this moving window. If `None`, `str(id(self))` will be
                used. This is used mostly for debugging purposes.
        """
        assert (
            input_sampling_period.total_seconds() > 0
        ), "The input sampling period should be greater than zero."
        assert (
            input_sampling_period <= size
        ), "The input sampling period should be equal to or lower than the window size."
        super().__init__(name=name)

        self._sampling_period = input_sampling_period

        self._resampler: Resampler | None = None
        self._resampler_sender: Sender[Sample[Quantity]] | None = None

        if resampler_config:
            assert (
                resampler_config.resampling_period <= size
            ), "The resampling period should be equal to or lower than the window size."

            self._resampler = Resampler(resampler_config)
            self._sampling_period = resampler_config.resampling_period

        # Sampling period might not fit perfectly into the window size.
        num_samples = math.ceil(
            size.total_seconds() / self._sampling_period.total_seconds()
        )

        self._resampled_data_recv = resampled_data_recv
        self._buffer = OrderedRingBuffer(
            np.empty(shape=num_samples, dtype=float),
            sampling_period=self._sampling_period,
            align_to=align_to,
        )

        self._condition_new_sample = asyncio.Condition()

    def start(self) -> None:
        """Start the MovingWindow.

        This method starts the MovingWindow tasks.
        """
        if self._resampler:
            self._configure_resampler()
        self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))

    @property
    def sampling_period(self) -> timedelta:
        """
        Return the sampling period of the MovingWindow.

        Returns:
            The sampling period of the MovingWindow.
        """
        return self._sampling_period

    @property
    def oldest_timestamp(self) -> datetime | None:
        """
        Return the oldest timestamp of the MovingWindow.

        Returns:
            The oldest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.oldest_timestamp

    @property
    def newest_timestamp(self) -> datetime | None:
        """
        Return the newest timestamp of the MovingWindow.

        Returns:
            The newest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.newest_timestamp

    @property
    def capacity(self) -> int:
        """
        Return the capacity of the MovingWindow.

        Capacity is the maximum number of samples that can be stored in the
        MovingWindow.

        Returns:
            The capacity of the MovingWindow.
        """
        return self._buffer.maxlen

    # pylint before 3.0 only accepts names with 3 or more chars
    def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
        """
        Return the sample at the given index or timestamp.

        In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
        which expects a slice as argument, this method expects a single index as argument
        and returns a single value.

        Args:
            key: The index or timestamp of the sample to return.

        Returns:
            The sample at the given index or timestamp.

        Raises:
            IndexError: If the buffer is empty or the index is out of bounds.
        """
        if self._buffer.count_valid() == 0:
            raise IndexError("The buffer is empty.")

        if isinstance(key, datetime):
            assert self._buffer.oldest_timestamp is not None
            assert self._buffer.newest_timestamp is not None
            if (
                key < self._buffer.oldest_timestamp
                or key > self._buffer.newest_timestamp
            ):
                raise IndexError(
                    f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                    f"{self._buffer.newest_timestamp}]"
                )
            return self._buffer[self._buffer.to_internal_index(key)]

        if isinstance(key, int):
            _logger.debug("Returning value at index %s ", key)
            timestamp = self._buffer.get_timestamp(key)
            assert timestamp is not None
            return self._buffer[self._buffer.to_internal_index(timestamp)]

        assert_never(key)

    def window(
        self,
        start: datetime | int | None,
        end: datetime | int | None,
        *,
        force_copy: bool = True,
        fill_value: float | None = np.nan,
    ) -> ArrayLike:
        """
        Return an array containing the samples in the given time interval.

        In contrast to the [`at`][frequenz.sdk.timeseries.MovingWindow.at] method,
        which expects a single index as argument, this method expects a slice as argument
        and returns an array.

        Args:
            start: The start timestamp of the time interval. If `None`, the
                start of the window is used.
            end: The end timestamp of the time interval. If `None`, the end of
                the window is used.
            force_copy: If `True`, the returned array is a copy of the underlying
                data. Otherwise, if possible, a view of the underlying data is
                returned.
            fill_value: If not None, will use this value to fill missing values.
                If missing values should be set, force_copy must be True.
                Defaults to NaN to avoid returning outdated data unexpectedly.

        Returns:
            An array containing the samples in the given time interval.
        """
        return self._buffer.window(
            start, end, force_copy=force_copy, fill_value=fill_value
        )

    async def wait_for_samples(self, n: int) -> None:
        """Wait until the next `n` samples are available in the MovingWindow.

        This function returns after `n` new samples are available in the MovingWindow,
        without considering whether the new samples are valid.  The validity of the
        samples can be verified by calling the
        [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

        Args:
            n: The number of samples to wait for.

        Raises:
            ValueError: If `n` is less than or equal to 0 or greater than the capacity
                of the MovingWindow.
        """
        if n == 0:
            return
        if n < 0:
            raise ValueError("The number of samples to wait for must be 0 or greater.")
        if n > self.capacity:
            raise ValueError(
                "The number of samples to wait for must be less than or equal to the "
                + f"capacity of the MovingWindow ({self.capacity})."
            )
        start_timestamp = (
            # Start from the next expected timestamp.
            self.newest_timestamp + self.sampling_period
            if self.newest_timestamp is not None
            else None
        )
        while True:
            async with self._condition_new_sample:
                # Every time a new sample is received, this condition gets notified and
                # will wake up.
                _ = await self._condition_new_sample.wait()
            if self.count_covered(since=start_timestamp) >= n:
                return

    async def _run_impl(self) -> None:
        """Awaits samples from the receiver and updates the underlying ring buffer.

        Raises:
            asyncio.CancelledError: if the MovingWindow task is cancelled.
        """
        try:
            async for sample in self._resampled_data_recv:
                _logger.debug("Received new sample: %s", sample)
                if self._resampler and self._resampler_sender:
                    await self._resampler_sender.send(sample)
                else:
                    self._buffer.update(sample)
                    async with self._condition_new_sample:
                        # Wake up all coroutines waiting for new samples.
                        self._condition_new_sample.notify_all()

        except asyncio.CancelledError:
            _logger.info("MovingWindow task has been cancelled.")
            raise

        _logger.error("Channel has been closed")

    def _configure_resampler(self) -> None:
        """Configure the components needed to run the resampler."""
        assert self._resampler is not None

        async def sink_buffer(sample: Sample[Quantity]) -> None:
            self._buffer.update(sample)
            async with self._condition_new_sample:
                # Wake up all coroutines waiting for new samples.
                self._condition_new_sample.notify_all()

        resampler_channel = Broadcast[Sample[Quantity]](name="average")
        self._resampler_sender = resampler_channel.new_sender()
        self._resampler.add_timeseries(
            "avg", resampler_channel.new_receiver(), sink_buffer
        )
        self._tasks.add(
            asyncio.create_task(self._resampler.resample(), name="resample")
        )

    def count_valid(
        self, *, since: datetime | None = None, until: datetime | None = None
    ) -> int:
        """Count the number of valid samples in this `MovingWindow`.

        If `since` and `until` are provided, the count is limited to the samples between
        (and including) the given timestamps.

        Args:
            since: The timestamp from which to start counting.  If `None`, the oldest
                timestamp of the buffer is used.
            until: The timestamp until (and including) which to count.  If `None`, the
                newest timestamp of the buffer is used.

        Returns:
            The number of valid samples in this `MovingWindow`.
        """
        return self._buffer.count_valid(since=since, until=until)

    def count_covered(
        self, *, since: datetime | None = None, until: datetime | None = None
    ) -> int:
        """Count the number of samples that are covered by the oldest and newest valid samples.

        If `since` and `until` are provided, the count is limited to the samples between
        (and including) the given timestamps.

        Args:
            since: The timestamp from which to start counting.  If `None`, the oldest
                timestamp of the buffer is used.
            until: The timestamp until (and including) which to count.  If `None`, the
                newest timestamp of the buffer is used.

        Returns:
            The count of samples between the oldest and newest (inclusive) valid samples
                or 0 if there are is no time range covered.
        """
        return self._buffer.count_covered(since=since, until=until)

    @overload
    def __getitem__(self, key: SupportsIndex) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: datetime) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: slice) -> ArrayLike:
        """See the main __getitem__ method."""

    # We need the noqa because `IndexError` is raised indirectly by `at()` and `window()`
    def __getitem__(  # noqa: DOC503
        self, key: SupportsIndex | datetime | slice
    ) -> float | ArrayLike:
        """
        Return a sub window of the `MovingWindow`.

        The `MovingWindow` is accessed either by timestamp or by index
        or by a slice of timestamps or integers.

        * If the key is an integer, the float value of that key
          at the given position is returned.
        * If the key is a timestamp, the float value of that key
          that corresponds to the timestamp is returned.
        * If the key is a slice of timestamps or integers, an ndarray is returned,
          where the bounds correspond to the slice bounds.
          Note that a half open interval, which is open at the end, is returned.

        Note:
            Slicing with a step other than 1 is not supported.

        Args:
            key: Either an integer or a timestamp or a slice of timestamps or integers.

        Raises:
            IndexError: when requesting an out of range timestamp or index
            ValueError: when requesting a slice with a step other than 1

        Returns:
            A float if the key is a number or a timestamp.
            an numpy array if the key is a slice.
        """
        if isinstance(key, slice):
            if not (key.step is None or key.step == 1):
                raise ValueError("Slicing with a step other than 1 is not supported.")
            return self.window(key.start, key.stop)

        if isinstance(key, datetime):
            return self.at(key)

        if isinstance(key, SupportsIndex):
            return self.at(key.__index__())

        assert_never(key)
Attributes¤
capacity property ¤
capacity: int

Return the capacity of the MovingWindow.

Capacity is the maximum number of samples that can be stored in the MovingWindow.

RETURNS DESCRIPTION
int

The capacity of the MovingWindow.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

newest_timestamp property ¤
newest_timestamp: datetime | None

Return the newest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The newest timestamp of the MovingWindow or None if the buffer is empty.

oldest_timestamp property ¤
oldest_timestamp: datetime | None

Return the oldest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The oldest timestamp of the MovingWindow or None if the buffer is empty.

sampling_period property ¤
sampling_period: timedelta

Return the sampling period of the MovingWindow.

RETURNS DESCRIPTION
timedelta

The sampling period of the MovingWindow.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in src/frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__getitem__ ¤
__getitem__(key: SupportsIndex) -> float
__getitem__(key: datetime) -> float
__getitem__(key: slice) -> ArrayLike
__getitem__(
    key: SupportsIndex | datetime | slice,
) -> float | ArrayLike

Return a sub window of the MovingWindow.

The MovingWindow is accessed either by timestamp or by index or by a slice of timestamps or integers.

  • If the key is an integer, the float value of that key at the given position is returned.
  • If the key is a timestamp, the float value of that key that corresponds to the timestamp is returned.
  • If the key is a slice of timestamps or integers, an ndarray is returned, where the bounds correspond to the slice bounds. Note that a half open interval, which is open at the end, is returned.
Note

Slicing with a step other than 1 is not supported.

PARAMETER DESCRIPTION
key

Either an integer or a timestamp or a slice of timestamps or integers.

TYPE: SupportsIndex | datetime | slice

RAISES DESCRIPTION
IndexError

when requesting an out of range timestamp or index

ValueError

when requesting a slice with a step other than 1

RETURNS DESCRIPTION
float | ArrayLike

A float if the key is a number or a timestamp.

float | ArrayLike

an numpy array if the key is a slice.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def __getitem__(  # noqa: DOC503
    self, key: SupportsIndex | datetime | slice
) -> float | ArrayLike:
    """
    Return a sub window of the `MovingWindow`.

    The `MovingWindow` is accessed either by timestamp or by index
    or by a slice of timestamps or integers.

    * If the key is an integer, the float value of that key
      at the given position is returned.
    * If the key is a timestamp, the float value of that key
      that corresponds to the timestamp is returned.
    * If the key is a slice of timestamps or integers, an ndarray is returned,
      where the bounds correspond to the slice bounds.
      Note that a half open interval, which is open at the end, is returned.

    Note:
        Slicing with a step other than 1 is not supported.

    Args:
        key: Either an integer or a timestamp or a slice of timestamps or integers.

    Raises:
        IndexError: when requesting an out of range timestamp or index
        ValueError: when requesting a slice with a step other than 1

    Returns:
        A float if the key is a number or a timestamp.
        an numpy array if the key is a slice.
    """
    if isinstance(key, slice):
        if not (key.step is None or key.step == 1):
            raise ValueError("Slicing with a step other than 1 is not supported.")
        return self.window(key.start, key.stop)

    if isinstance(key, datetime):
        return self.at(key)

    if isinstance(key, SupportsIndex):
        return self.at(key.__index__())

    assert_never(key)
__init__ ¤
__init__(
    *,
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    name: str | None = None
) -> None

Initialize the MovingWindow.

This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed.

PARAMETER DESCRIPTION
size

The time span of the moving window over which samples will be stored.

TYPE: timedelta

resampled_data_recv

A receiver that delivers samples with a given sampling period.

TYPE: Receiver[Sample[Quantity]]

input_sampling_period

The time interval between consecutive input samples.

TYPE: timedelta

resampler_config

The resampler configuration in case resampling is required.

TYPE: ResamplerConfig | None DEFAULT: None

align_to

A timestamp that defines a point in time to which the window is aligned to modulo window size. For further information, consult the class level documentation.

TYPE: datetime DEFAULT: UNIX_EPOCH

name

The name of this moving window. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    *,
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    name: str | None = None,
) -> None:
    """
    Initialize the MovingWindow.

    This method creates the underlying ring buffer and starts a
    new task that updates the ring buffer with new incoming samples.
    The task stops running only if the channel receiver is closed.

    Args:
        size: The time span of the moving window over which samples will be stored.
        resampled_data_recv: A receiver that delivers samples with a
            given sampling period.
        input_sampling_period: The time interval between consecutive input samples.
        resampler_config: The resampler configuration in case resampling is required.
        align_to: A timestamp that defines a point in time to which
            the window is aligned to modulo window size. For further
            information, consult the class level documentation.
        name: The name of this moving window. If `None`, `str(id(self))` will be
            used. This is used mostly for debugging purposes.
    """
    assert (
        input_sampling_period.total_seconds() > 0
    ), "The input sampling period should be greater than zero."
    assert (
        input_sampling_period <= size
    ), "The input sampling period should be equal to or lower than the window size."
    super().__init__(name=name)

    self._sampling_period = input_sampling_period

    self._resampler: Resampler | None = None
    self._resampler_sender: Sender[Sample[Quantity]] | None = None

    if resampler_config:
        assert (
            resampler_config.resampling_period <= size
        ), "The resampling period should be equal to or lower than the window size."

        self._resampler = Resampler(resampler_config)
        self._sampling_period = resampler_config.resampling_period

    # Sampling period might not fit perfectly into the window size.
    num_samples = math.ceil(
        size.total_seconds() / self._sampling_period.total_seconds()
    )

    self._resampled_data_recv = resampled_data_recv
    self._buffer = OrderedRingBuffer(
        np.empty(shape=num_samples, dtype=float),
        sampling_period=self._sampling_period,
        align_to=align_to,
    )

    self._condition_new_sample = asyncio.Condition()
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
at ¤
at(key: int | datetime) -> float

Return the sample at the given index or timestamp.

In contrast to the window method, which expects a slice as argument, this method expects a single index as argument and returns a single value.

PARAMETER DESCRIPTION
key

The index or timestamp of the sample to return.

TYPE: int | datetime

RETURNS DESCRIPTION
float

The sample at the given index or timestamp.

RAISES DESCRIPTION
IndexError

If the buffer is empty or the index is out of bounds.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
    """
    Return the sample at the given index or timestamp.

    In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
    which expects a slice as argument, this method expects a single index as argument
    and returns a single value.

    Args:
        key: The index or timestamp of the sample to return.

    Returns:
        The sample at the given index or timestamp.

    Raises:
        IndexError: If the buffer is empty or the index is out of bounds.
    """
    if self._buffer.count_valid() == 0:
        raise IndexError("The buffer is empty.")

    if isinstance(key, datetime):
        assert self._buffer.oldest_timestamp is not None
        assert self._buffer.newest_timestamp is not None
        if (
            key < self._buffer.oldest_timestamp
            or key > self._buffer.newest_timestamp
        ):
            raise IndexError(
                f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                f"{self._buffer.newest_timestamp}]"
            )
        return self._buffer[self._buffer.to_internal_index(key)]

    if isinstance(key, int):
        _logger.debug("Returning value at index %s ", key)
        timestamp = self._buffer.get_timestamp(key)
        assert timestamp is not None
        return self._buffer[self._buffer.to_internal_index(timestamp)]

    assert_never(key)
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in src/frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
count_covered ¤
count_covered(
    *,
    since: datetime | None = None,
    until: datetime | None = None
) -> int

Count the number of samples that are covered by the oldest and newest valid samples.

If since and until are provided, the count is limited to the samples between (and including) the given timestamps.

PARAMETER DESCRIPTION
since

The timestamp from which to start counting. If None, the oldest timestamp of the buffer is used.

TYPE: datetime | None DEFAULT: None

until

The timestamp until (and including) which to count. If None, the newest timestamp of the buffer is used.

TYPE: datetime | None DEFAULT: None

RETURNS DESCRIPTION
int

The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def count_covered(
    self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
    """Count the number of samples that are covered by the oldest and newest valid samples.

    If `since` and `until` are provided, the count is limited to the samples between
    (and including) the given timestamps.

    Args:
        since: The timestamp from which to start counting.  If `None`, the oldest
            timestamp of the buffer is used.
        until: The timestamp until (and including) which to count.  If `None`, the
            newest timestamp of the buffer is used.

    Returns:
        The count of samples between the oldest and newest (inclusive) valid samples
            or 0 if there are is no time range covered.
    """
    return self._buffer.count_covered(since=since, until=until)
count_valid ¤
count_valid(
    *,
    since: datetime | None = None,
    until: datetime | None = None
) -> int

Count the number of valid samples in this MovingWindow.

If since and until are provided, the count is limited to the samples between (and including) the given timestamps.

PARAMETER DESCRIPTION
since

The timestamp from which to start counting. If None, the oldest timestamp of the buffer is used.

TYPE: datetime | None DEFAULT: None

until

The timestamp until (and including) which to count. If None, the newest timestamp of the buffer is used.

TYPE: datetime | None DEFAULT: None

RETURNS DESCRIPTION
int

The number of valid samples in this MovingWindow.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def count_valid(
    self, *, since: datetime | None = None, until: datetime | None = None
) -> int:
    """Count the number of valid samples in this `MovingWindow`.

    If `since` and `until` are provided, the count is limited to the samples between
    (and including) the given timestamps.

    Args:
        since: The timestamp from which to start counting.  If `None`, the oldest
            timestamp of the buffer is used.
        until: The timestamp until (and including) which to count.  If `None`, the
            newest timestamp of the buffer is used.

    Returns:
        The number of valid samples in this `MovingWindow`.
    """
    return self._buffer.count_valid(since=since, until=until)
start ¤
start() -> None

Start the MovingWindow.

This method starts the MovingWindow tasks.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def start(self) -> None:
    """Start the MovingWindow.

    This method starts the MovingWindow tasks.
    """
    if self._resampler:
        self._configure_resampler()
    self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in src/frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:  # noqa: DOC503
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in src/frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )
wait_for_samples async ¤
wait_for_samples(n: int) -> None

Wait until the next n samples are available in the MovingWindow.

This function returns after n new samples are available in the MovingWindow, without considering whether the new samples are valid. The validity of the samples can be verified by calling the count_valid method.

PARAMETER DESCRIPTION
n

The number of samples to wait for.

TYPE: int

RAISES DESCRIPTION
ValueError

If n is less than or equal to 0 or greater than the capacity of the MovingWindow.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
async def wait_for_samples(self, n: int) -> None:
    """Wait until the next `n` samples are available in the MovingWindow.

    This function returns after `n` new samples are available in the MovingWindow,
    without considering whether the new samples are valid.  The validity of the
    samples can be verified by calling the
    [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.

    Args:
        n: The number of samples to wait for.

    Raises:
        ValueError: If `n` is less than or equal to 0 or greater than the capacity
            of the MovingWindow.
    """
    if n == 0:
        return
    if n < 0:
        raise ValueError("The number of samples to wait for must be 0 or greater.")
    if n > self.capacity:
        raise ValueError(
            "The number of samples to wait for must be less than or equal to the "
            + f"capacity of the MovingWindow ({self.capacity})."
        )
    start_timestamp = (
        # Start from the next expected timestamp.
        self.newest_timestamp + self.sampling_period
        if self.newest_timestamp is not None
        else None
    )
    while True:
        async with self._condition_new_sample:
            # Every time a new sample is received, this condition gets notified and
            # will wake up.
            _ = await self._condition_new_sample.wait()
        if self.count_covered(since=start_timestamp) >= n:
            return
window ¤
window(
    start: datetime | int | None,
    end: datetime | int | None,
    *,
    force_copy: bool = True,
    fill_value: float | None = nan
) -> ArrayLike

Return an array containing the samples in the given time interval.

In contrast to the at method, which expects a single index as argument, this method expects a slice as argument and returns an array.

PARAMETER DESCRIPTION
start

The start timestamp of the time interval. If None, the start of the window is used.

TYPE: datetime | int | None

end

The end timestamp of the time interval. If None, the end of the window is used.

TYPE: datetime | int | None

force_copy

If True, the returned array is a copy of the underlying data. Otherwise, if possible, a view of the underlying data is returned.

TYPE: bool DEFAULT: True

fill_value

If not None, will use this value to fill missing values. If missing values should be set, force_copy must be True. Defaults to NaN to avoid returning outdated data unexpectedly.

TYPE: float | None DEFAULT: nan

RETURNS DESCRIPTION
ArrayLike

An array containing the samples in the given time interval.

Source code in src/frequenz/sdk/timeseries/_moving_window.py
def window(
    self,
    start: datetime | int | None,
    end: datetime | int | None,
    *,
    force_copy: bool = True,
    fill_value: float | None = np.nan,
) -> ArrayLike:
    """
    Return an array containing the samples in the given time interval.

    In contrast to the [`at`][frequenz.sdk.timeseries.MovingWindow.at] method,
    which expects a single index as argument, this method expects a slice as argument
    and returns an array.

    Args:
        start: The start timestamp of the time interval. If `None`, the
            start of the window is used.
        end: The end timestamp of the time interval. If `None`, the end of
            the window is used.
        force_copy: If `True`, the returned array is a copy of the underlying
            data. Otherwise, if possible, a view of the underlying data is
            returned.
        fill_value: If not None, will use this value to fill missing values.
            If missing values should be set, force_copy must be True.
            Defaults to NaN to avoid returning outdated data unexpectedly.

    Returns:
        An array containing the samples in the given time interval.
    """
    return self._buffer.window(
        start, end, force_copy=force_copy, fill_value=fill_value
    )

frequenz.sdk.timeseries.PeriodicFeatureExtractor ¤

A feature extractor for historical timeseries data.

This class is creating a profile from periodically occurring windows in a buffer of historical data.

The profile is created out of all windows that are fully contained in the underlying buffer with the same start and end time modulo a fixed period.

Consider for example a timeseries $T$ of historical data and sub-series $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$ called period, where period of two sub-windows is defined as the distance of two points at the same position within the sub-windows.

This class calculates a statistical profile $S$ over all $S_i$, i.e. the value of $S$ at position $i$ is calculated by performing a certain calculation, e.g. an average, over all values of $S_i$ at position $i$.

Note

The oldest window or the window that is currently overwritten in the MovingWindow is not considered in the profile.

Note

When constructing a PeriodicFeatureExtractor object the MovingWindow size has to be a integer multiple of the period.

Example
from frequenz.sdk import microgrid
from datetime import datetime, timedelta, timezone

async with MovingWindow(
    size=timedelta(days=35),
    resampled_data_recv=microgrid.grid().power.new_receiver(),
    input_sampling_period=timedelta(seconds=1),
) as moving_window:
    feature_extractor = PeriodicFeatureExtractor(
        moving_window=moving_window,
        period=timedelta(days=7),
    )

    now = datetime.now(timezone.utc)

    # create a daily weighted average for the next 24h
    avg_24h = feature_extractor.avg(
        now,
        now + timedelta(hours=24),
        weights=[0.1, 0.2, 0.3, 0.4]
    )

    # create a daily average for Thursday March 23 2023
    th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
class PeriodicFeatureExtractor:
    """
    A feature extractor for historical timeseries data.

    This class is creating a profile from periodically occurring windows in a
    buffer of historical data.

    The profile is created out of all windows that are fully contained in the
    underlying buffer with the same start and end time modulo a fixed period.

    Consider for example a timeseries $T$ of historical data and sub-series
    $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$
    called period, where period of two sub-windows is defined as the distance
    of two points at the same position within the sub-windows.

    This class calculates a statistical profile $S$ over all $S_i$, i.e. the
    value of $S$ at position $i$ is calculated by performing a certain
    calculation, e.g. an average, over all values of $S_i$ at position $i$.

    Note:
        The oldest window or the window that is currently overwritten in the
        `MovingWindow` is not considered in the profile.

    Note:
        When constructing a `PeriodicFeatureExtractor` object the
        `MovingWindow` size has to be a integer multiple of the period.

    Example:
        ```python
        from frequenz.sdk import microgrid
        from datetime import datetime, timedelta, timezone

        async with MovingWindow(
            size=timedelta(days=35),
            resampled_data_recv=microgrid.grid().power.new_receiver(),
            input_sampling_period=timedelta(seconds=1),
        ) as moving_window:
            feature_extractor = PeriodicFeatureExtractor(
                moving_window=moving_window,
                period=timedelta(days=7),
            )

            now = datetime.now(timezone.utc)

            # create a daily weighted average for the next 24h
            avg_24h = feature_extractor.avg(
                now,
                now + timedelta(hours=24),
                weights=[0.1, 0.2, 0.3, 0.4]
            )

            # create a daily average for Thursday March 23 2023
            th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
        ```
    """

    def __init__(
        self,
        moving_window: MovingWindow,
        period: timedelta,
    ) -> None:
        """
        Initialize a PeriodicFeatureExtractor object.

        Args:
            moving_window: The MovingWindow that is used for the average calculation.
            period: The distance between two succeeding intervals.

        Raises:
            ValueError: If the MovingWindow size is not a integer multiple of the period.
        """
        self._moving_window = moving_window

        self._sampling_period = self._moving_window.sampling_period
        """The sampling_period as float to use it for indexing of samples."""

        self._period = int(period / self._sampling_period)
        """Distance between two succeeding intervals in samples."""

        _logger.debug("Initializing PeriodicFeatureExtractor!")
        _logger.debug("MovingWindow size: %i", self._moving_window.count_valid())
        _logger.debug(
            "Period between two succeeding intervals (in samples): %i",
            self._period,
        )

        if not self._moving_window.count_valid() % self._period == 0:
            raise ValueError(
                "The MovingWindow size is not a integer multiple of the period."
            )

        if not is_close_to_zero(self._period - period / self._sampling_period):
            raise ValueError(
                "The period is not a multiple of the sampling period. "
                "This might result in unexpected behaviour."
            )

    @property
    def _buffer(self) -> OrderedRingBuffer[NDArray[np.float64]]:
        return self._moving_window._buffer  # pylint: disable=protected-access

    def _timestamp_to_rel_index(self, timestamp: datetime) -> int:
        """
        Get the index of a timestamp relative to the oldest sample in the MovingWindow.

        In other word consider an integer axis where the zero is defined as the
        oldest element in the MovingWindow. This function returns the index of
        the given timestamp an this axis.

        This method can return negative values.

        Args:
            timestamp: A timestamp that we want to shift into the window.

        Returns:
            The index of the timestamp shifted into the MovingWindow.
        """
        # align timestamp to the sampling period
        timestamp = self._buffer.normalize_timestamp(timestamp)

        # distance between the input ts and the ts of oldest known samples (in samples)
        dist_to_oldest = int(
            (timestamp - self._buffer.time_bound_oldest) / self._sampling_period
        )

        _logger.debug("Shifting ts: %s", timestamp)
        _logger.debug("Oldest timestamp in buffer: %s", self._buffer.time_bound_oldest)
        _logger.debug("Distance to the oldest sample: %i", dist_to_oldest)

        return dist_to_oldest

    def _reshape_np_array(
        self, array: NDArray[np.float64], window_size: int
    ) -> NDArray[np.float64]:
        """
        Reshape a numpy array to a 2D array where each row represents a window.

        There are three cases to consider

        1. The array size is a multiple of window_size + period,
           i.e. num_windows is integer and we can simply reshape.
        2. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m < window_size.
        3. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m >= window_size.

        Note that in the current implementation of this class we have the restriction
        that period is a multiple integer of the size of the MovingWindow and hence
        only case 1 can occur.

        Args:
            array: The numpy array to reshape.
            window_size: The size of the window in samples.

        Returns:
            The reshaped 2D array.

        Raises:
            ValueError: If the array is smaller or equal to the given period.
        """
        # Not using the num_windows function here because we want to
        # differentiate between the three cases.
        if len(array) < self._period:
            raise ValueError(
                f"The array (length:{len(array)}) is too small to be reshaped."
            )

        num_windows = len(array) // self._period

        # Case 1:
        if len(array) - num_windows * self._period == 0:
            resized_array = array
        # Case 2
        elif len(array) - num_windows * self._period < window_size:
            resized_array = array[: num_windows * self._period]
        # Case 3
        else:
            num_windows += 1
            resized_array = np.resize(array, num_windows * self._period)

        return resized_array.reshape(num_windows, self._period)

    def _get_relative_positions(
        self, start: datetime, window_size: int
    ) -> RelativePositions:
        """
        Return relative positions of the MovingWindow.

        This method calculates the shifted relative positions of the start
        timestamp, the end timestamps as well as the next position that is
        overwritten in the ringbuffer.
        Shifted in that context means that the positions are moved as close
        assume possible to the oldest sample in the MovingWindow.

        Args:
            start: The start timestamp of the window.
            window_size: The size of the window in samples.

        Returns:
            The relative positions of the start, end and next samples.
        """
        # The number of usable windows can change, when the current position of
        # the ringbuffer is inside one of the windows inside the MovingWindow.
        # Since this is possible, we assume that one window is always not used
        # for the average calculation.
        #
        # We are ignoring either the window that is currently overwritten if
        # the current position is inside that window or the window that would
        # be overwritten next.
        #
        # Move the window to its first appearance in the MovingWindow relative
        # to the oldest sample stored in the MovingWindow.
        #
        # In other words the oldest stored sample is considered to have index 0.
        #
        # Note that the returned value is a index not a timestamp
        rel_start_sample = self._timestamp_to_rel_index(start) % self._period
        rel_end_sample = rel_start_sample + window_size

        # check if the newest time bound, i.e. the sample that is currently written,
        # is inside the interval
        rb_current_position = self._buffer.time_bound_newest
        rel_next_position = (
            self._timestamp_to_rel_index(rb_current_position) + 1
        ) % self._period
        # fix the rel_next_position if modulo period the next position
        # is smaller then the start sample position
        if rel_next_position < rel_start_sample:
            rel_next_position += self._period

        rel_next_position += self._period * (window_size // self._period)

        _logger.debug("current position of the ringbuffer: %s", rb_current_position)
        _logger.debug("relative start_sample: %s", rel_start_sample)
        _logger.debug("relative end_sample: %s", rel_end_sample)
        _logger.debug("relative next_position: %s", rel_next_position)

        return RelativePositions(rel_start_sample, rel_end_sample, rel_next_position)

    def _get_buffer_bounds(
        self, start: datetime, end: datetime
    ) -> tuple[int, int, int]:
        """
        Get the bounds of the ringbuffer used for further operations.

        This method uses the given start and end timestamps to calculate the
        part of the ringbuffer that can be used for further operations, like
        average or min/max.

        Here we cut out the oldest window or the window that is currently
        overwritten in the MovingWindow such that it is not considered in any
        further operation.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            The bounds of the to be used buffer and the window size.

        Raises:
            ValueError: If the start timestamp is after the end timestamp.
        """
        window_size = self._timestamp_to_rel_index(end) - self._timestamp_to_rel_index(
            start
        )
        if window_size <= 0:
            raise ValueError("Start timestamp must be before end timestamp")
        if window_size > self._period:
            raise ValueError(
                "The window size must be smaller or equal than the period."
            )

        rel_pos = self._get_relative_positions(start, window_size)

        if window_size > self._moving_window.count_valid():
            raise ValueError(
                "The window size must be smaller than the size of the `MovingWindow`"
            )

        # shifted distance between the next incoming sample and the start of the window
        dist_to_start = rel_pos.next - rel_pos.start

        # get the start and end position inside the ringbuffer
        end_pos = (
            self._timestamp_to_rel_index(self._buffer.time_bound_newest) + 1
        ) - dist_to_start

        # Note that these check is working since we are using the positions
        # relative to the oldest sample stored in the MovingWindow.
        if rel_pos.start <= rel_pos.next < rel_pos.end:
            # end position is start_position of the window that is currently written
            # that's how end_pos is currently set
            _logger.debug("Next sample will be inside the window time interval!")
        else:
            _logger.debug("Next sample will be outside the window time interval!")
            # end position is start_position of the window that
            # is overwritten next, hence we adding period.
            end_pos += self._period

        # add the offset to the oldest sample in the ringbuffer and wrap around
        # to get the start and end positions in the ringbuffer
        rb_offset = self._buffer.to_internal_index(self._buffer.time_bound_oldest)
        start_pos = self._buffer.wrap(end_pos + self._period + rb_offset)
        end_pos = self._buffer.wrap(end_pos + rb_offset)

        _logger.debug("start_pos in ringbuffer: %s", start_pos)
        _logger.debug("end_pos in ringbuffer: %s", end_pos)

        return (start_pos, end_pos, window_size)

    def _get_reshaped_np_array(
        self, start: datetime, end: datetime
    ) -> tuple[NDArray[np.float64], int]:
        """
        Create a reshaped numpy array from the MovingWindow.

        The reshaped array is a two dimensional array, where one dimension is
        the window_size and the other the number of windows returned by the
        `_get_buffer_bounds` method.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            A tuple containing the reshaped numpy array and the window size.
        """
        (start_pos, end_pos, window_size) = self._get_buffer_bounds(start, end)

        if start_pos >= end_pos:
            window_start = self._buffer[start_pos : self._moving_window.count_valid()]
            window_end = self._buffer[0:end_pos]
            # make the linter happy
            assert isinstance(window_start, np.ndarray)
            assert isinstance(window_end, np.ndarray)
            window_array = np.concatenate((window_start, window_end))
        else:
            window_array = self._buffer[start_pos:end_pos]

        return (self._reshape_np_array(window_array, window_size), window_size)

    def avg(
        self, start: datetime, end: datetime, weights: list[float] | None = None
    ) -> NDArray[np.float64]:
        """
        Create the average window out of the window defined by `start` and `end`.

        This method calculates the average of a window by averaging over all
        windows fully inside the MovingWindow having the period
        `self.period`.

        Args:
            start: The start of the window to average over.
            end: The end of the window to average over.
            weights: The weights to use for the average calculation (oldest first).

        Returns:
            The averaged timeseries window.
        """
        (reshaped, window_size) = self._get_reshaped_np_array(start, end)
        return np.average(  # type: ignore[no-any-return]
            reshaped[:, :window_size], axis=0, weights=weights
        )
Functions¤
__init__ ¤
__init__(
    moving_window: MovingWindow, period: timedelta
) -> None

Initialize a PeriodicFeatureExtractor object.

PARAMETER DESCRIPTION
moving_window

The MovingWindow that is used for the average calculation.

TYPE: MovingWindow

period

The distance between two succeeding intervals.

TYPE: timedelta

RAISES DESCRIPTION
ValueError

If the MovingWindow size is not a integer multiple of the period.

Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
def __init__(
    self,
    moving_window: MovingWindow,
    period: timedelta,
) -> None:
    """
    Initialize a PeriodicFeatureExtractor object.

    Args:
        moving_window: The MovingWindow that is used for the average calculation.
        period: The distance between two succeeding intervals.

    Raises:
        ValueError: If the MovingWindow size is not a integer multiple of the period.
    """
    self._moving_window = moving_window

    self._sampling_period = self._moving_window.sampling_period
    """The sampling_period as float to use it for indexing of samples."""

    self._period = int(period / self._sampling_period)
    """Distance between two succeeding intervals in samples."""

    _logger.debug("Initializing PeriodicFeatureExtractor!")
    _logger.debug("MovingWindow size: %i", self._moving_window.count_valid())
    _logger.debug(
        "Period between two succeeding intervals (in samples): %i",
        self._period,
    )

    if not self._moving_window.count_valid() % self._period == 0:
        raise ValueError(
            "The MovingWindow size is not a integer multiple of the period."
        )

    if not is_close_to_zero(self._period - period / self._sampling_period):
        raise ValueError(
            "The period is not a multiple of the sampling period. "
            "This might result in unexpected behaviour."
        )
avg ¤
avg(
    start: datetime,
    end: datetime,
    weights: list[float] | None = None,
) -> NDArray[float64]

Create the average window out of the window defined by start and end.

This method calculates the average of a window by averaging over all windows fully inside the MovingWindow having the period self.period.

PARAMETER DESCRIPTION
start

The start of the window to average over.

TYPE: datetime

end

The end of the window to average over.

TYPE: datetime

weights

The weights to use for the average calculation (oldest first).

TYPE: list[float] | None DEFAULT: None

RETURNS DESCRIPTION
NDArray[float64]

The averaged timeseries window.

Source code in src/frequenz/sdk/timeseries/_periodic_feature_extractor.py
def avg(
    self, start: datetime, end: datetime, weights: list[float] | None = None
) -> NDArray[np.float64]:
    """
    Create the average window out of the window defined by `start` and `end`.

    This method calculates the average of a window by averaging over all
    windows fully inside the MovingWindow having the period
    `self.period`.

    Args:
        start: The start of the window to average over.
        end: The end of the window to average over.
        weights: The weights to use for the average calculation (oldest first).

    Returns:
        The averaged timeseries window.
    """
    (reshaped, window_size) = self._get_reshaped_np_array(start, end)
    return np.average(  # type: ignore[no-any-return]
        reshaped[:, :window_size], axis=0, weights=weights
    )

frequenz.sdk.timeseries.ReceiverFetcher ¤

Bases: Generic[T_co], Protocol

An interface that just exposes a new_receiver method.

Source code in src/frequenz/sdk/_internal/_channels.py
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
    """An interface that just exposes a `new_receiver` method."""

    @abc.abstractmethod
    def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
        """Get a receiver from the channel.

        Args:
            limit: The maximum size of the receiver.

        Returns:
            A receiver instance.
        """
Functions¤
new_receiver abstractmethod ¤
new_receiver(*, limit: int = 50) -> Receiver[T_co]

Get a receiver from the channel.

PARAMETER DESCRIPTION
limit

The maximum size of the receiver.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[T_co]

A receiver instance.

Source code in src/frequenz/sdk/_internal/_channels.py
@abc.abstractmethod
def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
    """Get a receiver from the channel.

    Args:
        limit: The maximum size of the receiver.

    Returns:
        A receiver instance.
    """

frequenz.sdk.timeseries.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the *input sampling period* if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = lambda samples, _, __: statistics.fmean(
        s[1] for s in samples
    )
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = UNIX_EPOCH

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len class-attribute instance-attribute ¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len class-attribute instance-attribute ¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods class-attribute instance-attribute ¤
max_data_age_in_periods: float = 3.0

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function class-attribute instance-attribute ¤
resampling_function: ResamplingFunction = (
    lambda samples, _, __: fmean((s[1]) for s in samples)
)

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period instance-attribute ¤
resampling_period: timedelta

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len class-attribute instance-attribute ¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

frequenz.sdk.timeseries.ResamplerConfig2 dataclass ¤

Bases: ResamplerConfig

Resampler configuration.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
@dataclass(frozen=True)
class ResamplerConfig2(ResamplerConfig):
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the *input sampling period* if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction2 = lambda samples, _, __: statistics.fmean(
        s[1] for s in samples
    )
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = field(default=None, init=False)
    """Deprecated: Use timer_config.align_to instead."""

    timer_config: WallClockTimerConfig | None = None
    """The custom configuration of the wall clock timer used to keep track of time.

    If not provided or `None`, a configuration will be created by passing the
    [`resampling_period`][frequenz.sdk.timeseries.ResamplerConfig2.resampling_period] to
    the [`from_interval()`][frequenz.sdk.timeseries.WallClockTimerConfig.from_interval]
    method.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None:
            raise ValueError(
                f"align_to ({self.align_to}) must be specified via timer_config"
            )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = field(default=None, init=False)

Deprecated: Use timer_config.align_to instead.

initial_buffer_len class-attribute instance-attribute ¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len class-attribute instance-attribute ¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods class-attribute instance-attribute ¤
max_data_age_in_periods: float = 3.0

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function class-attribute instance-attribute ¤
resampling_function: ResamplingFunction2 = (
    lambda samples, _, __: fmean((s[1]) for s in samples)
)

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period instance-attribute ¤
resampling_period: timedelta

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

timer_config class-attribute instance-attribute ¤
timer_config: WallClockTimerConfig | None = None

The custom configuration of the wall clock timer used to keep track of time.

If not provided or None, a configuration will be created by passing the resampling_period to the from_interval() method.

warn_buffer_len class-attribute instance-attribute ¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None:
        raise ValueError(
            f"align_to ({self.align_to}) must be specified via timer_config"
        )

frequenz.sdk.timeseries.ResamplingError ¤

Bases: RuntimeError

An Error occurred while resampling.

This error is a container for errors raised by the underlying sources and or sinks.

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
class ResamplingError(RuntimeError):
    """An Error occurred while resampling.

    This error is a container for errors raised by the underlying sources and
    or sinks.
    """

    def __init__(
        self,
        exceptions: dict[Source, Exception | asyncio.CancelledError],
    ) -> None:
        """Create an instance.

        Args:
            exceptions: A mapping of timeseries source and the exception
                encountered while resampling that timeseries. Note that the
                error could be raised by the sink, while trying to send
                a resampled data for this timeseries, the source key is only
                used to identify the timeseries with the issue, it doesn't
                necessarily mean that the error was raised by the source. The
                underlying exception should provide information about what was
                the actual source of the exception.
        """
        super().__init__(f"Some error were found while resampling: {exceptions}")
        self.exceptions = exceptions
        """A mapping of timeseries source and the exception encountered.

        Note that the error could be raised by the sink, while trying to send
        a resampled data for this timeseries, the source key is only used to
        identify the timeseries with the issue, it doesn't necessarily mean
        that the error was raised by the source. The underlying exception
        should provide information about what was the actual source of the
        exception.
        """

    def __repr__(self) -> str:
        """Return the representation of the instance.

        Returns:
            The representation of the instance.
        """
        return f"{self.__class__.__name__}({self.exceptions=})"
Attributes¤
exceptions instance-attribute ¤
exceptions = exceptions

A mapping of timeseries source and the exception encountered.

Note that the error could be raised by the sink, while trying to send a resampled data for this timeseries, the source key is only used to identify the timeseries with the issue, it doesn't necessarily mean that the error was raised by the source. The underlying exception should provide information about what was the actual source of the exception.

Functions¤
__init__ ¤
__init__(
    exceptions: dict[Source, Exception | CancelledError],
) -> None

Create an instance.

PARAMETER DESCRIPTION
exceptions

A mapping of timeseries source and the exception encountered while resampling that timeseries. Note that the error could be raised by the sink, while trying to send a resampled data for this timeseries, the source key is only used to identify the timeseries with the issue, it doesn't necessarily mean that the error was raised by the source. The underlying exception should provide information about what was the actual source of the exception.

TYPE: dict[Source, Exception | CancelledError]

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
def __init__(
    self,
    exceptions: dict[Source, Exception | asyncio.CancelledError],
) -> None:
    """Create an instance.

    Args:
        exceptions: A mapping of timeseries source and the exception
            encountered while resampling that timeseries. Note that the
            error could be raised by the sink, while trying to send
            a resampled data for this timeseries, the source key is only
            used to identify the timeseries with the issue, it doesn't
            necessarily mean that the error was raised by the source. The
            underlying exception should provide information about what was
            the actual source of the exception.
    """
    super().__init__(f"Some error were found while resampling: {exceptions}")
    self.exceptions = exceptions
    """A mapping of timeseries source and the exception encountered.

    Note that the error could be raised by the sink, while trying to send
    a resampled data for this timeseries, the source key is only used to
    identify the timeseries with the issue, it doesn't necessarily mean
    that the error was raised by the source. The underlying exception
    should provide information about what was the actual source of the
    exception.
    """
__repr__ ¤
__repr__() -> str

Return the representation of the instance.

RETURNS DESCRIPTION
str

The representation of the instance.

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
def __repr__(self) -> str:
    """Return the representation of the instance.

    Returns:
        The representation of the instance.
    """
    return f"{self.__class__.__name__}({self.exceptions=})"

frequenz.sdk.timeseries.ResamplingFunction ¤

Bases: Protocol

Combine multiple samples into a new one.

A resampling function produces a new sample based on a list of pre-existing samples. It can do "upsampling" when the data rate of the input_samples period is smaller than the resampling_period, or "downsampling" if it is bigger.

In general, a resampling window is the same as the resampling_period, and this function might receive input samples from multiple windows in the past to enable extrapolation, but no samples from the future (so the timestamp of the new sample that is going to be produced will always be bigger than the biggest timestamp in the input data).

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
class ResamplingFunction(Protocol):
    """Combine multiple samples into a new one.

    A resampling function produces a new sample based on a list of pre-existing
    samples. It can do "upsampling" when the data rate of the `input_samples`
    period is smaller than the `resampling_period`, or "downsampling" if it is
    bigger.

    In general, a resampling window is the same as the `resampling_period`, and
    this function might receive input samples from multiple windows in the past to
    enable extrapolation, but no samples from the future (so the timestamp of the
    new sample that is going to be produced will always be bigger than the biggest
    timestamp in the input data).
    """

    def __call__(
        self,
        input_samples: Sequence[tuple[datetime, float]],
        resampler_config: ResamplerConfig,
        source_properties: SourceProperties,
        /,
    ) -> float:
        """Call the resampling function.

        Args:
            input_samples: The sequence of pre-existing samples, where the first item is
                the timestamp of the sample, and the second is the value of the sample.
                The sequence must be non-empty.
            resampler_config: The configuration of the resampler calling this
                function.
            source_properties: The properties of the source being resampled.

        Returns:
            The value of new sample produced after the resampling.
        """
        ...  # pylint: disable=unnecessary-ellipsis
Functions¤
__call__ ¤
__call__(
    input_samples: Sequence[tuple[datetime, float]],
    resampler_config: ResamplerConfig,
    source_properties: SourceProperties,
) -> float

Call the resampling function.

PARAMETER DESCRIPTION
input_samples

The sequence of pre-existing samples, where the first item is the timestamp of the sample, and the second is the value of the sample. The sequence must be non-empty.

TYPE: Sequence[tuple[datetime, float]]

resampler_config

The configuration of the resampler calling this function.

TYPE: ResamplerConfig

source_properties

The properties of the source being resampled.

TYPE: SourceProperties

RETURNS DESCRIPTION
float

The value of new sample produced after the resampling.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
def __call__(
    self,
    input_samples: Sequence[tuple[datetime, float]],
    resampler_config: ResamplerConfig,
    source_properties: SourceProperties,
    /,
) -> float:
    """Call the resampling function.

    Args:
        input_samples: The sequence of pre-existing samples, where the first item is
            the timestamp of the sample, and the second is the value of the sample.
            The sequence must be non-empty.
        resampler_config: The configuration of the resampler calling this
            function.
        source_properties: The properties of the source being resampled.

    Returns:
        The value of new sample produced after the resampling.
    """
    ...  # pylint: disable=unnecessary-ellipsis

frequenz.sdk.timeseries.ResamplingFunction2 ¤

Bases: Protocol

Combine multiple samples into a new one.

A resampling function produces a new sample based on a list of pre-existing samples. It can do "upsampling" when the data rate of the input_samples period is smaller than the resampling_period, or "downsampling" if it is bigger.

In general, a resampling window is the same as the resampling_period, and this function might receive input samples from multiple windows in the past to enable extrapolation, but no samples from the future (so the timestamp of the new sample that is going to be produced will always be bigger than the biggest timestamp in the input data).

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
class ResamplingFunction2(Protocol):
    """Combine multiple samples into a new one.

    A resampling function produces a new sample based on a list of pre-existing
    samples. It can do "upsampling" when the data rate of the `input_samples`
    period is smaller than the `resampling_period`, or "downsampling" if it is
    bigger.

    In general, a resampling window is the same as the `resampling_period`, and
    this function might receive input samples from multiple windows in the past to
    enable extrapolation, but no samples from the future (so the timestamp of the
    new sample that is going to be produced will always be bigger than the biggest
    timestamp in the input data).
    """

    def __call__(
        self,
        input_samples: Sequence[tuple[datetime, float]],
        resampler_config: ResamplerConfig | ResamplerConfig2,
        source_properties: SourceProperties,
        /,
    ) -> float:
        """Call the resampling function.

        Args:
            input_samples: The sequence of pre-existing samples, where the first item is
                the timestamp of the sample, and the second is the value of the sample.
                The sequence must be non-empty.
            resampler_config: The configuration of the resampler calling this
                function.
            source_properties: The properties of the source being resampled.

        Returns:
            The value of new sample produced after the resampling.
        """
        ...  # pylint: disable=unnecessary-ellipsis
Functions¤
__call__ ¤
__call__(
    input_samples: Sequence[tuple[datetime, float]],
    resampler_config: ResamplerConfig | ResamplerConfig2,
    source_properties: SourceProperties,
) -> float

Call the resampling function.

PARAMETER DESCRIPTION
input_samples

The sequence of pre-existing samples, where the first item is the timestamp of the sample, and the second is the value of the sample. The sequence must be non-empty.

TYPE: Sequence[tuple[datetime, float]]

resampler_config

The configuration of the resampler calling this function.

TYPE: ResamplerConfig | ResamplerConfig2

source_properties

The properties of the source being resampled.

TYPE: SourceProperties

RETURNS DESCRIPTION
float

The value of new sample produced after the resampling.

Source code in src/frequenz/sdk/timeseries/_resampling/_config.py
def __call__(
    self,
    input_samples: Sequence[tuple[datetime, float]],
    resampler_config: ResamplerConfig | ResamplerConfig2,
    source_properties: SourceProperties,
    /,
) -> float:
    """Call the resampling function.

    Args:
        input_samples: The sequence of pre-existing samples, where the first item is
            the timestamp of the sample, and the second is the value of the sample.
            The sequence must be non-empty.
        resampler_config: The configuration of the resampler calling this
            function.
        source_properties: The properties of the source being resampled.

    Returns:
        The value of new sample produced after the resampling.
    """
    ...  # pylint: disable=unnecessary-ellipsis

frequenz.sdk.timeseries.Sample dataclass ¤

Bases: Generic[QuantityT]

A measurement taken at a particular point in time.

The value could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in src/frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True, order=True)
class Sample(Generic[QuantityT]):
    """A measurement taken at a particular point in time.

    The `value` could be `None` if a component is malfunctioning or data is
    lacking for another reason, but a sample still needs to be sent to have a
    coherent view on a group of component metrics for a particular timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""

    value: QuantityT | None = None
    """The value of this sample."""

    def __str__(self) -> str:
        """Return a string representation of the sample."""
        return f"{type(self).__name__}({self.timestamp}, {self.value})"

    def __repr__(self) -> str:
        """Return a string representation of the sample."""
        return f"{type(self).__name__}({self.timestamp=}, {self.value=})"
Attributes¤
timestamp instance-attribute ¤
timestamp: datetime

The time when this sample was generated.

value class-attribute instance-attribute ¤
value: QuantityT | None = None

The value of this sample.

Functions¤
__repr__ ¤
__repr__() -> str

Return a string representation of the sample.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def __repr__(self) -> str:
    """Return a string representation of the sample."""
    return f"{type(self).__name__}({self.timestamp=}, {self.value=})"
__str__ ¤
__str__() -> str

Return a string representation of the sample.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def __str__(self) -> str:
    """Return a string representation of the sample."""
    return f"{type(self).__name__}({self.timestamp}, {self.value})"

frequenz.sdk.timeseries.Sample3Phase dataclass ¤

Bases: Generic[QuantityT]

A 3-phase measurement made at a particular point in time.

Each of the value fields could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in src/frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Sample3Phase(Generic[QuantityT]):
    """A 3-phase measurement made at a particular point in time.

    Each of the `value` fields could be `None` if a component is malfunctioning
    or data is lacking for another reason, but a sample still needs to be sent
    to have a coherent view on a group of component metrics for a particular
    timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""
    value_p1: QuantityT | None
    """The value of the 1st phase in this sample."""

    value_p2: QuantityT | None
    """The value of the 2nd phase in this sample."""

    value_p3: QuantityT | None
    """The value of the 3rd phase in this sample."""

    def __iter__(self) -> Iterator[QuantityT | None]:
        """Return an iterator that yields values from each of the phases.

        Yields:
            Per-phase measurements one-by-one.
        """
        yield self.value_p1
        yield self.value_p2
        yield self.value_p3

    @overload
    def max(self, default: QuantityT) -> QuantityT: ...

    @overload
    def max(self, default: None = None) -> QuantityT | None: ...

    def max(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the max value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Max value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x > y else y,
            filter(None, self),
        )
        return value

    @overload
    def min(self, default: QuantityT) -> QuantityT: ...

    @overload
    def min(self, default: None = None) -> QuantityT | None: ...

    def min(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the min value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Min value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x < y else y,
            filter(None, self),
        )
        return value

    def map(
        self,
        function: Callable[[QuantityT], QuantityT],
        default: QuantityT | None = None,
    ) -> Self:
        """Apply the given function on each of the phase values and return the result.

        If a phase value is `None`, replace it with `default` instead.

        Args:
            function: The function to apply on each of the phase values.
            default: The value to apply if a phase value is `None`.

        Returns:
            A new instance, with the given function applied on values for each of the
                phases.
        """
        return self.__class__(
            timestamp=self.timestamp,
            value_p1=default if self.value_p1 is None else function(self.value_p1),
            value_p2=default if self.value_p2 is None else function(self.value_p2),
            value_p3=default if self.value_p3 is None else function(self.value_p3),
        )
Attributes¤
timestamp instance-attribute ¤
timestamp: datetime

The time when this sample was generated.

value_p1 instance-attribute ¤
value_p1: QuantityT | None

The value of the 1st phase in this sample.

value_p2 instance-attribute ¤
value_p2: QuantityT | None

The value of the 2nd phase in this sample.

value_p3 instance-attribute ¤
value_p3: QuantityT | None

The value of the 3rd phase in this sample.

Functions¤
__iter__ ¤
__iter__() -> Iterator[QuantityT | None]

Return an iterator that yields values from each of the phases.

YIELDS DESCRIPTION
QuantityT | None

Per-phase measurements one-by-one.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def __iter__(self) -> Iterator[QuantityT | None]:
    """Return an iterator that yields values from each of the phases.

    Yields:
        Per-phase measurements one-by-one.
    """
    yield self.value_p1
    yield self.value_p2
    yield self.value_p3
map ¤
map(
    function: Callable[[QuantityT], QuantityT],
    default: QuantityT | None = None,
) -> Self

Apply the given function on each of the phase values and return the result.

If a phase value is None, replace it with default instead.

PARAMETER DESCRIPTION
function

The function to apply on each of the phase values.

TYPE: Callable[[QuantityT], QuantityT]

default

The value to apply if a phase value is None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
Self

A new instance, with the given function applied on values for each of the phases.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def map(
    self,
    function: Callable[[QuantityT], QuantityT],
    default: QuantityT | None = None,
) -> Self:
    """Apply the given function on each of the phase values and return the result.

    If a phase value is `None`, replace it with `default` instead.

    Args:
        function: The function to apply on each of the phase values.
        default: The value to apply if a phase value is `None`.

    Returns:
        A new instance, with the given function applied on values for each of the
            phases.
    """
    return self.__class__(
        timestamp=self.timestamp,
        value_p1=default if self.value_p1 is None else function(self.value_p1),
        value_p2=default if self.value_p2 is None else function(self.value_p2),
        value_p3=default if self.value_p3 is None else function(self.value_p3),
    )
max ¤
max(default: QuantityT) -> QuantityT
max(default: None = None) -> QuantityT | None
max(default: QuantityT | None = None) -> QuantityT | None

Return the max value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Max value among all phases, if available, default value otherwise.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def max(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the max value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Max value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x > y else y,
        filter(None, self),
    )
    return value
min ¤
min(default: QuantityT) -> QuantityT
min(default: None = None) -> QuantityT | None
min(default: QuantityT | None = None) -> QuantityT | None

Return the min value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Min value among all phases, if available, default value otherwise.

Source code in src/frequenz/sdk/timeseries/_base_types.py
def min(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the min value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Min value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x < y else y,
        filter(None, self),
    )
    return value

frequenz.sdk.timeseries.SourceProperties dataclass ¤

Properties of a resampling source.

Source code in src/frequenz/sdk/timeseries/_resampling/_base_types.py
@dataclass
class SourceProperties:
    """Properties of a resampling source."""

    sampling_start: datetime | None = None
    """The time when resampling started for this source.

    `None` means it didn't started yet.
    """

    received_samples: int = 0
    """Total samples received by this source so far."""

    sampling_period: timedelta | None = None
    """The sampling period of this source.

    This means we receive (on average) one sample for this source every
    `sampling_period` time.

    `None` means it is unknown.
    """
Attributes¤
received_samples class-attribute instance-attribute ¤
received_samples: int = 0

Total samples received by this source so far.

sampling_period class-attribute instance-attribute ¤
sampling_period: timedelta | None = None

The sampling period of this source.

This means we receive (on average) one sample for this source every sampling_period time.

None means it is unknown.

sampling_start class-attribute instance-attribute ¤
sampling_start: datetime | None = None

The time when resampling started for this source.

None means it didn't started yet.

frequenz.sdk.timeseries.SourceStoppedError ¤

Bases: RuntimeError

A timeseries stopped producing samples.

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
class SourceStoppedError(RuntimeError):
    """A timeseries stopped producing samples."""

    def __init__(self, source: Source) -> None:
        """Create an instance.

        Args:
            source: The source of the timeseries that stopped producing samples.
        """
        super().__init__(f"Timeseries stopped producing samples, source: {source}")
        self.source = source
        """The source of the timeseries that stopped producing samples."""

    def __repr__(self) -> str:
        """Return the representation of the instance.

        Returns:
            The representation of the instance.
        """
        return f"{self.__class__.__name__}({self.source!r})"
Attributes¤
source instance-attribute ¤
source = source

The source of the timeseries that stopped producing samples.

Functions¤
__init__ ¤
__init__(source: Source) -> None

Create an instance.

PARAMETER DESCRIPTION
source

The source of the timeseries that stopped producing samples.

TYPE: Source

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
def __init__(self, source: Source) -> None:
    """Create an instance.

    Args:
        source: The source of the timeseries that stopped producing samples.
    """
    super().__init__(f"Timeseries stopped producing samples, source: {source}")
    self.source = source
    """The source of the timeseries that stopped producing samples."""
__repr__ ¤
__repr__() -> str

Return the representation of the instance.

RETURNS DESCRIPTION
str

The representation of the instance.

Source code in src/frequenz/sdk/timeseries/_resampling/_exceptions.py
def __repr__(self) -> str:
    """Return the representation of the instance.

    Returns:
        The representation of the instance.
    """
    return f"{self.__class__.__name__}({self.source!r})"

frequenz.sdk.timeseries.TickInfo dataclass ¤

Information about a WallClockTimer tick.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@dataclass(frozen=True, kw_only=True)
class TickInfo:
    """Information about a `WallClockTimer` tick."""

    expected_tick_time: datetime
    """The expected time when the timer should have triggered."""

    sleep_infos: Sequence[ClocksInfo]
    """The information about every sleep performed to trigger this tick.

    If the timer didn't have do to a [`sleep()`][asyncio.sleep] to trigger the tick
    (i.e. the timer is catching up because there were big drifts in previous ticks),
    this will be empty.
    """

    @property
    def latest_sleep_info(self) -> ClocksInfo | None:
        """The clocks information from the last sleep done to trigger this tick.

        If no sleeps were done, this will be `None`.
        """
        return self.sleep_infos[-1] if self.sleep_infos else None
Attributes¤
expected_tick_time instance-attribute ¤
expected_tick_time: datetime

The expected time when the timer should have triggered.

latest_sleep_info property ¤
latest_sleep_info: ClocksInfo | None

The clocks information from the last sleep done to trigger this tick.

If no sleeps were done, this will be None.

sleep_infos instance-attribute ¤
sleep_infos: Sequence[ClocksInfo]

The information about every sleep performed to trigger this tick.

If the timer didn't have do to a sleep() to trigger the tick (i.e. the timer is catching up because there were big drifts in previous ticks), this will be empty.

frequenz.sdk.timeseries.WallClockTimer ¤

Bases: Receiver[TickInfo]

A timer synchronized with the wall clock.

This timer uses the wall clock to trigger ticks and handles discrepancies between the wall clock and monotonic time. Since sleeping is performed using monotonic time, differences between the two clocks can occur.

When the wall clock progresses slower than monotonic time, it is referred to as compression (wall clock time appears in the past relative to monotonic time). Conversely, when the wall clock progresses faster, it is called expansion (wall clock time appears in the future relative to monotonic time). If these differences exceed a configured threshold, a warning is emitted. The threshold is defined by the wall_clock_drift_tolerance_factor.

If the difference becomes excessively large, it is treated as a time jump. Time jumps can occur, for example, when the wall clock is adjusted by NTP after being out of sync for an extended period. In such cases, the timer resynchronizes with the wall clock and triggers an immediate tick. The threshold for detecting time jumps is controlled by the wall_clock_jump_threshold.

The timer ensures ticks are aligned to the align_to configuration, even after time jumps.

Additionally, the timer emits warnings if the actual sleep duration deviates significantly from the requested duration. This can happen due to event loop blocking or system overload. The tolerance for such deviations is defined by the async_drift_tolerance.

To account for these complexities, each tick provides a TickInfo object, which includes detailed information about the clocks and their drift.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
class WallClockTimer(Receiver[TickInfo]):
    """A timer synchronized with the wall clock.

    This timer uses the wall clock to trigger ticks and handles discrepancies between
    the wall clock and monotonic time. Since sleeping is performed using monotonic time,
    differences between the two clocks can occur.

    When the wall clock progresses slower than monotonic time, it is referred to as
    *compression* (wall clock time appears in the past relative to monotonic time).
    Conversely, when the wall clock progresses faster, it is called *expansion*
    (wall clock time appears in the future relative to monotonic time). If these
    differences exceed a configured threshold, a warning is emitted. The threshold
    is defined by the
    [`wall_clock_drift_tolerance_factor`][frequenz.sdk.timeseries.WallClockTimerConfig.wall_clock_drift_tolerance_factor].

    If the difference becomes excessively large, it is treated as a *time jump*.
    Time jumps can occur, for example, when the wall clock is adjusted by NTP after
    being out of sync for an extended period. In such cases, the timer resynchronizes
    with the wall clock and triggers an immediate tick. The threshold for detecting
    time jumps is controlled by the
    [`wall_clock_jump_threshold`][frequenz.sdk.timeseries.WallClockTimerConfig.wall_clock_jump_threshold].

    The timer ensures ticks are aligned to the
    [`align_to`][frequenz.sdk.timeseries.WallClockTimerConfig.align_to] configuration,
    even after time jumps.

    Additionally, the timer emits warnings if the actual sleep duration deviates
    significantly from the requested duration. This can happen due to event loop
    blocking or system overload. The tolerance for such deviations is defined by the
    [`async_drift_tolerance`][frequenz.sdk.timeseries.WallClockTimerConfig.async_drift_tolerance].

    To account for these complexities, each tick provides a
    [`TickInfo`][frequenz.sdk.timeseries.TickInfo] object, which includes detailed
    information about the clocks and their drift.
    """

    def __init__(
        self,
        interval: timedelta,
        config: WallClockTimerConfig | None = None,
        *,
        auto_start: bool = True,
    ) -> None:
        """Initialize this timer.

        See the class documentation for details.

        Args:
            interval: The time between timer ticks. Must be positive.
            config: The configuration for the timer. If `None`, a default configuration
                will be created using `from_interval()`.
            auto_start: Whether the timer should start automatically. If `False`,
                `reset()` must be called before the timer can be used.

        Raises:
            ValueError: If any value is out of range.
        """
        if interval <= _TD_ZERO:
            raise ValueError(f"interval must be positive, not {interval}")

        self._interval: timedelta = interval
        """The time to between timer ticks.

        The wall clock is used, so this will be added to the current time to calculate
        the next tick time.
        """

        self._config = config or WallClockTimerConfig.from_interval(interval)
        """The configuration for this timer."""

        self._closed: bool = True
        """Whether the timer was requested to close.

        If this is `False`, then the timer is running.

        If this is `True`, then it is closed or there is a request to close it
        or it was not started yet:

        * If `_next_tick_time` is `None`, it means it wasn't started yet (it was
          created with `auto_start=False`).  Any receiving method will start
          it by calling `reset()` in this case.

        * If `_next_tick_time` is not `None`, it means there was a request to
          close it.  In this case receiving methods will raise
          a `ReceiverStoppedError`.
        """

        self._next_tick_time: datetime | None = None
        """The wall clock time when the next tick should happen.

        If this is `None`, it means the timer didn't start yet, but it should
        be started as soon as it is used.
        """

        self._current_tick_info: TickInfo | None = None
        """The current tick information.

        This is calculated by `ready()` but is returned by `consume()`. If
        `None` it means `ready()` wasn't called and `consume()` will assert.
        `consume()` will set it back to `None` to tell `ready()` that it needs
        to wait again.
        """

        self._clocks_info: ClocksInfo | None = None
        """The latest information about the clocks and their drift.

        Or `None` if no sleeps were done yet.
        """

        if auto_start:
            self.reset()

    @property
    def interval(self) -> timedelta:
        """The interval between timer ticks.

        Since the wall clock is used, this will be added to the current time to
        calculate the next tick time.

        Danger:
            In real (monotonic) time, the actual time it passes between ticks could be
            smaller, bigger, or even **negative** if the wall clock jumped back in time!
        """
        return self._interval

    @property
    def config(self) -> WallClockTimerConfig:
        """The configuration for this timer."""
        return self._config

    @property
    def is_running(self) -> bool:
        """Whether the timer is running."""
        return not self._closed

    @property
    def next_tick_time(self) -> datetime | None:
        """The wall clock time when the next tick should happen, or `None` if it is not running."""
        return None if self._closed else self._next_tick_time

    def reset(self) -> None:
        """Reset the timer to start timing from now (plus an optional alignment).

        If the timer was closed, or not started yet, it will be started.
        """
        self._closed = False
        self._update_next_tick_time()
        self._current_tick_info = None
        # We assume the clocks will behave similarly after the timer was reset, so we
        # purposefully don't reset the clocks info.
        _logger.debug("reset(): _next_tick_time=%s", self._next_tick_time)

    @override
    def close(self) -> None:
        """Close and stop the timer.

        Once `close` has been called, all subsequent calls to `ready()` will immediately
        return False and calls to `consume()` / `receive()` or any use of the async
        iterator interface will raise a
        [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].

        You can restart the timer with `reset()`.
        """
        self._closed = True
        # We need to make sure it's not None, otherwise `ready()` will start it
        self._next_tick_time = datetime.now(timezone.utc)

    def _should_resync(self, info: ClocksInfo | timedelta | None) -> bool:
        """Check if the timer needs to resynchronize with the wall clock.

        This checks if the wall clock jumped beyond the configured threshold, which
        is defined in the timer configuration.

        Args:
            info: The information about the clocks and their drift. If `None`, it will
                not check for a resync, and will return `False`. If it is a
                `ClocksInfo`, it will check the `wall_clock_jump` property. If it is a
                `timedelta`, it will check if the absolute value is greater than the
                configured threshold.

        Returns:
            Whether the timer should resync to the wall clock.
        """
        threshold = self._config.wall_clock_jump_threshold
        if threshold is None or info is None:
            return False
        if isinstance(info, ClocksInfo):
            info = info.wall_clock_jump
        return abs(info) > threshold

    # We need to disable too many branches here, because the method is too complex but
    # it is not trivial to split into smaller parts.
    @override
    async def ready(self) -> bool:  # pylint: disable=too-many-branches
        """Wait until the timer `interval` passed.

        Once a call to `ready()` has finished, the resulting tick information
        must be read with a call to `consume()` (`receive()` or iterated over)
        to tell the timer it should wait for the next interval.

        The timer will remain ready (this method will return immediately)
        until it is consumed.

        Returns:
            Whether the timer was started and it is still running.
        """
        # If there are messages waiting to be consumed, return immediately.
        if self._current_tick_info is not None:
            return True

        # If `_next_tick_time` is `None`, it means it was created with
        # `auto_start=True` and should be started.
        if self._next_tick_time is None:
            self.reset()
            assert (
                self._next_tick_time is not None
            ), "This should be assigned by reset()"

        # If a close was explicitly requested, we bail out.
        if self._closed:
            return False

        wall_clock_now = datetime.now(timezone.utc)
        wall_clock_time_to_next_tick = self._next_tick_time - wall_clock_now

        # If we didn't reach the tick yet, sleep until we do.
        # We need to do this in a loop to react to resets, time jumps and wall clock
        # time compression, in which cases we need to recalculate the time to the next
        # tick and try again.
        sleeps: list[ClocksInfo] = []
        should_resync: bool = self._should_resync(self._clocks_info)
        while wall_clock_time_to_next_tick > _TD_ZERO:
            prev_clocks_info = self._clocks_info
            # We don't assign directly to self._clocks_info because its type is
            # ClocksInfo | None, and sleep() returns ClocksInfo, so we can avoid some
            # None checks further in the code with `clocks_info` (and we make the code
            # more succinct).
            clocks_info = await self._sleep(
                wall_clock_time_to_next_tick, prev_clocks_info=prev_clocks_info
            )
            should_resync = self._should_resync(clocks_info)
            wall_clock_now = datetime.now(timezone.utc)
            self._clocks_info = clocks_info

            sleeps.append(clocks_info)

            if previous_factor := self._has_drifted_beyond_tolerance(
                new_clocks_info=clocks_info, prev_clocks_info=prev_clocks_info
            ):
                # If we are resyncing we have a different issue, and we are not going to
                # use the factor to adjust the clock, but will just resync
                if not should_resync:
                    _logger.warning(
                        "The wall clock time drifted too much from the monotonic time. The "
                        "monotonic time will be adjusted to compensate for this difference. "
                        "We expected the wall clock time to have advanced (%s), but the "
                        "monotonic time advanced (%s) [previous_factor=%s "
                        "current_factor=%s, factor_change_absolute_tolerance=%s].",
                        clocks_info.wall_clock_elapsed,
                        clocks_info.monotonic_elapsed,
                        previous_factor,
                        clocks_info.wall_clock_factor,
                        self._config.wall_clock_drift_tolerance_factor,
                    )

            wall_clock_time_to_next_tick = self._next_tick_time - wall_clock_now

            # Technically the monotonic drift should always be positive, but we handle
            # negative values just in case, we've seen a lot of weird things happen.
            monotonic_drift = abs(clocks_info.monotonic_drift)
            drift_tolerance = self._config.async_drift_tolerance
            if drift_tolerance is not None and monotonic_drift > drift_tolerance:
                _logger.warning(
                    "The timer was supposed to sleep for %s, but it slept for %s "
                    "instead [difference=%s, tolerance=%s]. This is likely due to a "
                    "task taking too much time to complete and blocking the event "
                    "loop for too long. You probably should profile your code to "
                    "find out what's taking too long.",
                    clocks_info.monotonic_requested_sleep,
                    clocks_info.monotonic_elapsed,
                    monotonic_drift,
                    drift_tolerance,
                )

            # If we detect a time jump, we exit the loop and handle it outside of it, to
            # also account for time jumps in the past that could happen without even
            # having entered into the sleep loop.
            if should_resync:
                _logger.debug(
                    "ready(): Exiting the wait loop because we detected a time jump "
                    "and need to re-sync."
                )
                break

            if _logger.isEnabledFor(logging.DEBUG):
                _logger.debug(
                    "ready(): In sleep loop:\n"
                    "    next_tick_time=%s (%s)\n"
                    "    now=%s (%s)\n"
                    "    mono_now=%s\n"
                    "    wall_clock_time_to_next_tick=%s (%s)",
                    self._next_tick_time,
                    self._next_tick_time.timestamp(),
                    wall_clock_now,
                    wall_clock_now.timestamp(),
                    asyncio.get_running_loop().time(),
                    wall_clock_time_to_next_tick,
                    wall_clock_time_to_next_tick.total_seconds(),
                )

        # If there was a time jump, we need to resync the timer to the wall clock,
        # otherwise we can be sleeping for a long time until the timer catches up,
        # which is not suitable for many use cases.
        #
        # Resyncing the timer ensures that we keep ticking more or less at `interval`
        # even in the event of time jumps, with the downside that the timer will
        # trigger more than once for the same timestamp if it jumps back in time,
        # and will skip ticks if it jumps forward in time.
        #
        # When there is no threshold, so there is no resync, the ticks will be
        # contigous in time from the wall clock perspective, waiting until we reach
        # the expected next tick time when jumping back in time, and bursting all
        # missed ticks when jumping forward in time.
        if should_resync:
            assert self._clocks_info is not None
            _logger.warning(
                "The wall clock jumped %s (%s seconds) in time (threshold=%s). "
                "A tick will be triggered immediately with the `expected_tick_time` "
                "as it was before the time jump and the timer will be resynced to "
                "the wall clock.",
                self._clocks_info.wall_clock_jump,
                self._clocks_info.wall_clock_jump.total_seconds(),
                self._config.wall_clock_jump_threshold,
            )

        # If a close was explicitly requested during the sleep, we bail out.
        if self._closed:
            return False

        self._current_tick_info = TickInfo(
            expected_tick_time=self._next_tick_time, sleep_infos=sleeps
        )

        if should_resync:
            _logger.debug(
                "ready(): Before resync:\n"
                "    next_tick_time=%s\n"
                "    now=%s\n"
                "    wall_clock_time_to_next_tick=%s",
                self._next_tick_time,
                wall_clock_now,
                wall_clock_time_to_next_tick,
            )
            self._update_next_tick_time(now=wall_clock_now)
            _logger.debug(
                "ready(): After resync: next_tick_time=%s", self._next_tick_time
            )
        else:
            self._next_tick_time += self._interval
            _logger.debug(
                "ready(): No resync needed: next_tick_time=%s",
                self._next_tick_time,
            )

        return True

    @override
    def consume(self) -> TickInfo:
        """Return the latest tick information once `ready()` is complete.

        Once the timer has triggered
        ([`ready()`][frequenz.sdk.timeseries.WallClockTimer.ready] is done), this method
        returns the information about the tick that just happened.

        Returns:
            The information about the tick that just happened.

        Raises:
            ReceiverStoppedError: If the timer was closed via `close()`.
        """
        # If it was closed and there it no pending result, we raise
        # (if there is a pending result, then we still want to return it first)
        if self._closed and self._current_tick_info is None:
            raise ReceiverStoppedError(self)

        assert (
            self._current_tick_info is not None
        ), "calls to `consume()` must be follow a call to `ready()`"
        info = self._current_tick_info
        self._current_tick_info = None
        return info

    def _update_next_tick_time(self, *, now: datetime | None = None) -> None:
        """Update the next tick time, aligning it to `self._align_to` or now."""
        if now is None:
            now = datetime.now(timezone.utc)

        elapsed = _TD_ZERO

        if self._config.align_to is not None:
            elapsed = (now - self._config.align_to) % self._interval

        self._next_tick_time = now + self._interval - elapsed

    def _has_drifted_beyond_tolerance(
        self, *, new_clocks_info: ClocksInfo, prev_clocks_info: ClocksInfo | None
    ) -> float | Literal[False]:
        """Check if the wall clock drifted beyond the configured tolerance.

        This checks the relative difference between the wall clock and monotonic time
        based on the `wall_clock_drift_tolerance_factor` configuration.

        Args:
            new_clocks_info: The information about the clocks and their drift from the
                current sleep.
            prev_clocks_info: The information about the clocks and their drift from the
                previous sleep. If `None`, the previous factor will be considered 1.0.

        Returns:
            The previous wall clock factor if the drift is beyond the tolerance, or
                `False` if it is not.
        """
        tolerance = self._config.wall_clock_drift_tolerance_factor
        if tolerance is None:
            return False

        previous_factor = (
            prev_clocks_info.wall_clock_factor if prev_clocks_info else 1.0
        )
        current_factor = new_clocks_info.wall_clock_factor
        if abs(current_factor - previous_factor) > tolerance:
            return previous_factor
        return False

    async def _sleep(
        self, delay: timedelta, /, *, prev_clocks_info: ClocksInfo | None
    ) -> ClocksInfo:
        """Sleep for a given time and return information about the clocks and their drift.

        The time to sleep is adjusted based on the previously observed drift between the
        wall clock and monotonic time, if any.

        Also saves the information about the clocks and their drift for the next sleep.

        Args:
            delay: The time to sleep. Will be adjusted based on `prev_clocks_info` if
                available.
            prev_clocks_info: The information about the clocks and their drift from the
                previous sleep. If `None`, the sleep will be done as requested, without
                adjusting the time to sleep.

        Returns:
            The information about the clocks and their drift for this sleep.
        """
        if prev_clocks_info is not None:
            _logger.debug(
                "_sleep(): Adjusted original requested delay (%s) with factor %s",
                delay.total_seconds(),
                prev_clocks_info.wall_clock_factor,
            )
            delay = prev_clocks_info.wall_clock_to_monotonic(delay)

        delay_s = delay.total_seconds()

        _logger.debug("_sleep(): Will sleep for %s seconds", delay_s)
        start_monotonic_time = asyncio.get_running_loop().time()
        start_wall_clock_time = datetime.now(timezone.utc)
        await asyncio.sleep(delay_s)

        end_monotonic_time = asyncio.get_running_loop().time()
        end_wall_clock_time = datetime.now(timezone.utc)

        elapsed_monotonic = timedelta(seconds=end_monotonic_time - start_monotonic_time)
        elapsed_wall_clock = end_wall_clock_time - start_wall_clock_time

        wall_clock_jump = elapsed_wall_clock - elapsed_monotonic
        should_resync = self._should_resync(wall_clock_jump)
        _logger.debug("_sleep(): SHOULD RESYNC? %s", should_resync)
        clocks_info = ClocksInfo(
            monotonic_requested_sleep=delay,
            monotonic_time=end_monotonic_time,
            wall_clock_time=end_wall_clock_time,
            monotonic_elapsed=elapsed_monotonic,
            wall_clock_elapsed=elapsed_wall_clock,
            # If we should resync it means there was a big time jump, which should be
            # exceptional (NTP adjusting the clock or something like that), in this case
            # we want to use the previous factor as the current one will be way off.
            wall_clock_factor=(
                prev_clocks_info.wall_clock_factor
                if prev_clocks_info and should_resync
                else float("nan")  # nan means let ClocksInfo calculate it
            ),
        )
        _logger.debug(
            "_sleep(): After sleeping:\n"
            "    monotonic_requested_sleep=%s\n"
            "    monotonic_time=%s\n"
            "    wall_clock_time=%s\n"
            "    monotonic_elapsed=%s\n"
            "    wall_clock_elapsed=%s\n"
            "    factor=%s\n",
            clocks_info.monotonic_requested_sleep,
            clocks_info.monotonic_time,
            clocks_info.wall_clock_time,
            clocks_info.monotonic_elapsed,
            clocks_info.wall_clock_elapsed,
            clocks_info.wall_clock_factor,
        )

        return clocks_info

    def __str__(self) -> str:
        """Return a string representation of this timer."""
        return f"{type(self).__name__}({self.interval})"

    def __repr__(self) -> str:
        """Return a string representation of this timer."""
        next_tick = (
            ""
            if self._next_tick_time is None
            else f", next_tick_time={self._next_tick_time!r}"
        )
        return (
            f"{type(self).__name__}<interval={self.interval!r}, "
            f"is_running={self.is_running!r}{next_tick}>"
        )
Attributes¤
config property ¤

The configuration for this timer.

interval property ¤
interval: timedelta

The interval between timer ticks.

Since the wall clock is used, this will be added to the current time to calculate the next tick time.

Danger

In real (monotonic) time, the actual time it passes between ticks could be smaller, bigger, or even negative if the wall clock jumped back in time!

is_running property ¤
is_running: bool

Whether the timer is running.

next_tick_time property ¤
next_tick_time: datetime | None

The wall clock time when the next tick should happen, or None if it is not running.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__(
    interval: timedelta,
    config: WallClockTimerConfig | None = None,
    *,
    auto_start: bool = True
) -> None

Initialize this timer.

See the class documentation for details.

PARAMETER DESCRIPTION
interval

The time between timer ticks. Must be positive.

TYPE: timedelta

config

The configuration for the timer. If None, a default configuration will be created using from_interval().

TYPE: WallClockTimerConfig | None DEFAULT: None

auto_start

Whether the timer should start automatically. If False, reset() must be called before the timer can be used.

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def __init__(
    self,
    interval: timedelta,
    config: WallClockTimerConfig | None = None,
    *,
    auto_start: bool = True,
) -> None:
    """Initialize this timer.

    See the class documentation for details.

    Args:
        interval: The time between timer ticks. Must be positive.
        config: The configuration for the timer. If `None`, a default configuration
            will be created using `from_interval()`.
        auto_start: Whether the timer should start automatically. If `False`,
            `reset()` must be called before the timer can be used.

    Raises:
        ValueError: If any value is out of range.
    """
    if interval <= _TD_ZERO:
        raise ValueError(f"interval must be positive, not {interval}")

    self._interval: timedelta = interval
    """The time to between timer ticks.

    The wall clock is used, so this will be added to the current time to calculate
    the next tick time.
    """

    self._config = config or WallClockTimerConfig.from_interval(interval)
    """The configuration for this timer."""

    self._closed: bool = True
    """Whether the timer was requested to close.

    If this is `False`, then the timer is running.

    If this is `True`, then it is closed or there is a request to close it
    or it was not started yet:

    * If `_next_tick_time` is `None`, it means it wasn't started yet (it was
      created with `auto_start=False`).  Any receiving method will start
      it by calling `reset()` in this case.

    * If `_next_tick_time` is not `None`, it means there was a request to
      close it.  In this case receiving methods will raise
      a `ReceiverStoppedError`.
    """

    self._next_tick_time: datetime | None = None
    """The wall clock time when the next tick should happen.

    If this is `None`, it means the timer didn't start yet, but it should
    be started as soon as it is used.
    """

    self._current_tick_info: TickInfo | None = None
    """The current tick information.

    This is calculated by `ready()` but is returned by `consume()`. If
    `None` it means `ready()` wasn't called and `consume()` will assert.
    `consume()` will set it back to `None` to tell `ready()` that it needs
    to wait again.
    """

    self._clocks_info: ClocksInfo | None = None
    """The latest information about the clocks and their drift.

    Or `None` if no sleeps were done yet.
    """

    if auto_start:
        self.reset()
__repr__ ¤
__repr__() -> str

Return a string representation of this timer.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def __repr__(self) -> str:
    """Return a string representation of this timer."""
    next_tick = (
        ""
        if self._next_tick_time is None
        else f", next_tick_time={self._next_tick_time!r}"
    )
    return (
        f"{type(self).__name__}<interval={self.interval!r}, "
        f"is_running={self.is_running!r}{next_tick}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this timer.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def __str__(self) -> str:
    """Return a string representation of this timer."""
    return f"{type(self).__name__}({self.interval})"
close ¤
close() -> None

Close and stop the timer.

Once close has been called, all subsequent calls to ready() will immediately return False and calls to consume() / receive() or any use of the async iterator interface will raise a ReceiverStoppedError.

You can restart the timer with reset().

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@override
def close(self) -> None:
    """Close and stop the timer.

    Once `close` has been called, all subsequent calls to `ready()` will immediately
    return False and calls to `consume()` / `receive()` or any use of the async
    iterator interface will raise a
    [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].

    You can restart the timer with `reset()`.
    """
    self._closed = True
    # We need to make sure it's not None, otherwise `ready()` will start it
    self._next_tick_time = datetime.now(timezone.utc)
consume ¤
consume() -> TickInfo

Return the latest tick information once ready() is complete.

Once the timer has triggered (ready() is done), this method returns the information about the tick that just happened.

RETURNS DESCRIPTION
TickInfo

The information about the tick that just happened.

RAISES DESCRIPTION
ReceiverStoppedError

If the timer was closed via close().

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@override
def consume(self) -> TickInfo:
    """Return the latest tick information once `ready()` is complete.

    Once the timer has triggered
    ([`ready()`][frequenz.sdk.timeseries.WallClockTimer.ready] is done), this method
    returns the information about the tick that just happened.

    Returns:
        The information about the tick that just happened.

    Raises:
        ReceiverStoppedError: If the timer was closed via `close()`.
    """
    # If it was closed and there it no pending result, we raise
    # (if there is a pending result, then we still want to return it first)
    if self._closed and self._current_tick_info is None:
        raise ReceiverStoppedError(self)

    assert (
        self._current_tick_info is not None
    ), "calls to `consume()` must be follow a call to `ready()`"
    info = self._current_tick_info
    self._current_tick_info = None
    return info
filter ¤
filter(
    filter_function: Callable[
        [ReceiverMessageT_co],
        TypeGuard[FilteredMessageT_co],
    ],
) -> Receiver[FilteredMessageT_co]
filter(
    filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[
            [ReceiverMessageT_co],
            TypeGuard[FilteredMessageT_co],
        ]
    ),
) -> (
    Receiver[ReceiverMessageT_co]
    | Receiver[FilteredMessageT_co]
)

Apply a filter function on the messages on a receiver.

Note

You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
filter_function

The function to be applied on incoming messages to determine if they should be received.

TYPE: Callable[[ReceiverMessageT_co], bool] | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]

RETURNS DESCRIPTION
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]

A new receiver that only receives messages that pass the filter.

Source code in frequenz/channels/_receiver.py
def filter(
    self,
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]
    ),
    /,
) -> Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]:
    """Apply a filter function on the messages on a receiver.

    Note:
        You can pass a [type guard][typing.TypeGuard] as the filter function to
        narrow the type of the messages that pass the filter.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        filter_function: The function to be applied on incoming messages to
            determine if they should be received.

    Returns:
        A new receiver that only receives messages that pass the filter.
    """
    return _Filter(receiver=self, filter_function=filter_function)
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait until the timer interval passed.

Once a call to ready() has finished, the resulting tick information must be read with a call to consume() (receive() or iterated over) to tell the timer it should wait for the next interval.

The timer will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the timer was started and it is still running.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@override
async def ready(self) -> bool:  # pylint: disable=too-many-branches
    """Wait until the timer `interval` passed.

    Once a call to `ready()` has finished, the resulting tick information
    must be read with a call to `consume()` (`receive()` or iterated over)
    to tell the timer it should wait for the next interval.

    The timer will remain ready (this method will return immediately)
    until it is consumed.

    Returns:
        Whether the timer was started and it is still running.
    """
    # If there are messages waiting to be consumed, return immediately.
    if self._current_tick_info is not None:
        return True

    # If `_next_tick_time` is `None`, it means it was created with
    # `auto_start=True` and should be started.
    if self._next_tick_time is None:
        self.reset()
        assert (
            self._next_tick_time is not None
        ), "This should be assigned by reset()"

    # If a close was explicitly requested, we bail out.
    if self._closed:
        return False

    wall_clock_now = datetime.now(timezone.utc)
    wall_clock_time_to_next_tick = self._next_tick_time - wall_clock_now

    # If we didn't reach the tick yet, sleep until we do.
    # We need to do this in a loop to react to resets, time jumps and wall clock
    # time compression, in which cases we need to recalculate the time to the next
    # tick and try again.
    sleeps: list[ClocksInfo] = []
    should_resync: bool = self._should_resync(self._clocks_info)
    while wall_clock_time_to_next_tick > _TD_ZERO:
        prev_clocks_info = self._clocks_info
        # We don't assign directly to self._clocks_info because its type is
        # ClocksInfo | None, and sleep() returns ClocksInfo, so we can avoid some
        # None checks further in the code with `clocks_info` (and we make the code
        # more succinct).
        clocks_info = await self._sleep(
            wall_clock_time_to_next_tick, prev_clocks_info=prev_clocks_info
        )
        should_resync = self._should_resync(clocks_info)
        wall_clock_now = datetime.now(timezone.utc)
        self._clocks_info = clocks_info

        sleeps.append(clocks_info)

        if previous_factor := self._has_drifted_beyond_tolerance(
            new_clocks_info=clocks_info, prev_clocks_info=prev_clocks_info
        ):
            # If we are resyncing we have a different issue, and we are not going to
            # use the factor to adjust the clock, but will just resync
            if not should_resync:
                _logger.warning(
                    "The wall clock time drifted too much from the monotonic time. The "
                    "monotonic time will be adjusted to compensate for this difference. "
                    "We expected the wall clock time to have advanced (%s), but the "
                    "monotonic time advanced (%s) [previous_factor=%s "
                    "current_factor=%s, factor_change_absolute_tolerance=%s].",
                    clocks_info.wall_clock_elapsed,
                    clocks_info.monotonic_elapsed,
                    previous_factor,
                    clocks_info.wall_clock_factor,
                    self._config.wall_clock_drift_tolerance_factor,
                )

        wall_clock_time_to_next_tick = self._next_tick_time - wall_clock_now

        # Technically the monotonic drift should always be positive, but we handle
        # negative values just in case, we've seen a lot of weird things happen.
        monotonic_drift = abs(clocks_info.monotonic_drift)
        drift_tolerance = self._config.async_drift_tolerance
        if drift_tolerance is not None and monotonic_drift > drift_tolerance:
            _logger.warning(
                "The timer was supposed to sleep for %s, but it slept for %s "
                "instead [difference=%s, tolerance=%s]. This is likely due to a "
                "task taking too much time to complete and blocking the event "
                "loop for too long. You probably should profile your code to "
                "find out what's taking too long.",
                clocks_info.monotonic_requested_sleep,
                clocks_info.monotonic_elapsed,
                monotonic_drift,
                drift_tolerance,
            )

        # If we detect a time jump, we exit the loop and handle it outside of it, to
        # also account for time jumps in the past that could happen without even
        # having entered into the sleep loop.
        if should_resync:
            _logger.debug(
                "ready(): Exiting the wait loop because we detected a time jump "
                "and need to re-sync."
            )
            break

        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(
                "ready(): In sleep loop:\n"
                "    next_tick_time=%s (%s)\n"
                "    now=%s (%s)\n"
                "    mono_now=%s\n"
                "    wall_clock_time_to_next_tick=%s (%s)",
                self._next_tick_time,
                self._next_tick_time.timestamp(),
                wall_clock_now,
                wall_clock_now.timestamp(),
                asyncio.get_running_loop().time(),
                wall_clock_time_to_next_tick,
                wall_clock_time_to_next_tick.total_seconds(),
            )

    # If there was a time jump, we need to resync the timer to the wall clock,
    # otherwise we can be sleeping for a long time until the timer catches up,
    # which is not suitable for many use cases.
    #
    # Resyncing the timer ensures that we keep ticking more or less at `interval`
    # even in the event of time jumps, with the downside that the timer will
    # trigger more than once for the same timestamp if it jumps back in time,
    # and will skip ticks if it jumps forward in time.
    #
    # When there is no threshold, so there is no resync, the ticks will be
    # contigous in time from the wall clock perspective, waiting until we reach
    # the expected next tick time when jumping back in time, and bursting all
    # missed ticks when jumping forward in time.
    if should_resync:
        assert self._clocks_info is not None
        _logger.warning(
            "The wall clock jumped %s (%s seconds) in time (threshold=%s). "
            "A tick will be triggered immediately with the `expected_tick_time` "
            "as it was before the time jump and the timer will be resynced to "
            "the wall clock.",
            self._clocks_info.wall_clock_jump,
            self._clocks_info.wall_clock_jump.total_seconds(),
            self._config.wall_clock_jump_threshold,
        )

    # If a close was explicitly requested during the sleep, we bail out.
    if self._closed:
        return False

    self._current_tick_info = TickInfo(
        expected_tick_time=self._next_tick_time, sleep_infos=sleeps
    )

    if should_resync:
        _logger.debug(
            "ready(): Before resync:\n"
            "    next_tick_time=%s\n"
            "    now=%s\n"
            "    wall_clock_time_to_next_tick=%s",
            self._next_tick_time,
            wall_clock_now,
            wall_clock_time_to_next_tick,
        )
        self._update_next_tick_time(now=wall_clock_now)
        _logger.debug(
            "ready(): After resync: next_tick_time=%s", self._next_tick_time
        )
    else:
        self._next_tick_time += self._interval
        _logger.debug(
            "ready(): No resync needed: next_tick_time=%s",
            self._next_tick_time,
        )

    return True
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await anext(self)
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            and exc.__cause__.receiver is self
        ):
            # This is a false positive, we are actually checking __cause__ is a
            # ReceiverStoppedError which is an exception.
            raise exc.__cause__  # pylint: disable=raising-non-exception
        raise ReceiverStoppedError(self) from exc
    return received
reset ¤
reset() -> None

Reset the timer to start timing from now (plus an optional alignment).

If the timer was closed, or not started yet, it will be started.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def reset(self) -> None:
    """Reset the timer to start timing from now (plus an optional alignment).

    If the timer was closed, or not started yet, it will be started.
    """
    self._closed = False
    self._update_next_tick_time()
    self._current_tick_info = None
    # We assume the clocks will behave similarly after the timer was reset, so we
    # purposefully don't reset the clocks info.
    _logger.debug("reset(): _next_tick_time=%s", self._next_tick_time)
triggered ¤
triggered(
    selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether this receiver was selected by select().

This method is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether this receiver was selected.

Source code in frequenz/channels/_receiver.py
def triggered(
    self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether this receiver was selected by [`select()`][frequenz.channels.select].

    This method is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.

    Returns:
        Whether this receiver was selected.
    """
    if handled := selected._recv is self:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled

frequenz.sdk.timeseries.WallClockTimerConfig dataclass ¤

Configuration for a wall clock timer.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@dataclass(frozen=True, kw_only=True)
class WallClockTimerConfig:
    """Configuration for a [wall clock timer][frequenz.sdk.timeseries.WallClockTimer]."""

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the timer to.

    The first timer tick will occur at the first multiple of the timer's interval after
    this value.

    It must be a timezone aware `datetime` or `None`. If `None`, the timer aligns to the
    time it is started.
    """

    async_drift_tolerance: timedelta | None = None
    """The maximum allowed difference between the requested and the real sleep time.

    The timer will emit a warning if the difference is bigger than this value.

    It must be bigger than 0 or `None`. If `None`, no warnings will ever be emitted.
    """

    wall_clock_drift_tolerance_factor: float | None = None
    """The maximum allowed relative difference between the wall clock and monotonic time.

    The timer will emit a warning if the relative difference is bigger than this value.
    If the difference remains constant, the warning will be emitted only once, as the
    previous drift is taken into account. If there is information on the previous drift,
    the previous and current factor will be used to determine if a warning should be
    emitted.

    It must be bigger than 0 or `None`. If `None`, no warnings will be ever emitted.

    Info:
        The calculation is as follows:

        ```py
        tolerance = wall_clock_drift_tolerance_factor
        factor = monotonic_elapsed / wall_clock_elapsed
        previous_factor = previous_monotonic_elapsed / previous_wall_clock_elapsed
        if abs(factor - previous_factor) > tolerance:
            emit warning
        ```

        If there is no previous information, a `previous_factor` of 1.0 will be used.
    """

    wall_clock_jump_threshold: timedelta | None = None
    """The amount of time that's considered a wall clock jump.

    When the drift between the wall clock and monotonic time is too big, it is
    considered a time jump and the timer will be resynced to the wall clock.

    This value determines how big the difference needs to be to be considered a
    jump.

    Smaller values are considered wall clock *expansions* or *compressions* and are
    always gradually adjusted, instead of triggering a resync.

    Must be bigger than 0 or `None`. If `None`, a resync will never be triggered due to
    time jumps.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )

        def _is_strictly_positive_or_none(value: float | timedelta | None) -> bool:
            match value:
                case None:
                    return True
                case timedelta() as delta:
                    return delta > _TD_ZERO
                case float() as num:
                    return math.isfinite(num) and num > 0.0
                case int() as num:
                    return num > 0
                case _ as unknown:
                    assert_never(unknown)

        if not _is_strictly_positive_or_none(self.async_drift_tolerance):
            raise ValueError(
                "async_drift_tolerance should be positive or None, not "
                f"{self.async_drift_tolerance!r}"
            )
        if not _is_strictly_positive_or_none(self.wall_clock_drift_tolerance_factor):
            raise ValueError(
                "wall_clock_drift_tolerance_factor should be positive or None, not "
                f"{self.wall_clock_drift_tolerance_factor!r}"
            )
        if not _is_strictly_positive_or_none(self.wall_clock_jump_threshold):
            raise ValueError(
                "wall_clock_jump_threshold should be positive or None, not "
                f"{self.wall_clock_jump_threshold!r}"
            )

    @classmethod
    def from_interval(  # pylint: disable=too-many-arguments
        cls,
        interval: timedelta,
        *,
        align_to: datetime | None = UNIX_EPOCH,
        async_drift_tolerance_factor: float = 0.1,
        wall_clock_drift_tolerance_factor: float = 0.1,
        wall_clock_jump_threshold_factor: float = 1.0,
    ) -> Self:
        """Create a timer configuration based on an interval.

        This will set the tolerance and threshold values proportionally to the interval.

        Args:
            interval: The interval between timer ticks. Must be bigger than 0.
            align_to: The time to align the timer to. See the
                [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
                for details.
            async_drift_tolerance_factor: The maximum allowed difference between the
                requested and the real sleep time, relative to the interval.
                `async_drift_tolerance` will be set to `interval * this_factor`.  See
                the [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer]
                documentation for details.
            wall_clock_drift_tolerance_factor: The maximum allowed relative difference
                between the wall clock and monotonic time. See the
                [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
                for details.
            wall_clock_jump_threshold_factor: The amount of time that's considered a
                wall clock jump, relative to the interval. This will be set to
                `interval * this_factor`. See the
                [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
                for details.

        Returns:
            The created timer configuration.

        Raises:
            ValueError: If any value is out of range.
        """
        if interval <= _TD_ZERO:
            raise ValueError(f"interval must be bigger than 0, not {interval!r}")

        return cls(
            align_to=align_to,
            wall_clock_drift_tolerance_factor=wall_clock_drift_tolerance_factor,
            async_drift_tolerance=interval * async_drift_tolerance_factor,
            wall_clock_jump_threshold=interval * wall_clock_jump_threshold_factor,
        )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = UNIX_EPOCH

The time to align the timer to.

The first timer tick will occur at the first multiple of the timer's interval after this value.

It must be a timezone aware datetime or None. If None, the timer aligns to the time it is started.

async_drift_tolerance class-attribute instance-attribute ¤
async_drift_tolerance: timedelta | None = None

The maximum allowed difference between the requested and the real sleep time.

The timer will emit a warning if the difference is bigger than this value.

It must be bigger than 0 or None. If None, no warnings will ever be emitted.

wall_clock_drift_tolerance_factor class-attribute instance-attribute ¤
wall_clock_drift_tolerance_factor: float | None = None

The maximum allowed relative difference between the wall clock and monotonic time.

The timer will emit a warning if the relative difference is bigger than this value. If the difference remains constant, the warning will be emitted only once, as the previous drift is taken into account. If there is information on the previous drift, the previous and current factor will be used to determine if a warning should be emitted.

It must be bigger than 0 or None. If None, no warnings will be ever emitted.

Info

The calculation is as follows:

tolerance = wall_clock_drift_tolerance_factor
factor = monotonic_elapsed / wall_clock_elapsed
previous_factor = previous_monotonic_elapsed / previous_wall_clock_elapsed
if abs(factor - previous_factor) > tolerance:
    emit warning

If there is no previous information, a previous_factor of 1.0 will be used.

wall_clock_jump_threshold class-attribute instance-attribute ¤
wall_clock_jump_threshold: timedelta | None = None

The amount of time that's considered a wall clock jump.

When the drift between the wall clock and monotonic time is too big, it is considered a time jump and the timer will be resynced to the wall clock.

This value determines how big the difference needs to be to be considered a jump.

Smaller values are considered wall clock expansions or compressions and are always gradually adjusted, instead of triggering a resync.

Must be bigger than 0 or None. If None, a resync will never be triggered due to time jumps.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

    def _is_strictly_positive_or_none(value: float | timedelta | None) -> bool:
        match value:
            case None:
                return True
            case timedelta() as delta:
                return delta > _TD_ZERO
            case float() as num:
                return math.isfinite(num) and num > 0.0
            case int() as num:
                return num > 0
            case _ as unknown:
                assert_never(unknown)

    if not _is_strictly_positive_or_none(self.async_drift_tolerance):
        raise ValueError(
            "async_drift_tolerance should be positive or None, not "
            f"{self.async_drift_tolerance!r}"
        )
    if not _is_strictly_positive_or_none(self.wall_clock_drift_tolerance_factor):
        raise ValueError(
            "wall_clock_drift_tolerance_factor should be positive or None, not "
            f"{self.wall_clock_drift_tolerance_factor!r}"
        )
    if not _is_strictly_positive_or_none(self.wall_clock_jump_threshold):
        raise ValueError(
            "wall_clock_jump_threshold should be positive or None, not "
            f"{self.wall_clock_jump_threshold!r}"
        )
from_interval classmethod ¤
from_interval(
    interval: timedelta,
    *,
    align_to: datetime | None = UNIX_EPOCH,
    async_drift_tolerance_factor: float = 0.1,
    wall_clock_drift_tolerance_factor: float = 0.1,
    wall_clock_jump_threshold_factor: float = 1.0
) -> Self

Create a timer configuration based on an interval.

This will set the tolerance and threshold values proportionally to the interval.

PARAMETER DESCRIPTION
interval

The interval between timer ticks. Must be bigger than 0.

TYPE: timedelta

align_to

The time to align the timer to. See the WallClockTimer documentation for details.

TYPE: datetime | None DEFAULT: UNIX_EPOCH

async_drift_tolerance_factor

The maximum allowed difference between the requested and the real sleep time, relative to the interval. async_drift_tolerance will be set to interval * this_factor. See the WallClockTimer documentation for details.

TYPE: float DEFAULT: 0.1

wall_clock_drift_tolerance_factor

The maximum allowed relative difference between the wall clock and monotonic time. See the WallClockTimer documentation for details.

TYPE: float DEFAULT: 0.1

wall_clock_jump_threshold_factor

The amount of time that's considered a wall clock jump, relative to the interval. This will be set to interval * this_factor. See the WallClockTimer documentation for details.

TYPE: float DEFAULT: 1.0

RETURNS DESCRIPTION
Self

The created timer configuration.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in src/frequenz/sdk/timeseries/_resampling/_wall_clock_timer.py
@classmethod
def from_interval(  # pylint: disable=too-many-arguments
    cls,
    interval: timedelta,
    *,
    align_to: datetime | None = UNIX_EPOCH,
    async_drift_tolerance_factor: float = 0.1,
    wall_clock_drift_tolerance_factor: float = 0.1,
    wall_clock_jump_threshold_factor: float = 1.0,
) -> Self:
    """Create a timer configuration based on an interval.

    This will set the tolerance and threshold values proportionally to the interval.

    Args:
        interval: The interval between timer ticks. Must be bigger than 0.
        align_to: The time to align the timer to. See the
            [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
            for details.
        async_drift_tolerance_factor: The maximum allowed difference between the
            requested and the real sleep time, relative to the interval.
            `async_drift_tolerance` will be set to `interval * this_factor`.  See
            the [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer]
            documentation for details.
        wall_clock_drift_tolerance_factor: The maximum allowed relative difference
            between the wall clock and monotonic time. See the
            [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
            for details.
        wall_clock_jump_threshold_factor: The amount of time that's considered a
            wall clock jump, relative to the interval. This will be set to
            `interval * this_factor`. See the
            [`WallClockTimer`][frequenz.sdk.timeseries.WallClockTimer] documentation
            for details.

    Returns:
        The created timer configuration.

    Raises:
        ValueError: If any value is out of range.
    """
    if interval <= _TD_ZERO:
        raise ValueError(f"interval must be bigger than 0, not {interval!r}")

    return cls(
        align_to=align_to,
        wall_clock_drift_tolerance_factor=wall_clock_drift_tolerance_factor,
        async_drift_tolerance=interval * async_drift_tolerance_factor,
        wall_clock_jump_threshold=interval * wall_clock_jump_threshold_factor,
    )