Skip to content

formula_engine

frequenz.sdk.timeseries.formula_engine ¤

The formula engine module.

This module exposes the FormulaEngine and FormulaEngine3Phase classes.

Classes¤

frequenz.sdk.timeseries.formula_engine.FormulaEngine ¤

Bases: Generic[QuantityT], _ComposableFormulaEngine['FormulaEngine', 'HigherOrderFormulaBuilder', QuantityT]

FormulaEngines are a part of the SDK's data pipeline, and provide a way for the SDK to apply formulas on resampled data streams.

They are used in the SDK to calculate and stream metrics like grid_power, consumer_power, etc., which are building blocks of the Frequenz SDK Microgrid Model.

The SDK creates the formulas by analysing the configuration of components in the Component Graph.

Streaming Interface¤

The FormulaEngine.new_receiver() method can be used to create a Receiver that streams the Samples calculated by the formula engine.

from frequenz.sdk import microgrid

battery_pool = microgrid.battery_pool()

async for power in battery_pool.power.new_receiver():
    print(f"{power=}")
Composition¤

Composite FormulaEngines can be built using arithmetic operations on FormulaEngines streaming the same type of data.

For example, if you're interested in a particular composite metric that can be calculated by subtracting battery_pool().power and ev_charger_pool().power from the grid().power, we can build a FormulaEngine that provides a stream of this calculated metric as follows:

from frequenz.sdk import microgrid

logical_meter = microgrid.logical_meter()
battery_pool = microgrid.battery_pool()
ev_charger_pool = microgrid.ev_charger_pool()
grid = microgrid.grid()

# apply operations on formula engines to create a formula engine that would
# apply these operations on the corresponding data streams.
net_power = (
    grid.power - (battery_pool.power + ev_charger_pool.power)
).build("net_power")

async for power in net_power.new_receiver():
    print(f"{power=}")
Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
class FormulaEngine(
    Generic[QuantityT],
    _ComposableFormulaEngine[
        "FormulaEngine",  # type: ignore[type-arg]
        "HigherOrderFormulaBuilder",  # type: ignore[type-arg]
        QuantityT,
    ],
):
    """[`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine]s are a
    part of the SDK's data pipeline, and provide a way for the SDK to apply formulas on
    resampled data streams.

    They are used in the SDK to calculate and stream metrics like
    [`grid_power`][frequenz.sdk.timeseries.grid.Grid.power],
    [`consumer_power`][frequenz.sdk.timeseries.consumer.Consumer.power],
    etc., which are building blocks of the
    [Frequenz SDK Microgrid Model][frequenz.sdk.microgrid--frequenz-sdk-microgrid-model].

    The SDK creates the formulas by analysing the configuration of components in the
    {{glossary("Component Graph")}}.

    ### Streaming Interface

    The
    [`FormulaEngine.new_receiver()`][frequenz.sdk.timeseries.formula_engine.FormulaEngine.new_receiver]
    method can be used to create a
    [Receiver](https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/#frequenz.channels.Receiver)
    that streams the [Sample][frequenz.sdk.timeseries.Sample]s calculated by the formula
    engine.

    ```python
    from frequenz.sdk import microgrid

    battery_pool = microgrid.battery_pool()

    async for power in battery_pool.power.new_receiver():
        print(f"{power=}")
    ```

    ### Composition

    Composite `FormulaEngine`s can be built using arithmetic operations on
    `FormulaEngine`s streaming the same type of data.

    For example, if you're interested in a particular composite metric that can be
    calculated by subtracting
    [`battery_pool().power`][frequenz.sdk.timeseries.battery_pool.BatteryPool.power] and
    [`ev_charger_pool().power`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool]
    from the
    [`grid().power`][frequenz.sdk.timeseries.grid.Grid.power],
    we can build a `FormulaEngine` that provides a stream of this calculated metric as
    follows:

    ```python
    from frequenz.sdk import microgrid

    logical_meter = microgrid.logical_meter()
    battery_pool = microgrid.battery_pool()
    ev_charger_pool = microgrid.ev_charger_pool()
    grid = microgrid.grid()

    # apply operations on formula engines to create a formula engine that would
    # apply these operations on the corresponding data streams.
    net_power = (
        grid.power - (battery_pool.power + ev_charger_pool.power)
    ).build("net_power")

    async for power in net_power.new_receiver():
        print(f"{power=}")
    ```
    """  # noqa: D400, D205

    def __init__(
        self,
        builder: FormulaBuilder[QuantityT],
        create_method: Callable[[float], QuantityT],
    ) -> None:
        """Create a `FormulaEngine` instance.

        Args:
            builder: A `FormulaBuilder` instance to get the formula steps and metric
                fetchers from.
            create_method: A method to generate the output `Sample` value with.  If the
                formula is for generating power values, this would be
                `Power.from_watts`, for example.
        """
        self._higher_order_builder = HigherOrderFormulaBuilder
        self._name: str = builder.name
        self._builder: FormulaBuilder[QuantityT] = builder
        self._create_method = create_method
        self._channel: Broadcast[Sample[QuantityT]] = Broadcast(name=self._name)

    @classmethod
    def from_receiver(
        cls,
        name: str,
        receiver: Receiver[Sample[QuantityT]],
        create_method: Callable[[float], QuantityT],
        *,
        nones_are_zeros: bool = False,
    ) -> FormulaEngine[QuantityT]:
        """
        Create a formula engine from a receiver.

        Can be used to compose a formula engine with a receiver. When composing
        the new engine with other engines, make sure that receiver gets data
        from the same resampler and that the `create_method`s match.

        Example:
            ```python
            from frequenz.sdk import microgrid
            from frequenz.sdk.timeseries import Power

            async def run() -> None:
                producer_power_engine = microgrid.producer().power
                consumer_power_recv = microgrid.consumer().power.new_receiver()

                excess_power_recv = (
                    (
                        producer_power_engine
                        + FormulaEngine.from_receiver(
                            "consumer power",
                            consumer_power_recv,
                            Power.from_watts,
                        )
                    )
                    .build("excess power")
                    .new_receiver()
                )

            asyncio.run(run())
            ```

        Args:
            name: A name for the formula engine.
            receiver: A receiver that streams `Sample`s.
            create_method: A method to generate the output `Sample` value with,
                e.g. `Power.from_watts`.
            nones_are_zeros: If `True`, `None` values in the receiver are treated as 0.

        Returns:
            A formula engine that streams the `Sample`s from the receiver.
        """
        builder = FormulaBuilder(name, create_method)
        builder.push_metric(name, receiver, nones_are_zeros=nones_are_zeros)
        return cls(builder, create_method)

    async def _run(self) -> None:
        await self._builder.subscribe()
        steps, metric_fetchers = self._builder.finalize()
        evaluator = FormulaEvaluator[QuantityT](
            self._name, steps, metric_fetchers, self._create_method
        )
        sender = self._channel.new_sender()
        while True:
            try:
                msg = await evaluator.apply()
            except asyncio.CancelledError:
                _logger.exception("FormulaEngine task cancelled: %s", self._name)
                raise
            except Exception as err:  # pylint: disable=broad-except
                _logger.warning(
                    "Formula application failed: %s. Error: %s", self._name, err
                )
            else:
                await sender.send(msg)

    def __str__(self) -> str:
        """Return a string representation of the formula.

        Returns:
            A string representation of the formula.
        """
        steps = (
            self._builder._build_stack
            if len(self._builder._build_stack) > 0
            else self._builder._steps
        )
        return format_formula(steps)

    def new_receiver(
        self, name: str | None = None, max_size: int = 50
    ) -> Receiver[Sample[QuantityT]]:
        """Create a new receiver that streams the output of the formula engine.

        Args:
            name: An optional name for the receiver.
            max_size: The size of the receiver's buffer.

        Returns:
            A receiver that streams output `Sample`s from the formula engine.
        """
        if self._task is None:
            self._task = asyncio.create_task(self._run())

        recv = self._channel.new_receiver(name=name, limit=max_size)

        # This is a hack to ensure that the lifetime of the engine is tied to the
        # lifetime of the receiver.  This is necessary because the engine is a task that
        # runs forever, and in cases where higher order built for example with the below
        # idiom, the user would hold no references to the engine and it could get
        # garbage collected before the receiver.  This behaviour is explained in the
        # `asyncio.create_task` docs here:
        # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
        #
        #     formula = (grid_power_engine + bat_power_engine).build().new_receiver()
        recv._engine_reference = self  # type: ignore # pylint: disable=protected-access
        return recv
Functions¤
__add__ ¤
__add__(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that adds (data in) other to self.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __add__(
    self,
    other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT,
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that adds (data in) `other` to `self`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) + other  # type: ignore
__init__ ¤
__init__(
    builder: FormulaBuilder[QuantityT],
    create_method: Callable[[float], QuantityT],
) -> None

Create a FormulaEngine instance.

PARAMETER DESCRIPTION
builder

A FormulaBuilder instance to get the formula steps and metric fetchers from.

TYPE: FormulaBuilder[QuantityT]

create_method

A method to generate the output Sample value with. If the formula is for generating power values, this would be Power.from_watts, for example.

TYPE: Callable[[float], QuantityT]

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __init__(
    self,
    builder: FormulaBuilder[QuantityT],
    create_method: Callable[[float], QuantityT],
) -> None:
    """Create a `FormulaEngine` instance.

    Args:
        builder: A `FormulaBuilder` instance to get the formula steps and metric
            fetchers from.
        create_method: A method to generate the output `Sample` value with.  If the
            formula is for generating power values, this would be
            `Power.from_watts`, for example.
    """
    self._higher_order_builder = HigherOrderFormulaBuilder
    self._name: str = builder.name
    self._builder: FormulaBuilder[QuantityT] = builder
    self._create_method = create_method
    self._channel: Broadcast[Sample[QuantityT]] = Broadcast(name=self._name)
__mul__ ¤
__mul__(
    other: (
        _GenericEngine | _GenericHigherOrderBuilder | float
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that multiplies (data in) self with other.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | float

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __mul__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | float
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that multiplies (data in) `self` with `other`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) * other  # type: ignore
__str__ ¤
__str__() -> str

Return a string representation of the formula.

RETURNS DESCRIPTION
str

A string representation of the formula.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __str__(self) -> str:
    """Return a string representation of the formula.

    Returns:
        A string representation of the formula.
    """
    steps = (
        self._builder._build_stack
        if len(self._builder._build_stack) > 0
        else self._builder._steps
    )
    return format_formula(steps)
__sub__ ¤
__sub__(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that subtracts (data in) other from self.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __sub__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that subtracts (data in) `other` from `self`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) - other  # type: ignore
__truediv__ ¤
__truediv__(
    other: (
        _GenericEngine | _GenericHigherOrderBuilder | float
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that divides (data in) self by other.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | float

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __truediv__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | float
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that divides (data in) `self` by `other`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) / other  # type: ignore
consumption ¤
consumption() -> _GenericHigherOrderBuilder

Return a formula builder that applies the consumption operator on self.

The consumption operator returns either the identity if the power value is positive or 0.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def consumption(self) -> _GenericHigherOrderBuilder:
    """
    Return a formula builder that applies the consumption operator on `self`.

    The consumption operator returns either the identity if the power value is
    positive or 0.
    """
    return self._higher_order_builder(self, self._create_method).consumption()  # type: ignore
from_receiver classmethod ¤
from_receiver(
    name: str,
    receiver: Receiver[Sample[QuantityT]],
    create_method: Callable[[float], QuantityT],
    *,
    nones_are_zeros: bool = False
) -> FormulaEngine[QuantityT]

Create a formula engine from a receiver.

Can be used to compose a formula engine with a receiver. When composing the new engine with other engines, make sure that receiver gets data from the same resampler and that the create_methods match.

Example
from frequenz.sdk import microgrid
from frequenz.sdk.timeseries import Power

async def run() -> None:
    producer_power_engine = microgrid.producer().power
    consumer_power_recv = microgrid.consumer().power.new_receiver()

    excess_power_recv = (
        (
            producer_power_engine
            + FormulaEngine.from_receiver(
                "consumer power",
                consumer_power_recv,
                Power.from_watts,
            )
        )
        .build("excess power")
        .new_receiver()
    )

asyncio.run(run())
PARAMETER DESCRIPTION
name

A name for the formula engine.

TYPE: str

receiver

A receiver that streams Samples.

TYPE: Receiver[Sample[QuantityT]]

create_method

A method to generate the output Sample value with, e.g. Power.from_watts.

TYPE: Callable[[float], QuantityT]

nones_are_zeros

If True, None values in the receiver are treated as 0.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
FormulaEngine[QuantityT]

A formula engine that streams the Samples from the receiver.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
@classmethod
def from_receiver(
    cls,
    name: str,
    receiver: Receiver[Sample[QuantityT]],
    create_method: Callable[[float], QuantityT],
    *,
    nones_are_zeros: bool = False,
) -> FormulaEngine[QuantityT]:
    """
    Create a formula engine from a receiver.

    Can be used to compose a formula engine with a receiver. When composing
    the new engine with other engines, make sure that receiver gets data
    from the same resampler and that the `create_method`s match.

    Example:
        ```python
        from frequenz.sdk import microgrid
        from frequenz.sdk.timeseries import Power

        async def run() -> None:
            producer_power_engine = microgrid.producer().power
            consumer_power_recv = microgrid.consumer().power.new_receiver()

            excess_power_recv = (
                (
                    producer_power_engine
                    + FormulaEngine.from_receiver(
                        "consumer power",
                        consumer_power_recv,
                        Power.from_watts,
                    )
                )
                .build("excess power")
                .new_receiver()
            )

        asyncio.run(run())
        ```

    Args:
        name: A name for the formula engine.
        receiver: A receiver that streams `Sample`s.
        create_method: A method to generate the output `Sample` value with,
            e.g. `Power.from_watts`.
        nones_are_zeros: If `True`, `None` values in the receiver are treated as 0.

    Returns:
        A formula engine that streams the `Sample`s from the receiver.
    """
    builder = FormulaBuilder(name, create_method)
    builder.push_metric(name, receiver, nones_are_zeros=nones_are_zeros)
    return cls(builder, create_method)
max ¤
max(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula engine that outputs the maximum of self and other.

PARAMETER DESCRIPTION
other

A formula receiver, a formula builder or a QuantityT instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def max(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula engine that outputs the maximum of `self` and `other`.

    Args:
        other: A formula receiver, a formula builder or a QuantityT instance
            corresponding to a sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method).max(other)  # type: ignore
min ¤
min(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula engine that outputs the minimum of self and other.

PARAMETER DESCRIPTION
other

A formula receiver, a formula builder or a QuantityT instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def min(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula engine that outputs the minimum of `self` and `other`.

    Args:
        other: A formula receiver, a formula builder or a QuantityT instance
            corresponding to a sub-expression.


    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method).min(other)  # type: ignore
new_receiver ¤
new_receiver(
    name: str | None = None, max_size: int = 50
) -> Receiver[Sample[QuantityT]]

Create a new receiver that streams the output of the formula engine.

PARAMETER DESCRIPTION
name

An optional name for the receiver.

TYPE: str | None DEFAULT: None

max_size

The size of the receiver's buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[Sample[QuantityT]]

A receiver that streams output Samples from the formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def new_receiver(
    self, name: str | None = None, max_size: int = 50
) -> Receiver[Sample[QuantityT]]:
    """Create a new receiver that streams the output of the formula engine.

    Args:
        name: An optional name for the receiver.
        max_size: The size of the receiver's buffer.

    Returns:
        A receiver that streams output `Sample`s from the formula engine.
    """
    if self._task is None:
        self._task = asyncio.create_task(self._run())

    recv = self._channel.new_receiver(name=name, limit=max_size)

    # This is a hack to ensure that the lifetime of the engine is tied to the
    # lifetime of the receiver.  This is necessary because the engine is a task that
    # runs forever, and in cases where higher order built for example with the below
    # idiom, the user would hold no references to the engine and it could get
    # garbage collected before the receiver.  This behaviour is explained in the
    # `asyncio.create_task` docs here:
    # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
    #
    #     formula = (grid_power_engine + bat_power_engine).build().new_receiver()
    recv._engine_reference = self  # type: ignore # pylint: disable=protected-access
    return recv
production ¤
production() -> _GenericHigherOrderBuilder

Return a formula builder that applies the production operator on self.

The production operator returns either the absolute value if the power value is negative or 0.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def production(self) -> _GenericHigherOrderBuilder:
    """
    Return a formula builder that applies the production operator on `self`.

    The production operator returns either the absolute value if the power value is
    negative or 0.
    """
    return self._higher_order_builder(self, self._create_method).production()  # type: ignore

frequenz.sdk.timeseries.formula_engine.FormulaEngine3Phase ¤

Bases: _ComposableFormulaEngine['FormulaEngine3Phase', 'HigherOrderFormulaBuilder3Phase', QuantityT]

A FormulaEngine3Phase is similar to a FormulaEngine, except that they stream 3-phase samples. All the current formulas (like Grid.current, EVChargerPool.current, etc.) are implemented as 3-phase formulas.

Streaming Interface¤

The FormulaEngine3Phase.new_receiver() method can be used to create a Receiver that streams the Sample3Phase values calculated by the formula engine.

from frequenz.sdk import microgrid

ev_charger_pool = microgrid.ev_charger_pool()

async for sample in ev_charger_pool.current.new_receiver():
    print(f"Current: {sample}")
Composition¤

FormulaEngine3Phase instances can be composed together, just like FormulaEngine instances.

from frequenz.sdk import microgrid

logical_meter = microgrid.logical_meter()
ev_charger_pool = microgrid.ev_charger_pool()
grid = microgrid.grid()

# Calculate grid consumption current that's not used by the EV chargers
other_current = (grid.current - ev_charger_pool.current).build("other_current")

async for sample in other_current.new_receiver():
    print(f"Other current: {sample}")
Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
class FormulaEngine3Phase(
    _ComposableFormulaEngine[
        "FormulaEngine3Phase",  # type: ignore[type-arg]
        "HigherOrderFormulaBuilder3Phase",  # type: ignore[type-arg]
        QuantityT,
    ]
):
    """A
    [`FormulaEngine3Phase`][frequenz.sdk.timeseries.formula_engine.FormulaEngine3Phase]
    is similar to a
    [`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine], except that
    they stream [3-phase samples][frequenz.sdk.timeseries.Sample3Phase].  All the
    current formulas (like
    [`Grid.current`][frequenz.sdk.timeseries.grid.Grid.current],
    [`EVChargerPool.current`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.current],
    etc.) are implemented as 3-phase formulas.

    ### Streaming Interface

    The
    [`FormulaEngine3Phase.new_receiver()`][frequenz.sdk.timeseries.formula_engine.FormulaEngine3Phase.new_receiver]
    method can be used to create a
    [Receiver](https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/#frequenz.channels.Receiver)
    that streams the [Sample3Phase][frequenz.sdk.timeseries.Sample3Phase] values
    calculated by the formula engine.

    ```python
    from frequenz.sdk import microgrid

    ev_charger_pool = microgrid.ev_charger_pool()

    async for sample in ev_charger_pool.current.new_receiver():
        print(f"Current: {sample}")
    ```

    ### Composition

    `FormulaEngine3Phase` instances can be composed together, just like `FormulaEngine`
    instances.

    ```python
    from frequenz.sdk import microgrid

    logical_meter = microgrid.logical_meter()
    ev_charger_pool = microgrid.ev_charger_pool()
    grid = microgrid.grid()

    # Calculate grid consumption current that's not used by the EV chargers
    other_current = (grid.current - ev_charger_pool.current).build("other_current")

    async for sample in other_current.new_receiver():
        print(f"Other current: {sample}")
    ```
    """  # noqa: D205, D400

    def __init__(
        self,
        name: str,
        create_method: Callable[[float], QuantityT],
        phase_streams: tuple[
            FormulaEngine[QuantityT],
            FormulaEngine[QuantityT],
            FormulaEngine[QuantityT],
        ],
    ) -> None:
        """Create a `FormulaEngine3Phase` instance.

        Args:
            name: A name for the formula.
            create_method: A method to generate the output `Sample` value with.  If the
                formula is for generating power values, this would be
                `Power.from_watts`, for example.
            phase_streams: output streams of formula engines running per-phase formulas.
        """
        self._higher_order_builder = HigherOrderFormulaBuilder3Phase
        self._name: str = name
        self._create_method = create_method
        self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(name=self._name)
        self._task: asyncio.Task[None] | None = None
        self._streams: tuple[
            FormulaEngine[QuantityT],
            FormulaEngine[QuantityT],
            FormulaEngine[QuantityT],
        ] = phase_streams

    async def _run(self) -> None:
        sender = self._channel.new_sender()
        phase_1_rx = self._streams[0].new_receiver()
        phase_2_rx = self._streams[1].new_receiver()
        phase_3_rx = self._streams[2].new_receiver()

        while True:
            try:
                phase_1 = await phase_1_rx.receive()
                phase_2 = await phase_2_rx.receive()
                phase_3 = await phase_3_rx.receive()
                msg = Sample3Phase(
                    phase_1.timestamp,
                    phase_1.value,
                    phase_2.value,
                    phase_3.value,
                )
            except asyncio.CancelledError:
                _logger.exception("FormulaEngine task cancelled: %s", self._name)
                break
            else:
                await sender.send(msg)

    def new_receiver(
        self, name: str | None = None, max_size: int = 50
    ) -> Receiver[Sample3Phase[QuantityT]]:
        """Create a new receiver that streams the output of the formula engine.

        Args:
            name: An optional name for the receiver.
            max_size: The size of the receiver's buffer.

        Returns:
            A receiver that streams output `Sample`s from the formula engine.
        """
        if self._task is None:
            self._task = asyncio.create_task(self._run())

        return self._channel.new_receiver(name=name, limit=max_size)
Functions¤
__add__ ¤
__add__(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that adds (data in) other to self.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __add__(
    self,
    other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT,
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that adds (data in) `other` to `self`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) + other  # type: ignore
__init__ ¤
__init__(
    name: str,
    create_method: Callable[[float], QuantityT],
    phase_streams: tuple[
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
    ],
) -> None

Create a FormulaEngine3Phase instance.

PARAMETER DESCRIPTION
name

A name for the formula.

TYPE: str

create_method

A method to generate the output Sample value with. If the formula is for generating power values, this would be Power.from_watts, for example.

TYPE: Callable[[float], QuantityT]

phase_streams

output streams of formula engines running per-phase formulas.

TYPE: tuple[FormulaEngine[QuantityT], FormulaEngine[QuantityT], FormulaEngine[QuantityT]]

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __init__(
    self,
    name: str,
    create_method: Callable[[float], QuantityT],
    phase_streams: tuple[
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
    ],
) -> None:
    """Create a `FormulaEngine3Phase` instance.

    Args:
        name: A name for the formula.
        create_method: A method to generate the output `Sample` value with.  If the
            formula is for generating power values, this would be
            `Power.from_watts`, for example.
        phase_streams: output streams of formula engines running per-phase formulas.
    """
    self._higher_order_builder = HigherOrderFormulaBuilder3Phase
    self._name: str = name
    self._create_method = create_method
    self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(name=self._name)
    self._task: asyncio.Task[None] | None = None
    self._streams: tuple[
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
        FormulaEngine[QuantityT],
    ] = phase_streams
__mul__ ¤
__mul__(
    other: (
        _GenericEngine | _GenericHigherOrderBuilder | float
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that multiplies (data in) self with other.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | float

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __mul__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | float
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that multiplies (data in) `self` with `other`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) * other  # type: ignore
__sub__ ¤
__sub__(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that subtracts (data in) other from self.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __sub__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that subtracts (data in) `other` from `self`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) - other  # type: ignore
__truediv__ ¤
__truediv__(
    other: (
        _GenericEngine | _GenericHigherOrderBuilder | float
    ),
) -> _GenericHigherOrderBuilder

Return a formula builder that divides (data in) self by other.

PARAMETER DESCRIPTION
other

A formula receiver, or a formula builder instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | float

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def __truediv__(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | float
) -> _GenericHigherOrderBuilder:
    """Return a formula builder that divides (data in) `self` by `other`.

    Args:
        other: A formula receiver, or a formula builder instance corresponding to a
            sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method) / other  # type: ignore
consumption ¤
consumption() -> _GenericHigherOrderBuilder

Return a formula builder that applies the consumption operator on self.

The consumption operator returns either the identity if the power value is positive or 0.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def consumption(self) -> _GenericHigherOrderBuilder:
    """
    Return a formula builder that applies the consumption operator on `self`.

    The consumption operator returns either the identity if the power value is
    positive or 0.
    """
    return self._higher_order_builder(self, self._create_method).consumption()  # type: ignore
max ¤
max(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula engine that outputs the maximum of self and other.

PARAMETER DESCRIPTION
other

A formula receiver, a formula builder or a QuantityT instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def max(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula engine that outputs the maximum of `self` and `other`.

    Args:
        other: A formula receiver, a formula builder or a QuantityT instance
            corresponding to a sub-expression.

    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method).max(other)  # type: ignore
min ¤
min(
    other: (
        _GenericEngine
        | _GenericHigherOrderBuilder
        | QuantityT
    ),
) -> _GenericHigherOrderBuilder

Return a formula engine that outputs the minimum of self and other.

PARAMETER DESCRIPTION
other

A formula receiver, a formula builder or a QuantityT instance corresponding to a sub-expression.

TYPE: _GenericEngine | _GenericHigherOrderBuilder | QuantityT

RETURNS DESCRIPTION
_GenericHigherOrderBuilder

A formula builder that can take further expressions, or can be built into a formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def min(
    self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT
) -> _GenericHigherOrderBuilder:
    """Return a formula engine that outputs the minimum of `self` and `other`.

    Args:
        other: A formula receiver, a formula builder or a QuantityT instance
            corresponding to a sub-expression.


    Returns:
        A formula builder that can take further expressions, or can be built
            into a formula engine.
    """
    return self._higher_order_builder(self, self._create_method).min(other)  # type: ignore
new_receiver ¤
new_receiver(
    name: str | None = None, max_size: int = 50
) -> Receiver[Sample3Phase[QuantityT]]

Create a new receiver that streams the output of the formula engine.

PARAMETER DESCRIPTION
name

An optional name for the receiver.

TYPE: str | None DEFAULT: None

max_size

The size of the receiver's buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[Sample3Phase[QuantityT]]

A receiver that streams output Samples from the formula engine.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def new_receiver(
    self, name: str | None = None, max_size: int = 50
) -> Receiver[Sample3Phase[QuantityT]]:
    """Create a new receiver that streams the output of the formula engine.

    Args:
        name: An optional name for the receiver.
        max_size: The size of the receiver's buffer.

    Returns:
        A receiver that streams output `Sample`s from the formula engine.
    """
    if self._task is None:
        self._task = asyncio.create_task(self._run())

    return self._channel.new_receiver(name=name, limit=max_size)
production ¤
production() -> _GenericHigherOrderBuilder

Return a formula builder that applies the production operator on self.

The production operator returns either the absolute value if the power value is negative or 0.

Source code in frequenz/sdk/timeseries/formula_engine/_formula_engine.py
def production(self) -> _GenericHigherOrderBuilder:
    """
    Return a formula builder that applies the production operator on `self`.

    The production operator returns either the absolute value if the power value is
    negative or 0.
    """
    return self._higher_order_builder(self, self._create_method).production()  # type: ignore