Skip to content

Index

frequenz.sdk.timeseries ¤

Handling of timeseries streams.

A timeseries is a stream (normally an async iterator) of Samples.

Periodicity and alignment¤

All the data produced by this package is always periodic and aligned to the UNIX_EPOCH (by default).

Classes normally take a (re)sampling period as and argument and, optionally, an align_to argument.

This means timestamps are always separated exaclty by a period, and that this timestamp falls always at multiples of the period, starting at the align_to.

This ensures that the data is predictable and consistent among restarts.

Example

If we have a period of 10 seconds, and are aligning to the UNIX epoch. Assuming the following timeline starts in 1970-01-01 00:00:00 UTC and our current now is 1970-01-01 00:00:32 UTC, then the next timestamp will be at 1970-01-01 00:00:40 UTC:

align_to = 1970-01-01 00:00:00         next event = 1970-01-01 00:00:40
|                                       |
|---------|---------|---------|-|-------|---------|---------|---------|
0        10        20        30 |      40        50        60        70
                               now = 1970-01-01 00:00:32

Attributes¤

frequenz.sdk.timeseries.UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc) module-attribute ¤

The UNIX epoch (in UTC).

Classes¤

frequenz.sdk.timeseries.Current ¤

Bases: Quantity

A current quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Current(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        -3: "mA",
        0: "A",
    },
):
    """A current quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_amperes(cls, amperes: float) -> Self:
        """Initialize a new current quantity.

        Args:
            amperes: The current in amperes.

        Returns:
            A new current quantity.
        """
        current = cls.__new__(cls)
        current._base_value = amperes
        return current

    @classmethod
    def from_milliamperes(cls, milliamperes: float) -> Self:
        """Initialize a new current quantity.

        Args:
            milliamperes: The current in milliamperes.

        Returns:
            A new current quantity.
        """
        current = cls.__new__(cls)
        current._base_value = milliamperes * 10**-3
        return current

    def as_amperes(self) -> float:
        """Return the current in amperes.

        Returns:
            The current in amperes.
        """
        return self._base_value

    def as_milliamperes(self) -> float:
        """Return the current in milliamperes.

        Returns:
            The current in milliamperes.
        """
        return self._base_value * 1e3

    @overload  # type: ignore
    def __mul__(self, other: Percentage) -> Self:
        """Return a power from multiplying this power by the given percentage.

        Args:
            other: The percentage to multiply by.
        """

    @overload
    def __mul__(self, other: Voltage) -> Power:
        """Multiply the current by a voltage to get a power.

        Args:
            other: The voltage.
        """

    def __mul__(self, other: Percentage | Voltage) -> Self | Power:
        """Return a current or power from multiplying this current by the given value.

        Args:
            other: The percentage or voltage to multiply by.

        Returns:
            A current or power.

        Raises:
            TypeError: If the given value is not a percentage or voltage.
        """
        if isinstance(other, Percentage):
            return super().__mul__(other)
        if isinstance(other, Voltage):
            return Power.from_watts(self._base_value * other._base_value)

        return NotImplemented
Functions¤
__mul__(other) ¤

Return a current or power from multiplying this current by the given value.

PARAMETER DESCRIPTION
other

The percentage or voltage to multiply by.

TYPE: Percentage | Voltage

RETURNS DESCRIPTION
Self | Power

A current or power.

RAISES DESCRIPTION
TypeError

If the given value is not a percentage or voltage.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, other: Percentage | Voltage) -> Self | Power:
    """Return a current or power from multiplying this current by the given value.

    Args:
        other: The percentage or voltage to multiply by.

    Returns:
        A current or power.

    Raises:
        TypeError: If the given value is not a percentage or voltage.
    """
    if isinstance(other, Percentage):
        return super().__mul__(other)
    if isinstance(other, Voltage):
        return Power.from_watts(self._base_value * other._base_value)

    return NotImplemented
as_amperes() ¤

Return the current in amperes.

RETURNS DESCRIPTION
float

The current in amperes.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_amperes(self) -> float:
    """Return the current in amperes.

    Returns:
        The current in amperes.
    """
    return self._base_value
as_milliamperes() ¤

Return the current in milliamperes.

RETURNS DESCRIPTION
float

The current in milliamperes.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_milliamperes(self) -> float:
    """Return the current in milliamperes.

    Returns:
        The current in milliamperes.
    """
    return self._base_value * 1e3
from_amperes(amperes) classmethod ¤

Initialize a new current quantity.

PARAMETER DESCRIPTION
amperes

The current in amperes.

TYPE: float

RETURNS DESCRIPTION
Self

A new current quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_amperes(cls, amperes: float) -> Self:
    """Initialize a new current quantity.

    Args:
        amperes: The current in amperes.

    Returns:
        A new current quantity.
    """
    current = cls.__new__(cls)
    current._base_value = amperes
    return current
from_milliamperes(milliamperes) classmethod ¤

Initialize a new current quantity.

PARAMETER DESCRIPTION
milliamperes

The current in milliamperes.

TYPE: float

RETURNS DESCRIPTION
Self

A new current quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_milliamperes(cls, milliamperes: float) -> Self:
    """Initialize a new current quantity.

    Args:
        milliamperes: The current in milliamperes.

    Returns:
        A new current quantity.
    """
    current = cls.__new__(cls)
    current._base_value = milliamperes * 10**-3
    return current

frequenz.sdk.timeseries.Energy ¤

Bases: Quantity

An energy quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Energy(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        0: "Wh",
        3: "kWh",
        6: "MWh",
    },
):
    """An energy quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_watt_hours(cls, watt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            watt_hours: The energy in watt hours.

        Returns:
            A new energy quantity.
        """
        energy = cls.__new__(cls)
        energy._base_value = watt_hours
        return energy

    @classmethod
    def from_kilowatt_hours(cls, kilowatt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            kilowatt_hours: The energy in kilowatt hours.

        Returns:
            A new energy quantity.
        """
        energy = cls.__new__(cls)
        energy._base_value = kilowatt_hours * 10**3
        return energy

    @classmethod
    def from_megawatt_hours(cls, megawatt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            megawatt_hours: The energy in megawatt hours.

        Returns:
            A new energy quantity.
        """
        energy = cls.__new__(cls)
        energy._base_value = megawatt_hours * 10**6
        return energy

    def as_watt_hours(self) -> float:
        """Return the energy in watt hours.

        Returns:
            The energy in watt hours.
        """
        return self._base_value

    def as_kilowatt_hours(self) -> float:
        """Return the energy in kilowatt hours.

        Returns:
            The energy in kilowatt hours.
        """
        return self._base_value / 1e3

    def as_megawatt_hours(self) -> float:
        """Return the energy in megawatt hours.

        Returns:
            The energy in megawatt hours.
        """
        return self._base_value / 1e6

    @overload
    def __truediv__(self, other: timedelta) -> Power:
        """Return a power from dividing this energy by the given duration.

        Args:
            other: The duration to divide by.
        """

    @overload
    def __truediv__(self, other: Power) -> timedelta:
        """Return a duration from dividing this energy by the given power.

        Args:
            other: The power to divide by.
        """

    def __truediv__(self, other: timedelta | Power) -> Power | timedelta:
        """Return a power or duration from dividing this energy by the given value.

        Args:
            other: The power or duration to divide by.

        Returns:
            A power or duration from dividing this energy by the given value.

        Raises:
            TypeError: If the given value is not a power or duration.
        """
        if isinstance(other, timedelta):
            return Power.from_watts(self._base_value / (other.total_seconds() / 3600.0))
        if isinstance(other, Power):
            return timedelta(seconds=(self._base_value / other._base_value) * 3600.0)
        raise TypeError(
            f"unsupported operand type(s) for /: '{type(self)}' and '{type(other)}'"
        )
Functions¤
__truediv__(other) ¤

Return a power or duration from dividing this energy by the given value.

PARAMETER DESCRIPTION
other

The power or duration to divide by.

TYPE: timedelta | Power

RETURNS DESCRIPTION
Power | timedelta

A power or duration from dividing this energy by the given value.

RAISES DESCRIPTION
TypeError

If the given value is not a power or duration.

Source code in frequenz/sdk/timeseries/_quantities.py
def __truediv__(self, other: timedelta | Power) -> Power | timedelta:
    """Return a power or duration from dividing this energy by the given value.

    Args:
        other: The power or duration to divide by.

    Returns:
        A power or duration from dividing this energy by the given value.

    Raises:
        TypeError: If the given value is not a power or duration.
    """
    if isinstance(other, timedelta):
        return Power.from_watts(self._base_value / (other.total_seconds() / 3600.0))
    if isinstance(other, Power):
        return timedelta(seconds=(self._base_value / other._base_value) * 3600.0)
    raise TypeError(
        f"unsupported operand type(s) for /: '{type(self)}' and '{type(other)}'"
    )
as_kilowatt_hours() ¤

Return the energy in kilowatt hours.

RETURNS DESCRIPTION
float

The energy in kilowatt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilowatt_hours(self) -> float:
    """Return the energy in kilowatt hours.

    Returns:
        The energy in kilowatt hours.
    """
    return self._base_value / 1e3
as_megawatt_hours() ¤

Return the energy in megawatt hours.

RETURNS DESCRIPTION
float

The energy in megawatt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_megawatt_hours(self) -> float:
    """Return the energy in megawatt hours.

    Returns:
        The energy in megawatt hours.
    """
    return self._base_value / 1e6
as_watt_hours() ¤

Return the energy in watt hours.

RETURNS DESCRIPTION
float

The energy in watt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_watt_hours(self) -> float:
    """Return the energy in watt hours.

    Returns:
        The energy in watt hours.
    """
    return self._base_value
from_kilowatt_hours(kilowatt_hours) classmethod ¤

Initialize a new energy quantity.

PARAMETER DESCRIPTION
kilowatt_hours

The energy in kilowatt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilowatt_hours(cls, kilowatt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        kilowatt_hours: The energy in kilowatt hours.

    Returns:
        A new energy quantity.
    """
    energy = cls.__new__(cls)
    energy._base_value = kilowatt_hours * 10**3
    return energy
from_megawatt_hours(megawatt_hours) classmethod ¤

Initialize a new energy quantity.

PARAMETER DESCRIPTION
megawatt_hours

The energy in megawatt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_megawatt_hours(cls, megawatt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        megawatt_hours: The energy in megawatt hours.

    Returns:
        A new energy quantity.
    """
    energy = cls.__new__(cls)
    energy._base_value = megawatt_hours * 10**6
    return energy
from_watt_hours(watt_hours) classmethod ¤

Initialize a new energy quantity.

PARAMETER DESCRIPTION
watt_hours

The energy in watt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_watt_hours(cls, watt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        watt_hours: The energy in watt hours.

    Returns:
        A new energy quantity.
    """
    energy = cls.__new__(cls)
    energy._base_value = watt_hours
    return energy

frequenz.sdk.timeseries.Frequency ¤

Bases: Quantity

A frequency quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Frequency(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={0: "Hz", 3: "kHz", 6: "MHz", 9: "GHz"},
):
    """A frequency quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_hertz(cls, hertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            hertz: The frequency in hertz.

        Returns:
            A new frequency quantity.
        """
        frequency = cls.__new__(cls)
        frequency._base_value = hertz
        return frequency

    @classmethod
    def from_kilohertz(cls, kilohertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            kilohertz: The frequency in kilohertz.

        Returns:
            A new frequency quantity.
        """
        frequency = cls.__new__(cls)
        frequency._base_value = kilohertz * 10**3
        return frequency

    @classmethod
    def from_megahertz(cls, megahertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            megahertz: The frequency in megahertz.

        Returns:
            A new frequency quantity.
        """
        frequency = cls.__new__(cls)
        frequency._base_value = megahertz * 10**6
        return frequency

    @classmethod
    def from_gigahertz(cls, gigahertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            gigahertz: The frequency in gigahertz.

        Returns:
            A new frequency quantity.
        """
        frequency = cls.__new__(cls)
        frequency._base_value = gigahertz * 10**9
        return frequency

    def as_hertz(self) -> float:
        """Return the frequency in hertz.

        Returns:
            The frequency in hertz.
        """
        return self._base_value

    def as_kilohertz(self) -> float:
        """Return the frequency in kilohertz.

        Returns:
            The frequency in kilohertz.
        """
        return self._base_value / 1e3

    def as_megahertz(self) -> float:
        """Return the frequency in megahertz.

        Returns:
            The frequency in megahertz.
        """
        return self._base_value / 1e6

    def as_gigahertz(self) -> float:
        """Return the frequency in gigahertz.

        Returns:
            The frequency in gigahertz.
        """
        return self._base_value / 1e9

    def period(self) -> timedelta:
        """Return the period of the frequency.

        Returns:
            The period of the frequency.
        """
        return timedelta(seconds=1.0 / self._base_value)
Functions¤
as_gigahertz() ¤

Return the frequency in gigahertz.

RETURNS DESCRIPTION
float

The frequency in gigahertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_gigahertz(self) -> float:
    """Return the frequency in gigahertz.

    Returns:
        The frequency in gigahertz.
    """
    return self._base_value / 1e9
as_hertz() ¤

Return the frequency in hertz.

RETURNS DESCRIPTION
float

The frequency in hertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_hertz(self) -> float:
    """Return the frequency in hertz.

    Returns:
        The frequency in hertz.
    """
    return self._base_value
as_kilohertz() ¤

Return the frequency in kilohertz.

RETURNS DESCRIPTION
float

The frequency in kilohertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilohertz(self) -> float:
    """Return the frequency in kilohertz.

    Returns:
        The frequency in kilohertz.
    """
    return self._base_value / 1e3
as_megahertz() ¤

Return the frequency in megahertz.

RETURNS DESCRIPTION
float

The frequency in megahertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_megahertz(self) -> float:
    """Return the frequency in megahertz.

    Returns:
        The frequency in megahertz.
    """
    return self._base_value / 1e6
from_gigahertz(gigahertz) classmethod ¤

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
gigahertz

The frequency in gigahertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_gigahertz(cls, gigahertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        gigahertz: The frequency in gigahertz.

    Returns:
        A new frequency quantity.
    """
    frequency = cls.__new__(cls)
    frequency._base_value = gigahertz * 10**9
    return frequency
from_hertz(hertz) classmethod ¤

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
hertz

The frequency in hertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_hertz(cls, hertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        hertz: The frequency in hertz.

    Returns:
        A new frequency quantity.
    """
    frequency = cls.__new__(cls)
    frequency._base_value = hertz
    return frequency
from_kilohertz(kilohertz) classmethod ¤

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
kilohertz

The frequency in kilohertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilohertz(cls, kilohertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        kilohertz: The frequency in kilohertz.

    Returns:
        A new frequency quantity.
    """
    frequency = cls.__new__(cls)
    frequency._base_value = kilohertz * 10**3
    return frequency
from_megahertz(megahertz) classmethod ¤

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
megahertz

The frequency in megahertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_megahertz(cls, megahertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        megahertz: The frequency in megahertz.

    Returns:
        A new frequency quantity.
    """
    frequency = cls.__new__(cls)
    frequency._base_value = megahertz * 10**6
    return frequency
period() ¤

Return the period of the frequency.

RETURNS DESCRIPTION
timedelta

The period of the frequency.

Source code in frequenz/sdk/timeseries/_quantities.py
def period(self) -> timedelta:
    """Return the period of the frequency.

    Returns:
        The period of the frequency.
    """
    return timedelta(seconds=1.0 / self._base_value)

frequenz.sdk.timeseries.MovingWindow ¤

Bases: BackgroundService

A data window that moves with the latest datapoints of a data stream.

After initialization the MovingWindow can be accessed by an integer index or a timestamp. A sub window can be accessed by using a slice of integers or timestamps.

Note that a numpy ndarray is returned and thus users can use numpys operations directly on a window.

The window uses a ring buffer for storage and the first element is aligned to a fixed defined point in time. Since the moving nature of the window, the date of the first and the last element are constantly changing and therefore the point in time that defines the alignment can be outside of the time window. Modulo arithmetic is used to move the align_to timestamp into the latest window.

If for example the align_to parameter is set to datetime(1, 1, 1, tzinfo=timezone.utc) and the window size is bigger than one day then the first element will always be aligned to midnight.

Resampling might be required to reduce the number of samples to store, and it can be set by specifying the resampler config parameter so that the user can control the granularity of the samples to be stored in the underlying buffer.

If resampling is not required, the resampler config parameter can be set to None in which case the MovingWindow will not perform any resampling.

Example: Calculate the mean of a time interval

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample]("sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    async with MovingWindow(
        size=timedelta(seconds=5),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        time_start = datetime.now(tz=timezone.utc)
        time_end = time_start + timedelta(seconds=5)

        # ... wait for 5 seconds until the buffer is filled
        await asyncio.sleep(5)

        # return an numpy array from the window
        array = window[time_start:time_end]
        # and use it to for example calculate the mean
        mean = array.mean()

asyncio.run(run())
```

Example: Create a polars data frame from a MovingWindow

```python
import polars as pl
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample]("sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    # create a window that stores two days of data
    # starting at 1.1.23 with samplerate=1
    async with MovingWindow(
        size=timedelta(days=2),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        # wait for one full day until the buffer is filled
        await asyncio.sleep(60*60*24)

        # create a polars series with one full day of data
        time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
        time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
        series = pl.Series("Jan_1", window[time_start:time_end])

asyncio.run(run())
```
Source code in frequenz/sdk/timeseries/_moving_window.py
class MovingWindow(BackgroundService):
    """
    A data window that moves with the latest datapoints of a data stream.

    After initialization the `MovingWindow` can be accessed by an integer
    index or a timestamp. A sub window can be accessed by using a slice of
    integers or timestamps.

    Note that a numpy ndarray is returned and thus users can use
    numpys operations directly on a window.

    The window uses a ring buffer for storage and the first element is aligned to
    a fixed defined point in time. Since the moving nature of the window, the
    date of the first and the last element are constantly changing and therefore
    the point in time that defines the alignment can be outside of the time window.
    Modulo arithmetic is used to move the `align_to` timestamp into the latest
    window.

    If for example the `align_to` parameter is set to
    `datetime(1, 1, 1, tzinfo=timezone.utc)` and the window size is bigger than
    one day then the first element will always be aligned to midnight.

    Resampling might be required to reduce the number of samples to store, and
    it can be set by specifying the resampler config parameter so that the user
    can control the granularity of the samples to be stored in the underlying
    buffer.

    If resampling is not required, the resampler config parameter can be
    set to None in which case the MovingWindow will not perform any resampling.

    Example: Calculate the mean of a time interval

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample]("sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            async with MovingWindow(
                size=timedelta(seconds=5),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                time_start = datetime.now(tz=timezone.utc)
                time_end = time_start + timedelta(seconds=5)

                # ... wait for 5 seconds until the buffer is filled
                await asyncio.sleep(5)

                # return an numpy array from the window
                array = window[time_start:time_end]
                # and use it to for example calculate the mean
                mean = array.mean()

        asyncio.run(run())
        ```

    Example: Create a polars data frame from a `MovingWindow`

        ```python
        import polars as pl
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample]("sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            # create a window that stores two days of data
            # starting at 1.1.23 with samplerate=1
            async with MovingWindow(
                size=timedelta(days=2),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                # wait for one full day until the buffer is filled
                await asyncio.sleep(60*60*24)

                # create a polars series with one full day of data
                time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
                time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
                series = pl.Series("Jan_1", window[time_start:time_end])

        asyncio.run(run())
        ```
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        size: timedelta,
        resampled_data_recv: Receiver[Sample[Quantity]],
        input_sampling_period: timedelta,
        resampler_config: ResamplerConfig | None = None,
        align_to: datetime = UNIX_EPOCH,
        *,
        name: str | None = None,
    ) -> None:
        """
        Initialize the MovingWindow.

        This method creates the underlying ring buffer and starts a
        new task that updates the ring buffer with new incoming samples.
        The task stops running only if the channel receiver is closed.

        Args:
            size: The time span of the moving window over which samples will be stored.
            resampled_data_recv: A receiver that delivers samples with a
                given sampling period.
            input_sampling_period: The time interval between consecutive input samples.
            resampler_config: The resampler configuration in case resampling is required.
            align_to: A datetime object that defines a point in time to which
                the window is aligned to modulo window size. For further
                information, consult the class level documentation.
            name: The name of this moving window. If `None`, `str(id(self))` will be
                used. This is used mostly for debugging purposes.
        """
        assert (
            input_sampling_period.total_seconds() > 0
        ), "The input sampling period should be greater than zero."
        assert (
            input_sampling_period <= size
        ), "The input sampling period should be equal to or lower than the window size."
        super().__init__(name=name)

        self._sampling_period = input_sampling_period

        self._resampler: Resampler | None = None
        self._resampler_sender: Sender[Sample[Quantity]] | None = None

        if resampler_config:
            assert (
                resampler_config.resampling_period <= size
            ), "The resampling period should be equal to or lower than the window size."

            self._resampler = Resampler(resampler_config)
            self._sampling_period = resampler_config.resampling_period

        # Sampling period might not fit perfectly into the window size.
        num_samples = math.ceil(
            size.total_seconds() / self._sampling_period.total_seconds()
        )

        self._resampled_data_recv = resampled_data_recv
        self._buffer = OrderedRingBuffer(
            np.empty(shape=num_samples, dtype=float),
            sampling_period=self._sampling_period,
            align_to=align_to,
        )

    def start(self) -> None:
        """Start the MovingWindow.

        This method starts the MovingWindow tasks.
        """
        if self._resampler:
            self._configure_resampler()
        self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))

    @property
    def sampling_period(self) -> timedelta:
        """
        Return the sampling period of the MovingWindow.

        Returns:
            The sampling period of the MovingWindow.
        """
        return self._sampling_period

    async def _run_impl(self) -> None:
        """Awaits samples from the receiver and updates the underlying ring buffer.

        Raises:
            asyncio.CancelledError: if the MovingWindow task is cancelled.
        """
        try:
            async for sample in self._resampled_data_recv:
                _logger.debug("Received new sample: %s", sample)
                if self._resampler and self._resampler_sender:
                    await self._resampler_sender.send(sample)
                else:
                    self._buffer.update(sample)

        except asyncio.CancelledError:
            _logger.info("MovingWindow task has been cancelled.")
            raise

        _logger.error("Channel has been closed")

    def _configure_resampler(self) -> None:
        """Configure the components needed to run the resampler."""
        assert self._resampler is not None

        async def sink_buffer(sample: Sample[Quantity]) -> None:
            if sample.value is not None:
                self._buffer.update(sample)

        resampler_channel = Broadcast[Sample[Quantity]]("average")
        self._resampler_sender = resampler_channel.new_sender()
        self._resampler.add_timeseries(
            "avg", resampler_channel.new_receiver(), sink_buffer
        )
        self._tasks.add(
            asyncio.create_task(self._resampler.resample(), name="resample")
        )

    def __len__(self) -> int:
        """
        Return the size of the `MovingWindow`s underlying buffer.

        Returns:
            The size of the `MovingWindow`.
        """
        return len(self._buffer)

    @overload
    def __getitem__(self, key: SupportsIndex) -> float:
        """See the main __getitem__ method.

        [//]: # (# noqa: DAR101 key)
        """

    @overload
    def __getitem__(self, key: datetime) -> float:
        """See the main __getitem__ method.

        [//]: # (# noqa: DAR101 key)
        """

    @overload
    def __getitem__(self, key: slice) -> ArrayLike:
        """See the main __getitem__ method.

        [//]: # (# noqa: DAR101 key)
        """

    def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
        """
        Return a sub window of the `MovingWindow`.

        The `MovingWindow` is accessed either by timestamp or by index
        or by a slice of timestamps or integers.

        * If the key is an integer, the float value of that key
          at the given position is returned.
        * If the key is a datetime object, the float value of that key
          that corresponds to the timestamp is returned.
        * If the key is a slice of timestamps or integers, an ndarray is returned,
          where the bounds correspond to the slice bounds.
          Note that a half open interval, which is open at the end, is returned.

        Args:
            key: Either an integer or a timestamp or a slice of timestamps or integers.

        Raises:
            IndexError: when requesting an out of range timestamp or index
            TypeError: when the key is not a datetime or slice object.

        Returns:
            A float if the key is a number or a timestamp.
            an numpy array if the key is a slice.
        """
        if len(self._buffer) == 0:
            raise IndexError("The buffer is empty.")
        if isinstance(key, slice):
            if isinstance(key.start, int) or isinstance(key.stop, int):
                if key.start is None or key.stop is None:
                    key = slice(slice(key.start, key.stop).indices(self.__len__()))
            elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
                if key.start is None:
                    key = slice(self._buffer.time_bound_oldest, key.stop)
                if key.stop is None:
                    key = slice(key.start, self._buffer.time_bound_newest)

            _logger.debug("Returning slice for [%s:%s].", key.start, key.stop)

            # we are doing runtime typechecks since there is no abstract slice type yet
            # see also (https://peps.python.org/pep-0696)
            if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
                return self._buffer.window(key.start, key.stop)
            if isinstance(key.start, int) and isinstance(key.stop, int):
                return self._buffer[key]
        elif isinstance(key, datetime):
            _logger.debug("Returning value at time %s ", key)
            return self._buffer[self._buffer.datetime_to_index(key)]
        elif isinstance(key, SupportsIndex):
            _logger.debug("Returning value at index %s ", key)
            return self._buffer[key]

        raise TypeError(
            "Key has to be either a timestamp or an integer "
            "or a slice of timestamps or integers"
        )
Attributes¤
sampling_period: timedelta property ¤

Return the sampling period of the MovingWindow.

RETURNS DESCRIPTION
timedelta

The sampling period of the MovingWindow.

Functions¤
__getitem__(key) ¤

Return a sub window of the MovingWindow.

The MovingWindow is accessed either by timestamp or by index or by a slice of timestamps or integers.

  • If the key is an integer, the float value of that key at the given position is returned.
  • If the key is a datetime object, the float value of that key that corresponds to the timestamp is returned.
  • If the key is a slice of timestamps or integers, an ndarray is returned, where the bounds correspond to the slice bounds. Note that a half open interval, which is open at the end, is returned.
PARAMETER DESCRIPTION
key

Either an integer or a timestamp or a slice of timestamps or integers.

TYPE: SupportsIndex | datetime | slice

RAISES DESCRIPTION
IndexError

when requesting an out of range timestamp or index

TypeError

when the key is not a datetime or slice object.

RETURNS DESCRIPTION
float | ArrayLike

A float if the key is a number or a timestamp.

float | ArrayLike

an numpy array if the key is a slice.

Source code in frequenz/sdk/timeseries/_moving_window.py
def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
    """
    Return a sub window of the `MovingWindow`.

    The `MovingWindow` is accessed either by timestamp or by index
    or by a slice of timestamps or integers.

    * If the key is an integer, the float value of that key
      at the given position is returned.
    * If the key is a datetime object, the float value of that key
      that corresponds to the timestamp is returned.
    * If the key is a slice of timestamps or integers, an ndarray is returned,
      where the bounds correspond to the slice bounds.
      Note that a half open interval, which is open at the end, is returned.

    Args:
        key: Either an integer or a timestamp or a slice of timestamps or integers.

    Raises:
        IndexError: when requesting an out of range timestamp or index
        TypeError: when the key is not a datetime or slice object.

    Returns:
        A float if the key is a number or a timestamp.
        an numpy array if the key is a slice.
    """
    if len(self._buffer) == 0:
        raise IndexError("The buffer is empty.")
    if isinstance(key, slice):
        if isinstance(key.start, int) or isinstance(key.stop, int):
            if key.start is None or key.stop is None:
                key = slice(slice(key.start, key.stop).indices(self.__len__()))
        elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
            if key.start is None:
                key = slice(self._buffer.time_bound_oldest, key.stop)
            if key.stop is None:
                key = slice(key.start, self._buffer.time_bound_newest)

        _logger.debug("Returning slice for [%s:%s].", key.start, key.stop)

        # we are doing runtime typechecks since there is no abstract slice type yet
        # see also (https://peps.python.org/pep-0696)
        if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
            return self._buffer.window(key.start, key.stop)
        if isinstance(key.start, int) and isinstance(key.stop, int):
            return self._buffer[key]
    elif isinstance(key, datetime):
        _logger.debug("Returning value at time %s ", key)
        return self._buffer[self._buffer.datetime_to_index(key)]
    elif isinstance(key, SupportsIndex):
        _logger.debug("Returning value at index %s ", key)
        return self._buffer[key]

    raise TypeError(
        "Key has to be either a timestamp or an integer "
        "or a slice of timestamps or integers"
    )
__init__(size, resampled_data_recv, input_sampling_period, resampler_config=None, align_to=UNIX_EPOCH, *, name=None) ¤

Initialize the MovingWindow.

This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed.

PARAMETER DESCRIPTION
size

The time span of the moving window over which samples will be stored.

TYPE: timedelta

resampled_data_recv

A receiver that delivers samples with a given sampling period.

TYPE: Receiver[Sample[Quantity]]

input_sampling_period

The time interval between consecutive input samples.

TYPE: timedelta

resampler_config

The resampler configuration in case resampling is required.

TYPE: ResamplerConfig | None DEFAULT: None

align_to

A datetime object that defines a point in time to which the window is aligned to modulo window size. For further information, consult the class level documentation.

TYPE: datetime DEFAULT: UNIX_EPOCH

name

The name of this moving window. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/timeseries/_moving_window.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    *,
    name: str | None = None,
) -> None:
    """
    Initialize the MovingWindow.

    This method creates the underlying ring buffer and starts a
    new task that updates the ring buffer with new incoming samples.
    The task stops running only if the channel receiver is closed.

    Args:
        size: The time span of the moving window over which samples will be stored.
        resampled_data_recv: A receiver that delivers samples with a
            given sampling period.
        input_sampling_period: The time interval between consecutive input samples.
        resampler_config: The resampler configuration in case resampling is required.
        align_to: A datetime object that defines a point in time to which
            the window is aligned to modulo window size. For further
            information, consult the class level documentation.
        name: The name of this moving window. If `None`, `str(id(self))` will be
            used. This is used mostly for debugging purposes.
    """
    assert (
        input_sampling_period.total_seconds() > 0
    ), "The input sampling period should be greater than zero."
    assert (
        input_sampling_period <= size
    ), "The input sampling period should be equal to or lower than the window size."
    super().__init__(name=name)

    self._sampling_period = input_sampling_period

    self._resampler: Resampler | None = None
    self._resampler_sender: Sender[Sample[Quantity]] | None = None

    if resampler_config:
        assert (
            resampler_config.resampling_period <= size
        ), "The resampling period should be equal to or lower than the window size."

        self._resampler = Resampler(resampler_config)
        self._sampling_period = resampler_config.resampling_period

    # Sampling period might not fit perfectly into the window size.
    num_samples = math.ceil(
        size.total_seconds() / self._sampling_period.total_seconds()
    )

    self._resampled_data_recv = resampled_data_recv
    self._buffer = OrderedRingBuffer(
        np.empty(shape=num_samples, dtype=float),
        sampling_period=self._sampling_period,
        align_to=align_to,
    )
__len__() ¤

Return the size of the MovingWindows underlying buffer.

RETURNS DESCRIPTION
int

The size of the MovingWindow.

Source code in frequenz/sdk/timeseries/_moving_window.py
def __len__(self) -> int:
    """
    Return the size of the `MovingWindow`s underlying buffer.

    Returns:
        The size of the `MovingWindow`.
    """
    return len(self._buffer)
start() ¤

Start the MovingWindow.

This method starts the MovingWindow tasks.

Source code in frequenz/sdk/timeseries/_moving_window.py
def start(self) -> None:
    """Start the MovingWindow.

    This method starts the MovingWindow tasks.
    """
    if self._resampler:
        self._configure_resampler()
    self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))

frequenz.sdk.timeseries.Percentage ¤

Bases: Quantity

A percentage quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Percentage(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={0: "%"},
):
    """A percentage quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_percent(cls, percent: float) -> Self:
        """Initialize a new percentage quantity from a percent value.

        Args:
            percent: The percent value, normally in the 0.0-100.0 range.

        Returns:
            A new percentage quantity.
        """
        percentage = cls.__new__(cls)
        percentage._base_value = percent
        return percentage

    @classmethod
    def from_fraction(cls, fraction: float) -> Self:
        """Initialize a new percentage quantity from a fraction.

        Args:
            fraction: The fraction, normally in the 0.0-1.0 range.

        Returns:
            A new percentage quantity.
        """
        percentage = cls.__new__(cls)
        percentage._base_value = fraction * 100
        return percentage

    def as_percent(self) -> float:
        """Return this quantity as a percentage.

        Returns:
            This quantity as a percentage.
        """
        return self._base_value

    def as_fraction(self) -> float:
        """Return this quantity as a fraction.

        Returns:
            This quantity as a fraction.
        """
        return self._base_value / 100
Functions¤
as_fraction() ¤

Return this quantity as a fraction.

RETURNS DESCRIPTION
float

This quantity as a fraction.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_fraction(self) -> float:
    """Return this quantity as a fraction.

    Returns:
        This quantity as a fraction.
    """
    return self._base_value / 100
as_percent() ¤

Return this quantity as a percentage.

RETURNS DESCRIPTION
float

This quantity as a percentage.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_percent(self) -> float:
    """Return this quantity as a percentage.

    Returns:
        This quantity as a percentage.
    """
    return self._base_value
from_fraction(fraction) classmethod ¤

Initialize a new percentage quantity from a fraction.

PARAMETER DESCRIPTION
fraction

The fraction, normally in the 0.0-1.0 range.

TYPE: float

RETURNS DESCRIPTION
Self

A new percentage quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_fraction(cls, fraction: float) -> Self:
    """Initialize a new percentage quantity from a fraction.

    Args:
        fraction: The fraction, normally in the 0.0-1.0 range.

    Returns:
        A new percentage quantity.
    """
    percentage = cls.__new__(cls)
    percentage._base_value = fraction * 100
    return percentage
from_percent(percent) classmethod ¤

Initialize a new percentage quantity from a percent value.

PARAMETER DESCRIPTION
percent

The percent value, normally in the 0.0-100.0 range.

TYPE: float

RETURNS DESCRIPTION
Self

A new percentage quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_percent(cls, percent: float) -> Self:
    """Initialize a new percentage quantity from a percent value.

    Args:
        percent: The percent value, normally in the 0.0-100.0 range.

    Returns:
        A new percentage quantity.
    """
    percentage = cls.__new__(cls)
    percentage._base_value = percent
    return percentage

frequenz.sdk.timeseries.PeriodicFeatureExtractor ¤

A feature extractor for historical timeseries data.

This class is creating a profile from periodically occurring windows in a buffer of historical data.

The profile is created out of all windows that are fully contained in the underlying buffer with the same start and end time modulo a fixed period.

Consider for example a timeseries $T$ of historical data and sub-series $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$ called period, where period of two sub-windows is defined as the distance of two points at the same position within the sub-windows.

This class calculates a statistical profile $S$ over all $S_i$, i.e. the value of $S$ at position $i$ is calculated by performing a certain calculation, e.g. an average, over all values of $S_i$ at position $i$.

Note

The oldest window or the window that is currently overwritten in the MovingWindow is not considered in the profile.

Note

When constructing a PeriodicFeatureExtractor object the MovingWindow size has to be a integer multiple of the period.

Example
from frequenz.sdk import microgrid
from datetime import datetime, timedelta, timezone

async with MovingWindow(
    size=timedelta(days=35),
    resampled_data_recv=microgrid.logical_meter().grid_power.new_receiver(),
    input_sampling_period=timedelta(seconds=1),
) as moving_window:
    feature_extractor = PeriodicFeatureExtractor(
        moving_window=moving_window,
        period=timedelta(days=7),
    )

    now = datetime.now(timezone.utc)

    # create a daily weighted average for the next 24h
    avg_24h = feature_extractor.avg(
        now,
        now + timedelta(hours=24),
        weights=[0.1, 0.2, 0.3, 0.4]
    )

    # create a daily average for Thursday March 23 2023
    th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
class PeriodicFeatureExtractor:
    """
    A feature extractor for historical timeseries data.

    This class is creating a profile from periodically occurring windows in a
    buffer of historical data.

    The profile is created out of all windows that are fully contained in the
    underlying buffer with the same start and end time modulo a fixed period.

    Consider for example a timeseries $T$ of historical data and sub-series
    $S_i$ of $T$ all having the same size $l$ and the same fixed distance $p$
    called period, where period of two sub-windows is defined as the distance
    of two points at the same position within the sub-windows.

    This class calculates a statistical profile $S$ over all $S_i$, i.e. the
    value of $S$ at position $i$ is calculated by performing a certain
    calculation, e.g. an average, over all values of $S_i$ at position $i$.

    Note:
        The oldest window or the window that is currently overwritten in the
        `MovingWindow` is not considered in the profile.

    Note:
        When constructing a `PeriodicFeatureExtractor` object the
        `MovingWindow` size has to be a integer multiple of the period.

    Example:
        ```python
        from frequenz.sdk import microgrid
        from datetime import datetime, timedelta, timezone

        async with MovingWindow(
            size=timedelta(days=35),
            resampled_data_recv=microgrid.logical_meter().grid_power.new_receiver(),
            input_sampling_period=timedelta(seconds=1),
        ) as moving_window:
            feature_extractor = PeriodicFeatureExtractor(
                moving_window=moving_window,
                period=timedelta(days=7),
            )

            now = datetime.now(timezone.utc)

            # create a daily weighted average for the next 24h
            avg_24h = feature_extractor.avg(
                now,
                now + timedelta(hours=24),
                weights=[0.1, 0.2, 0.3, 0.4]
            )

            # create a daily average for Thursday March 23 2023
            th_avg_24h = feature_extractor.avg(datetime(2023, 3, 23), datetime(2023, 3, 24))
        ```
    """

    def __init__(
        self,
        moving_window: MovingWindow,
        period: timedelta,
    ) -> None:
        """
        Initialize a PeriodicFeatureExtractor object.

        Args:
            moving_window: The MovingWindow that is used for the average calculation.
            period: The distance between two succeeding intervals.

        Raises:
            ValueError: If the MovingWindow size is not a integer multiple of the period.
        """
        self._moving_window = moving_window

        self._sampling_period = self._moving_window.sampling_period
        """The sampling_period as float to use it for indexing of samples."""

        self._period = int(period / self._sampling_period)
        """Distance between two succeeding intervals in samples."""

        _logger.debug("Initializing PeriodicFeatureExtractor!")
        _logger.debug("MovingWindow size: %i", len(self._moving_window))
        _logger.debug(
            "Period between two succeeding intervals (in samples): %i",
            self._period,
        )

        if not len(self._moving_window) % self._period == 0:
            raise ValueError(
                "The MovingWindow size is not a integer multiple of the period."
            )

        if not is_close_to_zero(self._period - period / self._sampling_period):
            raise ValueError(
                "The period is not a multiple of the sampling period. "
                "This might result in unexpected behaviour."
            )

    @property
    def _buffer(self) -> OrderedRingBuffer[NDArray[np.float64]]:
        return self._moving_window._buffer  # pylint: disable=protected-access

    def _timestamp_to_rel_index(self, timestamp: datetime) -> int:
        """
        Get the index of a timestamp relative to the oldest sample in the MovingWindow.

        In other word consider an integer axis where the zero is defined as the
        oldest element in the MovingWindow. This function returns the index of
        the given timestamp an this axis.

        This method can return negative values.

        Args:
            timestamp: A timestamp that we want to shift into the window.

        Returns:
            The index of the timestamp shifted into the MovingWindow.
        """
        # align timestamp to the sampling period
        timestamp = self._buffer.normalize_timestamp(timestamp)

        # distance between the input ts and the ts of oldest known samples (in samples)
        dist_to_oldest = int(
            (timestamp - self._buffer.time_bound_oldest) / self._sampling_period
        )

        _logger.debug("Shifting ts: %s", timestamp)
        _logger.debug("Oldest timestamp in buffer: %s", self._buffer.time_bound_oldest)
        _logger.debug("Distance to the oldest sample: %i", dist_to_oldest)

        return dist_to_oldest

    def _reshape_np_array(
        self, array: NDArray[np.float_], window_size: int
    ) -> NDArray[np.float_]:
        """
        Reshape a numpy array to a 2D array where each row represents a window.

        There are three cases to consider

        1. The array size is a multiple of window_size + period,
           i.e. num_windows is integer and we can simply reshape.
        2. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m < window_size.
        3. The array size is NOT a multiple of window_size + period and
           it has length n * period + m, where m >= window_size.

        Note that in the current implementation of this class we have the restriction
        that period is a multiple integer of the size of the MovingWindow and hence
        only case 1 can occur.

        Args:
            array: The numpy array to reshape.
            window_size: The size of the window in samples.

        Returns:
            The reshaped 2D array.

        Raises:
            ValueError: If the array is smaller or equal to the given period.
        """
        # Not using the num_windows function here because we want to
        # differentiate between the three cases.
        if len(array) < self._period:
            raise ValueError(
                f"The array (length:{len(array)}) is too small to be reshaped."
            )

        num_windows = len(array) // self._period

        # Case 1:
        if len(array) - num_windows * self._period == 0:
            resized_array = array
        # Case 2
        elif len(array) - num_windows * self._period < window_size:
            resized_array = array[: num_windows * self._period]
        # Case 3
        else:
            num_windows += 1
            resized_array = np.resize(array, num_windows * self._period)

        return resized_array.reshape(num_windows, self._period)

    def _get_relative_positions(
        self, start: datetime, window_size: int
    ) -> RelativePositions:
        """
        Return relative positions of the MovingWindow.

        This method calculates the shifted relative positions of the start
        timestamp, the end timestamps as well as the next position that is
        overwritten in the ringbuffer.
        Shifted in that context means that the positions are moved as close
        assume possible to the oldest sample in the MovingWindow.

        Args:
            start: The start timestamp of the window.
            window_size: The size of the window in samples.

        Returns:
            The relative positions of the start, end and next samples.

        Raises:
            ValueError: If the start timestamp is after the end timestamp.
        """
        # The number of usable windows can change, when the current position of
        # the ringbuffer is inside one of the windows inside the MovingWindow.
        # Since this is possible, we assume that one window is always not used
        # for the average calculation.
        #
        # We are ignoring either the window that is currently overwritten if
        # the current position is inside that window or the window that would
        # be overwritten next.
        #
        # Move the window to its first appereance in the MovingWindow relative
        # to the oldest sample stored in the MovingWindow.
        #
        # In other words the oldest stored sample is considered to have index 0.
        #
        # Note that the returned value is a index not a timestamp
        rel_start_sample = self._timestamp_to_rel_index(start) % self._period
        rel_end_sample = rel_start_sample + window_size

        # check if the newest time bound, i.e. the sample that is currently written,
        # is inside the interval
        rb_current_position = self._buffer.time_bound_newest
        rel_next_position = (
            self._timestamp_to_rel_index(rb_current_position) + 1
        ) % self._period
        # fix the rel_next_position if modulo period the next position
        # is smaller then the start sample position
        if rel_next_position < rel_start_sample:
            rel_next_position += self._period

        rel_next_position += self._period * (window_size // self._period)

        _logger.debug("current position of the ringbuffer: %s", rb_current_position)
        _logger.debug("relative start_sample: %s", rel_start_sample)
        _logger.debug("relative end_sample: %s", rel_end_sample)
        _logger.debug("relative next_position: %s", rel_next_position)

        return RelativePositions(rel_start_sample, rel_end_sample, rel_next_position)

    def _get_buffer_bounds(
        self, start: datetime, end: datetime
    ) -> Tuple[int, int, int]:
        """
        Get the bounds of the ringbuffer used for further operations.

        This method uses the given start and end timestamps to calculate the
        part of the ringbuffer that can be used for further operations, like
        average or min/max.

        Here we cut out the oldest window or the window that is currently
        overwritten in the MovingWindow such that it is not considered in any
        further operation.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            The bounds of the to be used buffer and the window size.

        Raises:
            ValueError: If the start timestamp is after the end timestamp.
        """
        window_size = self._timestamp_to_rel_index(end) - self._timestamp_to_rel_index(
            start
        )
        if window_size <= 0:
            raise ValueError("Start timestamp must be before end timestamp")
        if window_size > self._period:
            raise ValueError(
                "The window size must be smaller or equal than the period."
            )

        rel_pos = self._get_relative_positions(start, window_size)

        if window_size > len(self._moving_window):
            raise ValueError(
                "The window size must be smaller than the size of the `MovingWindow`"
            )

        # shifted distance between the next incoming sample and the start of the window
        dist_to_start = rel_pos.next - rel_pos.start

        # get the start and end position inside the ringbuffer
        end_pos = (
            self._timestamp_to_rel_index(self._buffer.time_bound_newest) + 1
        ) - dist_to_start

        # Note that these check is working since we are using the positions
        # relative to the oldest sample stored in the MovingWindow.
        if rel_pos.start <= rel_pos.next < rel_pos.end:
            # end position is start_position of the window that is currently written
            # that's how end_pos is currently set
            _logger.debug("Next sample will be inside the window time interval!")
        else:
            _logger.debug("Next sample will be outside the window time interval!")
            # end position is start_position of the window that
            # is overwritten next, hence we adding period.
            end_pos += self._period

        # add the offset to the oldest sample in the ringbuffer and wrap around
        # to get the start and end positions in the ringbuffer
        rb_offset = self._buffer.datetime_to_index(self._buffer.time_bound_oldest)
        start_pos = self._buffer.wrap(end_pos + self._period + rb_offset)
        end_pos = self._buffer.wrap(end_pos + rb_offset)

        _logger.debug("start_pos in ringbuffer: %s", start_pos)
        _logger.debug("end_pos in ringbuffer: %s", end_pos)

        return (start_pos, end_pos, window_size)

    def _get_reshaped_np_array(
        self, start: datetime, end: datetime
    ) -> Tuple[NDArray[np.float_], int]:
        """
        Create a reshaped numpy array from the MovingWindow.

        The reshaped array is a two dimemsional array, where one dimension is
        the window_size and the other the number of windows returned by the
        `_get_buffer_bounds` method.

        Args:
            start: The start timestamp of the window.
            end: The end timestamp of the window.

        Returns:
            A tuple containing the reshaped numpy array and the window size.
        """
        (start_pos, end_pos, window_size) = self._get_buffer_bounds(start, end)

        if start_pos >= end_pos:
            window_start = self._buffer[start_pos : len(self._moving_window)]
            window_end = self._buffer[0:end_pos]
            # make the linter happy
            assert isinstance(window_start, np.ndarray)
            assert isinstance(window_end, np.ndarray)
            window_array = np.concatenate((window_start, window_end))
        else:
            window_array = self._buffer[start_pos:end_pos]

        return (self._reshape_np_array(window_array, window_size), window_size)

    def avg(
        self, start: datetime, end: datetime, weights: List[float] | None = None
    ) -> NDArray[np.float_]:
        """
        Create the average window out of the window defined by `start` and `end`.

        This method calculates the average of a window by averaging over all
        windows fully inside the MovingWindow having the period
        `self.period`.

        Args:
            start: The start of the window to average over.
            end: The end of the window to average over.
            weights: The weights to use for the average calculation (oldest first).

        Returns:
            The averaged timeseries window.
        """
        (reshaped, window_size) = self._get_reshaped_np_array(start, end)
        return np.average(  # type: ignore[no-any-return]
            reshaped[:, :window_size], axis=0, weights=weights
        )
Functions¤
__init__(moving_window, period) ¤

Initialize a PeriodicFeatureExtractor object.

PARAMETER DESCRIPTION
moving_window

The MovingWindow that is used for the average calculation.

TYPE: MovingWindow

period

The distance between two succeeding intervals.

TYPE: timedelta

RAISES DESCRIPTION
ValueError

If the MovingWindow size is not a integer multiple of the period.

Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
def __init__(
    self,
    moving_window: MovingWindow,
    period: timedelta,
) -> None:
    """
    Initialize a PeriodicFeatureExtractor object.

    Args:
        moving_window: The MovingWindow that is used for the average calculation.
        period: The distance between two succeeding intervals.

    Raises:
        ValueError: If the MovingWindow size is not a integer multiple of the period.
    """
    self._moving_window = moving_window

    self._sampling_period = self._moving_window.sampling_period
    """The sampling_period as float to use it for indexing of samples."""

    self._period = int(period / self._sampling_period)
    """Distance between two succeeding intervals in samples."""

    _logger.debug("Initializing PeriodicFeatureExtractor!")
    _logger.debug("MovingWindow size: %i", len(self._moving_window))
    _logger.debug(
        "Period between two succeeding intervals (in samples): %i",
        self._period,
    )

    if not len(self._moving_window) % self._period == 0:
        raise ValueError(
            "The MovingWindow size is not a integer multiple of the period."
        )

    if not is_close_to_zero(self._period - period / self._sampling_period):
        raise ValueError(
            "The period is not a multiple of the sampling period. "
            "This might result in unexpected behaviour."
        )
avg(start, end, weights=None) ¤

Create the average window out of the window defined by start and end.

This method calculates the average of a window by averaging over all windows fully inside the MovingWindow having the period self.period.

PARAMETER DESCRIPTION
start

The start of the window to average over.

TYPE: datetime

end

The end of the window to average over.

TYPE: datetime

weights

The weights to use for the average calculation (oldest first).

TYPE: List[float] | None DEFAULT: None

RETURNS DESCRIPTION
NDArray[float_]

The averaged timeseries window.

Source code in frequenz/sdk/timeseries/_periodic_feature_extractor.py
def avg(
    self, start: datetime, end: datetime, weights: List[float] | None = None
) -> NDArray[np.float_]:
    """
    Create the average window out of the window defined by `start` and `end`.

    This method calculates the average of a window by averaging over all
    windows fully inside the MovingWindow having the period
    `self.period`.

    Args:
        start: The start of the window to average over.
        end: The end of the window to average over.
        weights: The weights to use for the average calculation (oldest first).

    Returns:
        The averaged timeseries window.
    """
    (reshaped, window_size) = self._get_reshaped_np_array(start, end)
    return np.average(  # type: ignore[no-any-return]
        reshaped[:, :window_size], axis=0, weights=weights
    )

frequenz.sdk.timeseries.Power ¤

Bases: Quantity

A power quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Power(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        -3: "mW",
        0: "W",
        3: "kW",
        6: "MW",
    },
):
    """A power quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_watts(cls, watts: float) -> Self:
        """Initialize a new power quantity.

        Args:
            watts: The power in watts.

        Returns:
            A new power quantity.
        """
        power = cls.__new__(cls)
        power._base_value = watts
        return power

    @classmethod
    def from_milliwatts(cls, milliwatts: float) -> Self:
        """Initialize a new power quantity.

        Args:
            milliwatts: The power in milliwatts.

        Returns:
            A new power quantity.
        """
        power = cls.__new__(cls)
        power._base_value = milliwatts * 10**-3
        return power

    @classmethod
    def from_kilowatts(cls, kilowatts: float) -> Self:
        """Initialize a new power quantity.

        Args:
            kilowatts: The power in kilowatts.

        Returns:
            A new power quantity.
        """
        power = cls.__new__(cls)
        power._base_value = kilowatts * 10**3
        return power

    @classmethod
    def from_megawatts(cls, megawatts: float) -> Self:
        """Initialize a new power quantity.

        Args:
            megawatts: The power in megawatts.

        Returns:
            A new power quantity.
        """
        power = cls.__new__(cls)
        power._base_value = megawatts * 10**6
        return power

    def as_watts(self) -> float:
        """Return the power in watts.

        Returns:
            The power in watts.
        """
        return self._base_value

    def as_kilowatts(self) -> float:
        """Return the power in kilowatts.

        Returns:
            The power in kilowatts.
        """
        return self._base_value / 1e3

    def as_megawatts(self) -> float:
        """Return the power in megawatts.

        Returns:
            The power in megawatts.
        """
        return self._base_value / 1e6

    @overload  # type: ignore
    def __mul__(self, other: Percentage) -> Self:
        """Return a power from multiplying this power by the given percentage.

        Args:
            other: The percentage to multiply by.
        """

    @overload
    def __mul__(self, other: timedelta) -> Energy:
        """Return an energy from multiplying this power by the given duration.

        Args:
            other: The duration to multiply by.
        """

    def __mul__(self, other: Percentage | timedelta) -> Self | Energy:
        """Return a power or energy from multiplying this power by the given value.

        Args:
            other: The percentage or duration to multiply by.

        Returns:
            A power or energy.

        Raises:
            TypeError: If the given value is not a percentage or duration.
        """
        if isinstance(other, Percentage):
            return super().__mul__(other)
        if isinstance(other, timedelta):
            return Energy.from_watt_hours(
                self._base_value * other.total_seconds() / 3600.0
            )

        return NotImplemented

    @overload
    def __truediv__(self, other: Current) -> Voltage:
        """Return a voltage from dividing this power by the given current.

        Args:
            other: The current to divide by.
        """

    @overload
    def __truediv__(self, other: Voltage) -> Current:
        """Return a current from dividing this power by the given voltage.

        Args:
            other: The voltage to divide by.
        """

    def __truediv__(self, other: Current | Voltage) -> Voltage | Current:
        """Return a current or voltage from dividing this power by the given value.

        Args:
            other: The current or voltage to divide by.

        Returns:
            A current or voltage from dividing this power by the given value.

        Raises:
            TypeError: If the given value is not a current or voltage.
        """
        if isinstance(other, Current):
            return Voltage.from_volts(self._base_value / other._base_value)
        if isinstance(other, Voltage):
            return Current.from_amperes(self._base_value / other._base_value)
        raise TypeError(
            f"unsupported operand type(s) for /: '{type(self)}' and '{type(other)}'"
        )
Functions¤
__mul__(other) ¤

Return a power or energy from multiplying this power by the given value.

PARAMETER DESCRIPTION
other

The percentage or duration to multiply by.

TYPE: Percentage | timedelta

RETURNS DESCRIPTION
Self | Energy

A power or energy.

RAISES DESCRIPTION
TypeError

If the given value is not a percentage or duration.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, other: Percentage | timedelta) -> Self | Energy:
    """Return a power or energy from multiplying this power by the given value.

    Args:
        other: The percentage or duration to multiply by.

    Returns:
        A power or energy.

    Raises:
        TypeError: If the given value is not a percentage or duration.
    """
    if isinstance(other, Percentage):
        return super().__mul__(other)
    if isinstance(other, timedelta):
        return Energy.from_watt_hours(
            self._base_value * other.total_seconds() / 3600.0
        )

    return NotImplemented
__truediv__(other) ¤

Return a current or voltage from dividing this power by the given value.

PARAMETER DESCRIPTION
other

The current or voltage to divide by.

TYPE: Current | Voltage

RETURNS DESCRIPTION
Voltage | Current

A current or voltage from dividing this power by the given value.

RAISES DESCRIPTION
TypeError

If the given value is not a current or voltage.

Source code in frequenz/sdk/timeseries/_quantities.py
def __truediv__(self, other: Current | Voltage) -> Voltage | Current:
    """Return a current or voltage from dividing this power by the given value.

    Args:
        other: The current or voltage to divide by.

    Returns:
        A current or voltage from dividing this power by the given value.

    Raises:
        TypeError: If the given value is not a current or voltage.
    """
    if isinstance(other, Current):
        return Voltage.from_volts(self._base_value / other._base_value)
    if isinstance(other, Voltage):
        return Current.from_amperes(self._base_value / other._base_value)
    raise TypeError(
        f"unsupported operand type(s) for /: '{type(self)}' and '{type(other)}'"
    )
as_kilowatts() ¤

Return the power in kilowatts.

RETURNS DESCRIPTION
float

The power in kilowatts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilowatts(self) -> float:
    """Return the power in kilowatts.

    Returns:
        The power in kilowatts.
    """
    return self._base_value / 1e3
as_megawatts() ¤

Return the power in megawatts.

RETURNS DESCRIPTION
float

The power in megawatts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_megawatts(self) -> float:
    """Return the power in megawatts.

    Returns:
        The power in megawatts.
    """
    return self._base_value / 1e6
as_watts() ¤

Return the power in watts.

RETURNS DESCRIPTION
float

The power in watts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_watts(self) -> float:
    """Return the power in watts.

    Returns:
        The power in watts.
    """
    return self._base_value
from_kilowatts(kilowatts) classmethod ¤

Initialize a new power quantity.

PARAMETER DESCRIPTION
kilowatts

The power in kilowatts.

TYPE: float

RETURNS DESCRIPTION
Self

A new power quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilowatts(cls, kilowatts: float) -> Self:
    """Initialize a new power quantity.

    Args:
        kilowatts: The power in kilowatts.

    Returns:
        A new power quantity.
    """
    power = cls.__new__(cls)
    power._base_value = kilowatts * 10**3
    return power
from_megawatts(megawatts) classmethod ¤

Initialize a new power quantity.

PARAMETER DESCRIPTION
megawatts

The power in megawatts.

TYPE: float

RETURNS DESCRIPTION
Self

A new power quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_megawatts(cls, megawatts: float) -> Self:
    """Initialize a new power quantity.

    Args:
        megawatts: The power in megawatts.

    Returns:
        A new power quantity.
    """
    power = cls.__new__(cls)
    power._base_value = megawatts * 10**6
    return power
from_milliwatts(milliwatts) classmethod ¤

Initialize a new power quantity.

PARAMETER DESCRIPTION
milliwatts

The power in milliwatts.

TYPE: float

RETURNS DESCRIPTION
Self

A new power quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_milliwatts(cls, milliwatts: float) -> Self:
    """Initialize a new power quantity.

    Args:
        milliwatts: The power in milliwatts.

    Returns:
        A new power quantity.
    """
    power = cls.__new__(cls)
    power._base_value = milliwatts * 10**-3
    return power
from_watts(watts) classmethod ¤

Initialize a new power quantity.

PARAMETER DESCRIPTION
watts

The power in watts.

TYPE: float

RETURNS DESCRIPTION
Self

A new power quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_watts(cls, watts: float) -> Self:
    """Initialize a new power quantity.

    Args:
        watts: The power in watts.

    Returns:
        A new power quantity.
    """
    power = cls.__new__(cls)
    power._base_value = watts
    return power

frequenz.sdk.timeseries.Quantity ¤

A quantity with a unit.

Quantities try to behave like float and are also immutable.

Source code in frequenz/sdk/timeseries/_quantities.py
class Quantity:
    """A quantity with a unit.

    Quantities try to behave like float and are also immutable.
    """

    _base_value: float
    """The value of this quantity in the base unit."""

    _exponent_unit_map: dict[int, str] | None = None
    """A mapping from the exponent of the base unit to the unit symbol.

    If None, this quantity has no unit.  None is possible only when using the base
    class.  Sub-classes must define this.
    """

    def __init__(self, value: float, exponent: int = 0) -> None:
        """Initialize a new quantity.

        Args:
            value: The value of this quantity in a given exponent of the base unit.
            exponent: The exponent of the base unit the given value is in.
        """
        self._base_value = value * 10**exponent

    def __init_subclass__(cls, exponent_unit_map: dict[int, str]) -> None:
        """Initialize a new subclass of Quantity.

        Args:
            exponent_unit_map: A mapping from the exponent of the base unit to the unit
                symbol.

        Raises:
            TypeError: If the given exponent_unit_map is not a dict.
            ValueError: If the given exponent_unit_map does not contain a base unit
                (exponent 0).
        """
        if not 0 in exponent_unit_map:
            raise ValueError("Expected a base unit for the type (for exponent 0)")
        cls._exponent_unit_map = exponent_unit_map
        super().__init_subclass__()

    _zero_cache: dict[type, Quantity] = {}
    """Cache for zero singletons.

    This is a workaround for mypy getting confused when using @functools.cache and
    @classmethod combined with returning Self. It believes the resulting type of this
    method is Self and complains that members of the actual class don't exist in Self,
    so we need to implement the cache ourselves.
    """

    @classmethod
    def zero(cls) -> Self:
        """Return a quantity with value 0.

        Returns:
            A quantity with value 0.
        """
        _zero = cls._zero_cache.get(cls, None)
        if _zero is None:
            _zero = cls.__new__(cls)
            _zero._base_value = 0
            cls._zero_cache[cls] = _zero
        assert isinstance(_zero, cls)
        return _zero

    @property
    def base_value(self) -> float:
        """Return the value of this quantity in the base unit.

        Returns:
            The value of this quantity in the base unit.
        """
        return self._base_value

    @property
    def base_unit(self) -> str | None:
        """Return the base unit of this quantity.

        None if this quantity has no unit.

        Returns:
            The base unit of this quantity.
        """
        if not self._exponent_unit_map:
            return None
        return self._exponent_unit_map[0]

    def isnan(self) -> bool:
        """Return whether this quantity is NaN.

        Returns:
            Whether this quantity is NaN.
        """
        return math.isnan(self._base_value)

    def isinf(self) -> bool:
        """Return whether this quantity is infinite.

        Returns:
            Whether this quantity is infinite.
        """
        return math.isinf(self._base_value)

    def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool:
        """Return whether this quantity is close to another.

        Args:
            other: The quantity to compare to.
            rel_tol: The relative tolerance.
            abs_tol: The absolute tolerance.

        Returns:
            Whether this quantity is close to another.
        """
        return math.isclose(
            self._base_value,
            other._base_value,  # pylint: disable=protected-access
            rel_tol=rel_tol,
            abs_tol=abs_tol,
        )

    def __repr__(self) -> str:
        """Return a representation of this quantity.

        Returns:
            A representation of this quantity.
        """
        return f"{type(self).__name__}(value={self._base_value}, exponent=0)"

    def __str__(self) -> str:
        """Return a string representation of this quantity.

        Returns:
            A string representation of this quantity.
        """
        return self.__format__("")

    def __format__(self, __format_spec: str) -> str:
        """Return a formatted string representation of this quantity.

        If specified, must be of this form: `[0].{precision}`.  If a 0 is not given, the
        trailing zeros will be omitted.  If no precision is given, the default is 3.

        The returned string will use the unit that will result in the maxium precision,
        based on the magnitude of the value.

        Example:
            ```python
            from frequenz.sdk.timeseries import Current
            c = Current.from_amperes(0.2345)
            assert f"{c:.2}" == "234.5 mA"
            c = Current.from_amperes(1.2345)
            assert f"{c:.2}" == "1.23 A"
            c = Current.from_milliamperes(1.2345)
            assert f"{c:.6}" == "1.2345 mA"
            ```

        Args:
            __format_spec: The format specifier.

        Returns:
            A string representation of this quantity.

        Raises:
            ValueError: If the given format specifier is invalid.
        """
        keep_trailing_zeros = False
        if __format_spec != "":
            fspec_parts = __format_spec.split(".")
            if (
                len(fspec_parts) != 2
                or fspec_parts[0] not in ("", "0")
                or not fspec_parts[1].isdigit()
            ):
                raise ValueError(
                    "Invalid format specifier. Must be empty or `[0].{precision}`"
                )
            if fspec_parts[0] == "0":
                keep_trailing_zeros = True
            precision = int(fspec_parts[1])
        else:
            precision = 3
        if not self._exponent_unit_map:
            return f"{self._base_value:.{precision}f}"

        if math.isinf(self._base_value) or math.isnan(self._base_value):
            return f"{self._base_value} {self._exponent_unit_map[0]}"

        abs_value = abs(self._base_value)
        exponent = math.floor(math.log10(abs_value)) if abs_value else 0
        unit_place = exponent - exponent % 3
        if unit_place < min(self._exponent_unit_map):
            unit = self._exponent_unit_map[min(self._exponent_unit_map.keys())]
            unit_place = min(self._exponent_unit_map)
        elif unit_place > max(self._exponent_unit_map):
            unit = self._exponent_unit_map[max(self._exponent_unit_map.keys())]
            unit_place = max(self._exponent_unit_map)
        else:
            unit = self._exponent_unit_map[unit_place]
        value_str = f"{self._base_value / 10 ** unit_place:.{precision}f}"
        stripped = value_str.rstrip("0").rstrip(".")
        if not keep_trailing_zeros:
            value_str = stripped
        unit_str = unit if stripped != "0" else self._exponent_unit_map[0]
        return f"{value_str} {unit_str}"

    def __add__(self, other: Self) -> Self:
        """Return the sum of this quantity and another.

        Args:
            other: The other quantity.

        Returns:
            The sum of this quantity and another.
        """
        if not type(other) is type(self):
            return NotImplemented
        summe = type(self).__new__(type(self))
        summe._base_value = self._base_value + other._base_value
        return summe

    def __sub__(self, other: Self) -> Self:
        """Return the difference of this quantity and another.

        Args:
            other: The other quantity.

        Returns:
            The difference of this quantity and another.
        """
        if not type(other) is type(self):
            return NotImplemented
        difference = type(self).__new__(type(self))
        difference._base_value = self._base_value - other._base_value
        return difference

    def __mul__(self, percent: Percentage) -> Self:
        """Return the product of this quantity and a percentage.

        Args:
            percent: The percentage.

        Returns:
            The product of this quantity and a percentage.
        """
        if not isinstance(percent, Percentage):
            return NotImplemented

        product = type(self).__new__(type(self))
        product._base_value = self._base_value * percent.as_fraction()
        return product

    def __gt__(self, other: Self) -> bool:
        """Return whether this quantity is greater than another.

        Args:
            other: The other quantity.

        Returns:
            Whether this quantity is greater than another.
        """
        if not type(other) is type(self):
            return NotImplemented
        return self._base_value > other._base_value

    def __ge__(self, other: Self) -> bool:
        """Return whether this quantity is greater than or equal to another.

        Args:
            other: The other quantity.

        Returns:
            Whether this quantity is greater than or equal to another.
        """
        if not type(other) is type(self):
            return NotImplemented
        return self._base_value >= other._base_value

    def __lt__(self, other: Self) -> bool:
        """Return whether this quantity is less than another.

        Args:
            other: The other quantity.

        Returns:
            Whether this quantity is less than another.
        """
        if not type(other) is type(self):
            return NotImplemented
        return self._base_value < other._base_value

    def __le__(self, other: Self) -> bool:
        """Return whether this quantity is less than or equal to another.

        Args:
            other: The other quantity.

        Returns:
            Whether this quantity is less than or equal to another.
        """
        if not type(other) is type(self):
            return NotImplemented
        return self._base_value <= other._base_value

    def __eq__(self, other: object) -> bool:
        """Return whether this quantity is equal to another.

        Args:
            other: The other quantity.

        Returns:
            Whether this quantity is equal to another.
        """
        if not type(other) is type(self):
            return NotImplemented
        # The above check ensures that both quantities are the exact same type, because
        # `isinstance` returns true for subclasses and superclasses.  But the above check
        # doesn't help mypy identify the type of other,  so the below line is necessary.
        assert isinstance(other, self.__class__)
        return self._base_value == other._base_value

    def __neg__(self) -> Self:
        """Return the negation of this quantity.

        Returns:
            The negation of this quantity.
        """
        negation = type(self).__new__(type(self))
        negation._base_value = -self._base_value
        return negation

    def __abs__(self) -> Self:
        """Return the absolute value of this quantity.

        Returns:
            The absolute value of this quantity.
        """
        absolute = type(self).__new__(type(self))
        absolute._base_value = abs(self._base_value)
        return absolute
Attributes¤
base_unit: str | None property ¤

Return the base unit of this quantity.

None if this quantity has no unit.

RETURNS DESCRIPTION
str | None

The base unit of this quantity.

base_value: float property ¤

Return the value of this quantity in the base unit.

RETURNS DESCRIPTION
float

The value of this quantity in the base unit.

Functions¤
__abs__() ¤

Return the absolute value of this quantity.

RETURNS DESCRIPTION
Self

The absolute value of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __abs__(self) -> Self:
    """Return the absolute value of this quantity.

    Returns:
        The absolute value of this quantity.
    """
    absolute = type(self).__new__(type(self))
    absolute._base_value = abs(self._base_value)
    return absolute
__add__(other) ¤

Return the sum of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The sum of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __add__(self, other: Self) -> Self:
    """Return the sum of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The sum of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    summe = type(self).__new__(type(self))
    summe._base_value = self._base_value + other._base_value
    return summe
__eq__(other) ¤

Return whether this quantity is equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: object

RETURNS DESCRIPTION
bool

Whether this quantity is equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __eq__(self, other: object) -> bool:
    """Return whether this quantity is equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    # The above check ensures that both quantities are the exact same type, because
    # `isinstance` returns true for subclasses and superclasses.  But the above check
    # doesn't help mypy identify the type of other,  so the below line is necessary.
    assert isinstance(other, self.__class__)
    return self._base_value == other._base_value
__format__(__format_spec) ¤

Return a formatted string representation of this quantity.

If specified, must be of this form: [0].{precision}. If a 0 is not given, the trailing zeros will be omitted. If no precision is given, the default is 3.

The returned string will use the unit that will result in the maxium precision, based on the magnitude of the value.

Example
from frequenz.sdk.timeseries import Current
c = Current.from_amperes(0.2345)
assert f"{c:.2}" == "234.5 mA"
c = Current.from_amperes(1.2345)
assert f"{c:.2}" == "1.23 A"
c = Current.from_milliamperes(1.2345)
assert f"{c:.6}" == "1.2345 mA"
PARAMETER DESCRIPTION
__format_spec

The format specifier.

TYPE: str

RETURNS DESCRIPTION
str

A string representation of this quantity.

RAISES DESCRIPTION
ValueError

If the given format specifier is invalid.

Source code in frequenz/sdk/timeseries/_quantities.py
def __format__(self, __format_spec: str) -> str:
    """Return a formatted string representation of this quantity.

    If specified, must be of this form: `[0].{precision}`.  If a 0 is not given, the
    trailing zeros will be omitted.  If no precision is given, the default is 3.

    The returned string will use the unit that will result in the maxium precision,
    based on the magnitude of the value.

    Example:
        ```python
        from frequenz.sdk.timeseries import Current
        c = Current.from_amperes(0.2345)
        assert f"{c:.2}" == "234.5 mA"
        c = Current.from_amperes(1.2345)
        assert f"{c:.2}" == "1.23 A"
        c = Current.from_milliamperes(1.2345)
        assert f"{c:.6}" == "1.2345 mA"
        ```

    Args:
        __format_spec: The format specifier.

    Returns:
        A string representation of this quantity.

    Raises:
        ValueError: If the given format specifier is invalid.
    """
    keep_trailing_zeros = False
    if __format_spec != "":
        fspec_parts = __format_spec.split(".")
        if (
            len(fspec_parts) != 2
            or fspec_parts[0] not in ("", "0")
            or not fspec_parts[1].isdigit()
        ):
            raise ValueError(
                "Invalid format specifier. Must be empty or `[0].{precision}`"
            )
        if fspec_parts[0] == "0":
            keep_trailing_zeros = True
        precision = int(fspec_parts[1])
    else:
        precision = 3
    if not self._exponent_unit_map:
        return f"{self._base_value:.{precision}f}"

    if math.isinf(self._base_value) or math.isnan(self._base_value):
        return f"{self._base_value} {self._exponent_unit_map[0]}"

    abs_value = abs(self._base_value)
    exponent = math.floor(math.log10(abs_value)) if abs_value else 0
    unit_place = exponent - exponent % 3
    if unit_place < min(self._exponent_unit_map):
        unit = self._exponent_unit_map[min(self._exponent_unit_map.keys())]
        unit_place = min(self._exponent_unit_map)
    elif unit_place > max(self._exponent_unit_map):
        unit = self._exponent_unit_map[max(self._exponent_unit_map.keys())]
        unit_place = max(self._exponent_unit_map)
    else:
        unit = self._exponent_unit_map[unit_place]
    value_str = f"{self._base_value / 10 ** unit_place:.{precision}f}"
    stripped = value_str.rstrip("0").rstrip(".")
    if not keep_trailing_zeros:
        value_str = stripped
    unit_str = unit if stripped != "0" else self._exponent_unit_map[0]
    return f"{value_str} {unit_str}"
__ge__(other) ¤

Return whether this quantity is greater than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __ge__(self, other: Self) -> bool:
    """Return whether this quantity is greater than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value >= other._base_value
__gt__(other) ¤

Return whether this quantity is greater than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __gt__(self, other: Self) -> bool:
    """Return whether this quantity is greater than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value > other._base_value
__init__(value, exponent=0) ¤

Initialize a new quantity.

PARAMETER DESCRIPTION
value

The value of this quantity in a given exponent of the base unit.

TYPE: float

exponent

The exponent of the base unit the given value is in.

TYPE: int DEFAULT: 0

Source code in frequenz/sdk/timeseries/_quantities.py
def __init__(self, value: float, exponent: int = 0) -> None:
    """Initialize a new quantity.

    Args:
        value: The value of this quantity in a given exponent of the base unit.
        exponent: The exponent of the base unit the given value is in.
    """
    self._base_value = value * 10**exponent
__init_subclass__(exponent_unit_map) ¤

Initialize a new subclass of Quantity.

PARAMETER DESCRIPTION
exponent_unit_map

A mapping from the exponent of the base unit to the unit symbol.

TYPE: dict[int, str]

RAISES DESCRIPTION
TypeError

If the given exponent_unit_map is not a dict.

ValueError

If the given exponent_unit_map does not contain a base unit (exponent 0).

Source code in frequenz/sdk/timeseries/_quantities.py
def __init_subclass__(cls, exponent_unit_map: dict[int, str]) -> None:
    """Initialize a new subclass of Quantity.

    Args:
        exponent_unit_map: A mapping from the exponent of the base unit to the unit
            symbol.

    Raises:
        TypeError: If the given exponent_unit_map is not a dict.
        ValueError: If the given exponent_unit_map does not contain a base unit
            (exponent 0).
    """
    if not 0 in exponent_unit_map:
        raise ValueError("Expected a base unit for the type (for exponent 0)")
    cls._exponent_unit_map = exponent_unit_map
    super().__init_subclass__()
__le__(other) ¤

Return whether this quantity is less than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __le__(self, other: Self) -> bool:
    """Return whether this quantity is less than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value <= other._base_value
__lt__(other) ¤

Return whether this quantity is less than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __lt__(self, other: Self) -> bool:
    """Return whether this quantity is less than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value < other._base_value
__mul__(percent) ¤

Return the product of this quantity and a percentage.

PARAMETER DESCRIPTION
percent

The percentage.

TYPE: Percentage

RETURNS DESCRIPTION
Self

The product of this quantity and a percentage.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, percent: Percentage) -> Self:
    """Return the product of this quantity and a percentage.

    Args:
        percent: The percentage.

    Returns:
        The product of this quantity and a percentage.
    """
    if not isinstance(percent, Percentage):
        return NotImplemented

    product = type(self).__new__(type(self))
    product._base_value = self._base_value * percent.as_fraction()
    return product
__neg__() ¤

Return the negation of this quantity.

RETURNS DESCRIPTION
Self

The negation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __neg__(self) -> Self:
    """Return the negation of this quantity.

    Returns:
        The negation of this quantity.
    """
    negation = type(self).__new__(type(self))
    negation._base_value = -self._base_value
    return negation
__repr__() ¤

Return a representation of this quantity.

RETURNS DESCRIPTION
str

A representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __repr__(self) -> str:
    """Return a representation of this quantity.

    Returns:
        A representation of this quantity.
    """
    return f"{type(self).__name__}(value={self._base_value}, exponent=0)"
__str__() ¤

Return a string representation of this quantity.

RETURNS DESCRIPTION
str

A string representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __str__(self) -> str:
    """Return a string representation of this quantity.

    Returns:
        A string representation of this quantity.
    """
    return self.__format__("")
__sub__(other) ¤

Return the difference of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The difference of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __sub__(self, other: Self) -> Self:
    """Return the difference of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The difference of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    difference = type(self).__new__(type(self))
    difference._base_value = self._base_value - other._base_value
    return difference
isclose(other, rel_tol=1e-09, abs_tol=0.0) ¤

Return whether this quantity is close to another.

PARAMETER DESCRIPTION
other

The quantity to compare to.

TYPE: Self

rel_tol

The relative tolerance.

TYPE: float DEFAULT: 1e-09

abs_tol

The absolute tolerance.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
bool

Whether this quantity is close to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool:
    """Return whether this quantity is close to another.

    Args:
        other: The quantity to compare to.
        rel_tol: The relative tolerance.
        abs_tol: The absolute tolerance.

    Returns:
        Whether this quantity is close to another.
    """
    return math.isclose(
        self._base_value,
        other._base_value,  # pylint: disable=protected-access
        rel_tol=rel_tol,
        abs_tol=abs_tol,
    )
isinf() ¤

Return whether this quantity is infinite.

RETURNS DESCRIPTION
bool

Whether this quantity is infinite.

Source code in frequenz/sdk/timeseries/_quantities.py
def isinf(self) -> bool:
    """Return whether this quantity is infinite.

    Returns:
        Whether this quantity is infinite.
    """
    return math.isinf(self._base_value)
isnan() ¤

Return whether this quantity is NaN.

RETURNS DESCRIPTION
bool

Whether this quantity is NaN.

Source code in frequenz/sdk/timeseries/_quantities.py
def isnan(self) -> bool:
    """Return whether this quantity is NaN.

    Returns:
        Whether this quantity is NaN.
    """
    return math.isnan(self._base_value)
zero() classmethod ¤

Return a quantity with value 0.

RETURNS DESCRIPTION
Self

A quantity with value 0.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def zero(cls) -> Self:
    """Return a quantity with value 0.

    Returns:
        A quantity with value 0.
    """
    _zero = cls._zero_cache.get(cls, None)
    if _zero is None:
        _zero = cls.__new__(cls)
        _zero._base_value = 0
        cls._zero_cache[cls] = _zero
    assert isinstance(_zero, cls)
    return _zero

frequenz.sdk.timeseries.ResamplerConfig dataclass ¤

Resampler configuration.

Source code in frequenz/sdk/timeseries/_resampling.py
@dataclass(frozen=True)
class ResamplerConfig:
    """Resampler configuration."""

    resampling_period: timedelta
    """The resampling period.

    This is the time it passes between resampled data should be calculated.

    It must be a positive time span.
    """

    max_data_age_in_periods: float = 3.0
    """The maximum age a sample can have to be considered *relevant* for resampling.

    Expressed in number of periods, where period is the `resampling_period`
    if we are downsampling (resampling period bigger than the input period) or
    the input sampling period if we are upsampling (input period bigger than
    the resampling period).

    It must be bigger than 1.0.

    Example:
        If `resampling_period` is 3 seconds, the input sampling period is
        1 and `max_data_age_in_periods` is 2, then data older than 3*2
        = 6 seconds will be discarded when creating a new sample and never
        passed to the resampling function.

        If `resampling_period` is 3 seconds, the input sampling period is
        5 and `max_data_age_in_periods` is 2, then data older than 5*2
        = 10 seconds will be discarded when creating a new sample and never
        passed to the resampling function.
    """

    resampling_function: ResamplingFunction = average
    """The resampling function.

    This function will be applied to the sequence of relevant samples at
    a given time. The result of the function is what is sent as the resampled
    value.
    """

    initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
    """The initial length of the resampling buffer.

    The buffer could grow or shrink depending on the source properties,
    like sampling rate, to make sure all the requested past sampling periods
    can be stored.

    It must be at least 1 and at most `max_buffer_len`.
    """

    warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
    """The minimum length of the resampling buffer that will emit a warning.

    If a buffer grows bigger than this value, it will emit a warning in the
    logs, so buffers don't grow too big inadvertently.

    It must be at least 1 and at most `max_buffer_len`.
    """

    max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
    """The maximum length of the resampling buffer.

    Buffers won't be allowed to grow beyond this point even if it would be
    needed to keep all the requested past sampling periods. An error will be
    emitted in the logs if the buffer length needs to be truncated to this
    value.

    It must be at bigger than `warn_buffer_len`.
    """

    align_to: datetime | None = UNIX_EPOCH
    """The time to align the resampling period to.

    The resampling period will be aligned to this time, so the first resampled
    sample will be at the first multiple of `resampling_period` starting from
    `align_to`. It must be an aware datetime and can be in the future too.

    If `align_to` is `None`, the resampling period will be aligned to the
    time the resampler is created.
    """

    def __post_init__(self) -> None:
        """Check that config values are valid.

        Raises:
            ValueError: If any value is out of range.
        """
        if self.resampling_period.total_seconds() < 0.0:
            raise ValueError(
                f"resampling_period ({self.resampling_period}) must be positive"
            )
        if self.max_data_age_in_periods < 1.0:
            raise ValueError(
                f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
            )
        if self.warn_buffer_len < 1:
            raise ValueError(
                f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
            )
        if self.max_buffer_len <= self.warn_buffer_len:
            raise ValueError(
                f"max_buffer_len ({self.max_buffer_len}) should "
                f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
            )

        if self.initial_buffer_len < 1:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
            )
        if self.initial_buffer_len > self.max_buffer_len:
            raise ValueError(
                f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
                f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
                "initial_buffer_len or a bigger max_buffer_len"
            )
        if self.initial_buffer_len > self.warn_buffer_len:
            _logger.warning(
                "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
                self.initial_buffer_len,
                self.warn_buffer_len,
            )
        if self.align_to is not None and self.align_to.tzinfo is None:
            raise ValueError(
                f"align_to ({self.align_to}) should be a timezone aware datetime"
            )
Attributes¤
align_to: datetime | None = UNIX_EPOCH class-attribute instance-attribute ¤

The time to align the resampling period to.

The resampling period will be aligned to this time, so the first resampled sample will be at the first multiple of resampling_period starting from align_to. It must be an aware datetime and can be in the future too.

If align_to is None, the resampling period will be aligned to the time the resampler is created.

initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT class-attribute instance-attribute ¤

The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.

It must be at least 1 and at most max_buffer_len.

max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX class-attribute instance-attribute ¤

The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.

It must be at bigger than warn_buffer_len.

max_data_age_in_periods: float = 3.0 class-attribute instance-attribute ¤

The maximum age a sample can have to be considered relevant for resampling.

Expressed in number of periods, where period is the resampling_period if we are downsampling (resampling period bigger than the input period) or the input sampling period if we are upsampling (input period bigger than the resampling period).

It must be bigger than 1.0.

Example

If resampling_period is 3 seconds, the input sampling period is 1 and max_data_age_in_periods is 2, then data older than 3*2 = 6 seconds will be discarded when creating a new sample and never passed to the resampling function.

If resampling_period is 3 seconds, the input sampling period is 5 and max_data_age_in_periods is 2, then data older than 5*2 = 10 seconds will be discarded when creating a new sample and never passed to the resampling function.

resampling_function: ResamplingFunction = average class-attribute instance-attribute ¤

The resampling function.

This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.

resampling_period: timedelta instance-attribute ¤

The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.

warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN class-attribute instance-attribute ¤

The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most max_buffer_len.

Functions¤
__post_init__() ¤

Check that config values are valid.

RAISES DESCRIPTION
ValueError

If any value is out of range.

Source code in frequenz/sdk/timeseries/_resampling.py
def __post_init__(self) -> None:
    """Check that config values are valid.

    Raises:
        ValueError: If any value is out of range.
    """
    if self.resampling_period.total_seconds() < 0.0:
        raise ValueError(
            f"resampling_period ({self.resampling_period}) must be positive"
        )
    if self.max_data_age_in_periods < 1.0:
        raise ValueError(
            f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
        )
    if self.warn_buffer_len < 1:
        raise ValueError(
            f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
        )
    if self.max_buffer_len <= self.warn_buffer_len:
        raise ValueError(
            f"max_buffer_len ({self.max_buffer_len}) should "
            f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
        )

    if self.initial_buffer_len < 1:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
        )
    if self.initial_buffer_len > self.max_buffer_len:
        raise ValueError(
            f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
            f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
            "initial_buffer_len or a bigger max_buffer_len"
        )
    if self.initial_buffer_len > self.warn_buffer_len:
        _logger.warning(
            "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
            self.initial_buffer_len,
            self.warn_buffer_len,
        )
    if self.align_to is not None and self.align_to.tzinfo is None:
        raise ValueError(
            f"align_to ({self.align_to}) should be a timezone aware datetime"
        )

frequenz.sdk.timeseries.Sample dataclass ¤

Bases: Generic[QuantityT]

A measurement taken at a particular point in time.

The value could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True, order=True)
class Sample(Generic[QuantityT]):
    """A measurement taken at a particular point in time.

    The `value` could be `None` if a component is malfunctioning or data is
    lacking for another reason, but a sample still needs to be sent to have a
    coherent view on a group of component metrics for a particular timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""

    value: QuantityT | None = None
    """The value of this sample."""
Attributes¤
timestamp: datetime instance-attribute ¤

The time when this sample was generated.

value: QuantityT | None = None class-attribute instance-attribute ¤

The value of this sample.

frequenz.sdk.timeseries.Sample3Phase dataclass ¤

Bases: Generic[QuantityT]

A 3-phase measurement made at a particular point in time.

Each of the value fields could be None if a component is malfunctioning or data is lacking for another reason, but a sample still needs to be sent to have a coherent view on a group of component metrics for a particular timestamp.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Sample3Phase(Generic[QuantityT]):
    """A 3-phase measurement made at a particular point in time.

    Each of the `value` fields could be `None` if a component is malfunctioning
    or data is lacking for another reason, but a sample still needs to be sent
    to have a coherent view on a group of component metrics for a particular
    timestamp.
    """

    timestamp: datetime
    """The time when this sample was generated."""
    value_p1: QuantityT | None
    """The value of the 1st phase in this sample."""

    value_p2: QuantityT | None
    """The value of the 2nd phase in this sample."""

    value_p3: QuantityT | None
    """The value of the 3rd phase in this sample."""

    def __iter__(self) -> Iterator[QuantityT | None]:
        """Return an iterator that yields values from each of the phases.

        Yields:
            Per-phase measurements one-by-one.
        """
        yield self.value_p1
        yield self.value_p2
        yield self.value_p3

    @overload
    def max(self, default: QuantityT) -> QuantityT:
        ...

    @overload
    def max(self, default: None = None) -> QuantityT | None:
        ...

    def max(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the max value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Max value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x > y else y,
            filter(None, self),
        )
        return value

    @overload
    def min(self, default: QuantityT) -> QuantityT:
        ...

    @overload
    def min(self, default: None = None) -> QuantityT | None:
        ...

    def min(self, default: QuantityT | None = None) -> QuantityT | None:
        """Return the min value among all phases, or default if they are all `None`.

        Args:
            default: value to return if all phases are `None`.

        Returns:
            Min value among all phases, if available, default value otherwise.
        """
        if not any(self):
            return default
        value: QuantityT = functools.reduce(
            lambda x, y: x if x < y else y,
            filter(None, self),
        )
        return value

    def map(
        self,
        function: Callable[[QuantityT], QuantityT],
        default: QuantityT | None = None,
    ) -> Self:
        """Apply the given function on each of the phase values and return the result.

        If a phase value is `None`, replace it with `default` instead.

        Args:
            function: The function to apply on each of the phase values.
            default: The value to apply if a phase value is `None`.

        Returns:
            A new instance, with the given function applied on values for each of the
                phases.
        """
        return self.__class__(
            timestamp=self.timestamp,
            value_p1=default if self.value_p1 is None else function(self.value_p1),
            value_p2=default if self.value_p2 is None else function(self.value_p2),
            value_p3=default if self.value_p3 is None else function(self.value_p3),
        )
Attributes¤
timestamp: datetime instance-attribute ¤

The time when this sample was generated.

value_p1: QuantityT | None instance-attribute ¤

The value of the 1st phase in this sample.

value_p2: QuantityT | None instance-attribute ¤

The value of the 2nd phase in this sample.

value_p3: QuantityT | None instance-attribute ¤

The value of the 3rd phase in this sample.

Functions¤
__iter__() ¤

Return an iterator that yields values from each of the phases.

YIELDS DESCRIPTION
QuantityT | None

Per-phase measurements one-by-one.

Source code in frequenz/sdk/timeseries/_base_types.py
def __iter__(self) -> Iterator[QuantityT | None]:
    """Return an iterator that yields values from each of the phases.

    Yields:
        Per-phase measurements one-by-one.
    """
    yield self.value_p1
    yield self.value_p2
    yield self.value_p3
map(function, default=None) ¤

Apply the given function on each of the phase values and return the result.

If a phase value is None, replace it with default instead.

PARAMETER DESCRIPTION
function

The function to apply on each of the phase values.

TYPE: Callable[[QuantityT], QuantityT]

default

The value to apply if a phase value is None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
Self

A new instance, with the given function applied on values for each of the phases.

Source code in frequenz/sdk/timeseries/_base_types.py
def map(
    self,
    function: Callable[[QuantityT], QuantityT],
    default: QuantityT | None = None,
) -> Self:
    """Apply the given function on each of the phase values and return the result.

    If a phase value is `None`, replace it with `default` instead.

    Args:
        function: The function to apply on each of the phase values.
        default: The value to apply if a phase value is `None`.

    Returns:
        A new instance, with the given function applied on values for each of the
            phases.
    """
    return self.__class__(
        timestamp=self.timestamp,
        value_p1=default if self.value_p1 is None else function(self.value_p1),
        value_p2=default if self.value_p2 is None else function(self.value_p2),
        value_p3=default if self.value_p3 is None else function(self.value_p3),
    )
max(default=None) ¤

Return the max value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Max value among all phases, if available, default value otherwise.

Source code in frequenz/sdk/timeseries/_base_types.py
def max(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the max value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Max value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x > y else y,
        filter(None, self),
    )
    return value
min(default=None) ¤

Return the min value among all phases, or default if they are all None.

PARAMETER DESCRIPTION
default

value to return if all phases are None.

TYPE: QuantityT | None DEFAULT: None

RETURNS DESCRIPTION
QuantityT | None

Min value among all phases, if available, default value otherwise.

Source code in frequenz/sdk/timeseries/_base_types.py
def min(self, default: QuantityT | None = None) -> QuantityT | None:
    """Return the min value among all phases, or default if they are all `None`.

    Args:
        default: value to return if all phases are `None`.

    Returns:
        Min value among all phases, if available, default value otherwise.
    """
    if not any(self):
        return default
    value: QuantityT = functools.reduce(
        lambda x, y: x if x < y else y,
        filter(None, self),
    )
    return value

frequenz.sdk.timeseries.Temperature ¤

Bases: Quantity

A temperature quantity (in degrees Celsius).

Source code in frequenz/sdk/timeseries/_quantities.py
class Temperature(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        0: "°C",
    },
):
    """A temperature quantity (in degrees Celsius)."""

    @classmethod
    def from_celsius(cls, value: float) -> Self:
        """Initialize a new temperature quantity.

        Args:
            value: The temperature in degrees Celsius.

        Returns:
            A new temperature quantity.
        """
        power = cls.__new__(cls)
        power._base_value = value
        return power

    def as_celsius(self) -> float:
        """Return the temperature in degrees Celsius.

        Returns:
            The temperature in degrees Celsius.
        """
        return self._base_value
Functions¤
as_celsius() ¤

Return the temperature in degrees Celsius.

RETURNS DESCRIPTION
float

The temperature in degrees Celsius.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_celsius(self) -> float:
    """Return the temperature in degrees Celsius.

    Returns:
        The temperature in degrees Celsius.
    """
    return self._base_value
from_celsius(value) classmethod ¤

Initialize a new temperature quantity.

PARAMETER DESCRIPTION
value

The temperature in degrees Celsius.

TYPE: float

RETURNS DESCRIPTION
Self

A new temperature quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_celsius(cls, value: float) -> Self:
    """Initialize a new temperature quantity.

    Args:
        value: The temperature in degrees Celsius.

    Returns:
        A new temperature quantity.
    """
    power = cls.__new__(cls)
    power._base_value = value
    return power

frequenz.sdk.timeseries.Voltage ¤

Bases: Quantity

A voltage quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Voltage(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={0: "V", -3: "mV", 3: "kV"},
):
    """A voltage quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_volts(cls, volts: float) -> Self:
        """Initialize a new voltage quantity.

        Args:
            volts: The voltage in volts.

        Returns:
            A new voltage quantity.
        """
        voltage = cls.__new__(cls)
        voltage._base_value = volts
        return voltage

    @classmethod
    def from_millivolts(cls, millivolts: float) -> Self:
        """Initialize a new voltage quantity.

        Args:
            millivolts: The voltage in millivolts.

        Returns:
            A new voltage quantity.
        """
        voltage = cls.__new__(cls)
        voltage._base_value = millivolts * 10**-3
        return voltage

    @classmethod
    def from_kilovolts(cls, kilovolts: float) -> Self:
        """Initialize a new voltage quantity.

        Args:
            kilovolts: The voltage in kilovolts.

        Returns:
            A new voltage quantity.
        """
        voltage = cls.__new__(cls)
        voltage._base_value = kilovolts * 10**3
        return voltage

    def as_volts(self) -> float:
        """Return the voltage in volts.

        Returns:
            The voltage in volts.
        """
        return self._base_value

    def as_millivolts(self) -> float:
        """Return the voltage in millivolts.

        Returns:
            The voltage in millivolts.
        """
        return self._base_value * 1e3

    def as_kilovolts(self) -> float:
        """Return the voltage in kilovolts.

        Returns:
            The voltage in kilovolts.
        """
        return self._base_value / 1e3

    @overload  # type: ignore
    def __mul__(self, other: Percentage) -> Self:
        """Return a power from multiplying this power by the given percentage.

        Args:
            other: The percentage to multiply by.
        """

    @overload
    def __mul__(self, other: Current) -> Power:
        """Multiply the voltage by the current to get the power.

        Args:
            other: The current to multiply the voltage with.
        """

    def __mul__(self, other: Percentage | Current) -> Self | Power:
        """Return a voltage or power from multiplying this voltage by the given value.

        Args:
            other: The percentage or current to multiply by.

        Returns:
            The calculated voltage or power.

        Raises:
            TypeError: If the given value is not a percentage or current.
        """
        if isinstance(other, Percentage):
            return super().__mul__(other)
        if isinstance(other, Current):
            return Power.from_watts(self._base_value * other._base_value)

        return NotImplemented
Functions¤
__mul__(other) ¤

Return a voltage or power from multiplying this voltage by the given value.

PARAMETER DESCRIPTION
other

The percentage or current to multiply by.

TYPE: Percentage | Current

RETURNS DESCRIPTION
Self | Power

The calculated voltage or power.

RAISES DESCRIPTION
TypeError

If the given value is not a percentage or current.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, other: Percentage | Current) -> Self | Power:
    """Return a voltage or power from multiplying this voltage by the given value.

    Args:
        other: The percentage or current to multiply by.

    Returns:
        The calculated voltage or power.

    Raises:
        TypeError: If the given value is not a percentage or current.
    """
    if isinstance(other, Percentage):
        return super().__mul__(other)
    if isinstance(other, Current):
        return Power.from_watts(self._base_value * other._base_value)

    return NotImplemented
as_kilovolts() ¤

Return the voltage in kilovolts.

RETURNS DESCRIPTION
float

The voltage in kilovolts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilovolts(self) -> float:
    """Return the voltage in kilovolts.

    Returns:
        The voltage in kilovolts.
    """
    return self._base_value / 1e3
as_millivolts() ¤

Return the voltage in millivolts.

RETURNS DESCRIPTION
float

The voltage in millivolts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_millivolts(self) -> float:
    """Return the voltage in millivolts.

    Returns:
        The voltage in millivolts.
    """
    return self._base_value * 1e3
as_volts() ¤

Return the voltage in volts.

RETURNS DESCRIPTION
float

The voltage in volts.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_volts(self) -> float:
    """Return the voltage in volts.

    Returns:
        The voltage in volts.
    """
    return self._base_value
from_kilovolts(kilovolts) classmethod ¤

Initialize a new voltage quantity.

PARAMETER DESCRIPTION
kilovolts

The voltage in kilovolts.

TYPE: float

RETURNS DESCRIPTION
Self

A new voltage quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilovolts(cls, kilovolts: float) -> Self:
    """Initialize a new voltage quantity.

    Args:
        kilovolts: The voltage in kilovolts.

    Returns:
        A new voltage quantity.
    """
    voltage = cls.__new__(cls)
    voltage._base_value = kilovolts *