Skip to content

experimental

frequenz.channels.experimental ¤

Experimental channel primitives.

Warning

This package contains experimental channel primitives that are not yet considered stable. For more information on what to expect and how to use the experimental package please read the experimental package guidelines.

Attributes¤

frequenz.channels.experimental.DefaultT module-attribute ¤

DefaultT = TypeVar('DefaultT')

Type variable for the default value returned by GroupingLatestValueCache.get.

frequenz.channels.experimental.HashableT module-attribute ¤

HashableT = TypeVar('HashableT', bound=Hashable)

Type variable for the keys used to group values in the GroupingLatestValueCache.

frequenz.channels.experimental.ValueT_co module-attribute ¤

ValueT_co = TypeVar('ValueT_co', covariant=True)

Covariant type variable for the values cached by the GroupingLatestValueCache.

Classes¤

frequenz.channels.experimental.GroupingLatestValueCache ¤

Bases: Mapping[HashableT, ValueT_co]

A cache that stores the latest value in a receiver, grouped by key.

It provides a way to look up on demand, the latest value in a stream for any key, as long as there has been at least one value received for that key.

GroupingLatestValueCache takes a Receiver and a key function as arguments and stores the latest value received by that receiver for each key separately.

The GroupingLatestValueCache implements the Mapping interface, so it can be used like a dictionary. Additionally other methods from MutableMapping are implemented, but only methods removing items from the cache are allowed, such as pop(), popitem(), clear(), and __delitem__(). Other update methods are not provided because the user should not update the cache values directly.

Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache

channel = Broadcast[tuple[int, str]](name="lvc_test")

cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()

assert cache.get(6) is None
assert 6 not in cache

await sender.send((6, "twenty-six"))

assert 6 in cache
assert cache.get(6) == (6, "twenty-six")

del cache[6]

assert cache.get(6) is None
assert 6 not in cache

await cache.stop()
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
    """A cache that stores the latest value in a receiver, grouped by key.

    It provides a way to look up on demand, the latest value in a stream for any key, as
    long as there has been at least one value received for that key.

    [GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
    takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
    stores the latest value received by that receiver for each key separately.

    The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
    interface, so it can be used like a dictionary.  Additionally other methods from
    [`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
    methods removing items from the cache are allowed, such as
    [`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
    [`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
    [`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
    [`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
    Other update methods are not provided because the user should not update the
    cache values directly.

    Example:
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import GroupingLatestValueCache

        channel = Broadcast[tuple[int, str]](name="lvc_test")

        cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
        sender = channel.new_sender()

        assert cache.get(6) is None
        assert 6 not in cache

        await sender.send((6, "twenty-six"))

        assert 6 in cache
        assert cache.get(6) == (6, "twenty-six")

        del cache[6]

        assert cache.get(6) is None
        assert 6 not in cache

        await cache.stop()
        ```
    """

    def __init__(
        self,
        receiver: Receiver[ValueT_co],
        *,
        key: Callable[[ValueT_co], HashableT],
        unique_id: str | None = None,
    ) -> None:
        """Create a new cache.

        Args:
            receiver: The receiver to cache values from.
            key: An function that takes a value and returns a key to group the values
                by.
            unique_id: A string to help uniquely identify this instance. If not
                provided, a unique identifier will be generated from the object's
                [`id()`][id]. It is used mostly for debugging purposes.
        """
        self._receiver: Receiver[ValueT_co] = receiver
        self._key: Callable[[ValueT_co], HashableT] = key
        self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
        self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
        self._task: asyncio.Task[None] = asyncio.create_task(
            self._run(), name=f"LatestValueCache«{self._unique_id}»"
        )

    @property
    def unique_id(self) -> str:
        """The unique identifier of this instance."""
        return self._unique_id

    @override
    def keys(self) -> KeysView[HashableT]:
        """Return the set of keys for which values have been received.

        If no key function is provided, this will return an empty set.
        """
        return self._latest_value_by_key.keys()

    @override
    def items(self) -> ItemsView[HashableT, ValueT_co]:
        """Return an iterator over the key-value pairs of the latest values received."""
        return self._latest_value_by_key.items()

    @override
    def values(self) -> ValuesView[ValueT_co]:
        """Return an iterator over the latest values received."""
        return self._latest_value_by_key.values()

    @overload
    def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
        """Return the latest value that has been received for a specific key."""

    # MyPy passes this overload as a valid signature, but pylint does not like it.
    @overload
    def get(  # pylint: disable=signature-differs
        self, key: HashableT, default: DefaultT
    ) -> ValueT_co | DefaultT:
        """Return the latest value that has been received for a specific key."""

    @override
    def get(
        self, key: HashableT, default: DefaultT | None = None
    ) -> ValueT_co | DefaultT | None:
        """Return the latest value that has been received.

        Args:
            key: An optional key to retrieve the latest value for that key. If not
                provided, it retrieves the latest value received overall.
            default: The default value to return if no value has been received yet for
                the specified key. If not provided, it defaults to `None`.

        Returns:
            The latest value that has been received.
        """
        return self._latest_value_by_key.get(key, default)

    @override
    def __iter__(self) -> Iterator[HashableT]:
        """Return an iterator over the keys for which values have been received."""
        return iter(self._latest_value_by_key)

    @override
    def __len__(self) -> int:
        """Return the number of keys for which values have been received."""
        return len(self._latest_value_by_key)

    @override
    def __getitem__(self, key: HashableT) -> ValueT_co:
        """Return the latest value that has been received for a specific key.

        Args:
            key: The key to retrieve the latest value for.

        Returns:
            The latest value that has been received for that key.
        """
        return self._latest_value_by_key[key]

    @override
    def __contains__(self, key: object, /) -> bool:
        """Check if a value has been received for a specific key.

        Args:
            key: The key to check for.

        Returns:
            `True` if a value has been received for that key, `False` otherwise.
        """
        return key in self._latest_value_by_key

    @override
    def __eq__(self, other: object, /) -> bool:
        """Check if this cache is equal to another object.

        Two caches are considered equal if they have the same keys and values.

        Args:
            other: The object to compare with.

        Returns:
            `True` if the caches are equal, `False` otherwise.
        """
        match other:
            case GroupingLatestValueCache():
                return self._latest_value_by_key == other._latest_value_by_key
            case Mapping():
                return self._latest_value_by_key == other
            case _:
                return NotImplemented

    @override
    def __ne__(self, value: object, /) -> bool:
        """Check if this cache is not equal to another object.

        Args:
            value: The object to compare with.

        Returns:
            `True` if the caches are not equal, `False` otherwise.
        """
        return not self.__eq__(value)

    def __delitem__(self, key: HashableT) -> None:
        """Clear the latest value for a specific key.

        Args:
            key: The key for which to clear the latest value.
        """
        del self._latest_value_by_key[key]

    @overload
    def pop(self, key: HashableT, /) -> ValueT_co | None:
        """Remove the latest value for a specific key and return it."""

    @overload
    def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
        """Remove the latest value for a specific key and return it."""

    def pop(
        self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
    ) -> ValueT_co | DefaultT | None:
        """Remove the latest value for a specific key and return it.

        If no value has been received yet for that key, it returns the default value or
        raises a `KeyError` if no default value is provided.

        Args:
            key: The key for which to remove the latest value.
            default: The default value to return if no value has been received yet for
                the specified key.

        Returns:
            The latest value that has been received for that key, or the default value if
                no value has been received yet and a default value is provided.
        """
        if isinstance(default, _NotSpecified):
            return self._latest_value_by_key.pop(key)
        return self._latest_value_by_key.pop(key, default)

    def popitem(self) -> tuple[HashableT, ValueT_co]:
        """Remove and return a (key, value) pair from the cache.

        Pairs are returned in LIFO (last-in, first-out) order.

        Returns:
            A tuple containing the key and the latest value that has been received for
                that key.
        """
        return self._latest_value_by_key.popitem()

    def clear(self) -> None:
        """Clear all entries from the cache."""
        self._latest_value_by_key.clear()

    async def stop(self) -> None:
        """Stop the cache."""
        if not self._task.done():
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass

    def __repr__(self) -> str:
        """Return a string representation of this cache."""
        return (
            f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
            f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
        )

    async def _run(self) -> None:
        async for value in self._receiver:
            key = self._key(value)
            self._latest_value_by_key[key] = value
Attributes¤
unique_id property ¤
unique_id: str

The unique identifier of this instance.

Functions¤
__contains__ ¤
__contains__(key: object) -> bool

Check if a value has been received for a specific key.

PARAMETER DESCRIPTION
key

The key to check for.

TYPE: object

RETURNS DESCRIPTION
bool

True if a value has been received for that key, False otherwise.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __contains__(self, key: object, /) -> bool:
    """Check if a value has been received for a specific key.

    Args:
        key: The key to check for.

    Returns:
        `True` if a value has been received for that key, `False` otherwise.
    """
    return key in self._latest_value_by_key
__delitem__ ¤
__delitem__(key: HashableT) -> None

Clear the latest value for a specific key.

PARAMETER DESCRIPTION
key

The key for which to clear the latest value.

TYPE: HashableT

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def __delitem__(self, key: HashableT) -> None:
    """Clear the latest value for a specific key.

    Args:
        key: The key for which to clear the latest value.
    """
    del self._latest_value_by_key[key]
__eq__ ¤
__eq__(other: object) -> bool

Check if this cache is equal to another object.

Two caches are considered equal if they have the same keys and values.

PARAMETER DESCRIPTION
other

The object to compare with.

TYPE: object

RETURNS DESCRIPTION
bool

True if the caches are equal, False otherwise.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __eq__(self, other: object, /) -> bool:
    """Check if this cache is equal to another object.

    Two caches are considered equal if they have the same keys and values.

    Args:
        other: The object to compare with.

    Returns:
        `True` if the caches are equal, `False` otherwise.
    """
    match other:
        case GroupingLatestValueCache():
            return self._latest_value_by_key == other._latest_value_by_key
        case Mapping():
            return self._latest_value_by_key == other
        case _:
            return NotImplemented
__getitem__ ¤
__getitem__(key: HashableT) -> ValueT_co

Return the latest value that has been received for a specific key.

PARAMETER DESCRIPTION
key

The key to retrieve the latest value for.

TYPE: HashableT

RETURNS DESCRIPTION
ValueT_co

The latest value that has been received for that key.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __getitem__(self, key: HashableT) -> ValueT_co:
    """Return the latest value that has been received for a specific key.

    Args:
        key: The key to retrieve the latest value for.

    Returns:
        The latest value that has been received for that key.
    """
    return self._latest_value_by_key[key]
__init__ ¤
__init__(
    receiver: Receiver[ValueT_co],
    *,
    key: Callable[[ValueT_co], HashableT],
    unique_id: str | None = None
) -> None

Create a new cache.

PARAMETER DESCRIPTION
receiver

The receiver to cache values from.

TYPE: Receiver[ValueT_co]

key

An function that takes a value and returns a key to group the values by.

TYPE: Callable[[ValueT_co], HashableT]

unique_id

A string to help uniquely identify this instance. If not provided, a unique identifier will be generated from the object's id(). It is used mostly for debugging purposes.

TYPE: str | None DEFAULT: None

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def __init__(
    self,
    receiver: Receiver[ValueT_co],
    *,
    key: Callable[[ValueT_co], HashableT],
    unique_id: str | None = None,
) -> None:
    """Create a new cache.

    Args:
        receiver: The receiver to cache values from.
        key: An function that takes a value and returns a key to group the values
            by.
        unique_id: A string to help uniquely identify this instance. If not
            provided, a unique identifier will be generated from the object's
            [`id()`][id]. It is used mostly for debugging purposes.
    """
    self._receiver: Receiver[ValueT_co] = receiver
    self._key: Callable[[ValueT_co], HashableT] = key
    self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
    self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
    self._task: asyncio.Task[None] = asyncio.create_task(
        self._run(), name=f"LatestValueCache«{self._unique_id}»"
    )
__iter__ ¤
__iter__() -> Iterator[HashableT]

Return an iterator over the keys for which values have been received.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __iter__(self) -> Iterator[HashableT]:
    """Return an iterator over the keys for which values have been received."""
    return iter(self._latest_value_by_key)
__len__ ¤
__len__() -> int

Return the number of keys for which values have been received.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __len__(self) -> int:
    """Return the number of keys for which values have been received."""
    return len(self._latest_value_by_key)
__ne__ ¤
__ne__(value: object) -> bool

Check if this cache is not equal to another object.

PARAMETER DESCRIPTION
value

The object to compare with.

TYPE: object

RETURNS DESCRIPTION
bool

True if the caches are not equal, False otherwise.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def __ne__(self, value: object, /) -> bool:
    """Check if this cache is not equal to another object.

    Args:
        value: The object to compare with.

    Returns:
        `True` if the caches are not equal, `False` otherwise.
    """
    return not self.__eq__(value)
__repr__ ¤
__repr__() -> str

Return a string representation of this cache.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def __repr__(self) -> str:
    """Return a string representation of this cache."""
    return (
        f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
        f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
    )
clear ¤
clear() -> None

Clear all entries from the cache.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def clear(self) -> None:
    """Clear all entries from the cache."""
    self._latest_value_by_key.clear()
get ¤
get(
    key: HashableT, default: None = None
) -> ValueT_co | None
get(
    key: HashableT, default: DefaultT
) -> ValueT_co | DefaultT
get(
    key: HashableT, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None

Return the latest value that has been received.

PARAMETER DESCRIPTION
key

An optional key to retrieve the latest value for that key. If not provided, it retrieves the latest value received overall.

TYPE: HashableT

default

The default value to return if no value has been received yet for the specified key. If not provided, it defaults to None.

TYPE: DefaultT | None DEFAULT: None

RETURNS DESCRIPTION
ValueT_co | DefaultT | None

The latest value that has been received.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def get(
    self, key: HashableT, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None:
    """Return the latest value that has been received.

    Args:
        key: An optional key to retrieve the latest value for that key. If not
            provided, it retrieves the latest value received overall.
        default: The default value to return if no value has been received yet for
            the specified key. If not provided, it defaults to `None`.

    Returns:
        The latest value that has been received.
    """
    return self._latest_value_by_key.get(key, default)
items ¤

Return an iterator over the key-value pairs of the latest values received.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def items(self) -> ItemsView[HashableT, ValueT_co]:
    """Return an iterator over the key-value pairs of the latest values received."""
    return self._latest_value_by_key.items()
keys ¤
keys() -> KeysView[HashableT]

Return the set of keys for which values have been received.

If no key function is provided, this will return an empty set.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def keys(self) -> KeysView[HashableT]:
    """Return the set of keys for which values have been received.

    If no key function is provided, this will return an empty set.
    """
    return self._latest_value_by_key.keys()
pop ¤
pop(key: HashableT) -> ValueT_co | None
pop(
    key: HashableT, /, default: DefaultT
) -> ValueT_co | DefaultT
pop(
    key: HashableT,
    /,
    default: DefaultT | _NotSpecified = _NotSpecified(),
) -> ValueT_co | DefaultT | None

Remove the latest value for a specific key and return it.

If no value has been received yet for that key, it returns the default value or raises a KeyError if no default value is provided.

PARAMETER DESCRIPTION
key

The key for which to remove the latest value.

TYPE: HashableT

default

The default value to return if no value has been received yet for the specified key.

TYPE: DefaultT | _NotSpecified DEFAULT: _NotSpecified()

RETURNS DESCRIPTION
ValueT_co | DefaultT | None

The latest value that has been received for that key, or the default value if no value has been received yet and a default value is provided.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def pop(
    self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
    """Remove the latest value for a specific key and return it.

    If no value has been received yet for that key, it returns the default value or
    raises a `KeyError` if no default value is provided.

    Args:
        key: The key for which to remove the latest value.
        default: The default value to return if no value has been received yet for
            the specified key.

    Returns:
        The latest value that has been received for that key, or the default value if
            no value has been received yet and a default value is provided.
    """
    if isinstance(default, _NotSpecified):
        return self._latest_value_by_key.pop(key)
    return self._latest_value_by_key.pop(key, default)
popitem ¤
popitem() -> tuple[HashableT, ValueT_co]

Remove and return a (key, value) pair from the cache.

Pairs are returned in LIFO (last-in, first-out) order.

RETURNS DESCRIPTION
tuple[HashableT, ValueT_co]

A tuple containing the key and the latest value that has been received for that key.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
def popitem(self) -> tuple[HashableT, ValueT_co]:
    """Remove and return a (key, value) pair from the cache.

    Pairs are returned in LIFO (last-in, first-out) order.

    Returns:
        A tuple containing the key and the latest value that has been received for
            that key.
    """
    return self._latest_value_by_key.popitem()
stop async ¤
stop() -> None

Stop the cache.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
async def stop(self) -> None:
    """Stop the cache."""
    if not self._task.done():
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
values ¤
values() -> ValuesView[ValueT_co]

Return an iterator over the latest values received.

Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
@override
def values(self) -> ValuesView[ValueT_co]:
    """Return an iterator over the latest values received."""
    return self._latest_value_by_key.values()

frequenz.channels.experimental.NopReceiver ¤

Bases: Receiver[ReceiverMessageT_co]

A place-holder receiver that will never receive a message.

Source code in frequenz/channels/experimental/_nop_receiver.py
class NopReceiver(Receiver[ReceiverMessageT_co]):
    """A place-holder receiver that will never receive a message."""

    def __init__(self) -> None:
        """Initialize this instance."""
        self._close_event: asyncio.Event = asyncio.Event()

    @override
    async def ready(self) -> bool:
        """Wait for ever unless the receiver is closed.

        Returns:
            Whether the receiver is still active.
        """
        if self._close_event.is_set():
            return False
        await self._close_event.wait()
        return False

    @override
    def consume(self) -> ReceiverMessageT_co:  # noqa: DOC503 (raised indirectly)
        """Raise `ReceiverError` unless the NopReceiver is closed.

        If the receiver is closed, then raise `ReceiverStoppedError`.

        Returns:
            The next message received.

        Raises:
            ReceiverStoppedError: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the underlying receiver.
        """
        if self._close_event.is_set():
            raise ReceiverStoppedError(self)
        raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)

    @override
    def close(self) -> None:
        """Stop the receiver."""
        self._close_event.set()
Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__() -> None

Initialize this instance.

Source code in frequenz/channels/experimental/_nop_receiver.py
def __init__(self) -> None:
    """Initialize this instance."""
    self._close_event: asyncio.Event = asyncio.Event()
close ¤
close() -> None

Stop the receiver.

Source code in frequenz/channels/experimental/_nop_receiver.py
@override
def close(self) -> None:
    """Stop the receiver."""
    self._close_event.set()
consume ¤
consume() -> ReceiverMessageT_co

Raise ReceiverError unless the NopReceiver is closed.

If the receiver is closed, then raise ReceiverStoppedError.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next message received.

RAISES DESCRIPTION
ReceiverStoppedError

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the underlying receiver.

Source code in frequenz/channels/experimental/_nop_receiver.py
@override
def consume(self) -> ReceiverMessageT_co:  # noqa: DOC503 (raised indirectly)
    """Raise `ReceiverError` unless the NopReceiver is closed.

    If the receiver is closed, then raise `ReceiverStoppedError`.

    Returns:
        The next message received.

    Raises:
        ReceiverStoppedError: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the underlying receiver.
    """
    if self._close_event.is_set():
        raise ReceiverStoppedError(self)
    raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)
filter ¤
filter(
    filter_function: Callable[
        [ReceiverMessageT_co],
        TypeGuard[FilteredMessageT_co],
    ],
) -> Receiver[FilteredMessageT_co]
filter(
    filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[
            [ReceiverMessageT_co],
            TypeGuard[FilteredMessageT_co],
        ]
    ),
) -> (
    Receiver[ReceiverMessageT_co]
    | Receiver[FilteredMessageT_co]
)

Apply a filter function on the messages on a receiver.

Note

You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
filter_function

The function to be applied on incoming messages to determine if they should be received.

TYPE: Callable[[ReceiverMessageT_co], bool] | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]

RETURNS DESCRIPTION
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]

A new receiver that only receives messages that pass the filter.

Source code in frequenz/channels/_receiver.py
def filter(
    self,
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]
    ),
    /,
) -> Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]:
    """Apply a filter function on the messages on a receiver.

    Note:
        You can pass a [type guard][typing.TypeGuard] as the filter function to
        narrow the type of the messages that pass the filter.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        filter_function: The function to be applied on incoming messages to
            determine if they should be received.

    Returns:
        A new receiver that only receives messages that pass the filter.
    """
    return _Filter(receiver=self, filter_function=filter_function)
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait for ever unless the receiver is closed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/experimental/_nop_receiver.py
@override
async def ready(self) -> bool:
    """Wait for ever unless the receiver is closed.

    Returns:
        Whether the receiver is still active.
    """
    if self._close_event.is_set():
        return False
    await self._close_event.wait()
    return False
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await anext(self)
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            and exc.__cause__.receiver is self
        ):
            # This is a false positive, we are actually checking __cause__ is a
            # ReceiverStoppedError which is an exception.
            raise exc.__cause__  # pylint: disable=raising-non-exception
        raise ReceiverStoppedError(self) from exc
    return received
triggered ¤
triggered(
    selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether this receiver was selected by select().

This method is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether this receiver was selected.

Source code in frequenz/channels/_receiver.py
def triggered(
    self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether this receiver was selected by [`select()`][frequenz.channels.select].

    This method is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.

    Returns:
        Whether this receiver was selected.
    """
    if handled := selected._recv is self:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled

frequenz.channels.experimental.OptionalReceiver ¤

Bases: Receiver[ReceiverMessageT_co]

A receiver that will wait indefinitely if there is no underlying receiver.

This receiver is useful when the underlying receiver is not set initially. Instead of making if-else branches to check if the receiver is set, you can use this receiver to wait indefinitely if it is not set.

Source code in frequenz/channels/experimental/_optional_receiver.py
@deprecated("Use `frequenz.channels.experimental.NopReceiver` instead.")
class OptionalReceiver(Receiver[ReceiverMessageT_co]):
    """A receiver that will wait indefinitely if there is no underlying receiver.

    This receiver is useful when the underlying receiver is not set initially.
    Instead of making `if-else` branches to check if the receiver is set, you can use
    this receiver to wait indefinitely if it is not set.
    """

    def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
        """Initialize this instance.

        Args:
            receiver: The underlying receiver, or `None` if there is no receiver.
        """
        self._receiver: Receiver[ReceiverMessageT_co] | None = receiver

    @override
    async def ready(self) -> bool:
        """Wait until the receiver is ready with a message or an error.

        Once a call to `ready()` has finished, the message should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        if self._receiver is not None:
            return await self._receiver.ready()

        # If there's no receiver, wait forever
        await asyncio.Event().wait()
        return False

    @override
    def consume(self) -> ReceiverMessageT_co:  # noqa: DOC503 (raised indirectly)
        """Return the latest from the underlying receiver message once `ready()` is complete.

        `ready()` must be called before each call to `consume()`.

        Returns:
            The next message received.

        Raises:
            ReceiverStoppedError: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the underlying receiver.
        """
        if self._receiver is None:
            raise ReceiverError(
                "`consume()` must be preceded by a call to `ready()`", self
            )
        return self._receiver.consume()

    def is_set(self) -> bool:
        """Check if the receiver is set."""
        return self._receiver is not None

    def close(self) -> None:
        """Stop the receiver."""
        if self._receiver is not None:
            self._receiver.close()
Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co] | None)

Initialize this instance.

PARAMETER DESCRIPTION
receiver

The underlying receiver, or None if there is no receiver.

TYPE: Receiver[ReceiverMessageT_co] | None

Source code in frequenz/channels/experimental/_optional_receiver.py
def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
    """Initialize this instance.

    Args:
        receiver: The underlying receiver, or `None` if there is no receiver.
    """
    self._receiver: Receiver[ReceiverMessageT_co] | None = receiver
close ¤
close() -> None

Stop the receiver.

Source code in frequenz/channels/experimental/_optional_receiver.py
def close(self) -> None:
    """Stop the receiver."""
    if self._receiver is not None:
        self._receiver.close()
consume ¤
consume() -> ReceiverMessageT_co

Return the latest from the underlying receiver message once ready() is complete.

ready() must be called before each call to consume().

RETURNS DESCRIPTION
ReceiverMessageT_co

The next message received.

RAISES DESCRIPTION
ReceiverStoppedError

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the underlying receiver.

Source code in frequenz/channels/experimental/_optional_receiver.py
@override
def consume(self) -> ReceiverMessageT_co:  # noqa: DOC503 (raised indirectly)
    """Return the latest from the underlying receiver message once `ready()` is complete.

    `ready()` must be called before each call to `consume()`.

    Returns:
        The next message received.

    Raises:
        ReceiverStoppedError: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the underlying receiver.
    """
    if self._receiver is None:
        raise ReceiverError(
            "`consume()` must be preceded by a call to `ready()`", self
        )
    return self._receiver.consume()
filter ¤
filter(
    filter_function: Callable[
        [ReceiverMessageT_co],
        TypeGuard[FilteredMessageT_co],
    ],
) -> Receiver[FilteredMessageT_co]
filter(
    filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[
            [ReceiverMessageT_co],
            TypeGuard[FilteredMessageT_co],
        ]
    ),
) -> (
    Receiver[ReceiverMessageT_co]
    | Receiver[FilteredMessageT_co]
)

Apply a filter function on the messages on a receiver.

Note

You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
filter_function

The function to be applied on incoming messages to determine if they should be received.

TYPE: Callable[[ReceiverMessageT_co], bool] | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]

RETURNS DESCRIPTION
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]

A new receiver that only receives messages that pass the filter.

Source code in frequenz/channels/_receiver.py
def filter(
    self,
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]
    ),
    /,
) -> Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]:
    """Apply a filter function on the messages on a receiver.

    Note:
        You can pass a [type guard][typing.TypeGuard] as the filter function to
        narrow the type of the messages that pass the filter.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        filter_function: The function to be applied on incoming messages to
            determine if they should be received.

    Returns:
        A new receiver that only receives messages that pass the filter.
    """
    return _Filter(receiver=self, filter_function=filter_function)
is_set ¤
is_set() -> bool

Check if the receiver is set.

Source code in frequenz/channels/experimental/_optional_receiver.py
def is_set(self) -> bool:
    """Check if the receiver is set."""
    return self._receiver is not None
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait until the receiver is ready with a message or an error.

Once a call to ready() has finished, the message should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/experimental/_optional_receiver.py
@override
async def ready(self) -> bool:
    """Wait until the receiver is ready with a message or an error.

    Once a call to `ready()` has finished, the message should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    if self._receiver is not None:
        return await self._receiver.ready()

    # If there's no receiver, wait forever
    await asyncio.Event().wait()
    return False
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await anext(self)
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            and exc.__cause__.receiver is self
        ):
            # This is a false positive, we are actually checking __cause__ is a
            # ReceiverStoppedError which is an exception.
            raise exc.__cause__  # pylint: disable=raising-non-exception
        raise ReceiverStoppedError(self) from exc
    return received
triggered ¤
triggered(
    selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether this receiver was selected by select().

This method is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether this receiver was selected.

Source code in frequenz/channels/_receiver.py
def triggered(
    self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether this receiver was selected by [`select()`][frequenz.channels.select].

    This method is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.

    Returns:
        Whether this receiver was selected.
    """
    if handled := selected._recv is self:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled

frequenz.channels.experimental.Pipe ¤

Bases: Generic[ChannelMessageT]

A pipe between two channels.

The Pipe class takes a receiver and a sender and creates a pipe between them by forwarding all the messages received by the receiver to the sender.

Example
import asyncio
from contextlib import closing, aclosing, AsyncExitStack

from frequenz.channels import Broadcast, Pipe, Receiver

async def main() -> None:
    # Channels, receivers and Pipe are in AsyncExitStack
    # to close and stop them at the end.
    async with AsyncExitStack() as stack:
        source_channel = await stack.enter_async_context(
            aclosing(Broadcast[int](name="source channel"))
        )
        source_receiver = stack.enter_context(closing(source_channel.new_receiver()))

        forwarding_channel = await stack.enter_async_context(
            aclosing(Broadcast[int](name="forwarding channel"))
        )
        await stack.enter_async_context(
            Pipe(source_receiver, forwarding_channel.new_sender())
        )

        receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))

        source_sender = source_channel.new_sender()
        await source_sender.send(10)
        assert await receiver.receive() == 11

asyncio.run(main())
Source code in frequenz/channels/experimental/_pipe.py
class Pipe(typing.Generic[ChannelMessageT]):
    """A pipe between two channels.

    The `Pipe` class takes a receiver and a sender and creates a pipe between them
    by forwarding all the messages received by the receiver to the sender.

    Example:
        ```python
        import asyncio
        from contextlib import closing, aclosing, AsyncExitStack

        from frequenz.channels import Broadcast, Pipe, Receiver

        async def main() -> None:
            # Channels, receivers and Pipe are in AsyncExitStack
            # to close and stop them at the end.
            async with AsyncExitStack() as stack:
                source_channel = await stack.enter_async_context(
                    aclosing(Broadcast[int](name="source channel"))
                )
                source_receiver = stack.enter_context(closing(source_channel.new_receiver()))

                forwarding_channel = await stack.enter_async_context(
                    aclosing(Broadcast[int](name="forwarding channel"))
                )
                await stack.enter_async_context(
                    Pipe(source_receiver, forwarding_channel.new_sender())
                )

                receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))

                source_sender = source_channel.new_sender()
                await source_sender.send(10)
                assert await receiver.receive() == 11

        asyncio.run(main())
        ```
    """

    def __init__(
        self, receiver: Receiver[ChannelMessageT], sender: Sender[ChannelMessageT]
    ) -> None:
        """Create a new pipe between two channels.

        Args:
            receiver: The receiver channel.
            sender: The sender channel.
        """
        self._sender = sender
        self._receiver = receiver
        self._task: asyncio.Task[None] | None = None

    async def __aenter__(self) -> Pipe[ChannelMessageT]:
        """Enter the runtime context."""
        await self.start()
        return self

    async def __aexit__(
        self,
        _exc_type: typing.Type[BaseException] | None,
        _exc: BaseException | None,
        _tb: typing.Any,
    ) -> None:
        """Exit the runtime context."""
        await self.stop()

    async def start(self) -> None:
        """Start this pipe if it is not already running."""
        if not self._task or self._task.done():
            self._task = asyncio.create_task(self._run())

    async def stop(self) -> None:
        """Stop this pipe."""
        if self._task and not self._task.done():
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass

    async def _run(self) -> None:
        async for value in self._receiver:
            await self._sender.send(value)
Functions¤
__aenter__ async ¤
__aenter__() -> Pipe[ChannelMessageT]

Enter the runtime context.

Source code in frequenz/channels/experimental/_pipe.py
async def __aenter__(self) -> Pipe[ChannelMessageT]:
    """Enter the runtime context."""
    await self.start()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: Type[BaseException] | None,
    _exc: BaseException | None,
    _tb: Any,
) -> None

Exit the runtime context.

Source code in frequenz/channels/experimental/_pipe.py
async def __aexit__(
    self,
    _exc_type: typing.Type[BaseException] | None,
    _exc: BaseException | None,
    _tb: typing.Any,
) -> None:
    """Exit the runtime context."""
    await self.stop()
__init__ ¤
__init__(
    receiver: Receiver[ChannelMessageT],
    sender: Sender[ChannelMessageT],
) -> None

Create a new pipe between two channels.

PARAMETER DESCRIPTION
receiver

The receiver channel.

TYPE: Receiver[ChannelMessageT]

sender

The sender channel.

TYPE: Sender[ChannelMessageT]

Source code in frequenz/channels/experimental/_pipe.py
def __init__(
    self, receiver: Receiver[ChannelMessageT], sender: Sender[ChannelMessageT]
) -> None:
    """Create a new pipe between two channels.

    Args:
        receiver: The receiver channel.
        sender: The sender channel.
    """
    self._sender = sender
    self._receiver = receiver
    self._task: asyncio.Task[None] | None = None
start async ¤
start() -> None

Start this pipe if it is not already running.

Source code in frequenz/channels/experimental/_pipe.py
async def start(self) -> None:
    """Start this pipe if it is not already running."""
    if not self._task or self._task.done():
        self._task = asyncio.create_task(self._run())
stop async ¤
stop() -> None

Stop this pipe.

Source code in frequenz/channels/experimental/_pipe.py
async def stop(self) -> None:
    """Stop this pipe."""
    if self._task and not self._task.done():
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass

frequenz.channels.experimental.RelaySender ¤

Bases: Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]

A Sender for sending messages to multiple senders.

The RelaySender class takes multiple senders and forwards all the messages sent to it, to the senders it was created with.

Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import RelaySender

channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")

receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())

await tee_sender.send(5)
assert await receiver1.receive() == 5
assert await receiver2.receive() == 5
Source code in frequenz/channels/experimental/_relay_sender.py
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
    """A Sender for sending messages to multiple senders.

    The `RelaySender` class takes multiple senders and forwards all the messages sent to
    it, to the senders it was created with.

    Example:
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import RelaySender

        channel1: Broadcast[int] = Broadcast(name="channel1")
        channel2: Broadcast[int] = Broadcast(name="channel2")

        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())

        await tee_sender.send(5)
        assert await receiver1.receive() == 5
        assert await receiver2.receive() == 5
        ```
    """

    def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
        """Create a new RelaySender.

        Args:
            *senders: The senders to send messages to.
        """
        self._senders = senders

    @override
    async def send(self, message: SenderMessageT_contra, /) -> None:
        """Send a message.

        Args:
            message: The message to be sent.
        """
        for sender in self._senders:
            await sender.send(message)
Functions¤
__init__ ¤
__init__(*senders: Sender[SenderMessageT_contra]) -> None

Create a new RelaySender.

PARAMETER DESCRIPTION
*senders

The senders to send messages to.

TYPE: Sender[SenderMessageT_contra] DEFAULT: ()

Source code in frequenz/channels/experimental/_relay_sender.py
def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
    """Create a new RelaySender.

    Args:
        *senders: The senders to send messages to.
    """
    self._senders = senders
send async ¤
send(message: SenderMessageT_contra) -> None

Send a message.

PARAMETER DESCRIPTION
message

The message to be sent.

TYPE: SenderMessageT_contra

Source code in frequenz/channels/experimental/_relay_sender.py
@override
async def send(self, message: SenderMessageT_contra, /) -> None:
    """Send a message.

    Args:
        message: The message to be sent.
    """
    for sender in self._senders:
        await sender.send(message)

frequenz.channels.experimental.WithPrevious ¤

Bases: Generic[ChannelMessageT]

A composable predicate to build predicates that can use also the previous message.

This predicate can be used to filter messages based on a custom condition on the previous and current messages. This can be useful in cases where you want to process messages only if they satisfy a particular condition with respect to the previous message.

Receiving only messages that are different from the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious

channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
sender = channel.new_sender()

# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1

# This message will be skipped as it equals to the previous one.
await sender.send(1)

# This message will be received as it is a different from the previous one.
await sender.send(0)
assert await receiver.receive() == 0
Receiving only messages if they are bigger than the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious

channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(
    WithPrevious(lambda old, new: new > old, first_is_true=False)
)
sender = channel.new_sender()

# This message will skipped as first_is_true is False.
await sender.send(1)

# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2

# This message will be skipped as it is smaller than the previous one (1).
await sender.send(0)

# This message will be skipped as it is not bigger than the previous one (0).
await sender.send(0)

# This message will be received as it is bigger than the previous one (0).
await sender.send(1)
assert await receiver.receive() == 1

# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
Source code in frequenz/channels/experimental/_with_previous.py
class WithPrevious(Generic[ChannelMessageT]):
    """A composable predicate to build predicates that can use also the previous message.

    This predicate can be used to filter messages based on a custom condition on the
    previous and current messages. This can be useful in cases where you want to
    process messages only if they satisfy a particular condition with respect to the
    previous message.

    Example: Receiving only messages that are different from the previous one.
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import WithPrevious

        channel = Broadcast[int](name="example")
        receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
        sender = channel.new_sender()

        # This message will be received as it is the first message.
        await sender.send(1)
        assert await receiver.receive() == 1

        # This message will be skipped as it equals to the previous one.
        await sender.send(1)

        # This message will be received as it is a different from the previous one.
        await sender.send(0)
        assert await receiver.receive() == 0
        ```

    Example: Receiving only messages if they are bigger than the previous one.
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import WithPrevious

        channel = Broadcast[int](name="example")
        receiver = channel.new_receiver().filter(
            WithPrevious(lambda old, new: new > old, first_is_true=False)
        )
        sender = channel.new_sender()

        # This message will skipped as first_is_true is False.
        await sender.send(1)

        # This message will be received as it is bigger than the previous one (1).
        await sender.send(2)
        assert await receiver.receive() == 2

        # This message will be skipped as it is smaller than the previous one (1).
        await sender.send(0)

        # This message will be skipped as it is not bigger than the previous one (0).
        await sender.send(0)

        # This message will be received as it is bigger than the previous one (0).
        await sender.send(1)
        assert await receiver.receive() == 1

        # This message will be received as it is bigger than the previous one (1).
        await sender.send(2)
        assert await receiver.receive() == 2
        ```
    """

    def __init__(
        self,
        predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
        /,
        *,
        first_is_true: bool = True,
    ) -> None:
        """Initialize this instance.

        Args:
            predicate: A callable that takes two arguments, the previous message and the
                current message, and returns a boolean indicating whether the current
                message should be received.
            first_is_true: Whether the first message should be considered as satisfying
                the predicate. Defaults to `True`.
        """
        self._predicate = predicate
        self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
        self._first_is_true = first_is_true

    def __call__(self, message: ChannelMessageT) -> bool:
        """Return whether `message` is the first one received or different from the previous one."""

        def is_message(
            value: ChannelMessageT | _Sentinel,
        ) -> TypeGuard[ChannelMessageT]:
            return value is not _SENTINEL

        old_message = self._last_message
        self._last_message = message
        if is_message(old_message):
            return self._predicate(old_message, message)
        return self._first_is_true

    def __str__(self) -> str:
        """Return a string representation of this instance."""
        return f"{type(self).__name__}:{self._predicate.__name__}"

    def __repr__(self) -> str:
        """Return a string representation of this instance."""
        return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
Functions¤
__call__ ¤
__call__(message: ChannelMessageT) -> bool

Return whether message is the first one received or different from the previous one.

Source code in frequenz/channels/experimental/_with_previous.py
def __call__(self, message: ChannelMessageT) -> bool:
    """Return whether `message` is the first one received or different from the previous one."""

    def is_message(
        value: ChannelMessageT | _Sentinel,
    ) -> TypeGuard[ChannelMessageT]:
        return value is not _SENTINEL

    old_message = self._last_message
    self._last_message = message
    if is_message(old_message):
        return self._predicate(old_message, message)
    return self._first_is_true
__init__ ¤
__init__(
    predicate: Callable[
        [ChannelMessageT, ChannelMessageT], bool
    ],
    /,
    *,
    first_is_true: bool = True,
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
predicate

A callable that takes two arguments, the previous message and the current message, and returns a boolean indicating whether the current message should be received.

TYPE: Callable[[ChannelMessageT, ChannelMessageT], bool]

first_is_true

Whether the first message should be considered as satisfying the predicate. Defaults to True.

TYPE: bool DEFAULT: True

Source code in frequenz/channels/experimental/_with_previous.py
def __init__(
    self,
    predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
    /,
    *,
    first_is_true: bool = True,
) -> None:
    """Initialize this instance.

    Args:
        predicate: A callable that takes two arguments, the previous message and the
            current message, and returns a boolean indicating whether the current
            message should be received.
        first_is_true: Whether the first message should be considered as satisfying
            the predicate. Defaults to `True`.
    """
    self._predicate = predicate
    self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
    self._first_is_true = first_is_true
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

Source code in frequenz/channels/experimental/_with_previous.py
def __repr__(self) -> str:
    """Return a string representation of this instance."""
    return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

Source code in frequenz/channels/experimental/_with_previous.py
def __str__(self) -> str:
    """Return a string representation of this instance."""
    return f"{type(self).__name__}:{self._predicate.__name__}"