Skip to content

formulas

frequenz.sdk.timeseries.formulas ¤

Provides a way for the SDK to apply formulas on resampled data streams.

Formulas¤

Formulas are used in the SDK to calculate and stream metrics like grid_power, consumer_power, etc., which are building blocks of the Frequenz SDK Microgrid Model.

The SDK creates the formulas by analysing the configuration of components in the Component Graph.

Streaming Interface¤

The Formula.new_receiver() method can be used to create a Receiver that streams the Samples calculated by the evaluation of the formula.

from frequenz.sdk import microgrid

battery_pool = microgrid.new_battery_pool(priority=5)

async for power in battery_pool.power.new_receiver():
    print(f"{power=}")

Composition¤

Composite Formulas can be built using arithmetic operations on Formulas streaming the same type of data.

For example, if you're interested in a particular composite metric that can be calculated by subtracting new_battery_pool().power and new_ev_charger_pool().power from the grid().power, we can build a Formula that provides a stream of this calculated metric as follows:

from frequenz.sdk import microgrid

battery_pool = microgrid.new_battery_pool(priority=5)
ev_charger_pool = microgrid.new_ev_charger_pool(priority=5)
grid = microgrid.grid()

# apply operations on formulas to create a new formula that would
# apply these operations on the corresponding data streams.
net_power = (
    grid.power - (battery_pool.power + ev_charger_pool.power)
).build("net_power")

async for power in net_power.new_receiver():
    print(f"{power=}")

3-Phase Formulas¤

A Formula3Phase is similar to a Formula, except that it streams 3-phase samples. All the current formulas (like Grid.current_per_phase, EVChargerPool.current_per_phase, etc.) are implemented as 3-phase formulas.

Streaming Interface¤

The Formula3Phase.new_receiver() method can be used to create a Receiver that streams the Sample3Phase values calculated by 3-phase formulas.

from frequenz.sdk import microgrid

ev_charger_pool = microgrid.new_ev_charger_pool(priority=5)

async for sample in ev_charger_pool.current_per_phase.new_receiver():
    print(f"Current: {sample}")

Composition¤

Formula3Phase instances can be composed together, just like Formula instances.

from frequenz.sdk import microgrid

ev_charger_pool = microgrid.new_ev_charger_pool(priority=5)
grid = microgrid.grid()

# Calculate grid consumption current that's not used by the EV chargers
other_current = (grid.current_per_phase - ev_charger_pool.current_per_phase).build(
    "other_current"
)

async for sample in other_current.new_receiver():
    print(f"Other current: {sample}")

Classes¤

frequenz.sdk.timeseries.formulas.Formula ¤

Bases: BackgroundService, Generic[QuantityT]

A formula represented as an AST.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
class Formula(BackgroundService, Generic[QuantityT]):
    """A formula represented as an AST."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        *,
        name: str,
        root: AstNode[QuantityT],
        create_method: Callable[[float], QuantityT],
        sub_formulas: list[Formula[QuantityT]] | None = None,
        metric_fetcher: ResampledStreamFetcher | None = None,
    ) -> None:
        """Create a `Formula` instance.

        Args:
            name: The name of the formula.
            root: The root node of the formula AST.
            create_method: A method to generate the output values with.  If the
                formula is for generating power values, this would be
                `Power.from_watts`, for example.
            sub_formulas: Any sub-formulas that this formula depends on.
            metric_fetcher: An optional metric fetcher that needs to be started
                before the formula can be evaluated.
        """
        BackgroundService.__init__(self)
        self._name: str = name
        self._root: AstNode[QuantityT] = root
        self._create_method: Callable[[float], QuantityT] = create_method
        self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []

        self._channel: Broadcast[Sample[QuantityT]] = Broadcast(
            name=f"{self}",
            resend_latest=True,
        )
        self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor(
            root=self._root,
            output_channel=self._channel,
            metric_fetcher=metric_fetcher,
        )

    @override
    def __str__(self) -> str:
        """Return a string representation of the formula."""
        return f"[{self._name}]({self._root})"

    def new_receiver(self, *, max_size: int = 50) -> Receiver[Sample[QuantityT]]:
        """Subscribe to the formula evaluator to get evaluated samples."""
        if not self._evaluator.is_running:
            self.start()
        return self._channel.new_receiver(limit=max_size)

    @override
    def start(self) -> None:
        """Start the formula evaluator."""
        for sub_formula in self._sub_formulas:
            sub_formula.start()
        self._evaluator.start()

    @override
    async def stop(self, msg: str | None = None) -> None:
        """Stop the formula evaluator."""
        await BackgroundService.stop(self, msg)
        for sub_formula in self._sub_formulas:
            await sub_formula.stop(msg)
        await self._evaluator.stop(msg)

    def __add__(
        self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
    ) -> FormulaBuilder[QuantityT]:
        """Create an addition operation node."""
        return FormulaBuilder(self, self._create_method) + other

    def __sub__(
        self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
    ) -> FormulaBuilder[QuantityT]:
        """Create a subtraction operation node."""
        return FormulaBuilder(self, self._create_method) - other

    def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
        """Create a multiplication operation node."""
        return FormulaBuilder(self, self._create_method) * other

    def __truediv__(self, other: float) -> FormulaBuilder[QuantityT]:
        """Create a division operation node."""
        return FormulaBuilder(self, self._create_method) / other

    def coalesce(
        self,
        *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
    ) -> FormulaBuilder[QuantityT]:
        """Create a coalesce operation node."""
        return FormulaBuilder(self, self._create_method).coalesce(*other)

    def min(
        self,
        *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
    ) -> FormulaBuilder[QuantityT]:
        """Create a min operation node."""
        return FormulaBuilder(self, self._create_method).min(*other)

    def max(
        self,
        *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
    ) -> FormulaBuilder[QuantityT]:
        """Create a max operation node."""
        return FormulaBuilder(self, self._create_method).max(*other)
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__add__ ¤
__add__(
    other: (
        FormulaBuilder[QuantityT]
        | QuantityT
        | Formula[QuantityT]
    ),
) -> FormulaBuilder[QuantityT]

Create an addition operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def __add__(
    self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
) -> FormulaBuilder[QuantityT]:
    """Create an addition operation node."""
    return FormulaBuilder(self, self._create_method) + other
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in src/frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__init__ ¤
__init__(
    *,
    name: str,
    root: AstNode[QuantityT],
    create_method: Callable[[float], QuantityT],
    sub_formulas: list[Formula[QuantityT]] | None = None,
    metric_fetcher: ResampledStreamFetcher | None = None
) -> None

Create a Formula instance.

PARAMETER DESCRIPTION
name

The name of the formula.

TYPE: str

root

The root node of the formula AST.

TYPE: AstNode[QuantityT]

create_method

A method to generate the output values with. If the formula is for generating power values, this would be Power.from_watts, for example.

TYPE: Callable[[float], QuantityT]

sub_formulas

Any sub-formulas that this formula depends on.

TYPE: list[Formula[QuantityT]] | None DEFAULT: None

metric_fetcher

An optional metric fetcher that needs to be started before the formula can be evaluated.

TYPE: ResampledStreamFetcher | None DEFAULT: None

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    *,
    name: str,
    root: AstNode[QuantityT],
    create_method: Callable[[float], QuantityT],
    sub_formulas: list[Formula[QuantityT]] | None = None,
    metric_fetcher: ResampledStreamFetcher | None = None,
) -> None:
    """Create a `Formula` instance.

    Args:
        name: The name of the formula.
        root: The root node of the formula AST.
        create_method: A method to generate the output values with.  If the
            formula is for generating power values, this would be
            `Power.from_watts`, for example.
        sub_formulas: Any sub-formulas that this formula depends on.
        metric_fetcher: An optional metric fetcher that needs to be started
            before the formula can be evaluated.
    """
    BackgroundService.__init__(self)
    self._name: str = name
    self._root: AstNode[QuantityT] = root
    self._create_method: Callable[[float], QuantityT] = create_method
    self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []

    self._channel: Broadcast[Sample[QuantityT]] = Broadcast(
        name=f"{self}",
        resend_latest=True,
    )
    self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor(
        root=self._root,
        output_channel=self._channel,
        metric_fetcher=metric_fetcher,
    )
__mul__ ¤
__mul__(other: float) -> FormulaBuilder[QuantityT]

Create a multiplication operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
    """Create a multiplication operation node."""
    return FormulaBuilder(self, self._create_method) * other
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of the formula.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
@override
def __str__(self) -> str:
    """Return a string representation of the formula."""
    return f"[{self._name}]({self._root})"
__sub__ ¤
__sub__(
    other: (
        FormulaBuilder[QuantityT]
        | QuantityT
        | Formula[QuantityT]
    ),
) -> FormulaBuilder[QuantityT]

Create a subtraction operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def __sub__(
    self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
) -> FormulaBuilder[QuantityT]:
    """Create a subtraction operation node."""
    return FormulaBuilder(self, self._create_method) - other
__truediv__ ¤
__truediv__(other: float) -> FormulaBuilder[QuantityT]

Create a division operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def __truediv__(self, other: float) -> FormulaBuilder[QuantityT]:
    """Create a division operation node."""
    return FormulaBuilder(self, self._create_method) / other
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in src/frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
coalesce ¤
coalesce(
    *other: FormulaBuilder[QuantityT]
    | QuantityT
    | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]

Create a coalesce operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def coalesce(
    self,
    *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]:
    """Create a coalesce operation node."""
    return FormulaBuilder(self, self._create_method).coalesce(*other)
max ¤
max(
    *other: FormulaBuilder[QuantityT]
    | QuantityT
    | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]

Create a max operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def max(
    self,
    *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]:
    """Create a max operation node."""
    return FormulaBuilder(self, self._create_method).max(*other)
min ¤
min(
    *other: FormulaBuilder[QuantityT]
    | QuantityT
    | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]

Create a min operation node.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def min(
    self,
    *other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]:
    """Create a min operation node."""
    return FormulaBuilder(self, self._create_method).min(*other)
new_receiver ¤
new_receiver(
    *, max_size: int = 50
) -> Receiver[Sample[QuantityT]]

Subscribe to the formula evaluator to get evaluated samples.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
def new_receiver(self, *, max_size: int = 50) -> Receiver[Sample[QuantityT]]:
    """Subscribe to the formula evaluator to get evaluated samples."""
    if not self._evaluator.is_running:
        self.start()
    return self._channel.new_receiver(limit=max_size)
start ¤
start() -> None

Start the formula evaluator.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
@override
def start(self) -> None:
    """Start the formula evaluator."""
    for sub_formula in self._sub_formulas:
        sub_formula.start()
    self._evaluator.start()
stop async ¤
stop(msg: str | None = None) -> None

Stop the formula evaluator.

Source code in src/frequenz/sdk/timeseries/formulas/_formula.py
@override
async def stop(self, msg: str | None = None) -> None:
    """Stop the formula evaluator."""
    await BackgroundService.stop(self, msg)
    for sub_formula in self._sub_formulas:
        await sub_formula.stop(msg)
    await self._evaluator.stop(msg)
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in src/frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )

frequenz.sdk.timeseries.formulas.Formula3Phase ¤

Bases: BackgroundService, Generic[QuantityT]

A composite formula for three-phase metrics.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
class Formula3Phase(BackgroundService, Generic[QuantityT]):
    """A composite formula for three-phase metrics."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        *,
        name: str,
        phase_1: Formula[QuantityT],
        phase_2: Formula[QuantityT],
        phase_3: Formula[QuantityT],
        sub_formulas: list[Formula3Phase[QuantityT]] | None = None,
    ) -> None:
        """Initialize this instance.

        Args:
            name: The name of the formula.
            phase_1: The formula for phase 1.
            phase_2: The formula for phase 2.
            phase_3: The formula for phase 3.
            sub_formulas: Sub-formulas that need to be started before this formula.
        """
        BackgroundService.__init__(self)
        self._formula_p1: Formula[QuantityT] = phase_1
        self._formula_p2: Formula[QuantityT] = phase_2
        self._formula_p3: Formula[QuantityT] = phase_3
        self._create_method: Callable[[float], QuantityT] = phase_1._create_method

        self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(
            name=f"[Formula3Phase:{name}]({phase_1.name})"
        )
        self._sub_formulas: list[Formula3Phase[QuantityT]] = sub_formulas or []
        self._evaluator: Formula3PhaseEvaluatingActor[QuantityT] = (
            Formula3PhaseEvaluatingActor(phase_1, phase_2, phase_3, self._channel)
        )

    def new_receiver(self, *, max_size: int = 50) -> Receiver[Sample3Phase[QuantityT]]:
        """Subscribe to the output of this formula."""
        if not self._evaluator.is_running:
            self.start()
        return self._channel.new_receiver(limit=max_size)

    @override
    def start(self) -> None:
        """Start the per-phase and sub formulas."""
        for sub_formula in self._sub_formulas:
            sub_formula.start()
        self._formula_p1.start()
        self._formula_p2.start()
        self._formula_p3.start()
        self._evaluator.start()

    @override
    async def stop(self, msg: str | None = None) -> None:
        """Stop the formula."""
        await BackgroundService.stop(self, msg)
        for sub_formula in self._sub_formulas:
            await sub_formula.stop(msg)
        await self._formula_p1.stop(msg)
        await self._formula_p2.stop(msg)
        await self._formula_p3.stop(msg)
        await self._evaluator.stop(msg)

    def __add__(
        self,
        other: Formula3PhaseBuilder[QuantityT] | Formula3Phase[QuantityT],
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Add two three-phase formulas."""
        return Formula3PhaseBuilder(self, create_method=self._create_method) + other

    def __sub__(
        self,
        other: Formula3PhaseBuilder[QuantityT] | Formula3Phase[QuantityT],
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Subtract two three-phase formulas."""
        return Formula3PhaseBuilder(self, create_method=self._create_method) - other

    def __mul__(
        self,
        scalar: float,
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Multiply the three-phase formula by a scalar."""
        return Formula3PhaseBuilder(self, create_method=self._create_method) * scalar

    def __truediv__(
        self,
        scalar: float,
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Divide the three-phase formula by a scalar."""
        return Formula3PhaseBuilder(self, create_method=self._create_method) / scalar

    def coalesce(
        self,
        *other: Formula3PhaseBuilder[QuantityT]
        | Formula3Phase[QuantityT]
        | tuple[QuantityT, QuantityT, QuantityT],
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Coalesce the three-phase formula with a default value."""
        return Formula3PhaseBuilder(self, create_method=self._create_method).coalesce(
            *other
        )

    def min(
        self,
        *other: Formula3PhaseBuilder[QuantityT]
        | Formula3Phase[QuantityT]
        | tuple[QuantityT, QuantityT, QuantityT],
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Get the minimum of the three-phase formula with other formulas."""
        return Formula3PhaseBuilder(self, create_method=self._create_method).min(*other)

    def max(
        self,
        *other: Formula3PhaseBuilder[QuantityT]
        | Formula3Phase[QuantityT]
        | tuple[QuantityT, QuantityT, QuantityT],
    ) -> Formula3PhaseBuilder[QuantityT]:
        """Get the maximum of the three-phase formula with other formulas."""
        return Formula3PhaseBuilder(self, create_method=self._create_method).max(*other)
Attributes¤
is_running property ¤
is_running: bool

Return whether this background service is running.

A service is considered running when at least one task is running.

RETURNS DESCRIPTION
bool

Whether this background service is running.

name property ¤
name: str

The name of this background service.

RETURNS DESCRIPTION
str

The name of this background service.

tasks property ¤
tasks: Set[Task[Any]]

Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use them for informational purposes.

Danger

Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.

RETURNS DESCRIPTION
Set[Task[Any]]

The set of running tasks spawned by this background service.

Functions¤
__add__ ¤
__add__(
    other: (
        Formula3PhaseBuilder[QuantityT]
        | Formula3Phase[QuantityT]
    ),
) -> Formula3PhaseBuilder[QuantityT]

Add two three-phase formulas.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def __add__(
    self,
    other: Formula3PhaseBuilder[QuantityT] | Formula3Phase[QuantityT],
) -> Formula3PhaseBuilder[QuantityT]:
    """Add two three-phase formulas."""
    return Formula3PhaseBuilder(self, create_method=self._create_method) + other
__aenter__ async ¤
__aenter__() -> Self

Enter an async context.

Start this background service.

RETURNS DESCRIPTION
Self

This background service.

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aenter__(self) -> Self:
    """Enter an async context.

    Start this background service.

    Returns:
        This background service.
    """
    self.start()
    return self
__aexit__ async ¤
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit an async context.

Stop this background service.

PARAMETER DESCRIPTION
exc_type

The type of the exception raised, if any.

TYPE: type[BaseException] | None

exc_val

The exception raised, if any.

TYPE: BaseException | None

exc_tb

The traceback of the exception raised, if any.

TYPE: TracebackType | None

Source code in src/frequenz/sdk/actor/_background_service.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit an async context.

    Stop this background service.

    Args:
        exc_type: The type of the exception raised, if any.
        exc_val: The exception raised, if any.
        exc_tb: The traceback of the exception raised, if any.
    """
    await self.stop()
__await__ ¤
__await__() -> Generator[None, None, None]

Await this background service.

An awaited background service will wait for all its tasks to finish.

RETURNS DESCRIPTION
None

An implementation-specific generator for the awaitable.

Source code in src/frequenz/sdk/actor/_background_service.py
def __await__(self) -> collections.abc.Generator[None, None, None]:
    """Await this background service.

    An awaited background service will wait for all its tasks to finish.

    Returns:
        An implementation-specific generator for the awaitable.
    """
    return self.wait().__await__()
__init__ ¤
__init__(
    *,
    name: str,
    phase_1: Formula[QuantityT],
    phase_2: Formula[QuantityT],
    phase_3: Formula[QuantityT],
    sub_formulas: (
        list[Formula3Phase[QuantityT]] | None
    ) = None
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
name

The name of the formula.

TYPE: str

phase_1

The formula for phase 1.

TYPE: Formula[QuantityT]

phase_2

The formula for phase 2.

TYPE: Formula[QuantityT]

phase_3

The formula for phase 3.

TYPE: Formula[QuantityT]

sub_formulas

Sub-formulas that need to be started before this formula.

TYPE: list[Formula3Phase[QuantityT]] | None DEFAULT: None

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    *,
    name: str,
    phase_1: Formula[QuantityT],
    phase_2: Formula[QuantityT],
    phase_3: Formula[QuantityT],
    sub_formulas: list[Formula3Phase[QuantityT]] | None = None,
) -> None:
    """Initialize this instance.

    Args:
        name: The name of the formula.
        phase_1: The formula for phase 1.
        phase_2: The formula for phase 2.
        phase_3: The formula for phase 3.
        sub_formulas: Sub-formulas that need to be started before this formula.
    """
    BackgroundService.__init__(self)
    self._formula_p1: Formula[QuantityT] = phase_1
    self._formula_p2: Formula[QuantityT] = phase_2
    self._formula_p3: Formula[QuantityT] = phase_3
    self._create_method: Callable[[float], QuantityT] = phase_1._create_method

    self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(
        name=f"[Formula3Phase:{name}]({phase_1.name})"
    )
    self._sub_formulas: list[Formula3Phase[QuantityT]] = sub_formulas or []
    self._evaluator: Formula3PhaseEvaluatingActor[QuantityT] = (
        Formula3PhaseEvaluatingActor(phase_1, phase_2, phase_3, self._channel)
    )
__mul__ ¤
__mul__(scalar: float) -> Formula3PhaseBuilder[QuantityT]

Multiply the three-phase formula by a scalar.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def __mul__(
    self,
    scalar: float,
) -> Formula3PhaseBuilder[QuantityT]:
    """Multiply the three-phase formula by a scalar."""
    return Formula3PhaseBuilder(self, create_method=self._create_method) * scalar
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/sdk/actor/_background_service.py
def __repr__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/sdk/actor/_background_service.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return f"{type(self).__name__}[{self._name}]"
__sub__ ¤
__sub__(
    other: (
        Formula3PhaseBuilder[QuantityT]
        | Formula3Phase[QuantityT]
    ),
) -> Formula3PhaseBuilder[QuantityT]

Subtract two three-phase formulas.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def __sub__(
    self,
    other: Formula3PhaseBuilder[QuantityT] | Formula3Phase[QuantityT],
) -> Formula3PhaseBuilder[QuantityT]:
    """Subtract two three-phase formulas."""
    return Formula3PhaseBuilder(self, create_method=self._create_method) - other
__truediv__ ¤
__truediv__(
    scalar: float,
) -> Formula3PhaseBuilder[QuantityT]

Divide the three-phase formula by a scalar.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def __truediv__(
    self,
    scalar: float,
) -> Formula3PhaseBuilder[QuantityT]:
    """Divide the three-phase formula by a scalar."""
    return Formula3PhaseBuilder(self, create_method=self._create_method) / scalar
cancel ¤
cancel(msg: str | None = None) -> None

Cancel all running tasks spawned by this background service.

PARAMETER DESCRIPTION
msg

The message to be passed to the tasks being cancelled.

TYPE: str | None DEFAULT: None

Source code in src/frequenz/sdk/actor/_background_service.py
def cancel(self, msg: str | None = None) -> None:
    """Cancel all running tasks spawned by this background service.

    Args:
        msg: The message to be passed to the tasks being cancelled.
    """
    for task in self._tasks:
        task.cancel(msg)
coalesce ¤
coalesce(
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT]
) -> Formula3PhaseBuilder[QuantityT]

Coalesce the three-phase formula with a default value.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def coalesce(
    self,
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT],
) -> Formula3PhaseBuilder[QuantityT]:
    """Coalesce the three-phase formula with a default value."""
    return Formula3PhaseBuilder(self, create_method=self._create_method).coalesce(
        *other
    )
max ¤
max(
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT]
) -> Formula3PhaseBuilder[QuantityT]

Get the maximum of the three-phase formula with other formulas.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def max(
    self,
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT],
) -> Formula3PhaseBuilder[QuantityT]:
    """Get the maximum of the three-phase formula with other formulas."""
    return Formula3PhaseBuilder(self, create_method=self._create_method).max(*other)
min ¤
min(
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT]
) -> Formula3PhaseBuilder[QuantityT]

Get the minimum of the three-phase formula with other formulas.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def min(
    self,
    *other: Formula3PhaseBuilder[QuantityT]
    | Formula3Phase[QuantityT]
    | tuple[QuantityT, QuantityT, QuantityT],
) -> Formula3PhaseBuilder[QuantityT]:
    """Get the minimum of the three-phase formula with other formulas."""
    return Formula3PhaseBuilder(self, create_method=self._create_method).min(*other)
new_receiver ¤
new_receiver(
    *, max_size: int = 50
) -> Receiver[Sample3Phase[QuantityT]]

Subscribe to the output of this formula.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
def new_receiver(self, *, max_size: int = 50) -> Receiver[Sample3Phase[QuantityT]]:
    """Subscribe to the output of this formula."""
    if not self._evaluator.is_running:
        self.start()
    return self._channel.new_receiver(limit=max_size)
start ¤
start() -> None

Start the per-phase and sub formulas.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
@override
def start(self) -> None:
    """Start the per-phase and sub formulas."""
    for sub_formula in self._sub_formulas:
        sub_formula.start()
    self._formula_p1.start()
    self._formula_p2.start()
    self._formula_p3.start()
    self._evaluator.start()
stop async ¤
stop(msg: str | None = None) -> None

Stop the formula.

Source code in src/frequenz/sdk/timeseries/formulas/_formula_3_phase.py
@override
async def stop(self, msg: str | None = None) -> None:
    """Stop the formula."""
    await BackgroundService.stop(self, msg)
    for sub_formula in self._sub_formulas:
        await sub_formula.stop(msg)
    await self._formula_p1.stop(msg)
    await self._formula_p2.stop(msg)
    await self._formula_p3.stop(msg)
    await self._evaluator.stop(msg)
wait async ¤
wait() -> None

Wait this background service to finish.

Wait until all background service tasks are finished.

RAISES DESCRIPTION
BaseExceptionGroup

If any of the tasks spawned by this service raised an exception (CancelError is not considered an error and not returned in the exception group).

Source code in src/frequenz/sdk/actor/_background_service.py
async def wait(self) -> None:
    """Wait this background service to finish.

    Wait until all background service tasks are finished.

    Raises:
        BaseExceptionGroup: If any of the tasks spawned by this service raised an
            exception (`CancelError` is not considered an error and not returned in
            the exception group).
    """
    # We need to account for tasks that were created between when we started
    # awaiting and we finished awaiting.
    while self._tasks:
        done, pending = await asyncio.wait(self._tasks)
        assert not pending

        # We remove the done tasks, but there might be new ones created after we
        # started waiting.
        self._tasks = self._tasks - done

        exceptions: list[BaseException] = []
        for task in done:
            try:
                # This will raise a CancelledError if the task was cancelled or any
                # other exception if the task raised one.
                _ = task.result()
            except BaseException as error:  # pylint: disable=broad-except
                exceptions.append(error)
        if exceptions:
            raise BaseExceptionGroup(
                f"Error while stopping background service {self}", exceptions
            )