Skip to content

experimental

frequenz.channels.experimental ¤

Experimental channel primitives.

Warning

This package contains experimental channel primitives that are not yet considered stable. They are subject to change without notice, including removal, even in minor updates.

Classes¤

frequenz.channels.experimental.Pipe ¤

Bases: Generic[ChannelMessageT]

A pipe between two channels.

The Pipe class takes a receiver and a sender and creates a pipe between them by forwarding all the messages received by the receiver to the sender.

Example
from frequenz.channels import Broadcast, Pipe

channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")

receiver_chan1 = channel1.new_receiver()
sender_chan2 = channel2.new_sender()

async with Pipe(channel2.new_receiver(), channel1.new_sender()):
    await sender_chan2.send(10)
    assert await receiver_chan1.receive() == 10
Source code in frequenz/channels/experimental/_pipe.py
class Pipe(typing.Generic[ChannelMessageT]):
    """A pipe between two channels.

    The `Pipe` class takes a receiver and a sender and creates a pipe between them
    by forwarding all the messages received by the receiver to the sender.

    Example:
        ```python
        from frequenz.channels import Broadcast, Pipe

        channel1: Broadcast[int] = Broadcast(name="channel1")
        channel2: Broadcast[int] = Broadcast(name="channel2")

        receiver_chan1 = channel1.new_receiver()
        sender_chan2 = channel2.new_sender()

        async with Pipe(channel2.new_receiver(), channel1.new_sender()):
            await sender_chan2.send(10)
            assert await receiver_chan1.receive() == 10
        ```
    """

    def __init__(
        self, receiver: Receiver[ChannelMessageT], sender: Sender[ChannelMessageT]
    ) -> None:
        """Create a new pipe between two channels.

        Args:
            receiver: The receiver channel.
            sender: The sender channel.
        """
        self._sender = sender
        self._receiver = receiver
        self._task: asyncio.Task[None] | None = None

    async def __aenter__(self) -> Pipe[ChannelMessageT]:
        """Enter the runtime context."""
        await self.start()
        return self

    async def __aexit__(
        self,
        _exc_type: typing.Type[BaseException],
        _exc: BaseException,
        _tb: typing.Any,
    ) -> None:
        """Exit the runtime context."""
        await self.stop()

    async def start(self) -> None:
        """Start this pipe if it is not already running."""
        if not self._task or self._task.done():
            self._task = asyncio.create_task(self._run())

    async def stop(self) -> None:
        """Stop this pipe."""
        if self._task and not self._task.done():
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass

    async def _run(self) -> None:
        async for value in self._receiver:
            await self._sender.send(value)
Functions¤
__aenter__ async ¤
__aenter__() -> Pipe[ChannelMessageT]

Enter the runtime context.

Source code in frequenz/channels/experimental/_pipe.py
async def __aenter__(self) -> Pipe[ChannelMessageT]:
    """Enter the runtime context."""
    await self.start()
    return self
__aexit__ async ¤
__aexit__(
    _exc_type: Type[BaseException],
    _exc: BaseException,
    _tb: Any,
) -> None

Exit the runtime context.

Source code in frequenz/channels/experimental/_pipe.py
async def __aexit__(
    self,
    _exc_type: typing.Type[BaseException],
    _exc: BaseException,
    _tb: typing.Any,
) -> None:
    """Exit the runtime context."""
    await self.stop()
__init__ ¤
__init__(
    receiver: Receiver[ChannelMessageT],
    sender: Sender[ChannelMessageT],
) -> None

Create a new pipe between two channels.

PARAMETER DESCRIPTION
receiver

The receiver channel.

TYPE: Receiver[ChannelMessageT]

sender

The sender channel.

TYPE: Sender[ChannelMessageT]

Source code in frequenz/channels/experimental/_pipe.py
def __init__(
    self, receiver: Receiver[ChannelMessageT], sender: Sender[ChannelMessageT]
) -> None:
    """Create a new pipe between two channels.

    Args:
        receiver: The receiver channel.
        sender: The sender channel.
    """
    self._sender = sender
    self._receiver = receiver
    self._task: asyncio.Task[None] | None = None
start async ¤
start() -> None

Start this pipe if it is not already running.

Source code in frequenz/channels/experimental/_pipe.py
async def start(self) -> None:
    """Start this pipe if it is not already running."""
    if not self._task or self._task.done():
        self._task = asyncio.create_task(self._run())
stop async ¤
stop() -> None

Stop this pipe.

Source code in frequenz/channels/experimental/_pipe.py
async def stop(self) -> None:
    """Stop this pipe."""
    if self._task and not self._task.done():
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass

frequenz.channels.experimental.RelaySender ¤

Bases: Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]

A Sender for sending messages to multiple senders.

The RelaySender class takes multiple senders and forwards all the messages sent to it, to the senders it was created with.

Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import RelaySender

channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")

receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())

await tee_sender.send(5)
assert await receiver1.receive() == 5
assert await receiver2.receive() == 5
Source code in frequenz/channels/experimental/_relay_sender.py
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
    """A Sender for sending messages to multiple senders.

    The `RelaySender` class takes multiple senders and forwards all the messages sent to
    it, to the senders it was created with.

    Example:
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import RelaySender

        channel1: Broadcast[int] = Broadcast(name="channel1")
        channel2: Broadcast[int] = Broadcast(name="channel2")

        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())

        await tee_sender.send(5)
        assert await receiver1.receive() == 5
        assert await receiver2.receive() == 5
        ```
    """

    def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
        """Create a new RelaySender.

        Args:
            *senders: The senders to send messages to.
        """
        self._senders = senders

    @override
    async def send(self, message: SenderMessageT_contra, /) -> None:
        """Send a message.

        Args:
            message: The message to be sent.
        """
        for sender in self._senders:
            await sender.send(message)
Functions¤
__init__ ¤
__init__(*senders: Sender[SenderMessageT_contra]) -> None

Create a new RelaySender.

PARAMETER DESCRIPTION
*senders

The senders to send messages to.

TYPE: Sender[SenderMessageT_contra] DEFAULT: ()

Source code in frequenz/channels/experimental/_relay_sender.py
def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
    """Create a new RelaySender.

    Args:
        *senders: The senders to send messages to.
    """
    self._senders = senders
send async ¤
send(message: SenderMessageT_contra) -> None

Send a message.

PARAMETER DESCRIPTION
message

The message to be sent.

TYPE: SenderMessageT_contra

Source code in frequenz/channels/experimental/_relay_sender.py
@override
async def send(self, message: SenderMessageT_contra, /) -> None:
    """Send a message.

    Args:
        message: The message to be sent.
    """
    for sender in self._senders:
        await sender.send(message)

frequenz.channels.experimental.WithPrevious ¤

Bases: Generic[ChannelMessageT]

A composable predicate to build predicates that can use also the previous message.

This predicate can be used to filter messages based on a custom condition on the previous and current messages. This can be useful in cases where you want to process messages only if they satisfy a particular condition with respect to the previous message.

Receiving only messages that are different from the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious

channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
sender = channel.new_sender()

# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1

# This message will be skipped as it equals to the previous one.
await sender.send(1)

# This message will be received as it is a different from the previous one.
await sender.send(0)
assert await receiver.receive() == 0
Receiving only messages if they are bigger than the previous one.

```python from frequenz.channels import Broadcast from frequenz.channels.experimental import WithPrevious

channel = Broadcastint receiver = channel.new_receiver().filter( WithPrevious(lambda old, new: new > old, first_is_true=False) ) sender = channel.new_sender()

This message will skipped as first_is_true is False.¤

await sender.send(1)

This message will be received as it is bigger than the previous one (1).¤

await sender.send(2) assert await receiver.receive() == 2

This message will be skipped as it is smaller than the previous one (1).¤

await sender.send(0)

This message will be skipped as it is not bigger than the previous one (0).¤

await sender.send(0)

This message will be received as it is bigger than the previous one (0).¤

await sender.send(1) assert await receiver.receive() == 1

This message will be received as it is bigger than the previous one (1).¤

await sender.send(2) assert await receiver.receive() == 2

Source code in frequenz/channels/experimental/_with_previous.py
class WithPrevious(Generic[ChannelMessageT]):
    """A composable predicate to build predicates that can use also the previous message.

    This predicate can be used to filter messages based on a custom condition on the
    previous and current messages. This can be useful in cases where you want to
    process messages only if they satisfy a particular condition with respect to the
    previous message.

    Example: Receiving only messages that are different from the previous one.
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import WithPrevious

        channel = Broadcast[int](name="example")
        receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
        sender = channel.new_sender()

        # This message will be received as it is the first message.
        await sender.send(1)
        assert await receiver.receive() == 1

        # This message will be skipped as it equals to the previous one.
        await sender.send(1)

        # This message will be received as it is a different from the previous one.
        await sender.send(0)
        assert await receiver.receive() == 0
        ```

    Example: Receiving only messages if they are bigger than the previous one.
        ```python
        from frequenz.channels import Broadcast
        from frequenz.channels.experimental import WithPrevious

        channel = Broadcast[int](name="example")
        receiver = channel.new_receiver().filter(
            WithPrevious(lambda old, new: new > old, first_is_true=False)
        )
        sender = channel.new_sender()

        # This message will skipped as first_is_true is False.
        await sender.send(1)

        # This message will be received as it is bigger than the previous one (1).
        await sender.send(2)
        assert await receiver.receive() == 2

        # This message will be skipped as it is smaller than the previous one (1).
        await sender.send(0)

        # This message will be skipped as it is not bigger than the previous one (0).
        await sender.send(0)

        # This message will be received as it is bigger than the previous one (0).
        await sender.send(1)
        assert await receiver.receive() == 1

        # This message will be received as it is bigger than the previous one (1).
        await sender.send(2)
        assert await receiver.receive() == 2
    """

    def __init__(
        self,
        predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
        /,
        *,
        first_is_true: bool = True,
    ) -> None:
        """Initialize this instance.

        Args:
            predicate: A callable that takes two arguments, the previous message and the
                current message, and returns a boolean indicating whether the current
                message should be received.
            first_is_true: Whether the first message should be considered as satisfying
                the predicate. Defaults to `True`.
        """
        self._predicate = predicate
        self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
        self._first_is_true = first_is_true

    def __call__(self, message: ChannelMessageT) -> bool:
        """Return whether `message` is the first one received or different from the previous one."""

        def is_message(
            value: ChannelMessageT | _Sentinel,
        ) -> TypeGuard[ChannelMessageT]:
            return value is not _SENTINEL

        old_message = self._last_message
        self._last_message = message
        if is_message(old_message):
            return self._predicate(old_message, message)
        return self._first_is_true

    def __str__(self) -> str:
        """Return a string representation of this instance."""
        return f"{type(self).__name__}:{self._predicate.__name__}"

    def __repr__(self) -> str:
        """Return a string representation of this instance."""
        return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
Functions¤
__call__ ¤
__call__(message: ChannelMessageT) -> bool

Return whether message is the first one received or different from the previous one.

Source code in frequenz/channels/experimental/_with_previous.py
def __call__(self, message: ChannelMessageT) -> bool:
    """Return whether `message` is the first one received or different from the previous one."""

    def is_message(
        value: ChannelMessageT | _Sentinel,
    ) -> TypeGuard[ChannelMessageT]:
        return value is not _SENTINEL

    old_message = self._last_message
    self._last_message = message
    if is_message(old_message):
        return self._predicate(old_message, message)
    return self._first_is_true
__init__ ¤
__init__(
    predicate: Callable[
        [ChannelMessageT, ChannelMessageT], bool
    ],
    /,
    *,
    first_is_true: bool = True,
) -> None

Initialize this instance.

PARAMETER DESCRIPTION
predicate

A callable that takes two arguments, the previous message and the current message, and returns a boolean indicating whether the current message should be received.

TYPE: Callable[[ChannelMessageT, ChannelMessageT], bool]

first_is_true

Whether the first message should be considered as satisfying the predicate. Defaults to True.

TYPE: bool DEFAULT: True

Source code in frequenz/channels/experimental/_with_previous.py
def __init__(
    self,
    predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
    /,
    *,
    first_is_true: bool = True,
) -> None:
    """Initialize this instance.

    Args:
        predicate: A callable that takes two arguments, the previous message and the
            current message, and returns a boolean indicating whether the current
            message should be received.
        first_is_true: Whether the first message should be considered as satisfying
            the predicate. Defaults to `True`.
    """
    self._predicate = predicate
    self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
    self._first_is_true = first_is_true
__repr__ ¤
__repr__() -> str

Return a string representation of this instance.

Source code in frequenz/channels/experimental/_with_previous.py
def __repr__(self) -> str:
    """Return a string representation of this instance."""
    return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
__str__ ¤
__str__() -> str

Return a string representation of this instance.

Source code in frequenz/channels/experimental/_with_previous.py
def __str__(self) -> str:
    """Return a string representation of this instance."""
    return f"{type(self).__name__}:{self._predicate.__name__}"