Skip to content

Index

frequenz.sdk.timeseries ¤

Handling of timeseries streams.

A timeseries is a stream (normally an async iterator) of Samples.

Periodicity and alignment¤

All the data produced by this package is always periodic and aligned to the UNIX_EPOCH (by default).

Classes normally take a (re)sampling period as and argument and, optionally, an align_to argument.

This means timestamps are always separated exactly by a period, and that this timestamp falls always at multiples of the period, starting at the align_to.

This ensures that the data is predictable and consistent among restarts.

Example

If we have a period of 10 seconds, and are aligning to the UNIX epoch. Assuming the following timeline starts in 1970-01-01 00:00:00 UTC and our current now is 1970-01-01 00:00:32 UTC, then the next timestamp will be at 1970-01-01 00:00:40 UTC:

align_to = 1970-01-01 00:00:00         next event = 1970-01-01 00:00:40
|                                       |
|---------|---------|---------|-|-------|---------|---------|---------|
0        10        20        30 |      40        50        60        70
                               now = 1970-01-01 00:00:32

Attributes¤

frequenz.sdk.timeseries.UNIX_EPOCH module-attribute ¤

UNIX_EPOCH = fromtimestamp(0.0, tz=utc)

The UNIX epoch (in UTC).

Classes¤

frequenz.sdk.timeseries.Bounds dataclass ¤

Bases: Generic[_T]

Lower and upper bound values.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Bounds(Generic[_T]):
    """Lower and upper bound values."""

    lower: _T
    """Lower bound."""

    upper: _T
    """Upper bound."""

    def __contains__(self, item: _T) -> bool:
        """
        Check if the value is within the range of the container.

        Args:
            item: The value to check.

        Returns:
            bool: True if value is within the range, otherwise False.
        """
        if self.lower is None and self.upper is None:
            return True
        if self.lower is None:
            return item <= self.upper
        if self.upper is None:
            return self.lower <= item

        return cast(Comparable, self.lower) <= item <= cast(Comparable, self.upper)
Attributes¤
lower instance-attribute ¤
lower: _T

Lower bound.

upper instance-attribute ¤
upper: _T

Upper bound.

Functions¤
__contains__ ¤
__contains__(item: _T) -> bool

Check if the value is within the range of the container.

PARAMETER DESCRIPTION
item

The value to check.

TYPE: _T

RETURNS DESCRIPTION
bool

True if value is within the range, otherwise False.

TYPE: bool

Source code in frequenz/sdk/timeseries/_base_types.py
def __contains__(self, item: _T) -> bool:
    """
    Check if the value is within the range of the container.

    Args:
        item: The value to check.

    Returns:
        bool: True if value is within the range, otherwise False.
    """
    if self.lower is None and self.upper is None:
        return True
    if self.lower is None:
        return item <= self.upper
    if self.upper is None:
        return self.lower <= item

    return cast(Comparable, self.lower) <= item <= cast(Comparable, self.upper)

frequenz.sdk.timeseries.Fuse dataclass ¤

Fuse data class.

Source code in frequenz/sdk/timeseries/_fuse.py
@dataclass(frozen=True)
class Fuse:
    """Fuse data class."""

    max_current: Current
    """Rated current of the fuse."""
Attributes¤
max_current instance-attribute ¤
max_current: Current

Rated current of the fuse.

frequenz.sdk.timeseries.MovingWindow ¤

Bases: BackgroundService

A data window that moves with the latest datapoints of a data stream.

After initialization the MovingWindow can be accessed by an integer index or a timestamp. A sub window can be accessed by using a slice of integers or timestamps.

Note that a numpy ndarray is returned and thus users can use numpys operations directly on a window.

The window uses a ring buffer for storage and the first element is aligned to a fixed defined point in time. Since the moving nature of the window, the date of the first and the last element are constantly changing and therefore the point in time that defines the alignment can be outside of the time window. Modulo arithmetic is used to move the align_to timestamp into the latest window.

If for example the align_to parameter is set to datetime(1, 1, 1, tzinfo=timezone.utc) and the window size is bigger than one day then the first element will always be aligned to midnight.

Resampling might be required to reduce the number of samples to store, and it can be set by specifying the resampler config parameter so that the user can control the granularity of the samples to be stored in the underlying buffer.

If resampling is not required, the resampler config parameter can be set to None in which case the MovingWindow will not perform any resampling.

Example: Calculate the mean of a time interval

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    async with MovingWindow(
        size=timedelta(seconds=5),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        time_start = datetime.now(tz=timezone.utc)
        time_end = time_start + timedelta(seconds=5)

        # ... wait for 5 seconds until the buffer is filled
        await asyncio.sleep(5)

        # return an numpy array from the window
        array = window[time_start:time_end]
        # and use it to for example calculate the mean
        mean = array.mean()

asyncio.run(run())
```

Example: Create a polars data frame from a MovingWindow

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    # create a window that stores two days of data
    # starting at 1.1.23 with samplerate=1
    async with MovingWindow(
        size=timedelta(days=2),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        # wait for one full day until the buffer is filled
        await asyncio.sleep(60*60*24)

        time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
        time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)

        # You can now create a polars series with one full day of data by
        # passing the window slice, like:
        # series = pl.Series("Jan_1", window[time_start:time_end])

asyncio.run(run())
```
Source code in frequenz/sdk/timeseries/_moving_window.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class MovingWindow(BackgroundService):
    """
    A data window that moves with the latest datapoints of a data stream.

    After initialization the `MovingWindow` can be accessed by an integer
    index or a timestamp. A sub window can be accessed by using a slice of
    integers or timestamps.

    Note that a numpy ndarray is returned and thus users can use
    numpys operations directly on a window.

    The window uses a ring buffer for storage and the first element is aligned to
    a fixed defined point in time. Since the moving nature of the window, the
    date of the first and the last element are constantly changing and therefore
    the point in time that defines the alignment can be outside of the time window.
    Modulo arithmetic is used to move the `align_to` timestamp into the latest
    window.

    If for example the `align_to` parameter is set to
    `datetime(1, 1, 1, tzinfo=timezone.utc)` and the window size is bigger than
    one day then the first element will always be aligned to midnight.

    Resampling might be required to reduce the number of samples to store, and
    it can be set by specifying the resampler config parameter so that the user
    can control the granularity of the samples to be stored in the underlying
    buffer.

    If resampling is not required, the resampler config parameter can be
    set to None in which case the MovingWindow will not perform any resampling.

    Example: Calculate the mean of a time interval

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            async with MovingWindow(
                size=timedelta(seconds=5),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                time_start = datetime.now(tz=timezone.utc)
                time_end = time_start + timedelta(seconds=5)

                # ... wait for 5 seconds until the buffer is filled
                await asyncio.sleep(5)

                # return an numpy array from the window
                array = window[time_start:time_end]
                # and use it to for example calculate the mean
                mean = array.mean()

        asyncio.run(run())
        ```

    Example: Create a polars data frame from a `MovingWindow`

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            # create a window that stores two days of data
            # starting at 1.1.23 with samplerate=1
            async with MovingWindow(
                size=timedelta(days=2),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                # wait for one full day until the buffer is filled
                await asyncio.sleep(60*60*24)

                time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
                time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)

                # You can now create a polars series with one full day of data by
                # passing the window slice, like:
                # series = pl.Series("Jan_1", window[time_start:time_end])

        asyncio.run(run())
        ```
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        size: timedelta,
        resampled_data_recv: Receiver[Sample[Quantity]],
        input_sampling_period: timedelta,
        resampler_config: ResamplerConfig | None = None,
        align_to: datetime = UNIX_EPOCH,
        *,
        name: str | None = None,
    ) -> None:
        """
        Initialize the MovingWindow.

        This method creates the underlying ring buffer and starts a
        new task that updates the ring buffer with new incoming samples.
        The task stops running only if the channel receiver is closed.

        Args:
            size: The time span of the moving window over which samples will be stored.
            resampled_data_recv: A receiver that delivers samples with a
                given sampling period.
            input_sampling_period: The time interval between consecutive input samples.
            resampler_config: The resampler configuration in case resampling is required.
            align_to: A timestamp that defines a point in time to which
                the window is aligned to modulo window size. For further
                information, consult the class level documentation.
            name: The name of this moving window. If `None`, `str(id(self))` will be
                used. This is used mostly for debugging purposes.
        """
        assert (
            input_sampling_period.total_seconds() > 0
        ), "The input sampling period should be greater than zero."
        assert (
            input_sampling_period <= size
        ), "The input sampling period should be equal to or lower than the window size."
        super().__init__(name=name)

        self._sampling_period = input_sampling_period

        self._resampler: Resampler | None = None
        self._resampler_sender: Sender[Sample[Quantity]] | None = None

        if resampler_config:
            assert (
                resampler_config.resampling_period <= size
            ), "The resampling period should be equal to or lower than the window size."

            self._resampler = Resampler(resampler_config)
            self._sampling_period = resampler_config.resampling_period

        # Sampling period might not fit perfectly into the window size.
        num_samples = math.ceil(
            size.total_seconds() / self._sampling_period.total_seconds()
        )

        self._resampled_data_recv = resampled_data_recv
        self._buffer = OrderedRingBuffer(
            np.empty(shape=num_samples, dtype=float),
            sampling_period=self._sampling_period,
            align_to=align_to,
        )

    def start(self) -> None:
        """Start the MovingWindow.

        This method starts the MovingWindow tasks.
        """
        if self._resampler:
            self._configure_resampler()
        self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))

    @property
    def sampling_period(self) -> timedelta:
        """
        Return the sampling period of the MovingWindow.

        Returns:
            The sampling period of the MovingWindow.
        """
        return self._sampling_period

    @property
    def oldest_timestamp(self) -> datetime | None:
        """
        Return the oldest timestamp of the MovingWindow.

        Returns:
            The oldest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.oldest_timestamp

    @property
    def newest_timestamp(self) -> datetime | None:
        """
        Return the newest timestamp of the MovingWindow.

        Returns:
            The newest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.newest_timestamp

    @property
    def capacity(self) -> int:
        """
        Return the capacity of the MovingWindow.

        Capacity is the maximum number of samples that can be stored in the
        MovingWindow.

        Returns:
            The capacity of the MovingWindow.
        """
        return self._buffer.maxlen

    # pylint before 3.0 only accepts names with 3 or more chars
    def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
        """
        Return the sample at the given index or timestamp.

        In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
        which expects a slice as argument, this method expects a single index as argument
        and returns a single value.

        Args:
            key: The index or timestamp of the sample to return.

        Returns:
            The sample at the given index or timestamp.

        Raises:
            IndexError: If the buffer is empty or the index is out of bounds.
        """
        if self._buffer.count_valid() == 0:
            raise IndexError("The buffer is empty.")

        if isinstance(key, datetime):
            assert self._buffer.oldest_timestamp is not None
            assert self._buffer.newest_timestamp is not None
            if (
                key < self._buffer.oldest_timestamp
                or key > self._buffer.newest_timestamp
            ):
                raise IndexError(
                    f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                    f"{self._buffer.newest_timestamp}]"
                )
            return self._buffer[self._buffer.to_internal_index(key)]

        if isinstance(key, int):
            _logger.debug("Returning value at index %s ", key)
            timestamp = self._buffer.get_timestamp(key)
            assert timestamp is not None
            return self._buffer[self._buffer.to_internal_index(timestamp)]

        raise TypeError("Key has to be either a timestamp or an integer.")

    def window(
        self,
        start: datetime | int | None,
        end: datetime | int | None,
        *,
        force_copy: bool = True,
        fill_value: float | None = np.nan,
    ) -> ArrayLike:
        """
        Return an array containing the samples in the given time interval.

        In contrast to the [`at`][frequenz.sdk.timeseries.MovingWindow.at] method,
        which expects a single index as argument, this method expects a slice as argument
        and returns an array.

        Args:
            start: The start timestamp of the time interval. If `None`, the
                start of the window is used.
            end: The end timestamp of the time interval. If `None`, the end of
                the window is used.
            force_copy: If `True`, the returned array is a copy of the underlying
                data. Otherwise, if possible, a view of the underlying data is
                returned.
            fill_value: If not None, will use this value to fill missing values.
                If missing values should be set, force_copy must be True.
                Defaults to NaN to avoid returning outdated data unexpectedly.

        Returns:
            An array containing the samples in the given time interval.
        """
        return self._buffer.window(
            start, end, force_copy=force_copy, fill_value=fill_value
        )

    async def _run_impl(self) -> None:
        """Awaits samples from the receiver and updates the underlying ring buffer.

        Raises:
            asyncio.CancelledError: if the MovingWindow task is cancelled.
        """
        try:
            async for sample in self._resampled_data_recv:
                _logger.debug("Received new sample: %s", sample)
                if self._resampler and self._resampler_sender:
                    await self._resampler_sender.send(sample)
                else:
                    self._buffer.update(sample)

        except asyncio.CancelledError:
            _logger.info("MovingWindow task has been cancelled.")
            raise

        _logger.error("Channel has been closed")

    def _configure_resampler(self) -> None:
        """Configure the components needed to run the resampler."""
        assert self._resampler is not None

        async def sink_buffer(sample: Sample[Quantity]) -> None:
            if sample.value is not None:
                self._buffer.update(sample)

        resampler_channel = Broadcast[Sample[Quantity]](name="average")
        self._resampler_sender = resampler_channel.new_sender()
        self._resampler.add_timeseries(
            "avg", resampler_channel.new_receiver(), sink_buffer
        )
        self._tasks.add(
            asyncio.create_task(self._resampler.resample(), name="resample")
        )

    def count_valid(self) -> int:
        """
        Count the number of valid samples in this `MovingWindow`.

        Returns:
            The number of valid samples in this `MovingWindow`.
        """
        return self._buffer.count_valid()

    def count_covered(self) -> int:
        """Count the number of samples that are covered by the oldest and newest valid samples.

        Returns:
            The count of samples between the oldest and newest (inclusive) valid samples
                or 0 if there are is no time range covered.
        """
        return self._buffer.count_covered()

    @overload
    def __getitem__(self, key: SupportsIndex) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: datetime) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: slice) -> ArrayLike:
        """See the main __getitem__ method."""

    def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
        """
        Return a sub window of the `MovingWindow`.

        The `MovingWindow` is accessed either by timestamp or by index
        or by a slice of timestamps or integers.

        * If the key is an integer, the float value of that key
          at the given position is returned.
        * If the key is a timestamp, the float value of that key
          that corresponds to the timestamp is returned.
        * If the key is a slice of timestamps or integers, an ndarray is returned,
          where the bounds correspond to the slice bounds.
          Note that a half open interval, which is open at the end, is returned.

        Args:
            key: Either an integer or a timestamp or a slice of timestamps or integers.

        Raises:
            IndexError: when requesting an out of range timestamp or index
            TypeError: when the key is not a datetime or slice object.

        Returns:
            A float if the key is a number or a timestamp.
            an numpy array if the key is a slice.
        """
        if isinstance(key, slice):
            if not (key.step is None or key.step == 1):
                raise ValueError("Slicing with a step other than 1 is not supported.")
            return self.window(key.start, key.stop)

        if isinstance(key, datetime):
            return self.at(key)

        if isinstance(key, SupportsIndex):
            return self.at(key.__index__())

        raise TypeError(
            "Key has to be either a timestamp or an integer "
            "or a slice of timestamps or integers"
        )
Attributes¤
capacity property ¤
capacity: int

Return the capacity of the MovingWindow.

Capacity is the maximum number of samples that can be stored in the MovingWindow.

RETURNS DESCRIPTION
int

The capacity of the MovingWindow.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

newest_timestamp property ¤
newest_timestamp: datetime | None

Return the newest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The newest timestamp of the MovingWindow or None if the buffer is empty.

oldest_timestamp property ¤
oldest_timestamp: datetime | None

Return the oldest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The oldest timestamp of the MovingWindow or None if the buffer is empty.

sampling_period property ¤
sampling_period: timedelta

Return the sampling period of the MovingWindow.

RETURNS DESCRIPTION
timedelta

The sampling period of the MovingWindow.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__getitem__ ¤
__getitem__(
    key: SupportsIndex | datetime | slice,
) -> float | ArrayLike

Return a sub window of the MovingWindow.

The MovingWindow is accessed either by timestamp or by index or by a slice of timestamps or integers.

  • If the key is an integer, the float value of that key at the given position is returned.
  • If the key is a timestamp, the float value of that key that corresponds to the timestamp is returned.
  • If the key is a slice of timestamps or integers, an ndarray is returned, where the bounds correspond to the slice bounds. Note that a half open interval, which is open at the end, is returned.
PARAMETER DESCRIPTION
key

Either an integer or a timestamp or a slice of timestamps or integers.

TYPE: SupportsIndex | datetime | slice

RAISES DESCRIPTION
IndexError

when requesting an out of range timestamp or index

TypeError

when the key is not a datetime or slice object.

RETURNS DESCRIPTION
float | ArrayLike

A float if the key is a number or a timestamp.

float | ArrayLike

an numpy array if the key is a slice.

Source code in frequenz/sdk/timeseries/_moving_window.py
def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
    """
    Return a sub window of the `MovingWindow`.

    The `MovingWindow` is accessed either by timestamp or by index
    or by a slice of timestamps or integers.

    * If the key is an integer, the float value of that key
      at the given position is returned.
    * If the key is a timestamp, the float value of that key
      that corresponds to the timestamp is returned.
    * If the key is a slice of timestamps or integers, an ndarray is returned,
      where the bounds correspond to the slice bounds.
      Note that a half open interval, which is open at the end, is returned.

    Args:
        key: Either an integer or a timestamp or a slice of timestamps or integers.

    Raises:
        IndexError: when requesting an out of range timestamp or index
        TypeError: when the key is not a datetime or slice object.

    Returns:
        A float if the key is a number or a timestamp.
        an numpy array if the key is a slice.
    """
    if isinstance(key, slice):
        if not (key.step is None or key.step == 1):
            raise ValueError("Slicing with a step other than 1 is not supported.")
        return self.window(key.start, key.stop)

    if isinstance(key, datetime):
        return self.at(key)

    if isinstance(key, SupportsIndex):
        return self.at(key.__index__())

    raise TypeError(
        "Key has to be either a timestamp or an integer "
        "or a slice of timestamps or integers"
    )
__init__ ¤
__init__(
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    *,
    name: str | None = None
) -> None

Initialize the MovingWindow.

This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed.

PARAMETER DESCRIPTION
size

The time span of the moving window over which samples will be stored.

TYPE: timedelta

resampled_data_recv

A receiver that delivers samples with a given sampling period.

TYPE: Receiver[Sample[Quantity]]

input_sampling_period

The time interval between consecutive input samples.

TYPE: timedelta

resampler_config

The resampler configuration in case resampling is required.

TYPE: ResamplerConfig | None DEFAULT: None

align_to

A timestamp that defines a point in time to which the window is aligned to modulo window size. For further information, consult the class level documentation.

TYPE: datetime DEFAULT: UNIX_EPOCH

name

The name of this moving window. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/timeseries/_moving_window.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    *,
    name: str | None = None,
) -> None:
    """
    Initialize the MovingWindow.

    This method creates the underlying ring buffer and starts a
    new task that updates the ring buffer with new incoming samples.
    The task stops running only if the channel receiver is closed.

    Args:
        size: The time span of the moving window over which samples will be stored.
        resampled_data_recv: A receiver that delivers samples with a
            given sampling period.
        input_sampling_period: The time interval between consecutive input samples.
        resampler_config: The resampler configuration in case resampling is required.
        align_to: A timestamp that defines a point in time to which
            the window is aligned to modulo window size. For further
            information, consult the class level documentation.
        name: The name of this moving window. If `None`, `str(id(self))` will be
            used. This is used mostly for debugging purposes.
    """
    assert (
        input_sampling_period.total_seconds() > 0
    ), "The input sampling period should be greater than zero."
    assert (
        input_sampling_period <= size
    ), "The input sampling period should be equal to or lower than the window size."
    super().__init__(name=name)

    self._sampling_period = input_sampling_period

    self._resampler: Resampler | None = None
    self._resampler_sender: Sender[Sample[Quantity]] | None = None

    if resampler_config:
        assert (
            resampler_config.resampling_period <= size
        ), "The resampling period should be equal to or lower than the window size."

        self._resampler = Resampler(resampler_config)
        self._sampling_period = resampler_config.resampling_period

    # Sampling period might not fit perfectly into the window size.
    num_samples = math.ceil(
        size.total_seconds() / self._sampling_period.total_seconds()
    )

    self._resampled_data_recv = resampled_data_recv
    self._buffer = OrderedRingBuffer(
        np.empty(shape=num_samples, dtype=float),
        sampling_period=self._sampling_period,
        align_to=align_to,
    )
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
at ¤
at(key: int | datetime) -> float

Return the sample at the given index or timestamp.

In contrast to the window method, which expects a slice as argument, this method expects a single index as argument and returns a single value.

PARAMETER DESCRIPTION
key

The index or timestamp of the sample to return.

TYPE: int | datetime

RETURNS DESCRIPTION
float

The sample at the given index or timestamp.

RAISES DESCRIPTION
IndexError

If the buffer is empty or the index is out of bounds.

Source code in frequenz/sdk/timeseries/_moving_window.py
def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
    """
    Return the sample at the given index or timestamp.

    In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
    which expects a slice as argument, this method expects a single index as argument
    and returns a single value.

    Args:
        key: The index or timestamp of the sample to return.

    Returns:
        The sample at the given index or timestamp.

    Raises:
        IndexError: If the buffer is empty or the index is out of bounds.
    """
    if self._buffer.count_valid() == 0:
        raise IndexError("The buffer is empty.")

    if isinstance(key, datetime):
        assert self._buffer.oldest_timestamp is not None
        assert self._buffer.newest_timestamp is not None
        if (
            key < self._buffer.oldest_timestamp
            or key > self._buffer.newest_timestamp
        ):
            raise IndexError(
                f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                f"{self._buffer.newest_timestamp}]"
            )
        return self._buffer[self._buffer.to_internal_index(key)]

    if isinstance(key, int):
        _logger.debug("Returning value at index %s ", key)
        timestamp = self._buffer.get_timestamp(key)
        assert timestamp is not None
        return self._buffer[self._buffer.to_internal_index(timestamp)]

    raise TypeError("Key has to be either a timestamp or an integer.")
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
count_covered ¤
count_covered() -> int

Count the number of samples that are covered by the oldest and newest valid samples.

RETURNS DESCRIPTION
int

The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered.

Source code in frequenz/sdk/timeseries/_moving_window.py
def count_covered(self) -> int:
    """Count the number of samples that are covered by the oldest and newest valid samples.

    Returns:
        The count of samples between the oldest and newest (inclusive) valid samples
            or 0 if there are is no time range covered.
    """
    return self._buffer.count_covered()
count_valid ¤
count_valid() -> int

Count the number of valid samples in this MovingWindow.

RETURNS DESCRIPTION
int

The number of valid samples in this MovingWindow.

Source code in frequenz/sdk/timeseries/_moving_window.py
def count_valid(self) -> int:
    """
    Count the number of valid samples in this `MovingWindow`.

    Returns:
        The number of valid samples in this `MovingWindow`.
    """
    return self._buffer.count_valid()
start ¤
start() -> None

Start the MovingWindow.

This method starts the MovingWindow tasks.

Source code in frequenz/sdk/timeseries/_moving_window.py
def start(self) -> None:
    """Start the MovingWindow.

    This method starts the MovingWindow tasks.
    """
    if self._resampler:
        self._configure_resampler()
    self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )
window ¤
window(
    start: datetime | int | None,
    end: datetime | int | None,
    *,
    force_copy: bool = True,
    fill_value: float | None = np.nan
) -> ArrayLike

Return an array containing the samples in the given time interval.

In contrast to the at method, which expects a single index as argument, this method expects a slice as argument and returns an array.

PARAMETER DESCRIPTION
start

The start timestamp of the time interval. If None, the start of the window is used.

TYPE: datetime | int | None

end

The end timestamp of the time interval. If None, the end of the window is used.

TYPE: datetime | int | None

force_copy

If True, the returned array is a copy of the underlying data. Otherwise, if possible, a view of the underlying data is returned.

TYPE: bool DEFAULT: True

fill_value

If not None, will use this value to fill missing values. If missing values should be set, force_copy must be True. Defaults to NaN to avoid returning outdated data unexpectedly.

TYPE: float | None DEFAULT: nan

RETURNS DESCRIPTION
ArrayLike

An array containing the samples in the given time interval.

Source code in frequenz/sdk/timeseries/_moving_window.py
def window(
    self,
    start: datetime | int | None,
    end: datetime | int | None,
    *,
    force_copy: bool = True,
    fill_value: float | None = np.nan,
) -> ArrayLike:
    """
    Return an array containing the samples in the given time interval.

    In contrast to the [`at`][frequenz.sdk.timeseries.MovingWindow.at] method,
    which expects a single index as argument, this method expects a slice as argument
    and returns an array.

    Args:
        start: The start timestamp of the time interval. If `None`, the
            start of the window is used.
        end: The end timestamp of the time interval. If `None`, the end of
            the window is used.
        force_copy: If `True`, the returned array is a copy of the underlying
            data. Otherwise, if possible, a view of the underlying data is
            returned.
        fill_value: If not None, will use this value to fill missing values.
            If missing values should be set, force_copy must be True.
            Defaults to NaN to avoid returning outdated data unexpectedly.

    Returns:
        An array containing the samples in the given time interval.
    """
    return self._buffer.window(
        start, end, force_copy=force_copy, fill_value=fill_value
    )

frequenz.sdk.timeseries.PeriodicFeatureExtractor ¤

A feature extractor for historical timeseries data.

This class is creating a profile from periodically occurring windows in a buffer of historical data.

The profile is created out of all windows that are fully contained in the underlying buffer with the same start and end time modulo a fixed period.

Consider for example a timeseries $T$ of historical data and sub-series $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$ called period, where period of two sub-windows is defined as the distance of two points at the same position within the sub-windows.

This class calculates a statistical profile $S$ over all $S_i$, i.e. the value of $S$ at position $i$ is calculated by performing a certain calculation, e.g. an average, over all values of $S_i$ at position $i$.

Note

The oldest window or the window that is currently overwritten in the MovingWindow is not considered in the profile.

Note

When constructing a PeriodicFeatureExtractor object the MovingWindow size has to be a integer multiple of the period.

Example
from frequenz.sdk import microgrid
from datetime import datetime, timedelta, timezone

async with MovingWindow(
    size=timedelta(days=35),
    resampled_data_recv=microgrid.grid().power.new_receiver(),
    input_sampling_period=timedelta(seconds=1),
) as moving_window:
    feature_extractor = PeriodicFeatureExtractor(
        moving_window=moving_window,
        period=timedelta(days=7),
    )

    now = datetime.now(timezone.utc)

    # create a daily weighted average for the next 24h
    avg_24h = feature_extractor.avg(
        now,
        now + timedelta(hours=24),
        weights=[0.1, 0.2, 0.3, 0.4]
    )

    # create a daily average for Thursday March 23 2023
    th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
class PeriodicFeatureExtractor:
    """
    A feature extractor for historical timeseries data.

    This class is creating a profile from periodically occurring windows in a
    buffer of historical data.

    The profile is created out of all windows that are fully contained in the
    underlying buffer with the same start and end time modulo a fixed period.

    Consider for example a timeseries $T$ of historical data and sub-series
    $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$
    called period, where period of two sub-windows is defined as the distance
    of two points at the same position within the sub-windows.

    This class calculates a statistical profile $S$ over all $S_i$, i.e. the
    value of $S$ at position $i$ is calculated by performing a certain
    calculation, e.g. an average, over all values of $S_i$ at position $i$.

    Note:
        The oldest window or the window that is currently overwritten in the
        `MovingWindow` is not considered in the profile.

    Note:
        When constructing a `PeriodicFeatureExtractor` object the
        `MovingWindow` size has to be a integer multiple of the period.

    Example:
        ```python
        from frequenz.sdk import microgrid
        from datetime import datetime, timedelta, timezone

        async with MovingWindow(
            size=timedelta(days=35),
            resampled_data_recv=microgrid.grid().power.new_receiver(),
            input_sampling_period=timedelta(seconds=1),
        ) as moving_window:
            feature_extractor = PeriodicFeatureExtractor(
                moving_window=moving_window,
                period=timedelta(days=7),
            )

            now = datetime.now(timezone.utc)

            # create a daily weighted average for the next 24h
            avg_24h = feature_extractor.avg(
                now,
                now + timedelta(hours=24),
                weights=[0.1, 0.2, 0.3, 0.4]
            )

            # create a daily average for Thursday March 23 2023
            th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
        ```
    """

    def __init__(
        self,
        moving_window: MovingWindow,
        period: timedelta,
    ) -> None:
        """
        Initialize a PeriodicFeatureExtractor object.

        Args:
            moving_window: The MovingWindow that is used for the average calculation.
            period: The distance between two succeeding intervals.

        Raises:
            ValueError: If the MovingWindow size is not a integer multiple of the period.
        """
        self._moving_window = moving_window

        self._sampling_period = self._moving_window.sampling_period
        """The sampling_period as float to use it for indexing of samples."""

        self._period = int(period / self._sampling_period)
        """Distance between two succeeding intervals in samples."""

        _logger.debug("Initializing PeriodicFeatureExtractor!")
        _logger.debug("MovingWindow size: %i", self._moving_window.count_valid())
        _logger.debug(
            "Period between two succeeding intervals (in samples): %i",
            self._period,
        )

        if not self._moving_window.count_valid() % self._period == 0:
            raise ValueError(
                "The MovingWindow size is not a integer multiple of the period."
            )

        if not is_close_to_zero(self._period - period / self._sampling_period):
            raise ValueError(
                "The period is not a multiple of the sampling period. "
                "This might result in unexpected behaviour."
            )

    @property
    def _buffer(self) -> OrderedRingBuffer[NDArray[np.float64]]:
        return self._moving_window._buffer  # pylint: disable=protected-access

    def _timestamp_to_rel_index(self, timestamp: datetime) -> int:
        """
        Get the index of a timestamp relative to the oldest sample in the MovingWindow.

        In other word consider an integer axis where the zero is defined as the
        oldest element in the MovingWindow. This function returns the index of
        the given timestamp an this axis.

        This method can return negative values.

        Args:
            timestamp: A timestamp that we want to shift into the window.

        Returns:
            The index of the timestamp shifted into the MovingWindow.
        """
        # align timestamp to the sampling period
        timestamp = self._buffer.normalize_timestamp(timestamp)

        # distance between the input ts and the ts of oldest known samples (in samples)
        dist_to_oldest = int(
            (timestamp - self._buffer.time_bound_oldest) / self._sampling_period
        )

        _logger.debug("Shifting ts: %s", timestamp)
        _logger.debug("Oldest timestamp in buffer: %s", self._buffer.time_bound_oldest)
        _logger.debug("Distance to the oldest sample: %i", dist_to_oldest)

        return dist_to_oldest

    def _reshape_np_array(
        self, array: NDArray[np.float_], window_size: int
    ) -> NDArray[np.float_]:
        """
        Reshape a numpy array to a 2D array where each row represents a window.

        There are three cases to consider

        1. The array size is a multiple of window_size + period,
           i.e. num_windows is integer and we can simply reshape.
        2. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m < window_size.
        3. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m >= window_size.

        Note that in the current implementation of this class we have the restriction
        that period is a multiple integer of the size of the MovingWindow and hence
        only case 1 can occur.

        Args:
            array: The numpy array to reshape.
            window_size: The size of the window in samples.

        Returns:
            The reshaped 2D array.

        Raises:
            ValueError: If the array is smaller or equal to the given period.
        """
        # Not using the num_windows function here because we want to
        # differentiate between the three cases.
        if len(array) < self._period:
            raise ValueError(
                f"The array (length:{len(array)}) is too small to be reshaped."
            )

        num_windows = len(array) // self._period

        # Case 1:
        if len(array) - num_windows * self._period == 0:
            resized_array = array
        # Case 2
        elif len(array) - num_windows * self._period < window_size:
            resized_array = array[: num_windows * self._period]
        # Case 3
        else:
            num_windows += 1
            resized_array = np.resize(array, num_windows * self._period)

        return resized_array.reshape(num_windows, self._period)

    def _get_relative_positions(
        self, start: datetime, window_size: int
    ) -> RelativePositions:
        """
        Return relative positions of the MovingWindow.

        This method calculates the shifted relative positions of the start
        timestamp, the end timestamps as well as the next position that is
        overwritten in the ringbuffer.
        Shifted in that context means that the positions are moved as close
        assume possible to the oldest sample in the MovingWindow.

        Args:
            start: The start timestamp of the window.
            window_size: The size of the window in samples.

        Returns:
            The relative positions of the start, end and next samples.
        """
        # The number of usable windows can change, when the current position of
        # the ringbuffer is inside one of the windows inside the MovingWindow.
        # Since this is possible, we assume that one window is always not used
        # for the average calculation.
        #
        # We are ignoring either the window that is currently overwritten if
        # the current position is inside that window or the window that would
        # be overwritten next.
        #
        # Move the window to its first appearance in the MovingWindow relative
        # to the oldest sample stored in the MovingWindow.
        #
        # In other words the oldest stored sample is considered to have index 0.
        #
        # Note that the returned value is a index not a timestamp
        rel_start_sample = self._timestamp_to_rel_index(start) % self._period
        rel_end_sample = rel_start_sample + window_size

        # check if the newest time bound, i.e. the sample that is currently written,
        # is inside the interval
        rb_current_position = self._buffer.time_bound_newest
        rel_next_position = (
            self._timestamp_to_rel_index(rb_current_position) + 1
        ) % self._period
        # fix the rel_next_position if modulo period the next position
        # is smaller then the start sample position
        if rel_next_position < rel_start_sample:
            rel_next_position += self._period

        rel_next_position += self._period * (window_size // self._period)

        _logger.debug("current position of the ringbuffer: %s", rb_current_position)
        _logger.debug("relative start_sample: %s", rel_start_sample)
        _logger.debug("relative end_sample: %s", rel_end_sample)
        _logger.debug("relative next_position: %s", rel_next_position)

        return RelativePositions(rel_start_sample, rel_end_sample, rel_next_position)

    def _get_buffer_bounds(
        self, start: datetime, end: datetime
    ) -> tuple[int, int, int]:
        """
        Get the bounds of the ringbuffer used for further operations.

        This method uses the given start and end timestamps to calculate the
        part of the ringbuffer that can be used for further operations, like
        average or min/max.

        Here we cut out the oldest window or the window that is currently
        overwritten in the MovingWindow such that it is not considered in any
        further operation.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            The bounds of the to be used buffer and the window size.

        Raises:
            ValueError: If the start timestamp is after the end timestamp.
        """
        window_size = self._timestamp_to_rel_index(end) - self._timestamp_to_rel_index(
            start
        )
        if window_size <= 0:
            raise ValueError("Start timestamp must be before end timestamp")
        if window_size > self._period:
            raise ValueError(
                "The window size must be smaller or equal than the period."
            )

        rel_pos = self._get_relative_positions(start, window_size)

        if window_size > self._moving_window.count_valid():
            raise ValueError(
                "The window size must be smaller than the size of the `MovingWindow`"
            )

        # shifted distance between the next incoming sample and the start of the window
        dist_to_start = rel_pos.next - rel_pos.start

        # get the start and end position inside the ringbuffer
        end_pos = (
            self._timestamp_to_rel_index(self._buffer.time_bound_newest) + 1
        ) - dist_to_start

        # Note that these check is working since we are using the positions
        # relative to the oldest sample stored in the MovingWindow.
        if rel_pos.start <= rel_pos.next < rel_pos.end:
            # end position is start_position of the window that is currently written
            # that's how end_pos is currently set
            _logger.debug("Next sample will be inside the window time interval!")
        else:
            _logger.debug("Next sample will be outside the window time interval!")
            # end position is start_position of the window that
            # is overwritten next, hence we adding period.
            end_pos += self._period

        # add the offset to the oldest sample in the ringbuffer and wrap around
        # to get the start and end positions in the ringbuffer
        rb_offset = self._buffer.to_internal_index(self._buffer.time_bound_oldest)
        start_pos = self._buffer.wrap(end_pos + self._period + rb_offset)
        end_pos = self._buffer.wrap(end_pos + rb_offset)

        _logger.debug("start_pos in ringbuffer: %s", start_pos)
        _logger.debug("end_pos in ringbuffer: %s", end_pos)

        return (start_pos, end_pos, window_size)

    def _get_reshaped_np_array(
        self, start: datetime, end: datetime
    ) -> tuple[NDArray[np.float_], int]:
        """
        Create a reshaped numpy array from the MovingWindow.

        The reshaped array is a two dimensional array, where one dimension is
        the window_size and the other the number of windows returned by the
        `_get_buffer_bounds` method.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            A tuple containing the reshaped numpy array and the window size.
        """
        (start_pos, end_pos, window_size) = self._get_buffer_bounds(start, end)

        if start_pos >= end_pos:
            window_start = self._buffer[start_pos : self._moving_window.count_valid()]
            window_end = self._buffer[0:end_pos]
            # make the linter happy
            assert isinstance(window_start, np.ndarray)
            assert isinstance(window_end, np.ndarray)
            window_array = np.concatenate((window_start, window_end))
        else:
            window_array = self._buffer[start_pos:end_pos]

        return (self._reshape_np_array(window_array, window_size), window_size)

    def avg(
        self, start: datetime, end: datetime, weights: list[float] | None = None
    ) -> NDArray[np.float_]:
        """
        Create the average window out of the window defined by `start` and `end`.

        This method calculates the average of a window by averaging over all
        windows fully inside the MovingWindow having the period
        `self.period`.

        Args:
            start: The start of the window to average over.
            end: The end of the window to average over.
            weights: The weights to use for the average calculation (oldest first).

        Returns:
            The averaged timeseries window.
        """
        (reshaped, window_size) = self._get_reshaped_np_array(start, end)
        return np.average(  # type: ignore[no-any-return]
            reshaped[:, :window_size], axis=0, weights=weights
        )
Functions¤
__init__ ¤
__init__(
    moving_window: MovingWindow, period: timedelta
) -> None

Initialize a PeriodicFeatureExtractor object.

PARAMETER DESCRIPTION
moving_window

The MovingWindow that is used for the average calculation.

TYPE: MovingWindow

period

The distance between two succeeding intervals.

TYPE: timedelta

RAISES DESCRIPTION
ValueError

If the MovingWindow size is not a integer multiple of the period.

Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
def __init__(
    self,
    moving_window: MovingWindow,
    period: timedelta,
) -> None:
    """
    Initialize a PeriodicFeatureExtractor object.

    Args:
        moving_window: The MovingWindow that is used for the average calculation.
        period: The distance between two succeeding intervals.

    Raises:
        ValueError: If the MovingWindow size is not a integer multiple of the period.
    """
    self._moving_window = moving_window

    self._sampling_period = self._moving_window.sampling_period
    """The sampling_period as float to use it for indexing of samples."""

    self._period = int(period / self._sampling_period)
    """Distance between two succeeding intervals in samples."""

    _logger.debug("Initializing PeriodicFeatureExtractor!")
    _logger.debug("MovingWindow size: %i", self._moving_window.count_valid())
    _logger.debug(
        "Period between two succeeding intervals (in samples): %i",
        self._period,
    )

    if not self._moving_window.count_valid() % self._period == 0:
        raise ValueError(
            "The MovingWindow size is not a integer multiple of the period."
        )

    if not is_close_to_zero(self._period - period / self._sampling_period):
        raise ValueError(
            "The period is not a multiple of the sampling period. "
            "This might result in unexpected behaviour."
        )
avg ¤
avg(
    start: datetime,
    end: datetime,
    weights: list[float] | None = None,
) -> NDArray[float_]

Create the average window out of the window defined by start and end.

This method calculates the average of a window by averaging over all windows fully inside the MovingWindow having the period self.period.

PARAMETER DESCRIPTION
start

The start of the window to average over.

TYPE: datetime

end

The end of the window to average over.

TYPE: datetime

weights

The weights to use for the average calculation (oldest first).

TYPE: list[float] | None DEFAULT: None

RETURNS DESCRIPTION
NDArray[float_]

The averaged timeseries window.

Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
def avg(
    self, start: datetime, end: datetime, weights: list[float] | None = None
) -> NDArray[np.float_]:
    """
    Create the average window out of the window defined by `start` and `end`.

    This method calculates the average of a window by averaging over all
    windows fully inside the MovingWindow having the period
    `self.period`.

    Args:
        start: The start of the window to average over.
        end: The end of the window to average over.
        weights: The weights to use for the average calculation (oldest first).

    Returns:
        The averaged timeseries window.
    """
    (reshaped, window_size) = self._get_reshaped_np_array(start, end)
    return np.average(  # type: ignore[no-any-return]
        reshaped[:, :window_size], axis=0, weights=weights
    )

frequenz.sdk.timeseries.ReceiverFetcher ¤

Bases: Generic[T_co], Protocol

An interface that just exposes a new_receiver method.

Source code in frequenz/sdk/_internal/_channels.py
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
    """An interface that just exposes a `new_receiver` method."""

    @abc.abstractmethod
    def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
        """Get a receiver from the channel.

        Args:
            limit: The maximum size of the receiver.

        Returns:
            A receiver instance.
        """
Functions¤
new_receiver abstractmethod ¤
new_receiver(*, limit: int = 50) -> Receiver[T_co]

Get a receiver from the channel.

PARAMETER DESCRIPTION
limit

The maximum size of the receiver.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[T_co]

A receiver instance.

Source code in frequenz/sdk/_internal/_channels.py
@abc.abstractmethod
def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
    """Get a receiver from the channel.

    Args:
        limit: The maximum size of the receiver.

    Returns:
        A receiver instance.
    """

frequenz.sdk.timeseries.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in frequenz/sdk/timeseries/_resampling.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the *input sampling period* if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = average
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to class-attribute instance-attribute ¤
align_to: datetime | None = UNIX_EPOCH

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len class-attribute instance-attribute ¤
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len class-attribute instance-attribute ¤
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods class-attribute instance-attribute ¤
max_data_age_in_periods: float = 3.0

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function class-attribute instance-attribute ¤
resampling_function: ResamplingFunction = average

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period instance-attribute ¤
resampling_period: timedelta

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len class-attribute instance-attribute ¤
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__ ¤
__post_init__() -> None

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in frequenz/sdk/timeseries/_resampling.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

frequenz.sdk.timeseries.Sample dataclass ¤

Bases: Generic[QuantityT]

A measurement taken at a particular point in time.

The value could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True, order=True)
class Sample(Generic[QuantityT]):
    """A measurement taken at a particular point in time.

    The `value` could be `None` if a component is malfunctioning or data is
    lacking for another reason, but a sample still needs to be sent to have a
    coherent view on a group of component metrics for a particular timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""

    value: QuantityT | None = None
    """The value of this sample."""
Attributes¤
timestamp instance-attribute ¤
timestamp: datetime

The time when this sample was generated.

value class-attribute instance-attribute ¤
value: QuantityT | None = None

The value of this sample.

frequenz.sdk.timeseries.Sample3Phase dataclass ¤

Bases: Generic[QuantityT]

A 3-phase measurement made at a particular point in time.

Each of the value fields could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Sample3Phase(Generic[QuantityT]):
    """A 3-phase measurement made at a particular point in time.

    Each of the `value` fields could be `None` if a component is malfunctioning
    or data is lacking for another reason, but a sample still needs to be sent
    to have a coherent view on a group of component metrics for a particular
    timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""
    value_p1: QuantityT | None
    """The value of the 1st phase in this sample."""

    value_p2: QuantityT | None
    """The value of the 2nd phase in this sample."""

    value_p3: QuantityT | None
    """The value of the 3rd phase in this sample."""

    def __iter__(self) -> Iterator[QuantityT | None]:
        """Return an iterator that yields values from each of the phases.

        Yields:
            Per-phase measurements one-by-one.
        """
        yield self.value_p1
        yield self.value_p2
        yield self.value_p3

    @overload
    def max(self, default: QuantityT) -> QuantityT: ...

    @overload
    def max(self, default: None = None) -> QuantityT | None: ...

    def max(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the max value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Max value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x > y else y,
            filter(None, self),
        )
        return value

    @overload
    def min(self, default: QuantityT) -> QuantityT: ...

    @overload
    def min(self, default: None = None) -> QuantityT | None: ...

    def min(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the min value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Min value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x < y else y,
            filter(None, self),
        )
        return value

    def map(
        self,
        function: Callable[[QuantityT], QuantityT],
        default: QuantityT | None = None,
    ) -> Self:
        """Apply the given function on each of the phase values and return the result.

        If a phase value is `None`, replace it with `default` instead.

        Args:
            function: The function to apply on each of the phase values.
            default: The value to apply if a phase value is `None`.

        Returns:
            A new instance, with the given function applied on values for each of the
                phases.
        """
        return self.__class__(
            timestamp=self.timestamp,
            value_p1=default if self.value_p1 is None else function(self.value_p1),
            value_p2=default if self.value_p2 is None else function(self.value_p2),
            value_p3=default if self.value_p3 is None else function(self.value_p3),
        )
Attributes¤
timestamp instance-attribute ¤
timestamp: datetime

The time when this sample was generated.

value_p1 instance-attribute ¤
value_p1: QuantityT | None

The value of the 1st phase in this sample.

value_p2 instance-attribute ¤
value_p2: QuantityT | None

The value of the 2nd phase in this sample.

value_p3 instance-attribute ¤
value_p3: QuantityT | None

The value of the 3rd phase in this sample.

Functions¤
__iter__ ¤
__iter__() -> Iterator[QuantityT | None]

Return an iterator that yields values from each of the phases.

YIELDS DESCRIPTION
QuantityT | None

Per-phase measurements one-by-one.

Source code in frequenz/sdk/timeseries/_base_types.py
def __iter__(self) -> Iterator[QuantityT | None]:
    """Return an iterator that yields values from each of the phases.

    Yields:
        Per-phase measurements one-by-one.
    """
    yield self.value_p1
    yield self.value_p2
    yield self.value_p3
map ¤
map(
    function: Callable[[QuantityT], QuantityT],
    default: QuantityT | None = None,
) -> Self

Apply the given function on each of the phase values and return the result.

If a phase value is None, replace it with default instead.

PARAMETER DESCRIPTION
function

The function to apply on each of the phase values.

TYPE: Callable[[QuantityT], QuantityT]

default

The value to apply if a phase value is None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
Self

A new instance, with the given function applied on values for each of the phases.

Source code in frequenz/sdk/timeseries/_base_types.py
def map(
    self,
    function: Callable[[QuantityT], QuantityT],
    default: QuantityT | None = None,
) -> Self:
    """Apply the given function on each of the phase values and return the result.

    If a phase value is `None`, replace it with `default` instead.

    Args:
        function: The function to apply on each of the phase values.
        default: The value to apply if a phase value is `None`.

    Returns:
        A new instance, with the given function applied on values for each of the
            phases.
    """
    return self.__class__(
        timestamp=self.timestamp,
        value_p1=default if self.value_p1 is None else function(self.value_p1),
        value_p2=default if self.value_p2 is None else function(self.value_p2),
        value_p3=default if self.value_p3 is None else function(self.value_p3),
    )
max ¤
max(default: QuantityT | None = None) -> QuantityT | None

Return the max value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Max value among all phases, if available, default value otherwise.

Source code in frequenz/sdk/timeseries/_base_types.py
def max(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the max value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Max value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x > y else y,
        filter(None, self),
    )
    return value
min ¤
min(default: QuantityT | None = None) -> QuantityT | None

Return the min value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Min value among all phases, if available, default value otherwise.

Source code in frequenz/sdk/timeseries/_base_types.py
def min(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the min value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Min value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x < y else y,
        filter(None, self),
    )
    return value