Skip to content

Index

frequenz.sdk.timeseries ¤

Handling of timeseries streams.

A timeseries is a stream (normally an async iterator) of Samples.

Periodicity and alignment¤

All the data produced by this package is always periodic and aligned to the UNIX_EPOCH (by default).

Classes normally take a (re)sampling period as and argument and, optionally, an align_to argument.

This means timestamps are always separated exactly by a period, and that this timestamp falls always at multiples of the period, starting at the align_to.

This ensures that the data is predictable and consistent among restarts.

Example

If we have a period of 10 seconds, and are aligning to the UNIX epoch. Assuming the following timeline starts in 1970-01-01 00:00:00 UTC and our current now is 1970-01-01 00:00:32 UTC, then the next timestamp will be at 1970-01-01 00:00:40 UTC:

align_to = 1970-01-01 00:00:00         next event = 1970-01-01 00:00:40
|                                       |
|---------|---------|---------|-|-------|---------|---------|---------|
0        10        20        30 |      40        50        60        70
                               now = 1970-01-01 00:00:32

Attributes¤

frequenz.sdk.timeseries.UNIX_EPOCH module-attribute ¤

UNIX_EPOCH = fromtimestamp(0.0, tz=utc)

The UNIX epoch (in UTC).

Classes¤

frequenz.sdk.timeseries.Bounds dataclass ¤

Bases: Generic[_T]

Lower and upper bound values.

Source code in frequenz/sdk/timeseries/_base_types.py
@dataclass(frozen=True)
class Bounds(Generic[_T]):
    """Lower and upper bound values."""

    lower: _T
    """Lower bound."""

    upper: _T
    """Upper bound."""
Attributes¤
lower instance-attribute ¤
lower: _T

Lower bound.

upper instance-attribute ¤
upper: _T

Upper bound.

frequenz.sdk.timeseries.Current ¤

Bases: Quantity

A current quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Current(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        -3: "mA",
        0: "A",
    },
):
    """A current quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_amperes(cls, amperes: float) -> Self:
        """Initialize a new current quantity.

        Args:
            amperes: The current in amperes.

        Returns:
            A new current quantity.
        """
        return cls._new(amperes)

    @classmethod
    def from_milliamperes(cls, milliamperes: float) -> Self:
        """Initialize a new current quantity.

        Args:
            milliamperes: The current in milliamperes.

        Returns:
            A new current quantity.
        """
        return cls._new(milliamperes, exponent=-3)

    def as_amperes(self) -> float:
        """Return the current in amperes.

        Returns:
            The current in amperes.
        """
        return self._base_value

    def as_milliamperes(self) -> float:
        """Return the current in milliamperes.

        Returns:
            The current in milliamperes.
        """
        return self._base_value * 1e3

    # See comment for Power.__mul__ for why we need the ignore here.
    @overload  # type: ignore[override]
    def __mul__(self, scalar: float, /) -> Self:
        """Scale this current by a scalar.

        Args:
            scalar: The scalar by which to scale this current.

        Returns:
            The scaled current.
        """

    @overload
    def __mul__(self, percent: Percentage, /) -> Self:
        """Scale this current by a percentage.

        Args:
            percent: The percentage by which to scale this current.

        Returns:
            The scaled current.
        """

    @overload
    def __mul__(self, other: Voltage, /) -> Power:
        """Multiply the current by a voltage to get a power.

        Args:
            other: The voltage.

        Returns:
            The calculated power.
        """

    def __mul__(self, other: float | Percentage | Voltage, /) -> Self | Power:
        """Return a current or power from multiplying this current by the given value.

        Args:
            other: The scalar, percentage or voltage to multiply by.

        Returns:
            A current or power.
        """
        match other:
            case float() | Percentage():
                return super().__mul__(other)
            case Voltage():
                return Power._new(self._base_value * other._base_value)
            case _:
                return NotImplemented
Attributes¤
base_unit property ¤
base_unit: str | None

Return the base unit of this quantity.

None if this quantity has no unit.

RETURNS DESCRIPTION
str | None

The base unit of this quantity.

base_value property ¤
base_value: float

Return the value of this quantity in the base unit.

RETURNS DESCRIPTION
float

The value of this quantity in the base unit.

Functions¤
__abs__ ¤
__abs__() -> Self

Return the absolute value of this quantity.

RETURNS DESCRIPTION
Self

The absolute value of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __abs__(self) -> Self:
    """Return the absolute value of this quantity.

    Returns:
        The absolute value of this quantity.
    """
    absolute = type(self).__new__(type(self))
    absolute._base_value = abs(self._base_value)
    return absolute
__add__ ¤
__add__(other: Self) -> Self

Return the sum of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The sum of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __add__(self, other: Self) -> Self:
    """Return the sum of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The sum of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    summe = type(self).__new__(type(self))
    summe._base_value = self._base_value + other._base_value
    return summe
__eq__ ¤
__eq__(other: object) -> bool

Return whether this quantity is equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: object

RETURNS DESCRIPTION
bool

Whether this quantity is equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __eq__(self, other: object) -> bool:
    """Return whether this quantity is equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    # The above check ensures that both quantities are the exact same type, because
    # `isinstance` returns true for subclasses and superclasses.  But the above check
    # doesn't help mypy identify the type of other,  so the below line is necessary.
    assert isinstance(other, self.__class__)
    return self._base_value == other._base_value
__format__ ¤
__format__(__format_spec: str) -> str

Return a formatted string representation of this quantity.

If specified, must be of this form: [0].{precision}. If a 0 is not given, the trailing zeros will be omitted. If no precision is given, the default is 3.

The returned string will use the unit that will result in the maximum precision, based on the magnitude of the value.

Example
from frequenz.sdk.timeseries import Current
c = Current.from_amperes(0.2345)
assert f"{c:.2}" == "234.5 mA"
c = Current.from_amperes(1.2345)
assert f"{c:.2}" == "1.23 A"
c = Current.from_milliamperes(1.2345)
assert f"{c:.6}" == "1.2345 mA"
PARAMETER DESCRIPTION
__format_spec

The format specifier.

TYPE: str

RETURNS DESCRIPTION
str

A string representation of this quantity.

RAISES DESCRIPTION
ValueError

If the given format specifier is invalid.

Source code in frequenz/sdk/timeseries/_quantities.py
def __format__(self, __format_spec: str) -> str:
    """Return a formatted string representation of this quantity.

    If specified, must be of this form: `[0].{precision}`.  If a 0 is not given, the
    trailing zeros will be omitted.  If no precision is given, the default is 3.

    The returned string will use the unit that will result in the maximum precision,
    based on the magnitude of the value.

    Example:
        ```python
        from frequenz.sdk.timeseries import Current
        c = Current.from_amperes(0.2345)
        assert f"{c:.2}" == "234.5 mA"
        c = Current.from_amperes(1.2345)
        assert f"{c:.2}" == "1.23 A"
        c = Current.from_milliamperes(1.2345)
        assert f"{c:.6}" == "1.2345 mA"
        ```

    Args:
        __format_spec: The format specifier.

    Returns:
        A string representation of this quantity.

    Raises:
        ValueError: If the given format specifier is invalid.
    """
    keep_trailing_zeros = False
    if __format_spec != "":
        fspec_parts = __format_spec.split(".")
        if (
            len(fspec_parts) != 2
            or fspec_parts[0] not in ("", "0")
            or not fspec_parts[1].isdigit()
        ):
            raise ValueError(
                "Invalid format specifier. Must be empty or `[0].{precision}`"
            )
        if fspec_parts[0] == "0":
            keep_trailing_zeros = True
        precision = int(fspec_parts[1])
    else:
        precision = 3
    if not self._exponent_unit_map:
        return f"{self._base_value:.{precision}f}"

    if math.isinf(self._base_value) or math.isnan(self._base_value):
        return f"{self._base_value} {self._exponent_unit_map[0]}"

    if abs_value := abs(self._base_value):
        precision_pow = 10 ** (precision)
        # Prevent numbers like 999.999999 being rendered as 1000 V
        # instead of 1 kV.
        # This could happen because the str formatting function does
        # rounding as well.
        # This is an imperfect solution that works for _most_ cases.
        # isclose parameters were chosen according to the observed cases
        if math.isclose(abs_value, precision_pow, abs_tol=1e-4, rel_tol=0.01):
            # If the value is close to the precision, round it
            exponent = math.ceil(math.log10(precision_pow))
        else:
            exponent = math.floor(math.log10(abs_value))
    else:
        exponent = 0

    unit_place = exponent - exponent % 3
    if unit_place < min(self._exponent_unit_map):
        unit = self._exponent_unit_map[min(self._exponent_unit_map.keys())]
        unit_place = min(self._exponent_unit_map)
    elif unit_place > max(self._exponent_unit_map):
        unit = self._exponent_unit_map[max(self._exponent_unit_map.keys())]
        unit_place = max(self._exponent_unit_map)
    else:
        unit = self._exponent_unit_map[unit_place]

    value_str = f"{self._base_value / 10 ** unit_place:.{precision}f}"

    if value_str in ("-0", "0"):
        stripped = value_str
    else:
        stripped = value_str.rstrip("0").rstrip(".")

    if not keep_trailing_zeros:
        value_str = stripped
    unit_str = unit if stripped not in ("-0", "0") else self._exponent_unit_map[0]
    return f"{value_str} {unit_str}"
__ge__ ¤
__ge__(other: Self) -> bool

Return whether this quantity is greater than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __ge__(self, other: Self) -> bool:
    """Return whether this quantity is greater than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value >= other._base_value
__gt__ ¤
__gt__(other: Self) -> bool

Return whether this quantity is greater than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __gt__(self, other: Self) -> bool:
    """Return whether this quantity is greater than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value > other._base_value
__init__ ¤
__init__(value: float, exponent: int = 0) -> None

Initialize a new quantity.

PARAMETER DESCRIPTION
value

The value of this quantity in a given exponent of the base unit.

TYPE: float

exponent

The exponent of the base unit the given value is in.

TYPE: int DEFAULT: 0

Source code in frequenz/sdk/timeseries/_quantities.py
def __init__(self, value: float, exponent: int = 0) -> None:
    """Initialize a new quantity.

    Args:
        value: The value of this quantity in a given exponent of the base unit.
        exponent: The exponent of the base unit the given value is in.
    """
    self._base_value = value * 10.0**exponent
__init_subclass__ ¤
__init_subclass__(
    exponent_unit_map: dict[int, str]
) -> None

Initialize a new subclass of Quantity.

PARAMETER DESCRIPTION
exponent_unit_map

A mapping from the exponent of the base unit to the unit symbol.

TYPE: dict[int, str]

RAISES DESCRIPTION
TypeError

If the given exponent_unit_map is not a dict.

ValueError

If the given exponent_unit_map does not contain a base unit (exponent 0).

Source code in frequenz/sdk/timeseries/_quantities.py
def __init_subclass__(cls, exponent_unit_map: dict[int, str]) -> None:
    """Initialize a new subclass of Quantity.

    Args:
        exponent_unit_map: A mapping from the exponent of the base unit to the unit
            symbol.

    Raises:
        TypeError: If the given exponent_unit_map is not a dict.
        ValueError: If the given exponent_unit_map does not contain a base unit
            (exponent 0).
    """
    if 0 not in exponent_unit_map:
        raise ValueError("Expected a base unit for the type (for exponent 0)")
    cls._exponent_unit_map = exponent_unit_map
    super().__init_subclass__()
__le__ ¤
__le__(other: Self) -> bool

Return whether this quantity is less than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __le__(self, other: Self) -> bool:
    """Return whether this quantity is less than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value <= other._base_value
__lt__ ¤
__lt__(other: Self) -> bool

Return whether this quantity is less than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __lt__(self, other: Self) -> bool:
    """Return whether this quantity is less than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value < other._base_value
__mul__ ¤
__mul__(
    other: float | Percentage | Voltage,
) -> Self | Power

Return a current or power from multiplying this current by the given value.

PARAMETER DESCRIPTION
other

The scalar, percentage or voltage to multiply by.

TYPE: float | Percentage | Voltage

RETURNS DESCRIPTION
Self | Power

A current or power.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, other: float | Percentage | Voltage, /) -> Self | Power:
    """Return a current or power from multiplying this current by the given value.

    Args:
        other: The scalar, percentage or voltage to multiply by.

    Returns:
        A current or power.
    """
    match other:
        case float() | Percentage():
            return super().__mul__(other)
        case Voltage():
            return Power._new(self._base_value * other._base_value)
        case _:
            return NotImplemented
__neg__ ¤
__neg__() -> Self

Return the negation of this quantity.

RETURNS DESCRIPTION
Self

The negation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __neg__(self) -> Self:
    """Return the negation of this quantity.

    Returns:
        The negation of this quantity.
    """
    negation = type(self).__new__(type(self))
    negation._base_value = -self._base_value
    return negation
__repr__ ¤
__repr__() -> str

Return a representation of this quantity.

RETURNS DESCRIPTION
str

A representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __repr__(self) -> str:
    """Return a representation of this quantity.

    Returns:
        A representation of this quantity.
    """
    return f"{type(self).__name__}(value={self._base_value}, exponent=0)"
__str__ ¤
__str__() -> str

Return a string representation of this quantity.

RETURNS DESCRIPTION
str

A string representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __str__(self) -> str:
    """Return a string representation of this quantity.

    Returns:
        A string representation of this quantity.
    """
    return self.__format__("")
__sub__ ¤
__sub__(other: Self) -> Self

Return the difference of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The difference of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __sub__(self, other: Self) -> Self:
    """Return the difference of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The difference of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    difference = type(self).__new__(type(self))
    difference._base_value = self._base_value - other._base_value
    return difference
__truediv__ ¤
__truediv__(value: float | Self) -> Self | float

Divide this quantity by a scalar or another quantity.

PARAMETER DESCRIPTION
value

The scalar or quantity to divide this quantity by.

TYPE: float | Self

RETURNS DESCRIPTION
Self | float

The divided quantity or the ratio of this quantity to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __truediv__(self, value: float | Self, /) -> Self | float:
    """Divide this quantity by a scalar or another quantity.

    Args:
        value: The scalar or quantity to divide this quantity by.

    Returns:
        The divided quantity or the ratio of this quantity to another.
    """
    match value:
        case float():
            return type(self)._new(self._base_value / value)
        case Quantity() if type(value) is type(self):
            return self._base_value / value._base_value
        case _:
            return NotImplemented
as_amperes ¤
as_amperes() -> float

Return the current in amperes.

RETURNS DESCRIPTION
float

The current in amperes.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_amperes(self) -> float:
    """Return the current in amperes.

    Returns:
        The current in amperes.
    """
    return self._base_value
as_milliamperes ¤
as_milliamperes() -> float

Return the current in milliamperes.

RETURNS DESCRIPTION
float

The current in milliamperes.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_milliamperes(self) -> float:
    """Return the current in milliamperes.

    Returns:
        The current in milliamperes.
    """
    return self._base_value * 1e3
from_amperes classmethod ¤
from_amperes(amperes: float) -> Self

Initialize a new current quantity.

PARAMETER DESCRIPTION
amperes

The current in amperes.

TYPE: float

RETURNS DESCRIPTION
Self

A new current quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_amperes(cls, amperes: float) -> Self:
    """Initialize a new current quantity.

    Args:
        amperes: The current in amperes.

    Returns:
        A new current quantity.
    """
    return cls._new(amperes)
from_milliamperes classmethod ¤
from_milliamperes(milliamperes: float) -> Self

Initialize a new current quantity.

PARAMETER DESCRIPTION
milliamperes

The current in milliamperes.

TYPE: float

RETURNS DESCRIPTION
Self

A new current quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_milliamperes(cls, milliamperes: float) -> Self:
    """Initialize a new current quantity.

    Args:
        milliamperes: The current in milliamperes.

    Returns:
        A new current quantity.
    """
    return cls._new(milliamperes, exponent=-3)
from_string classmethod ¤
from_string(string: str) -> Self

Return a quantity from a string representation.

PARAMETER DESCRIPTION
string

The string representation of the quantity.

TYPE: str

RETURNS DESCRIPTION
Self

A quantity object with the value given in the string.

RAISES DESCRIPTION
ValueError

If the string does not match the expected format.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_string(cls, string: str) -> Self:
    """Return a quantity from a string representation.

    Args:
        string: The string representation of the quantity.

    Returns:
        A quantity object with the value given in the string.

    Raises:
        ValueError: If the string does not match the expected format.

    """
    split_string = string.split(" ")

    if len(split_string) != 2:
        raise ValueError(
            f"Expected a string of the form 'value unit', got {string}"
        )

    assert cls._exponent_unit_map is not None
    exp_map = cls._exponent_unit_map

    for exponent, unit in exp_map.items():
        if unit == split_string[1]:
            instance = cls.__new__(cls)
            try:
                instance._base_value = float(split_string[0]) * 10**exponent
            except ValueError as error:
                raise ValueError(f"Failed to parse string '{string}'.") from error

            return instance

    raise ValueError(f"Unknown unit {split_string[1]}")
isclose ¤
isclose(
    other: Self,
    rel_tol: float = 1e-09,
    abs_tol: float = 0.0,
) -> bool

Return whether this quantity is close to another.

PARAMETER DESCRIPTION
other

The quantity to compare to.

TYPE: Self

rel_tol

The relative tolerance.

TYPE: float DEFAULT: 1e-09

abs_tol

The absolute tolerance.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
bool

Whether this quantity is close to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool:
    """Return whether this quantity is close to another.

    Args:
        other: The quantity to compare to.
        rel_tol: The relative tolerance.
        abs_tol: The absolute tolerance.

    Returns:
        Whether this quantity is close to another.
    """
    return math.isclose(
        self._base_value,
        other._base_value,  # pylint: disable=protected-access
        rel_tol=rel_tol,
        abs_tol=abs_tol,
    )
isinf ¤
isinf() -> bool

Return whether this quantity is infinite.

RETURNS DESCRIPTION
bool

Whether this quantity is infinite.

Source code in frequenz/sdk/timeseries/_quantities.py
def isinf(self) -> bool:
    """Return whether this quantity is infinite.

    Returns:
        Whether this quantity is infinite.
    """
    return math.isinf(self._base_value)
isnan ¤
isnan() -> bool

Return whether this quantity is NaN.

RETURNS DESCRIPTION
bool

Whether this quantity is NaN.

Source code in frequenz/sdk/timeseries/_quantities.py
def isnan(self) -> bool:
    """Return whether this quantity is NaN.

    Returns:
        Whether this quantity is NaN.
    """
    return math.isnan(self._base_value)
zero classmethod ¤
zero() -> Self

Return a quantity with value 0.0.

RETURNS DESCRIPTION
Self

A quantity with value 0.0.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def zero(cls) -> Self:
    """Return a quantity with value 0.0.

    Returns:
        A quantity with value 0.0.
    """
    _zero = cls._zero_cache.get(cls, None)
    if _zero is None:
        _zero = cls.__new__(cls)
        _zero._base_value = 0.0
        cls._zero_cache[cls] = _zero
    assert isinstance(_zero, cls)
    return _zero

frequenz.sdk.timeseries.Energy ¤

Bases: Quantity

An energy quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Energy(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={
        0: "Wh",
        3: "kWh",
        6: "MWh",
    },
):
    """An energy quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_watt_hours(cls, watt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            watt_hours: The energy in watt hours.

        Returns:
            A new energy quantity.
        """
        return cls._new(watt_hours)

    @classmethod
    def from_kilowatt_hours(cls, kilowatt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            kilowatt_hours: The energy in kilowatt hours.

        Returns:
            A new energy quantity.
        """
        return cls._new(kilowatt_hours, exponent=3)

    @classmethod
    def from_megawatt_hours(cls, megawatt_hours: float) -> Self:
        """Initialize a new energy quantity.

        Args:
            megawatt_hours: The energy in megawatt hours.

        Returns:
            A new energy quantity.
        """
        return cls._new(megawatt_hours, exponent=6)

    def as_watt_hours(self) -> float:
        """Return the energy in watt hours.

        Returns:
            The energy in watt hours.
        """
        return self._base_value

    def as_kilowatt_hours(self) -> float:
        """Return the energy in kilowatt hours.

        Returns:
            The energy in kilowatt hours.
        """
        return self._base_value / 1e3

    def as_megawatt_hours(self) -> float:
        """Return the energy in megawatt hours.

        Returns:
            The energy in megawatt hours.
        """
        return self._base_value / 1e6

    def __mul__(self, other: float | Percentage) -> Self:
        """Scale this energy by a percentage.

        Args:
            other: The percentage by which to scale this energy.

        Returns:
            The scaled energy.
        """
        match other:
            case float():
                return self._new(self._base_value * other)
            case Percentage():
                return self._new(self._base_value * other.as_fraction())
            case _:
                return NotImplemented

    # See the comment for Power.__mul__ for why we need the ignore here.
    @overload  # type: ignore[override]
    def __truediv__(self, other: float, /) -> Self:
        """Divide this energy by a scalar.

        Args:
            other: The scalar to divide this energy by.

        Returns:
            The divided energy.
        """

    @overload
    def __truediv__(self, other: Self, /) -> float:
        """Return the ratio of this energy to another.

        Args:
            other: The other energy.

        Returns:
            The ratio of this energy to another.
        """

    @overload
    def __truediv__(self, duration: timedelta, /) -> Power:
        """Return a power from dividing this energy by the given duration.

        Args:
            duration: The duration to divide by.

        Returns:
            A power from dividing this energy by the given duration.
        """

    @overload
    def __truediv__(self, power: Power, /) -> timedelta:
        """Return a duration from dividing this energy by the given power.

        Args:
            power: The power to divide by.

        Returns:
            A duration from dividing this energy by the given power.
        """

    def __truediv__(
        self, other: float | Self | timedelta | Power, /
    ) -> Self | float | Power | timedelta:
        """Return a power or duration from dividing this energy by the given value.

        Args:
            other: The scalar, energy, power or duration to divide by.

        Returns:
            A power or duration from dividing this energy by the given value.
        """
        match other:
            case float():
                return super().__truediv__(other)
            case Energy():
                return self._base_value / other._base_value
            case timedelta():
                return Power._new(self._base_value / (other.total_seconds() / 3600.0))
            case Power():
                return timedelta(
                    seconds=(self._base_value / other._base_value) * 3600.0
                )
            case _:
                return NotImplemented
Attributes¤
base_unit property ¤
base_unit: str | None

Return the base unit of this quantity.

None if this quantity has no unit.

RETURNS DESCRIPTION
str | None

The base unit of this quantity.

base_value property ¤
base_value: float

Return the value of this quantity in the base unit.

RETURNS DESCRIPTION
float

The value of this quantity in the base unit.

Functions¤
__abs__ ¤
__abs__() -> Self

Return the absolute value of this quantity.

RETURNS DESCRIPTION
Self

The absolute value of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __abs__(self) -> Self:
    """Return the absolute value of this quantity.

    Returns:
        The absolute value of this quantity.
    """
    absolute = type(self).__new__(type(self))
    absolute._base_value = abs(self._base_value)
    return absolute
__add__ ¤
__add__(other: Self) -> Self

Return the sum of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The sum of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __add__(self, other: Self) -> Self:
    """Return the sum of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The sum of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    summe = type(self).__new__(type(self))
    summe._base_value = self._base_value + other._base_value
    return summe
__eq__ ¤
__eq__(other: object) -> bool

Return whether this quantity is equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: object

RETURNS DESCRIPTION
bool

Whether this quantity is equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __eq__(self, other: object) -> bool:
    """Return whether this quantity is equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    # The above check ensures that both quantities are the exact same type, because
    # `isinstance` returns true for subclasses and superclasses.  But the above check
    # doesn't help mypy identify the type of other,  so the below line is necessary.
    assert isinstance(other, self.__class__)
    return self._base_value == other._base_value
__format__ ¤
__format__(__format_spec: str) -> str

Return a formatted string representation of this quantity.

If specified, must be of this form: [0].{precision}. If a 0 is not given, the trailing zeros will be omitted. If no precision is given, the default is 3.

The returned string will use the unit that will result in the maximum precision, based on the magnitude of the value.

Example
from frequenz.sdk.timeseries import Current
c = Current.from_amperes(0.2345)
assert f"{c:.2}" == "234.5 mA"
c = Current.from_amperes(1.2345)
assert f"{c:.2}" == "1.23 A"
c = Current.from_milliamperes(1.2345)
assert f"{c:.6}" == "1.2345 mA"
PARAMETER DESCRIPTION
__format_spec

The format specifier.

TYPE: str

RETURNS DESCRIPTION
str

A string representation of this quantity.

RAISES DESCRIPTION
ValueError

If the given format specifier is invalid.

Source code in frequenz/sdk/timeseries/_quantities.py
def __format__(self, __format_spec: str) -> str:
    """Return a formatted string representation of this quantity.

    If specified, must be of this form: `[0].{precision}`.  If a 0 is not given, the
    trailing zeros will be omitted.  If no precision is given, the default is 3.

    The returned string will use the unit that will result in the maximum precision,
    based on the magnitude of the value.

    Example:
        ```python
        from frequenz.sdk.timeseries import Current
        c = Current.from_amperes(0.2345)
        assert f"{c:.2}" == "234.5 mA"
        c = Current.from_amperes(1.2345)
        assert f"{c:.2}" == "1.23 A"
        c = Current.from_milliamperes(1.2345)
        assert f"{c:.6}" == "1.2345 mA"
        ```

    Args:
        __format_spec: The format specifier.

    Returns:
        A string representation of this quantity.

    Raises:
        ValueError: If the given format specifier is invalid.
    """
    keep_trailing_zeros = False
    if __format_spec != "":
        fspec_parts = __format_spec.split(".")
        if (
            len(fspec_parts) != 2
            or fspec_parts[0] not in ("", "0")
            or not fspec_parts[1].isdigit()
        ):
            raise ValueError(
                "Invalid format specifier. Must be empty or `[0].{precision}`"
            )
        if fspec_parts[0] == "0":
            keep_trailing_zeros = True
        precision = int(fspec_parts[1])
    else:
        precision = 3
    if not self._exponent_unit_map:
        return f"{self._base_value:.{precision}f}"

    if math.isinf(self._base_value) or math.isnan(self._base_value):
        return f"{self._base_value} {self._exponent_unit_map[0]}"

    if abs_value := abs(self._base_value):
        precision_pow = 10 ** (precision)
        # Prevent numbers like 999.999999 being rendered as 1000 V
        # instead of 1 kV.
        # This could happen because the str formatting function does
        # rounding as well.
        # This is an imperfect solution that works for _most_ cases.
        # isclose parameters were chosen according to the observed cases
        if math.isclose(abs_value, precision_pow, abs_tol=1e-4, rel_tol=0.01):
            # If the value is close to the precision, round it
            exponent = math.ceil(math.log10(precision_pow))
        else:
            exponent = math.floor(math.log10(abs_value))
    else:
        exponent = 0

    unit_place = exponent - exponent % 3
    if unit_place < min(self._exponent_unit_map):
        unit = self._exponent_unit_map[min(self._exponent_unit_map.keys())]
        unit_place = min(self._exponent_unit_map)
    elif unit_place > max(self._exponent_unit_map):
        unit = self._exponent_unit_map[max(self._exponent_unit_map.keys())]
        unit_place = max(self._exponent_unit_map)
    else:
        unit = self._exponent_unit_map[unit_place]

    value_str = f"{self._base_value / 10 ** unit_place:.{precision}f}"

    if value_str in ("-0", "0"):
        stripped = value_str
    else:
        stripped = value_str.rstrip("0").rstrip(".")

    if not keep_trailing_zeros:
        value_str = stripped
    unit_str = unit if stripped not in ("-0", "0") else self._exponent_unit_map[0]
    return f"{value_str} {unit_str}"
__ge__ ¤
__ge__(other: Self) -> bool

Return whether this quantity is greater than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __ge__(self, other: Self) -> bool:
    """Return whether this quantity is greater than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value >= other._base_value
__gt__ ¤
__gt__(other: Self) -> bool

Return whether this quantity is greater than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __gt__(self, other: Self) -> bool:
    """Return whether this quantity is greater than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value > other._base_value
__init__ ¤
__init__(value: float, exponent: int = 0) -> None

Initialize a new quantity.

PARAMETER DESCRIPTION
value

The value of this quantity in a given exponent of the base unit.

TYPE: float

exponent

The exponent of the base unit the given value is in.

TYPE: int DEFAULT: 0

Source code in frequenz/sdk/timeseries/_quantities.py
def __init__(self, value: float, exponent: int = 0) -> None:
    """Initialize a new quantity.

    Args:
        value: The value of this quantity in a given exponent of the base unit.
        exponent: The exponent of the base unit the given value is in.
    """
    self._base_value = value * 10.0**exponent
__init_subclass__ ¤
__init_subclass__(
    exponent_unit_map: dict[int, str]
) -> None

Initialize a new subclass of Quantity.

PARAMETER DESCRIPTION
exponent_unit_map

A mapping from the exponent of the base unit to the unit symbol.

TYPE: dict[int, str]

RAISES DESCRIPTION
TypeError

If the given exponent_unit_map is not a dict.

ValueError

If the given exponent_unit_map does not contain a base unit (exponent 0).

Source code in frequenz/sdk/timeseries/_quantities.py
def __init_subclass__(cls, exponent_unit_map: dict[int, str]) -> None:
    """Initialize a new subclass of Quantity.

    Args:
        exponent_unit_map: A mapping from the exponent of the base unit to the unit
            symbol.

    Raises:
        TypeError: If the given exponent_unit_map is not a dict.
        ValueError: If the given exponent_unit_map does not contain a base unit
            (exponent 0).
    """
    if 0 not in exponent_unit_map:
        raise ValueError("Expected a base unit for the type (for exponent 0)")
    cls._exponent_unit_map = exponent_unit_map
    super().__init_subclass__()
__le__ ¤
__le__(other: Self) -> bool

Return whether this quantity is less than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __le__(self, other: Self) -> bool:
    """Return whether this quantity is less than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value <= other._base_value
__lt__ ¤
__lt__(other: Self) -> bool

Return whether this quantity is less than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __lt__(self, other: Self) -> bool:
    """Return whether this quantity is less than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value < other._base_value
__mul__ ¤
__mul__(other: float | Percentage) -> Self

Scale this energy by a percentage.

PARAMETER DESCRIPTION
other

The percentage by which to scale this energy.

TYPE: float | Percentage

RETURNS DESCRIPTION
Self

The scaled energy.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, other: float | Percentage) -> Self:
    """Scale this energy by a percentage.

    Args:
        other: The percentage by which to scale this energy.

    Returns:
        The scaled energy.
    """
    match other:
        case float():
            return self._new(self._base_value * other)
        case Percentage():
            return self._new(self._base_value * other.as_fraction())
        case _:
            return NotImplemented
__neg__ ¤
__neg__() -> Self

Return the negation of this quantity.

RETURNS DESCRIPTION
Self

The negation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __neg__(self) -> Self:
    """Return the negation of this quantity.

    Returns:
        The negation of this quantity.
    """
    negation = type(self).__new__(type(self))
    negation._base_value = -self._base_value
    return negation
__repr__ ¤
__repr__() -> str

Return a representation of this quantity.

RETURNS DESCRIPTION
str

A representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __repr__(self) -> str:
    """Return a representation of this quantity.

    Returns:
        A representation of this quantity.
    """
    return f"{type(self).__name__}(value={self._base_value}, exponent=0)"
__str__ ¤
__str__() -> str

Return a string representation of this quantity.

RETURNS DESCRIPTION
str

A string representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __str__(self) -> str:
    """Return a string representation of this quantity.

    Returns:
        A string representation of this quantity.
    """
    return self.__format__("")
__sub__ ¤
__sub__(other: Self) -> Self

Return the difference of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The difference of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __sub__(self, other: Self) -> Self:
    """Return the difference of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The difference of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    difference = type(self).__new__(type(self))
    difference._base_value = self._base_value - other._base_value
    return difference
__truediv__ ¤
__truediv__(
    other: float | Self | timedelta | Power,
) -> Self | float | Power | timedelta

Return a power or duration from dividing this energy by the given value.

PARAMETER DESCRIPTION
other

The scalar, energy, power or duration to divide by.

TYPE: float | Self | timedelta | Power

RETURNS DESCRIPTION
Self | float | Power | timedelta

A power or duration from dividing this energy by the given value.

Source code in frequenz/sdk/timeseries/_quantities.py
def __truediv__(
    self, other: float | Self | timedelta | Power, /
) -> Self | float | Power | timedelta:
    """Return a power or duration from dividing this energy by the given value.

    Args:
        other: The scalar, energy, power or duration to divide by.

    Returns:
        A power or duration from dividing this energy by the given value.
    """
    match other:
        case float():
            return super().__truediv__(other)
        case Energy():
            return self._base_value / other._base_value
        case timedelta():
            return Power._new(self._base_value / (other.total_seconds() / 3600.0))
        case Power():
            return timedelta(
                seconds=(self._base_value / other._base_value) * 3600.0
            )
        case _:
            return NotImplemented
as_kilowatt_hours ¤
as_kilowatt_hours() -> float

Return the energy in kilowatt hours.

RETURNS DESCRIPTION
float

The energy in kilowatt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilowatt_hours(self) -> float:
    """Return the energy in kilowatt hours.

    Returns:
        The energy in kilowatt hours.
    """
    return self._base_value / 1e3
as_megawatt_hours ¤
as_megawatt_hours() -> float

Return the energy in megawatt hours.

RETURNS DESCRIPTION
float

The energy in megawatt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_megawatt_hours(self) -> float:
    """Return the energy in megawatt hours.

    Returns:
        The energy in megawatt hours.
    """
    return self._base_value / 1e6
as_watt_hours ¤
as_watt_hours() -> float

Return the energy in watt hours.

RETURNS DESCRIPTION
float

The energy in watt hours.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_watt_hours(self) -> float:
    """Return the energy in watt hours.

    Returns:
        The energy in watt hours.
    """
    return self._base_value
from_kilowatt_hours classmethod ¤
from_kilowatt_hours(kilowatt_hours: float) -> Self

Initialize a new energy quantity.

PARAMETER DESCRIPTION
kilowatt_hours

The energy in kilowatt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilowatt_hours(cls, kilowatt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        kilowatt_hours: The energy in kilowatt hours.

    Returns:
        A new energy quantity.
    """
    return cls._new(kilowatt_hours, exponent=3)
from_megawatt_hours classmethod ¤
from_megawatt_hours(megawatt_hours: float) -> Self

Initialize a new energy quantity.

PARAMETER DESCRIPTION
megawatt_hours

The energy in megawatt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_megawatt_hours(cls, megawatt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        megawatt_hours: The energy in megawatt hours.

    Returns:
        A new energy quantity.
    """
    return cls._new(megawatt_hours, exponent=6)
from_string classmethod ¤
from_string(string: str) -> Self

Return a quantity from a string representation.

PARAMETER DESCRIPTION
string

The string representation of the quantity.

TYPE: str

RETURNS DESCRIPTION
Self

A quantity object with the value given in the string.

RAISES DESCRIPTION
ValueError

If the string does not match the expected format.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_string(cls, string: str) -> Self:
    """Return a quantity from a string representation.

    Args:
        string: The string representation of the quantity.

    Returns:
        A quantity object with the value given in the string.

    Raises:
        ValueError: If the string does not match the expected format.

    """
    split_string = string.split(" ")

    if len(split_string) != 2:
        raise ValueError(
            f"Expected a string of the form 'value unit', got {string}"
        )

    assert cls._exponent_unit_map is not None
    exp_map = cls._exponent_unit_map

    for exponent, unit in exp_map.items():
        if unit == split_string[1]:
            instance = cls.__new__(cls)
            try:
                instance._base_value = float(split_string[0]) * 10**exponent
            except ValueError as error:
                raise ValueError(f"Failed to parse string '{string}'.") from error

            return instance

    raise ValueError(f"Unknown unit {split_string[1]}")
from_watt_hours classmethod ¤
from_watt_hours(watt_hours: float) -> Self

Initialize a new energy quantity.

PARAMETER DESCRIPTION
watt_hours

The energy in watt hours.

TYPE: float

RETURNS DESCRIPTION
Self

A new energy quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_watt_hours(cls, watt_hours: float) -> Self:
    """Initialize a new energy quantity.

    Args:
        watt_hours: The energy in watt hours.

    Returns:
        A new energy quantity.
    """
    return cls._new(watt_hours)
isclose ¤
isclose(
    other: Self,
    rel_tol: float = 1e-09,
    abs_tol: float = 0.0,
) -> bool

Return whether this quantity is close to another.

PARAMETER DESCRIPTION
other

The quantity to compare to.

TYPE: Self

rel_tol

The relative tolerance.

TYPE: float DEFAULT: 1e-09

abs_tol

The absolute tolerance.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
bool

Whether this quantity is close to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool:
    """Return whether this quantity is close to another.

    Args:
        other: The quantity to compare to.
        rel_tol: The relative tolerance.
        abs_tol: The absolute tolerance.

    Returns:
        Whether this quantity is close to another.
    """
    return math.isclose(
        self._base_value,
        other._base_value,  # pylint: disable=protected-access
        rel_tol=rel_tol,
        abs_tol=abs_tol,
    )
isinf ¤
isinf() -> bool

Return whether this quantity is infinite.

RETURNS DESCRIPTION
bool

Whether this quantity is infinite.

Source code in frequenz/sdk/timeseries/_quantities.py
def isinf(self) -> bool:
    """Return whether this quantity is infinite.

    Returns:
        Whether this quantity is infinite.
    """
    return math.isinf(self._base_value)
isnan ¤
isnan() -> bool

Return whether this quantity is NaN.

RETURNS DESCRIPTION
bool

Whether this quantity is NaN.

Source code in frequenz/sdk/timeseries/_quantities.py
def isnan(self) -> bool:
    """Return whether this quantity is NaN.

    Returns:
        Whether this quantity is NaN.
    """
    return math.isnan(self._base_value)
zero classmethod ¤
zero() -> Self

Return a quantity with value 0.0.

RETURNS DESCRIPTION
Self

A quantity with value 0.0.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def zero(cls) -> Self:
    """Return a quantity with value 0.0.

    Returns:
        A quantity with value 0.0.
    """
    _zero = cls._zero_cache.get(cls, None)
    if _zero is None:
        _zero = cls.__new__(cls)
        _zero._base_value = 0.0
        cls._zero_cache[cls] = _zero
    assert isinstance(_zero, cls)
    return _zero

frequenz.sdk.timeseries.Frequency ¤

Bases: Quantity

A frequency quantity.

Objects of this type are wrappers around float values and are immutable.

The constructors accept a single float value, the as_*() methods return a float value, and each of the arithmetic operators supported by this type are actually implemented using floating-point arithmetic.

So all considerations about floating-point arithmetic apply to this type as well.

Source code in frequenz/sdk/timeseries/_quantities.py
class Frequency(
    Quantity,
    metaclass=_NoDefaultConstructible,
    exponent_unit_map={0: "Hz", 3: "kHz", 6: "MHz", 9: "GHz"},
):
    """A frequency quantity.

    Objects of this type are wrappers around `float` values and are immutable.

    The constructors accept a single `float` value, the `as_*()` methods return a
    `float` value, and each of the arithmetic operators supported by this type are
    actually implemented using floating-point arithmetic.

    So all considerations about floating-point arithmetic apply to this type as well.
    """

    @classmethod
    def from_hertz(cls, hertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            hertz: The frequency in hertz.

        Returns:
            A new frequency quantity.
        """
        return cls._new(hertz)

    @classmethod
    def from_kilohertz(cls, kilohertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            kilohertz: The frequency in kilohertz.

        Returns:
            A new frequency quantity.
        """
        return cls._new(kilohertz, exponent=3)

    @classmethod
    def from_megahertz(cls, megahertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            megahertz: The frequency in megahertz.

        Returns:
            A new frequency quantity.
        """
        return cls._new(megahertz, exponent=6)

    @classmethod
    def from_gigahertz(cls, gigahertz: float) -> Self:
        """Initialize a new frequency quantity.

        Args:
            gigahertz: The frequency in gigahertz.

        Returns:
            A new frequency quantity.
        """
        return cls._new(gigahertz, exponent=9)

    def as_hertz(self) -> float:
        """Return the frequency in hertz.

        Returns:
            The frequency in hertz.
        """
        return self._base_value

    def as_kilohertz(self) -> float:
        """Return the frequency in kilohertz.

        Returns:
            The frequency in kilohertz.
        """
        return self._base_value / 1e3

    def as_megahertz(self) -> float:
        """Return the frequency in megahertz.

        Returns:
            The frequency in megahertz.
        """
        return self._base_value / 1e6

    def as_gigahertz(self) -> float:
        """Return the frequency in gigahertz.

        Returns:
            The frequency in gigahertz.
        """
        return self._base_value / 1e9

    def period(self) -> timedelta:
        """Return the period of the frequency.

        Returns:
            The period of the frequency.
        """
        return timedelta(seconds=1.0 / self._base_value)
Attributes¤
base_unit property ¤
base_unit: str | None

Return the base unit of this quantity.

None if this quantity has no unit.

RETURNS DESCRIPTION
str | None

The base unit of this quantity.

base_value property ¤
base_value: float

Return the value of this quantity in the base unit.

RETURNS DESCRIPTION
float

The value of this quantity in the base unit.

Functions¤
__abs__ ¤
__abs__() -> Self

Return the absolute value of this quantity.

RETURNS DESCRIPTION
Self

The absolute value of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __abs__(self) -> Self:
    """Return the absolute value of this quantity.

    Returns:
        The absolute value of this quantity.
    """
    absolute = type(self).__new__(type(self))
    absolute._base_value = abs(self._base_value)
    return absolute
__add__ ¤
__add__(other: Self) -> Self

Return the sum of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The sum of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __add__(self, other: Self) -> Self:
    """Return the sum of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The sum of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    summe = type(self).__new__(type(self))
    summe._base_value = self._base_value + other._base_value
    return summe
__eq__ ¤
__eq__(other: object) -> bool

Return whether this quantity is equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: object

RETURNS DESCRIPTION
bool

Whether this quantity is equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __eq__(self, other: object) -> bool:
    """Return whether this quantity is equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    # The above check ensures that both quantities are the exact same type, because
    # `isinstance` returns true for subclasses and superclasses.  But the above check
    # doesn't help mypy identify the type of other,  so the below line is necessary.
    assert isinstance(other, self.__class__)
    return self._base_value == other._base_value
__format__ ¤
__format__(__format_spec: str) -> str

Return a formatted string representation of this quantity.

If specified, must be of this form: [0].{precision}. If a 0 is not given, the trailing zeros will be omitted. If no precision is given, the default is 3.

The returned string will use the unit that will result in the maximum precision, based on the magnitude of the value.

Example
from frequenz.sdk.timeseries import Current
c = Current.from_amperes(0.2345)
assert f"{c:.2}" == "234.5 mA"
c = Current.from_amperes(1.2345)
assert f"{c:.2}" == "1.23 A"
c = Current.from_milliamperes(1.2345)
assert f"{c:.6}" == "1.2345 mA"
PARAMETER DESCRIPTION
__format_spec

The format specifier.

TYPE: str

RETURNS DESCRIPTION
str

A string representation of this quantity.

RAISES DESCRIPTION
ValueError

If the given format specifier is invalid.

Source code in frequenz/sdk/timeseries/_quantities.py
def __format__(self, __format_spec: str) -> str:
    """Return a formatted string representation of this quantity.

    If specified, must be of this form: `[0].{precision}`.  If a 0 is not given, the
    trailing zeros will be omitted.  If no precision is given, the default is 3.

    The returned string will use the unit that will result in the maximum precision,
    based on the magnitude of the value.

    Example:
        ```python
        from frequenz.sdk.timeseries import Current
        c = Current.from_amperes(0.2345)
        assert f"{c:.2}" == "234.5 mA"
        c = Current.from_amperes(1.2345)
        assert f"{c:.2}" == "1.23 A"
        c = Current.from_milliamperes(1.2345)
        assert f"{c:.6}" == "1.2345 mA"
        ```

    Args:
        __format_spec: The format specifier.

    Returns:
        A string representation of this quantity.

    Raises:
        ValueError: If the given format specifier is invalid.
    """
    keep_trailing_zeros = False
    if __format_spec != "":
        fspec_parts = __format_spec.split(".")
        if (
            len(fspec_parts) != 2
            or fspec_parts[0] not in ("", "0")
            or not fspec_parts[1].isdigit()
        ):
            raise ValueError(
                "Invalid format specifier. Must be empty or `[0].{precision}`"
            )
        if fspec_parts[0] == "0":
            keep_trailing_zeros = True
        precision = int(fspec_parts[1])
    else:
        precision = 3
    if not self._exponent_unit_map:
        return f"{self._base_value:.{precision}f}"

    if math.isinf(self._base_value) or math.isnan(self._base_value):
        return f"{self._base_value} {self._exponent_unit_map[0]}"

    if abs_value := abs(self._base_value):
        precision_pow = 10 ** (precision)
        # Prevent numbers like 999.999999 being rendered as 1000 V
        # instead of 1 kV.
        # This could happen because the str formatting function does
        # rounding as well.
        # This is an imperfect solution that works for _most_ cases.
        # isclose parameters were chosen according to the observed cases
        if math.isclose(abs_value, precision_pow, abs_tol=1e-4, rel_tol=0.01):
            # If the value is close to the precision, round it
            exponent = math.ceil(math.log10(precision_pow))
        else:
            exponent = math.floor(math.log10(abs_value))
    else:
        exponent = 0

    unit_place = exponent - exponent % 3
    if unit_place < min(self._exponent_unit_map):
        unit = self._exponent_unit_map[min(self._exponent_unit_map.keys())]
        unit_place = min(self._exponent_unit_map)
    elif unit_place > max(self._exponent_unit_map):
        unit = self._exponent_unit_map[max(self._exponent_unit_map.keys())]
        unit_place = max(self._exponent_unit_map)
    else:
        unit = self._exponent_unit_map[unit_place]

    value_str = f"{self._base_value / 10 ** unit_place:.{precision}f}"

    if value_str in ("-0", "0"):
        stripped = value_str
    else:
        stripped = value_str.rstrip("0").rstrip(".")

    if not keep_trailing_zeros:
        value_str = stripped
    unit_str = unit if stripped not in ("-0", "0") else self._exponent_unit_map[0]
    return f"{value_str} {unit_str}"
__ge__ ¤
__ge__(other: Self) -> bool

Return whether this quantity is greater than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __ge__(self, other: Self) -> bool:
    """Return whether this quantity is greater than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value >= other._base_value
__gt__ ¤
__gt__(other: Self) -> bool

Return whether this quantity is greater than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is greater than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __gt__(self, other: Self) -> bool:
    """Return whether this quantity is greater than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is greater than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value > other._base_value
__init__ ¤
__init__(value: float, exponent: int = 0) -> None

Initialize a new quantity.

PARAMETER DESCRIPTION
value

The value of this quantity in a given exponent of the base unit.

TYPE: float

exponent

The exponent of the base unit the given value is in.

TYPE: int DEFAULT: 0

Source code in frequenz/sdk/timeseries/_quantities.py
def __init__(self, value: float, exponent: int = 0) -> None:
    """Initialize a new quantity.

    Args:
        value: The value of this quantity in a given exponent of the base unit.
        exponent: The exponent of the base unit the given value is in.
    """
    self._base_value = value * 10.0**exponent
__init_subclass__ ¤
__init_subclass__(
    exponent_unit_map: dict[int, str]
) -> None

Initialize a new subclass of Quantity.

PARAMETER DESCRIPTION
exponent_unit_map

A mapping from the exponent of the base unit to the unit symbol.

TYPE: dict[int, str]

RAISES DESCRIPTION
TypeError

If the given exponent_unit_map is not a dict.

ValueError

If the given exponent_unit_map does not contain a base unit (exponent 0).

Source code in frequenz/sdk/timeseries/_quantities.py
def __init_subclass__(cls, exponent_unit_map: dict[int, str]) -> None:
    """Initialize a new subclass of Quantity.

    Args:
        exponent_unit_map: A mapping from the exponent of the base unit to the unit
            symbol.

    Raises:
        TypeError: If the given exponent_unit_map is not a dict.
        ValueError: If the given exponent_unit_map does not contain a base unit
            (exponent 0).
    """
    if 0 not in exponent_unit_map:
        raise ValueError("Expected a base unit for the type (for exponent 0)")
    cls._exponent_unit_map = exponent_unit_map
    super().__init_subclass__()
__le__ ¤
__le__(other: Self) -> bool

Return whether this quantity is less than or equal to another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than or equal to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __le__(self, other: Self) -> bool:
    """Return whether this quantity is less than or equal to another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than or equal to another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value <= other._base_value
__lt__ ¤
__lt__(other: Self) -> bool

Return whether this quantity is less than another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
bool

Whether this quantity is less than another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __lt__(self, other: Self) -> bool:
    """Return whether this quantity is less than another.

    Args:
        other: The other quantity.

    Returns:
        Whether this quantity is less than another.
    """
    if not type(other) is type(self):
        return NotImplemented
    return self._base_value < other._base_value
__mul__ ¤
__mul__(value: float | Percentage) -> Self

Scale this quantity by a scalar or percentage.

PARAMETER DESCRIPTION
value

The scalar or percentage by which to scale this quantity.

TYPE: float | Percentage

RETURNS DESCRIPTION
Self

The scaled quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __mul__(self, value: float | Percentage, /) -> Self:
    """Scale this quantity by a scalar or percentage.

    Args:
        value: The scalar or percentage by which to scale this quantity.

    Returns:
        The scaled quantity.
    """
    match value:
        case float():
            return type(self)._new(self._base_value * value)
        case Percentage():
            return type(self)._new(self._base_value * value.as_fraction())
        case _:
            return NotImplemented
__neg__ ¤
__neg__() -> Self

Return the negation of this quantity.

RETURNS DESCRIPTION
Self

The negation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __neg__(self) -> Self:
    """Return the negation of this quantity.

    Returns:
        The negation of this quantity.
    """
    negation = type(self).__new__(type(self))
    negation._base_value = -self._base_value
    return negation
__repr__ ¤
__repr__() -> str

Return a representation of this quantity.

RETURNS DESCRIPTION
str

A representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __repr__(self) -> str:
    """Return a representation of this quantity.

    Returns:
        A representation of this quantity.
    """
    return f"{type(self).__name__}(value={self._base_value}, exponent=0)"
__str__ ¤
__str__() -> str

Return a string representation of this quantity.

RETURNS DESCRIPTION
str

A string representation of this quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
def __str__(self) -> str:
    """Return a string representation of this quantity.

    Returns:
        A string representation of this quantity.
    """
    return self.__format__("")
__sub__ ¤
__sub__(other: Self) -> Self

Return the difference of this quantity and another.

PARAMETER DESCRIPTION
other

The other quantity.

TYPE: Self

RETURNS DESCRIPTION
Self

The difference of this quantity and another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __sub__(self, other: Self) -> Self:
    """Return the difference of this quantity and another.

    Args:
        other: The other quantity.

    Returns:
        The difference of this quantity and another.
    """
    if not type(other) is type(self):
        return NotImplemented
    difference = type(self).__new__(type(self))
    difference._base_value = self._base_value - other._base_value
    return difference
__truediv__ ¤
__truediv__(value: float | Self) -> Self | float

Divide this quantity by a scalar or another quantity.

PARAMETER DESCRIPTION
value

The scalar or quantity to divide this quantity by.

TYPE: float | Self

RETURNS DESCRIPTION
Self | float

The divided quantity or the ratio of this quantity to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def __truediv__(self, value: float | Self, /) -> Self | float:
    """Divide this quantity by a scalar or another quantity.

    Args:
        value: The scalar or quantity to divide this quantity by.

    Returns:
        The divided quantity or the ratio of this quantity to another.
    """
    match value:
        case float():
            return type(self)._new(self._base_value / value)
        case Quantity() if type(value) is type(self):
            return self._base_value / value._base_value
        case _:
            return NotImplemented
as_gigahertz ¤
as_gigahertz() -> float

Return the frequency in gigahertz.

RETURNS DESCRIPTION
float

The frequency in gigahertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_gigahertz(self) -> float:
    """Return the frequency in gigahertz.

    Returns:
        The frequency in gigahertz.
    """
    return self._base_value / 1e9
as_hertz ¤
as_hertz() -> float

Return the frequency in hertz.

RETURNS DESCRIPTION
float

The frequency in hertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_hertz(self) -> float:
    """Return the frequency in hertz.

    Returns:
        The frequency in hertz.
    """
    return self._base_value
as_kilohertz ¤
as_kilohertz() -> float

Return the frequency in kilohertz.

RETURNS DESCRIPTION
float

The frequency in kilohertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_kilohertz(self) -> float:
    """Return the frequency in kilohertz.

    Returns:
        The frequency in kilohertz.
    """
    return self._base_value / 1e3
as_megahertz ¤
as_megahertz() -> float

Return the frequency in megahertz.

RETURNS DESCRIPTION
float

The frequency in megahertz.

Source code in frequenz/sdk/timeseries/_quantities.py
def as_megahertz(self) -> float:
    """Return the frequency in megahertz.

    Returns:
        The frequency in megahertz.
    """
    return self._base_value / 1e6
from_gigahertz classmethod ¤
from_gigahertz(gigahertz: float) -> Self

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
gigahertz

The frequency in gigahertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_gigahertz(cls, gigahertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        gigahertz: The frequency in gigahertz.

    Returns:
        A new frequency quantity.
    """
    return cls._new(gigahertz, exponent=9)
from_hertz classmethod ¤
from_hertz(hertz: float) -> Self

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
hertz

The frequency in hertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_hertz(cls, hertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        hertz: The frequency in hertz.

    Returns:
        A new frequency quantity.
    """
    return cls._new(hertz)
from_kilohertz classmethod ¤
from_kilohertz(kilohertz: float) -> Self

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
kilohertz

The frequency in kilohertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_kilohertz(cls, kilohertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        kilohertz: The frequency in kilohertz.

    Returns:
        A new frequency quantity.
    """
    return cls._new(kilohertz, exponent=3)
from_megahertz classmethod ¤
from_megahertz(megahertz: float) -> Self

Initialize a new frequency quantity.

PARAMETER DESCRIPTION
megahertz

The frequency in megahertz.

TYPE: float

RETURNS DESCRIPTION
Self

A new frequency quantity.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_megahertz(cls, megahertz: float) -> Self:
    """Initialize a new frequency quantity.

    Args:
        megahertz: The frequency in megahertz.

    Returns:
        A new frequency quantity.
    """
    return cls._new(megahertz, exponent=6)
from_string classmethod ¤
from_string(string: str) -> Self

Return a quantity from a string representation.

PARAMETER DESCRIPTION
string

The string representation of the quantity.

TYPE: str

RETURNS DESCRIPTION
Self

A quantity object with the value given in the string.

RAISES DESCRIPTION
ValueError

If the string does not match the expected format.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def from_string(cls, string: str) -> Self:
    """Return a quantity from a string representation.

    Args:
        string: The string representation of the quantity.

    Returns:
        A quantity object with the value given in the string.

    Raises:
        ValueError: If the string does not match the expected format.

    """
    split_string = string.split(" ")

    if len(split_string) != 2:
        raise ValueError(
            f"Expected a string of the form 'value unit', got {string}"
        )

    assert cls._exponent_unit_map is not None
    exp_map = cls._exponent_unit_map

    for exponent, unit in exp_map.items():
        if unit == split_string[1]:
            instance = cls.__new__(cls)
            try:
                instance._base_value = float(split_string[0]) * 10**exponent
            except ValueError as error:
                raise ValueError(f"Failed to parse string '{string}'.") from error

            return instance

    raise ValueError(f"Unknown unit {split_string[1]}")
isclose ¤
isclose(
    other: Self,
    rel_tol: float = 1e-09,
    abs_tol: float = 0.0,
) -> bool

Return whether this quantity is close to another.

PARAMETER DESCRIPTION
other

The quantity to compare to.

TYPE: Self

rel_tol

The relative tolerance.

TYPE: float DEFAULT: 1e-09

abs_tol

The absolute tolerance.

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
bool

Whether this quantity is close to another.

Source code in frequenz/sdk/timeseries/_quantities.py
def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool:
    """Return whether this quantity is close to another.

    Args:
        other: The quantity to compare to.
        rel_tol: The relative tolerance.
        abs_tol: The absolute tolerance.

    Returns:
        Whether this quantity is close to another.
    """
    return math.isclose(
        self._base_value,
        other._base_value,  # pylint: disable=protected-access
        rel_tol=rel_tol,
        abs_tol=abs_tol,
    )
isinf ¤
isinf() -> bool

Return whether this quantity is infinite.

RETURNS DESCRIPTION
bool

Whether this quantity is infinite.

Source code in frequenz/sdk/timeseries/_quantities.py
def isinf(self) -> bool:
    """Return whether this quantity is infinite.

    Returns:
        Whether this quantity is infinite.
    """
    return math.isinf(self._base_value)
isnan ¤
isnan() -> bool

Return whether this quantity is NaN.

RETURNS DESCRIPTION
bool

Whether this quantity is NaN.

Source code in frequenz/sdk/timeseries/_quantities.py
def isnan(self) -> bool:
    """Return whether this quantity is NaN.

    Returns:
        Whether this quantity is NaN.
    """
    return math.isnan(self._base_value)
period ¤
period() -> timedelta

Return the period of the frequency.

RETURNS DESCRIPTION
timedelta

The period of the frequency.

Source code in frequenz/sdk/timeseries/_quantities.py
def period(self) -> timedelta:
    """Return the period of the frequency.

    Returns:
        The period of the frequency.
    """
    return timedelta(seconds=1.0 / self._base_value)
zero classmethod ¤
zero() -> Self

Return a quantity with value 0.0.

RETURNS DESCRIPTION
Self

A quantity with value 0.0.

Source code in frequenz/sdk/timeseries/_quantities.py
@classmethod
def zero(cls) -> Self:
    """Return a quantity with value 0.0.

    Returns:
        A quantity with value 0.0.
    """
    _zero = cls._zero_cache.get(cls, None)
    if _zero is None:
        _zero = cls.__new__(cls)
        _zero._base_value = 0.0
        cls._zero_cache[cls] = _zero
    assert isinstance(_zero, cls)
    return _zero

frequenz.sdk.timeseries.Fuse dataclass ¤

Fuse data class.

Source code in frequenz/sdk/timeseries/_fuse.py
@dataclass(frozen=True)
class Fuse:
    """Fuse data class."""

    max_current: Current
    """Rated current of the fuse."""
Attributes¤
max_current instance-attribute ¤
max_current: Current

Rated current of the fuse.

frequenz.sdk.timeseries.MovingWindow ¤

Bases: BackgroundService

A data window that moves with the latest datapoints of a data stream.

After initialization the MovingWindow can be accessed by an integer index or a timestamp. A sub window can be accessed by using a slice of integers or timestamps.

Note that a numpy ndarray is returned and thus users can use numpys operations directly on a window.

The window uses a ring buffer for storage and the first element is aligned to a fixed defined point in time. Since the moving nature of the window, the date of the first and the last element are constantly changing and therefore the point in time that defines the alignment can be outside of the time window. Modulo arithmetic is used to move the align_to timestamp into the latest window.

If for example the align_to parameter is set to datetime(1, 1, 1, tzinfo=timezone.utc) and the window size is bigger than one day then the first element will always be aligned to midnight.

Resampling might be required to reduce the number of samples to store, and it can be set by specifying the resampler config parameter so that the user can control the granularity of the samples to be stored in the underlying buffer.

If resampling is not required, the resampler config parameter can be set to None in which case the MovingWindow will not perform any resampling.

Example: Calculate the mean of a time interval

```python
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    async with MovingWindow(
        size=timedelta(seconds=5),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        time_start = datetime.now(tz=timezone.utc)
        time_end = time_start + timedelta(seconds=5)

        # ... wait for 5 seconds until the buffer is filled
        await asyncio.sleep(5)

        # return an numpy array from the window
        array = window[time_start:time_end]
        # and use it to for example calculate the mean
        mean = array.mean()

asyncio.run(run())
```

Example: Create a polars data frame from a MovingWindow

```python
import polars as pl
from datetime import datetime, timedelta, timezone

async def send_mock_data(sender: Sender[Sample]) -> None:
    while True:
        await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
        await asyncio.sleep(1.0)

async def run() -> None:
    resampled_data_channel = Broadcast[Sample](name="sample-data")
    resampled_data_receiver = resampled_data_channel.new_receiver()
    resampled_data_sender = resampled_data_channel.new_sender()

    send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

    # create a window that stores two days of data
    # starting at 1.1.23 with samplerate=1
    async with MovingWindow(
        size=timedelta(days=2),
        resampled_data_recv=resampled_data_receiver,
        input_sampling_period=timedelta(seconds=1),
    ) as window:
        # wait for one full day until the buffer is filled
        await asyncio.sleep(60*60*24)

        # create a polars series with one full day of data
        time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
        time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
        series = pl.Series("Jan_1", window[time_start:time_end])

asyncio.run(run())
```
Source code in frequenz/sdk/timeseries/_moving_window.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
class MovingWindow(BackgroundService):
    """
    A data window that moves with the latest datapoints of a data stream.

    After initialization the `MovingWindow` can be accessed by an integer
    index or a timestamp. A sub window can be accessed by using a slice of
    integers or timestamps.

    Note that a numpy ndarray is returned and thus users can use
    numpys operations directly on a window.

    The window uses a ring buffer for storage and the first element is aligned to
    a fixed defined point in time. Since the moving nature of the window, the
    date of the first and the last element are constantly changing and therefore
    the point in time that defines the alignment can be outside of the time window.
    Modulo arithmetic is used to move the `align_to` timestamp into the latest
    window.

    If for example the `align_to` parameter is set to
    `datetime(1, 1, 1, tzinfo=timezone.utc)` and the window size is bigger than
    one day then the first element will always be aligned to midnight.

    Resampling might be required to reduce the number of samples to store, and
    it can be set by specifying the resampler config parameter so that the user
    can control the granularity of the samples to be stored in the underlying
    buffer.

    If resampling is not required, the resampler config parameter can be
    set to None in which case the MovingWindow will not perform any resampling.

    Example: Calculate the mean of a time interval

        ```python
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            async with MovingWindow(
                size=timedelta(seconds=5),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                time_start = datetime.now(tz=timezone.utc)
                time_end = time_start + timedelta(seconds=5)

                # ... wait for 5 seconds until the buffer is filled
                await asyncio.sleep(5)

                # return an numpy array from the window
                array = window[time_start:time_end]
                # and use it to for example calculate the mean
                mean = array.mean()

        asyncio.run(run())
        ```

    Example: Create a polars data frame from a `MovingWindow`

        ```python
        import polars as pl
        from datetime import datetime, timedelta, timezone

        async def send_mock_data(sender: Sender[Sample]) -> None:
            while True:
                await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0))
                await asyncio.sleep(1.0)

        async def run() -> None:
            resampled_data_channel = Broadcast[Sample](name="sample-data")
            resampled_data_receiver = resampled_data_channel.new_receiver()
            resampled_data_sender = resampled_data_channel.new_sender()

            send_task = asyncio.create_task(send_mock_data(resampled_data_sender))

            # create a window that stores two days of data
            # starting at 1.1.23 with samplerate=1
            async with MovingWindow(
                size=timedelta(days=2),
                resampled_data_recv=resampled_data_receiver,
                input_sampling_period=timedelta(seconds=1),
            ) as window:
                # wait for one full day until the buffer is filled
                await asyncio.sleep(60*60*24)

                # create a polars series with one full day of data
                time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
                time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
                series = pl.Series("Jan_1", window[time_start:time_end])

        asyncio.run(run())
        ```
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        size: timedelta,
        resampled_data_recv: Receiver[Sample[Quantity]],
        input_sampling_period: timedelta,
        resampler_config: ResamplerConfig | None = None,
        align_to: datetime = UNIX_EPOCH,
        *,
        name: str | None = None,
    ) -> None:
        """
        Initialize the MovingWindow.

        This method creates the underlying ring buffer and starts a
        new task that updates the ring buffer with new incoming samples.
        The task stops running only if the channel receiver is closed.

        Args:
            size: The time span of the moving window over which samples will be stored.
            resampled_data_recv: A receiver that delivers samples with a
                given sampling period.
            input_sampling_period: The time interval between consecutive input samples.
            resampler_config: The resampler configuration in case resampling is required.
            align_to: A timestamp that defines a point in time to which
                the window is aligned to modulo window size. For further
                information, consult the class level documentation.
            name: The name of this moving window. If `None`, `str(id(self))` will be
                used. This is used mostly for debugging purposes.
        """
        assert (
            input_sampling_period.total_seconds() > 0
        ), "The input sampling period should be greater than zero."
        assert (
            input_sampling_period <= size
        ), "The input sampling period should be equal to or lower than the window size."
        super().__init__(name=name)

        self._sampling_period = input_sampling_period

        self._resampler: Resampler | None = None
        self._resampler_sender: Sender[Sample[Quantity]] | None = None

        if resampler_config:
            assert (
                resampler_config.resampling_period <= size
            ), "The resampling period should be equal to or lower than the window size."

            self._resampler = Resampler(resampler_config)
            self._sampling_period = resampler_config.resampling_period

        # Sampling period might not fit perfectly into the window size.
        num_samples = math.ceil(
            size.total_seconds() / self._sampling_period.total_seconds()
        )

        self._resampled_data_recv = resampled_data_recv
        self._buffer = OrderedRingBuffer(
            np.empty(shape=num_samples, dtype=float),
            sampling_period=self._sampling_period,
            align_to=align_to,
        )

    def start(self) -> None:
        """Start the MovingWindow.

        This method starts the MovingWindow tasks.
        """
        if self._resampler:
            self._configure_resampler()
        self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))

    @property
    def sampling_period(self) -> timedelta:
        """
        Return the sampling period of the MovingWindow.

        Returns:
            The sampling period of the MovingWindow.
        """
        return self._sampling_period

    @property
    def oldest_timestamp(self) -> datetime | None:
        """
        Return the oldest timestamp of the MovingWindow.

        Returns:
            The oldest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.oldest_timestamp

    @property
    def newest_timestamp(self) -> datetime | None:
        """
        Return the newest timestamp of the MovingWindow.

        Returns:
            The newest timestamp of the MovingWindow or None if the buffer is empty.
        """
        return self._buffer.newest_timestamp

    @property
    def capacity(self) -> int:
        """
        Return the capacity of the MovingWindow.

        Capacity is the maximum number of samples that can be stored in the
        MovingWindow.

        Returns:
            The capacity of the MovingWindow.
        """
        return self._buffer.maxlen

    # pylint before 3.0 only accepts names with 3 or more chars
    def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
        """
        Return the sample at the given index or timestamp.

        In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
        which expects a slice as argument, this method expects a single index as argument
        and returns a single value.

        Args:
            key: The index or timestamp of the sample to return.

        Returns:
            The sample at the given index or timestamp.

        Raises:
            IndexError: If the buffer is empty or the index is out of bounds.
        """
        if self._buffer.count_valid() == 0:
            raise IndexError("The buffer is empty.")

        if isinstance(key, datetime):
            assert self._buffer.oldest_timestamp is not None
            assert self._buffer.newest_timestamp is not None
            if (
                key < self._buffer.oldest_timestamp
                or key > self._buffer.newest_timestamp
            ):
                raise IndexError(
                    f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                    f"{self._buffer.newest_timestamp}]"
                )
            return self._buffer[self._buffer.to_internal_index(key)]

        if isinstance(key, int):
            _logger.debug("Returning value at index %s ", key)
            timestamp = self._buffer.get_timestamp(key)
            assert timestamp is not None
            return self._buffer[self._buffer.to_internal_index(timestamp)]

        raise TypeError("Key has to be either a timestamp or an integer.")

    def window(
        self,
        start: datetime | int | None,
        end: datetime | int | None,
        *,
        force_copy: bool = True,
        fill_value: float | None = np.nan,
    ) -> ArrayLike:
        """
        Return an array containing the samples in the given time interval.

        In contrast to the [`at`][frequenz.sdk.timeseries.MovingWindow.at] method,
        which expects a single index as argument, this method expects a slice as argument
        and returns an array.

        Args:
            start: The start timestamp of the time interval. If `None`, the
                start of the window is used.
            end: The end timestamp of the time interval. If `None`, the end of
                the window is used.
            force_copy: If `True`, the returned array is a copy of the underlying
                data. Otherwise, if possible, a view of the underlying data is
                returned.
            fill_value: If not None, will use this value to fill missing values.
                If missing values should be set, force_copy must be True.
                Defaults to NaN to avoid returning outdated data unexpectedly.

        Returns:
            An array containing the samples in the given time interval.
        """
        return self._buffer.window(
            start, end, force_copy=force_copy, fill_value=fill_value
        )

    async def _run_impl(self) -> None:
        """Awaits samples from the receiver and updates the underlying ring buffer.

        Raises:
            asyncio.CancelledError: if the MovingWindow task is cancelled.
        """
        try:
            async for sample in self._resampled_data_recv:
                _logger.debug("Received new sample: %s", sample)
                if self._resampler and self._resampler_sender:
                    await self._resampler_sender.send(sample)
                else:
                    self._buffer.update(sample)

        except asyncio.CancelledError:
            _logger.info("MovingWindow task has been cancelled.")
            raise

        _logger.error("Channel has been closed")

    def _configure_resampler(self) -> None:
        """Configure the components needed to run the resampler."""
        assert self._resampler is not None

        async def sink_buffer(sample: Sample[Quantity]) -> None:
            if sample.value is not None:
                self._buffer.update(sample)

        resampler_channel = Broadcast[Sample[Quantity]](name="average")
        self._resampler_sender = resampler_channel.new_sender()
        self._resampler.add_timeseries(
            "avg", resampler_channel.new_receiver(), sink_buffer
        )
        self._tasks.add(
            asyncio.create_task(self._resampler.resample(), name="resample")
        )

    def count_valid(self) -> int:
        """
        Count the number of valid samples in this `MovingWindow`.

        Returns:
            The number of valid samples in this `MovingWindow`.
        """
        return self._buffer.count_valid()

    def count_covered(self) -> int:
        """Count the number of samples that are covered by the oldest and newest valid samples.

        Returns:
            The count of samples between the oldest and newest (inclusive) valid samples
                or 0 if there are is no time range covered.
        """
        return self._buffer.count_covered()

    @overload
    def __getitem__(self, key: SupportsIndex) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: datetime) -> float:
        """See the main __getitem__ method."""

    @overload
    def __getitem__(self, key: slice) -> ArrayLike:
        """See the main __getitem__ method."""

    def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
        """
        Return a sub window of the `MovingWindow`.

        The `MovingWindow` is accessed either by timestamp or by index
        or by a slice of timestamps or integers.

        * If the key is an integer, the float value of that key
          at the given position is returned.
        * If the key is a timestamp, the float value of that key
          that corresponds to the timestamp is returned.
        * If the key is a slice of timestamps or integers, an ndarray is returned,
          where the bounds correspond to the slice bounds.
          Note that a half open interval, which is open at the end, is returned.

        Args:
            key: Either an integer or a timestamp or a slice of timestamps or integers.

        Raises:
            IndexError: when requesting an out of range timestamp or index
            TypeError: when the key is not a datetime or slice object.

        Returns:
            A float if the key is a number or a timestamp.
            an numpy array if the key is a slice.
        """
        if isinstance(key, slice):
            if not (key.step is None or key.step == 1):
                raise ValueError("Slicing with a step other than 1 is not supported.")
            return self.window(key.start, key.stop)

        if isinstance(key, datetime):
            return self.at(key)

        if isinstance(key, SupportsIndex):
            return self.at(key.__index__())

        raise TypeError(
            "Key has to be either a timestamp or an integer "
            "or a slice of timestamps or integers"
        )
Attributes¤
capacity property ¤
capacity: int

Return the capacity of the MovingWindow.

Capacity is the maximum number of samples that can be stored in the MovingWindow.

RETURNS DESCRIPTION
int

The capacity of the MovingWindow.

is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

newest_timestamp property ¤
newest_timestamp: datetime | None

Return the newest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The newest timestamp of the MovingWindow or None if the buffer is empty.

oldest_timestamp property ¤
oldest_timestamp: datetime | None

Return the oldest timestamp of the MovingWindow.

RETURNS DESCRIPTION
datetime | None

The oldest timestamp of the MovingWindow or None if the buffer is empty.

sampling_period property ¤
sampling_period: timedelta

Return the sampling period of the MovingWindow.

RETURNS DESCRIPTION
timedelta

The sampling period of the MovingWindow.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
Generator[None, None, None]

An implementation-specific generator for the awaitable.

Source code in frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__del__ ¤
__del__() -> None

Destroy this instance.

Cancel all running tasks spawned by this background service.

Source code in frequenz/sdk/actor/_background_service.py
def __del__(self) -> None:
    """Destroy this instance.

    Cancel all running tasks spawned by this background service.
    """
    self.cancel("{self!r} was deleted")
__getitem__ ¤
__getitem__(
    key: SupportsIndex | datetime | slice,
) -> float | ArrayLike

Return a sub window of the MovingWindow.

The MovingWindow is accessed either by timestamp or by index or by a slice of timestamps or integers.

  • If the key is an integer, the float value of that key at the given position is returned.
  • If the key is a timestamp, the float value of that key that corresponds to the timestamp is returned.
  • If the key is a slice of timestamps or integers, an ndarray is returned, where the bounds correspond to the slice bounds. Note that a half open interval, which is open at the end, is returned.
PARAMETER DESCRIPTION
key

Either an integer or a timestamp or a slice of timestamps or integers.

TYPE: SupportsIndex | datetime | slice

RAISES DESCRIPTION
IndexError

when requesting an out of range timestamp or index

TypeError

when the key is not a datetime or slice object.

RETURNS DESCRIPTION
float | ArrayLike

A float if the key is a number or a timestamp.

float | ArrayLike

an numpy array if the key is a slice.

Source code in frequenz/sdk/timeseries/_moving_window.py
def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLike:
    """
    Return a sub window of the `MovingWindow`.

    The `MovingWindow` is accessed either by timestamp or by index
    or by a slice of timestamps or integers.

    * If the key is an integer, the float value of that key
      at the given position is returned.
    * If the key is a timestamp, the float value of that key
      that corresponds to the timestamp is returned.
    * If the key is a slice of timestamps or integers, an ndarray is returned,
      where the bounds correspond to the slice bounds.
      Note that a half open interval, which is open at the end, is returned.

    Args:
        key: Either an integer or a timestamp or a slice of timestamps or integers.

    Raises:
        IndexError: when requesting an out of range timestamp or index
        TypeError: when the key is not a datetime or slice object.

    Returns:
        A float if the key is a number or a timestamp.
        an numpy array if the key is a slice.
    """
    if isinstance(key, slice):
        if not (key.step is None or key.step == 1):
            raise ValueError("Slicing with a step other than 1 is not supported.")
        return self.window(key.start, key.stop)

    if isinstance(key, datetime):
        return self.at(key)

    if isinstance(key, SupportsIndex):
        return self.at(key.__index__())

    raise TypeError(
        "Key has to be either a timestamp or an integer "
        "or a slice of timestamps or integers"
    )
__init__ ¤
__init__(
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    *,
    name: str | None = None
) -> None

Initialize the MovingWindow.

This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed.

PARAMETER DESCRIPTION
size

The time span of the moving window over which samples will be stored.

TYPE: timedelta

resampled_data_recv

A receiver that delivers samples with a given sampling period.

TYPE: Receiver[Sample[Quantity]]

input_sampling_period

The time interval between consecutive input samples.

TYPE: timedelta

resampler_config

The resampler configuration in case resampling is required.

TYPE: ResamplerConfig | None DEFAULT: None

align_to

A timestamp that defines a point in time to which the window is aligned to modulo window size. For further information, consult the class level documentation.

TYPE: datetime DEFAULT: UNIX_EPOCH

name

The name of this moving window. If None, str(id(self)) will be used. This is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/timeseries/_moving_window.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    size: timedelta,
    resampled_data_recv: Receiver[Sample[Quantity]],
    input_sampling_period: timedelta,
    resampler_config: ResamplerConfig | None = None,
    align_to: datetime = UNIX_EPOCH,
    *,
    name: str | None = None,
) -> None:
    """
    Initialize the MovingWindow.

    This method creates the underlying ring buffer and starts a
    new task that updates the ring buffer with new incoming samples.
    The task stops running only if the channel receiver is closed.

    Args:
        size: The time span of the moving window over which samples will be stored.
        resampled_data_recv: A receiver that delivers samples with a
            given sampling period.
        input_sampling_period: The time interval between consecutive input samples.
        resampler_config: The resampler configuration in case resampling is required.
        align_to: A timestamp that defines a point in time to which
            the window is aligned to modulo window size. For further
            information, consult the class level documentation.
        name: The name of this moving window. If `None`, `str(id(self))` will be
            used. This is used mostly for debugging purposes.
    """
    assert (
        input_sampling_period.total_seconds() > 0
    ), "The input sampling period should be greater than zero."
    assert (
        input_sampling_period <= size
    ), "The input sampling period should be equal to or lower than the window size."
    super().__init__(name=name)

    self._sampling_period = input_sampling_period

    self._resampler: Resampler | None = None
    self._resampler_sender: Sender[Sample[Quantity]] | None = None

    if resampler_config:
        assert (
            resampler_config.resampling_period <= size
        ), "The resampling period should be equal to or lower than the window size."

        self._resampler = Resampler(resampler_config)
        self._sampling_period = resampler_config.resampling_period

    # Sampling period might not fit perfectly into the window size.
    num_samples = math.ceil(
        size.total_seconds() / self._sampling_period.total_seconds()
    )

    self._resampled_data_recv = resampled_data_recv
    self._buffer = OrderedRingBuffer(
        np.empty(shape=num_samples, dtype=float),
        sampling_period=self._sampling_period,
        align_to=align_to,
    )
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
at ¤
at(key: int | datetime) -> float

Return the sample at the given index or timestamp.

In contrast to the window method, which expects a slice as argument, this method expects a single index as argument and returns a single value.

PARAMETER DESCRIPTION
key

The index or timestamp of the sample to return.

TYPE: int | datetime

RETURNS DESCRIPTION
float

The sample at the given index or timestamp.

RAISES DESCRIPTION
IndexError

If the buffer is empty or the index is out of bounds.

Source code in frequenz/sdk/timeseries/_moving_window.py
def at(self, key: int | datetime) -> float:  # pylint: disable=invalid-name
    """
    Return the sample at the given index or timestamp.

    In contrast to the [`window`][frequenz.sdk.timeseries.MovingWindow.window] method,
    which expects a slice as argument, this method expects a single index as argument
    and returns a single value.

    Args:
        key: The index or timestamp of the sample to return.

    Returns:
        The sample at the given index or timestamp.

    Raises:
        IndexError: If the buffer is empty or the index is out of bounds.
    """
    if self._buffer.count_valid() == 0:
        raise IndexError("The buffer is empty.")

    if isinstance(key, datetime):
        assert self._buffer.oldest_timestamp is not None
        assert self._buffer.newest_timestamp is not None
        if (
            key < self._buffer.oldest_timestamp
            or key > self._buffer.newest_timestamp
        ):
            raise IndexError(
                f"Timestamp {key} is out of range [{self._buffer.oldest_timestamp}, "
                f"{self._buffer.newest_timestamp}]"
            )
        return self._buffer[self._buffer.to_internal_index(key)]

    if isinstance(key, int):
        _logger.debug("Returning value at index %s ", key)
        timestamp = self._buffer.get_timestamp(key)
        assert timestamp is not None
        return self._buffer[self._buffer.to_internal_index(timestamp)]

    raise TypeError("Key has to be either a timestamp or an integer.")
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
count_covered ¤
count_covered() -> int

Count the number of samples that are covered by the oldest and newest valid samples.

RETURNS DESCRIPTION
int

The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered.

Source code in frequenz/sdk/timeseries/_moving_window.py
def count_covered(self) -> int:
    """Count the number of samples that are covered by the oldest and newest valid samples.

    Returns:
        The count of samples between the oldest and newest (inclusive) valid samples
            or 0 if there are is no time range covered.
    """
    return self._buffer.count_covered()
count_valid ¤
count_valid() -> int

Count the number of valid samples in this MovingWindow.

RETURNS DESCRIPTION
int

The number of valid samples in this MovingWindow.

Source code in frequenz/sdk/timeseries/_moving_window.py
def count_valid(self) -> int:
    """
    Count the number of valid samples in this `MovingWindow`.

    Returns:
        The number of valid samples in this `MovingWindow`.
    """
    return self._buffer.count_valid()
start ¤
start() -> None

Start the MovingWindow.

This method starts the MovingWindow tasks.

Source code in frequenz/sdk/timeseries/_moving_window.py
def start(self) -> None:
    """Start the MovingWindow.

    This method starts the MovingWindow tasks.
    """
    if self._resampler:
        self._configure_resampler()
    self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window"))
stop async ¤
stop(msg: str | None = None) -> None

Stop this background service.

This method cancels all running tasks spawned by this service and waits for them to finish.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception.

Source code in frequenz/sdk/actor/_background_service.py
async def stop(self, msg: str | None = None) -> None:
    """Stop this background service.

    This method cancels all running tasks spawned by this service and waits for them
    to finish.

    Args:
        msg: The message to be passed to the tasks being cancelled.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception.
    """
    if not self._tasks:
        return
    self.cancel(msg)
    try:
        await self.wait()
    except BaseExceptionGroup as exc_group:
        # We want to ignore CancelledError here as we explicitly cancelled all the
        # tasks.
        _, rest = exc_group.split(asyncio.CancelledError)
        if rest is not None:
            # We are filtering out from an exception group, we really don't want to
            # add the exceptions we just filtered by adding a from clause here.
            raise rest  # pylint: disable=raise-missing-from
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}