Skip to content

Index

frequenz.channels ¤

Frequenz Channels.

This package contains channel implementations.

Base classes:

  • Receiver: An object that can wait for and consume messages from a channel.

  • Sender: An object that can send messages to a channel.

Channels:

  • Anycast: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

  • Broadcast: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Utilities to work with channels:

  • merge: Merge messages coming from multiple receivers into a single stream.

  • select: Iterate over the messages of all receivers as new messages become available.

Exception classes:

Extra utility receivers:

  • Event: A receiver that generates a message when an event is set.

  • FileWatcher: A receiver that generates a message when a file is added, modified or deleted.

  • Timer: A receiver that generates a message after a given amount of time.

Attributes¤

frequenz.channels.ChannelMessageT module-attribute ¤

ChannelMessageT = TypeVar('ChannelMessageT')

The type of the message that can be sent across a channel.

frequenz.channels.ErroredChannelT_co module-attribute ¤

ErroredChannelT_co = TypeVar(
    "ErroredChannelT_co", covariant=True
)

The type of channel having an error.

frequenz.channels.MappedMessageT_co module-attribute ¤

MappedMessageT_co = TypeVar(
    "MappedMessageT_co", covariant=True
)

The type of the message received by the receiver after being mapped.

frequenz.channels.ReceiverMessageT_co module-attribute ¤

ReceiverMessageT_co = TypeVar(
    "ReceiverMessageT_co", covariant=True
)

The type of the message received by a receiver.

frequenz.channels.SenderMessageT_co module-attribute ¤

SenderMessageT_co = TypeVar(
    "SenderMessageT_co", covariant=True
)

The type of the message sent by a sender.

frequenz.channels.SenderMessageT_contra module-attribute ¤

SenderMessageT_contra = TypeVar(
    "SenderMessageT_contra", contravariant=True
)

The type of the message sent by a sender.

Classes¤

frequenz.channels.Anycast ¤

Bases: Generic[ChannelMessageT]

A channel that delivers each message to exactly one receiver.

Description¤

Tip

Anycast channels behave like the Golang channels.

Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver (but any receiver).

Receiver Receiver msg1 msg1 Sender Channel Sender msg2 msg2

Characteristics

  • Buffered: Yes, with a global channel buffer
  • Buffer full policy: Block senders
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and if the senders are faster than the receivers, then the channel's buffer will fill up. In that case, the senders will block at the send() method until the receivers consume the messages in the channel's buffer. The channel's buffer size can be configured at creation time via the limit argument.

The first receiver that is awaited will get the next message. When multiple receivers are waiting, the asyncio loop scheduler picks a receiver for each next massage.

This means that, in practice, there might be only one receiver receiving all the messages, depending on how tasks are schduled.

If you need to ensure some delivery policy (like round-robin or uniformly random), then you will have to implement it yourself.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When the channel is not needed anymore, it should be closed with the close() method. This will prevent further attempts to send() data. Receivers will still be able to drain the pending messages on the channel, but after that, subsequent receive() calls will raise a ReceiverStoppedError exception.

This channel is useful, for example, to distribute work across multiple workers.

In cases where each message need to be received by every receiver, a broadcast channel may be used.

Examples¤
Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Anycast, Sender


async def send(sender: Sender[int]) -> None:
    for message in range(3):
        print(f"sending {message}")
        await sender.send(message)


async def main() -> None:
    channel = Anycast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            message = await receiver.receive()
            print(f"received {message}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for message in range(start, stop):
        print(f"{name} sending {message}")
        await sender.send(message)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for message in receiver:
            print(f"{name} received {message}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Anycast[int](name="numbers", limit=2)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
Source code in frequenz/channels/_anycast.py
class Anycast(Generic[ChannelMessageT]):
    """A channel that delivers each message to exactly one receiver.

    # Description

    !!! Tip inline end

        [Anycast][frequenz.channels.Anycast] channels behave like the
        [Golang](https://golang.org/) [channels](https://go.dev/ref/spec#Channel_types).

    [Anycast][frequenz.channels.Anycast] channels support multiple
    [senders][frequenz.channels.Sender] and multiple
    [receivers][frequenz.channels.Receiver]. Each message sent through any of the
    senders will be received by exactly one receiver (but **any** receiver).

    <center>
    ```bob
    .---------. msg1                           msg1  .-----------.
    | Sender  +------.                       .------>| Receiver  |
    '---------'      |      .----------.     |       '-----------'
                     +----->| Channel  +-----+
    .---------.      |      '----------'     |       .-----------.
    | Sender  +------'                       '------>| Receiver  |
    '---------' msg2                           msg2  '-----------'
    ```
    </center>

    !!! Note inline end "Characteristics"

        * **Buffered:** Yes, with a global channel buffer
        * **Buffer full policy:** Block senders
        * **Multiple receivers:** Yes
        * **Multiple senders:** Yes
        * **Thread-safe:** No

    This channel is buffered, and if the senders are faster than the receivers, then the
    channel's buffer will fill up. In that case, the senders will block at the
    [`send()`][frequenz.channels.Sender.send] method until the receivers consume the
    messages in the channel's buffer. The channel's buffer size can be configured at
    creation time via the `limit` argument.

    The first receiver that is awaited will get the next message. When multiple
    receivers are waiting, the [asyncio][] loop scheduler picks a receiver for each next
    massage.

    This means that, in practice, there might be only one receiver receiving all the
    messages, depending on how tasks are schduled.

    If you need to ensure some delivery policy (like round-robin or uniformly random),
    then you will have to implement it yourself.

    To create a new [senders][frequenz.channels.Sender] and
    [receivers][frequenz.channels.Receiver] you can use the
    [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
    [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
    respectively.

    When the channel is not needed anymore, it should be closed with the
    [`close()`][frequenz.channels.Anycast.close] method. This will prevent further
    attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
    able to drain the pending messages on the channel, but after that, subsequent
    [`receive()`][frequenz.channels.Receiver.receive] calls will raise a
    [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.

    This channel is useful, for example, to distribute work across multiple workers.

    In cases where each message need to be received by every receiver, a
    [broadcast][frequenz.channels.Broadcast] channel may be used.

    # Examples

    Example: Send a few numbers to a receiver
        This is a very simple example that sends a few numbers from a single sender to
        a single receiver.

        ```python
        import asyncio

        from frequenz.channels import Anycast, Sender


        async def send(sender: Sender[int]) -> None:
            for message in range(3):
                print(f"sending {message}")
                await sender.send(message)


        async def main() -> None:
            channel = Anycast[int](name="numbers")

            sender = channel.new_sender()
            receiver = channel.new_receiver()

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send(sender))
                for _ in range(3):
                    message = await receiver.receive()
                    print(f"received {message}")
                    await asyncio.sleep(0.1)  # sleep (or work) with the data


        asyncio.run(main())
        ```

        The output should look something like (although the sending and received might
        appear more interleaved):

        ```
        sending 0
        sending 1
        sending 2
        received 0
        received 1
        received 2
        ```

    Example: Send a few number from multiple senders to multiple receivers
        This is a more complex example that sends a few numbers from multiple senders to
        multiple receivers, using a small buffer to force the senders to block.

        ```python
        import asyncio

        from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


        async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
            for message in range(start, stop):
                print(f"{name} sending {message}")
                await sender.send(message)


        async def recv(name: str, receiver: Receiver[int]) -> None:
            try:
                async for message in receiver:
                    print(f"{name} received {message}")
                await asyncio.sleep(0.1)  # sleep (or work) with the data
            except ReceiverStoppedError:
                pass


        async def main() -> None:
            acast = Anycast[int](name="numbers", limit=2)

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
                task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
                task_group.create_task(recv("receiver_1", acast.new_receiver()))
                task_group.create_task(recv("receiver_2", acast.new_receiver()))


        asyncio.run(main())
        ```

        The output should look something like this(although the sending and received
        might appear interleaved in a different way):

        ```
        sender_1 sending 10
        sender_1 sending 11
        sender_1 sending 12
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a message
        sender_2 sending 20
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a message
        receiver_1 received 10
        receiver_1 received 11
        sender_2 sending 21
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a message
        receiver_1 received 12
        receiver_1 received 20
        receiver_1 received 21
        ```
    """

    def __init__(self, *, name: str, limit: int = 10) -> None:
        """Initialize this channel.

        Args:
            name: The name of the channel. This is for logging purposes, and it will be
                shown in the string representation of the channel.
            limit: The size of the internal buffer in number of messages.  If the buffer
                is full, then the senders will block until the receivers consume the
                messages in the buffer.
        """
        self._name: str = name
        """The name of the channel.

        This is for logging purposes, and it will be shown in the string representation
        of the channel.
        """

        self._deque: deque[ChannelMessageT] = deque(maxlen=limit)
        """The channel's buffer."""

        self._send_cv: Condition = Condition()
        """The condition to wait for free space in the channel's buffer.

        If the channel's buffer is full, then the sender waits for messages to
        get consumed using this condition until there's some free space
        available in the channel's buffer.
        """

        self._recv_cv: Condition = Condition()
        """The condition to wait for messages in the channel's buffer.

        If the channel's buffer is empty, then the receiver waits for messages
        using this condition until there's a message available in the channel's
        buffer.
        """

        self._closed: bool = False
        """Whether the channel is closed."""

    @property
    def name(self) -> str:
        """The name of this channel.

        This is for debugging purposes, it will be shown in the string representation
        of this channel.
        """
        return self._name

    @property
    def is_closed(self) -> bool:
        """Whether this channel is closed.

        Any further attempts to use this channel after it is closed will result in an
        exception.
        """
        return self._closed

    @property
    def limit(self) -> int:
        """The maximum number of messages that can be stored in the channel's buffer.

        If the length of channel's buffer reaches the limit, then the sender
        blocks at the [send()][frequenz.channels.Sender.send] method until
        a message is consumed.
        """
        maxlen = self._deque.maxlen
        assert maxlen is not None
        return maxlen

    async def close(self) -> None:
        """Close the channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending messages on the channel,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._closed = True
        async with self._send_cv:
            self._send_cv.notify_all()
        async with self._recv_cv:
            self._recv_cv.notify_all()

    def new_sender(self) -> Sender[ChannelMessageT]:
        """Return a new sender attached to this channel."""
        return _Sender(self)

    def new_receiver(self) -> Receiver[ChannelMessageT]:
        """Return a new receiver attached to this channel."""
        return _Receiver(self)

    def __str__(self) -> str:
        """Return a string representation of this channel."""
        return f"{type(self).__name__}:{self._name}"

    def __repr__(self) -> str:
        """Return a string representation of this channel."""
        return (
            f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<"
            f"current={len(self._deque)!r}, closed={self._closed!r}>"
        )
Attributes¤
is_closed property ¤
is_closed: bool

Whether this channel is closed.

Any further attempts to use this channel after it is closed will result in an exception.

limit property ¤
limit: int

The maximum number of messages that can be stored in the channel's buffer.

If the length of channel's buffer reaches the limit, then the sender blocks at the send() method until a message is consumed.

name property ¤
name: str

The name of this channel.

This is for debugging purposes, it will be shown in the string representation of this channel.

Functions¤
__init__ ¤
__init__(*, name: str, limit: int = 10) -> None

Initialize this channel.

PARAMETER DESCRIPTION
name

The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.

TYPE: str

limit

The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer.

TYPE: int DEFAULT: 10

Source code in frequenz/channels/_anycast.py
def __init__(self, *, name: str, limit: int = 10) -> None:
    """Initialize this channel.

    Args:
        name: The name of the channel. This is for logging purposes, and it will be
            shown in the string representation of the channel.
        limit: The size of the internal buffer in number of messages.  If the buffer
            is full, then the senders will block until the receivers consume the
            messages in the buffer.
    """
    self._name: str = name
    """The name of the channel.

    This is for logging purposes, and it will be shown in the string representation
    of the channel.
    """

    self._deque: deque[ChannelMessageT] = deque(maxlen=limit)
    """The channel's buffer."""

    self._send_cv: Condition = Condition()
    """The condition to wait for free space in the channel's buffer.

    If the channel's buffer is full, then the sender waits for messages to
    get consumed using this condition until there's some free space
    available in the channel's buffer.
    """

    self._recv_cv: Condition = Condition()
    """The condition to wait for messages in the channel's buffer.

    If the channel's buffer is empty, then the receiver waits for messages
    using this condition until there's a message available in the channel's
    buffer.
    """

    self._closed: bool = False
    """Whether the channel is closed."""
__repr__ ¤
__repr__() -> str

Return a string representation of this channel.

Source code in frequenz/channels/_anycast.py
def __repr__(self) -> str:
    """Return a string representation of this channel."""
    return (
        f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<"
        f"current={len(self._deque)!r}, closed={self._closed!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this channel.

Source code in frequenz/channels/_anycast.py
def __str__(self) -> str:
    """Return a string representation of this channel."""
    return f"{type(self).__name__}:{self._name}"
close async ¤
close() -> None

Close the channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending messages on the channel, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/_anycast.py
async def close(self) -> None:
    """Close the channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending messages on the channel,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._closed = True
    async with self._send_cv:
        self._send_cv.notify_all()
    async with self._recv_cv:
        self._recv_cv.notify_all()
new_receiver ¤
new_receiver() -> Receiver[ChannelMessageT]

Return a new receiver attached to this channel.

Source code in frequenz/channels/_anycast.py
def new_receiver(self) -> Receiver[ChannelMessageT]:
    """Return a new receiver attached to this channel."""
    return _Receiver(self)
new_sender ¤
new_sender() -> Sender[ChannelMessageT]

Return a new sender attached to this channel.

Source code in frequenz/channels/_anycast.py
def new_sender(self) -> Sender[ChannelMessageT]:
    """Return a new sender attached to this channel."""
    return _Sender(self)

frequenz.channels.Broadcast ¤

Bases: Generic[ChannelMessageT]

A channel that deliver all messages to all receivers.

Description¤

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.

Receiver Receiver msg1 msg1,msg2 Sender Channel Sender msg2 msg1,msg2

Characteristics

  • Buffered: Yes, with one buffer per receiver
  • Buffer full policy: Drop oldest message
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.

Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When a channel is not needed anymore, it should be closed with close(). This will prevent further attempts to send() data, and will allow receivers to drain the pending items on their queues, but after that, subsequent receive() calls will raise a ReceiverStoppedError.

This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.

In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.

Examples¤
Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Broadcast, Sender


async def send(sender: Sender[int]) -> None:
    for message in range(3):
        print(f"sending {message}")
        await sender.send(message)


async def main() -> None:
    channel = Broadcast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            message = await receiver.receive()
            print(f"received {message}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for message in range(start, stop):
        print(f"{name} sending {message}")
        await sender.send(message)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for message in receiver:
            print(f"{name} received {message}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Broadcast[int](name="numbers")

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21
Source code in frequenz/channels/_broadcast.py
class Broadcast(Generic[ChannelMessageT]):
    """A channel that deliver all messages to all receivers.

    # Description

    [Broadcast][frequenz.channels.Broadcast] channels can have multiple
    [senders][frequenz.channels.Sender] and multiple
    [receivers][frequenz.channels.Receiver]. Each message sent through any of the
    senders will be received by all receivers.

    <center>
    ```bob
    .---------. msg1                           msg1,msg2  .-----------.
    | Sender  +------.                        .---------->| Receiver  |
    '---------'      |      .----------.     |            '-----------'
                     +----->| Channel  +-----+
    .---------.      |      '----------'     |            .-----------.
    | Sender  +------'                       '----------->| Receiver  |
    '---------' msg2                           msg1,msg2  '-----------'
    ```
    </center>

    !!! Note inline end "Characteristics"

        * **Buffered:** Yes, with one buffer per receiver
        * **Buffer full policy:** Drop oldest message
        * **Multiple receivers:** Yes
        * **Multiple senders:** Yes
        * **Thread-safe:** No

    This channel is buffered, and when messages are not being consumed fast
    enough and the buffer fills up, old messages will get dropped.

    Each receiver has its own buffer, so messages will only be dropped for
    receivers that can't keep up with the senders, and not for the whole
    channel.

    To create a new [senders][frequenz.channels.Sender] and
    [receivers][frequenz.channels.Receiver] you can use the
    [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
    [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
    respectively.

    When a channel is not needed anymore, it should be closed with
    [`close()`][frequenz.channels.Broadcast.close]. This will prevent further
    attempts to [`send()`][frequenz.channels.Sender.send] data, and will allow
    receivers to drain the pending items on their queues, but after that,
    subsequent [receive()][frequenz.channels.Receiver.receive] calls will
    raise a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].

    This channel is useful, for example, to implement a pub/sub pattern, where
    multiple receivers can subscribe to a channel to receive all messages.

    In cases where each message needs to be delivered only to one receiver, an
    [anycast][frequenz.channels.Anycast] channel may be used.

    # Examples

    Example: Send a few numbers to a receiver
        This is a very simple example that sends a few numbers from a single sender to
        a single receiver.

        ```python
        import asyncio

        from frequenz.channels import Broadcast, Sender


        async def send(sender: Sender[int]) -> None:
            for message in range(3):
                print(f"sending {message}")
                await sender.send(message)


        async def main() -> None:
            channel = Broadcast[int](name="numbers")

            sender = channel.new_sender()
            receiver = channel.new_receiver()

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send(sender))
                for _ in range(3):
                    message = await receiver.receive()
                    print(f"received {message}")
                    await asyncio.sleep(0.1)  # sleep (or work) with the data


        asyncio.run(main())
        ```

        The output should look something like (although the sending and received might
        appear more interleaved):

        ```
        sending 0
        sending 1
        sending 2
        received 0
        received 1
        received 2
        ```

    Example: Send a few number from multiple senders to multiple receivers
        This is a more complex example that sends a few numbers from multiple senders to
        multiple receivers, using a small buffer to force the senders to block.

        ```python
        import asyncio

        from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender


        async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
            for message in range(start, stop):
                print(f"{name} sending {message}")
                await sender.send(message)


        async def recv(name: str, receiver: Receiver[int]) -> None:
            try:
                async for message in receiver:
                    print(f"{name} received {message}")
                await asyncio.sleep(0.1)  # sleep (or work) with the data
            except ReceiverStoppedError:
                pass


        async def main() -> None:
            acast = Broadcast[int](name="numbers")

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
                task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
                task_group.create_task(recv("receiver_1", acast.new_receiver()))
                task_group.create_task(recv("receiver_2", acast.new_receiver()))


        asyncio.run(main())
        ```

        The output should look something like this(although the sending and received
        might appear interleaved in a different way):

        ```
        sender_1 sending 10
        sender_1 sending 11
        sender_1 sending 12
        sender_2 sending 20
        sender_2 sending 21
        receiver_1 received 10
        receiver_1 received 11
        receiver_1 received 12
        receiver_1 received 20
        receiver_1 received 21
        receiver_2 received 10
        receiver_2 received 11
        receiver_2 received 12
        receiver_2 received 20
        receiver_2 received 21
        ```
    """

    def __init__(self, *, name: str, resend_latest: bool = False) -> None:
        """Initialize this channel.

        Args:
            name: The name of the channel. This is for logging purposes, and it will be
                shown in the string representation of the channel.
            resend_latest: When True, every time a new receiver is created with
                `new_receiver`, the last message seen by the channel will be sent to the
                new receiver automatically. This allows new receivers on slow streams to
                get the latest message as soon as they are created, without having to
                wait for the next message on the channel to arrive.  It is safe to be
                set in data/reporting channels, but is not recommended for use in
                channels that stream control instructions.
        """
        self._name: str = name
        """The name of the broadcast channel.

        Only used for debugging purposes.
        """

        self._recv_cv: Condition = Condition()
        """The condition to wait for data in the channel's buffer."""

        self._receivers: dict[
            int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
        ] = {}
        """The receivers attached to the channel, indexed by their hash()."""

        self._closed: bool = False
        """Whether the channel is closed."""

        self._latest: ChannelMessageT | None = None
        """The latest message sent to the channel."""

        self.resend_latest: bool = resend_latest
        """Whether to resend the latest message to new receivers.

        When `True`, every time a new receiver is created with `new_receiver`, it will
        automatically get sent the latest message on the channel.  This allows new
        receivers on slow streams to get the latest message as soon as they are created,
        without having to wait for the next message on the channel to arrive.

        It is safe to be set in data/reporting channels, but is not recommended for use
        in channels that stream control instructions.
        """

    @property
    def name(self) -> str:
        """The name of this channel.

        This is for logging purposes, and it will be shown in the string representation
        of this channel.
        """
        return self._name

    @property
    def is_closed(self) -> bool:
        """Whether this channel is closed.

        Any further attempts to use this channel after it is closed will result in an
        exception.
        """
        return self._closed

    async def close(self) -> None:
        """Close this channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on their queues,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._latest = None
        self._closed = True
        async with self._recv_cv:
            self._recv_cv.notify_all()

    def new_sender(self) -> Sender[ChannelMessageT]:
        """Return a new sender attached to this channel."""
        return _Sender(self)

    def new_receiver(
        self, *, name: str | None = None, limit: int = 50
    ) -> Receiver[ChannelMessageT]:
        """Return a new receiver attached to this channel.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            name: A name to identify the receiver in the logs.
            limit: Number of messages the receiver can hold in its buffer.

        Returns:
            A new receiver attached to this channel.
        """
        recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit)
        self._receivers[hash(recv)] = weakref.ref(recv)
        if self.resend_latest and self._latest is not None:
            recv.enqueue(self._latest)
        return recv

    def __str__(self) -> str:
        """Return a string representation of this channel."""
        return f"{type(self).__name__}:{self._name}"

    def __repr__(self) -> str:
        """Return a string representation of this channel."""
        return (
            f"{type(self).__name__}(name={self._name!r}, "
            f"resend_latest={self.resend_latest!r}):<"
            f"latest={self._latest!r}, "
            f"receivers={len(self._receivers)!r}, "
            f"closed={self._closed!r}>"
        )
Attributes¤
is_closed property ¤
is_closed: bool

Whether this channel is closed.

Any further attempts to use this channel after it is closed will result in an exception.

name property ¤
name: str

The name of this channel.

This is for logging purposes, and it will be shown in the string representation of this channel.

resend_latest instance-attribute ¤
resend_latest: bool = resend_latest

Whether to resend the latest message to new receivers.

When True, every time a new receiver is created with new_receiver, it will automatically get sent the latest message on the channel. This allows new receivers on slow streams to get the latest message as soon as they are created, without having to wait for the next message on the channel to arrive.

It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.

Functions¤
__init__ ¤
__init__(*, name: str, resend_latest: bool = False) -> None

Initialize this channel.

PARAMETER DESCRIPTION
name

The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.

TYPE: str

resend_latest

When True, every time a new receiver is created with new_receiver, the last message seen by the channel will be sent to the new receiver automatically. This allows new receivers on slow streams to get the latest message as soon as they are created, without having to wait for the next message on the channel to arrive. It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.

TYPE: bool DEFAULT: False

Source code in frequenz/channels/_broadcast.py
def __init__(self, *, name: str, resend_latest: bool = False) -> None:
    """Initialize this channel.

    Args:
        name: The name of the channel. This is for logging purposes, and it will be
            shown in the string representation of the channel.
        resend_latest: When True, every time a new receiver is created with
            `new_receiver`, the last message seen by the channel will be sent to the
            new receiver automatically. This allows new receivers on slow streams to
            get the latest message as soon as they are created, without having to
            wait for the next message on the channel to arrive.  It is safe to be
            set in data/reporting channels, but is not recommended for use in
            channels that stream control instructions.
    """
    self._name: str = name
    """The name of the broadcast channel.

    Only used for debugging purposes.
    """

    self._recv_cv: Condition = Condition()
    """The condition to wait for data in the channel's buffer."""

    self._receivers: dict[
        int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
    ] = {}
    """The receivers attached to the channel, indexed by their hash()."""

    self._closed: bool = False
    """Whether the channel is closed."""

    self._latest: ChannelMessageT | None = None
    """The latest message sent to the channel."""

    self.resend_latest: bool = resend_latest
    """Whether to resend the latest message to new receivers.

    When `True`, every time a new receiver is created with `new_receiver`, it will
    automatically get sent the latest message on the channel.  This allows new
    receivers on slow streams to get the latest message as soon as they are created,
    without having to wait for the next message on the channel to arrive.

    It is safe to be set in data/reporting channels, but is not recommended for use
    in channels that stream control instructions.
    """
__repr__ ¤
__repr__() -> str

Return a string representation of this channel.

Source code in frequenz/channels/_broadcast.py
def __repr__(self) -> str:
    """Return a string representation of this channel."""
    return (
        f"{type(self).__name__}(name={self._name!r}, "
        f"resend_latest={self.resend_latest!r}):<"
        f"latest={self._latest!r}, "
        f"receivers={len(self._receivers)!r}, "
        f"closed={self._closed!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this channel.

Source code in frequenz/channels/_broadcast.py
def __str__(self) -> str:
    """Return a string representation of this channel."""
    return f"{type(self).__name__}:{self._name}"
close async ¤
close() -> None

Close this channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on their queues, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/_broadcast.py
async def close(self) -> None:
    """Close this channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on their queues,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._latest = None
    self._closed = True
    async with self._recv_cv:
        self._recv_cv.notify_all()
new_receiver ¤
new_receiver(
    *, name: str | None = None, limit: int = 50
) -> Receiver[ChannelMessageT]

Return a new receiver attached to this channel.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: str | None DEFAULT: None

limit

Number of messages the receiver can hold in its buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[ChannelMessageT]

A new receiver attached to this channel.

Source code in frequenz/channels/_broadcast.py
def new_receiver(
    self, *, name: str | None = None, limit: int = 50
) -> Receiver[ChannelMessageT]:
    """Return a new receiver attached to this channel.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        name: A name to identify the receiver in the logs.
        limit: Number of messages the receiver can hold in its buffer.

    Returns:
        A new receiver attached to this channel.
    """
    recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit)
    self._receivers[hash(recv)] = weakref.ref(recv)
    if self.resend_latest and self._latest is not None:
        recv.enqueue(self._latest)
    return recv
new_sender ¤
new_sender() -> Sender[ChannelMessageT]

Return a new sender attached to this channel.

Source code in frequenz/channels/_broadcast.py
def new_sender(self) -> Sender[ChannelMessageT]:
    """Return a new sender attached to this channel."""
    return _Sender(self)

frequenz.channels.ChannelClosedError ¤

Bases: ChannelError[ErroredChannelT_co]

A closed channel was used.

Source code in frequenz/channels/_exceptions.py
class ChannelClosedError(ChannelError[ErroredChannelT_co]):
    """A closed channel was used."""

    def __init__(self, channel: ErroredChannelT_co):
        """Initialize this error.

        Args:
            channel: The channel that was closed.
        """
        super().__init__(f"Channel {channel} was closed", channel)
Attributes¤
channel instance-attribute ¤

The channel where the error happened.

Functions¤
__init__ ¤
__init__(channel: ErroredChannelT_co)

Initialize this error.

PARAMETER DESCRIPTION
channel

The channel that was closed.

TYPE: ErroredChannelT_co

Source code in frequenz/channels/_exceptions.py
def __init__(self, channel: ErroredChannelT_co):
    """Initialize this error.

    Args:
        channel: The channel that was closed.
    """
    super().__init__(f"Channel {channel} was closed", channel)

frequenz.channels.ChannelError ¤

Bases: Error, Generic[ErroredChannelT_co]

An error that originated in a channel.

All exceptions generated by channels inherit from this exception.

Source code in frequenz/channels/_exceptions.py
class ChannelError(Error, Generic[ErroredChannelT_co]):
    """An error that originated in a channel.

    All exceptions generated by channels inherit from this exception.
    """

    def __init__(self, message: str, channel: ErroredChannelT_co):
        """Initialize this error.

        Args:
            message: The error message.
            channel: The channel where the error happened.
        """
        super().__init__(message)
        self.channel: ErroredChannelT_co = channel
        """The channel where the error happened."""
Attributes¤
channel instance-attribute ¤

The channel where the error happened.

Functions¤
__init__ ¤
__init__(message: str, channel: ErroredChannelT_co)

Initialize this error.

PARAMETER DESCRIPTION
message

The error message.

TYPE: str

channel

The channel where the error happened.

TYPE: ErroredChannelT_co

Source code in frequenz/channels/_exceptions.py
def __init__(self, message: str, channel: ErroredChannelT_co):
    """Initialize this error.

    Args:
        message: The error message.
        channel: The channel where the error happened.
    """
    super().__init__(message)
    self.channel: ErroredChannelT_co = channel
    """The channel where the error happened."""

frequenz.channels.Error ¤

Bases: RuntimeError

An error that originated in this library.

This is useful if you want to catch all exceptions generated by this library.

Source code in frequenz/channels/_exceptions.py
class Error(RuntimeError):
    """An error that originated in this library.

    This is useful if you want to catch all exceptions generated by this library.
    """

    def __init__(self, message: str):
        """Initialize this error.

        Args:
            message: The error message.
        """
        super().__init__(message)
Functions¤
__init__ ¤
__init__(message: str)

Initialize this error.

PARAMETER DESCRIPTION
message

The error message.

TYPE: str

Source code in frequenz/channels/_exceptions.py
def __init__(self, message: str):
    """Initialize this error.

    Args:
        message: The error message.
    """
    super().__init__(message)

frequenz.channels.Merger ¤

Bases: Receiver[ReceiverMessageT_co]

A receiver that merges messages coming from multiple receivers into a single stream.

Tip

Please consider using the more idiomatic merge() function instead of creating a Merger instance directly.

Source code in frequenz/channels/_merge.py
class Merger(Receiver[ReceiverMessageT_co]):
    """A receiver that merges messages coming from multiple receivers into a single stream.

    Tip:
        Please consider using the more idiomatic [`merge()`][frequenz.channels.merge]
        function instead of creating a `Merger` instance directly.
    """

    def __init__(
        self, *receivers: Receiver[ReceiverMessageT_co], name: str | None
    ) -> None:
        """Initialize this merger.

        Args:
            *receivers: The receivers to merge.
            name: The name of the receiver. Used to create the string representation
                of the receiver.
        """
        self._receivers: dict[str, Receiver[ReceiverMessageT_co]] = {
            str(id): recv for id, recv in enumerate(receivers)
        }
        self._name: str = name if name is not None else type(self).__name__
        self._pending: set[asyncio.Task[Any]] = {
            asyncio.create_task(anext(recv), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: deque[ReceiverMessageT_co] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Finalize this merger."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop this merger."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a message or an error.

        Once a call to `ready()` has finished, the message should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    asyncio.create_task(anext(self._receivers[name]), name=name)
                )

    def consume(self) -> ReceiverMessageT_co:
        """Return the latest message once `ready` is complete.

        Returns:
            The next message that was received.

        Raises:
            ReceiverStoppedError: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceded by a call to `ready()`"

        return self._results.popleft()

    def __str__(self) -> str:
        """Return a string representation of this receiver."""
        if len(self._receivers) > 3:
            receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)]
            receivers.append("…")
        else:
            receivers = [str(p) for p in self._receivers.values()]
        return f"{self._name}:{','.join(receivers)}"

    def __repr__(self) -> str:
        """Return a string representation of this receiver."""
        return (
            f"{self._name}("
            f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})"
        )
Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__del__ ¤
__del__() -> None

Finalize this merger.

Source code in frequenz/channels/_merge.py
def __del__(self) -> None:
    """Finalize this merger."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__ ¤
__init__(
    *receivers: Receiver[ReceiverMessageT_co],
    name: str | None
) -> None

Initialize this merger.

PARAMETER DESCRIPTION
*receivers

The receivers to merge.

TYPE: Receiver[ReceiverMessageT_co] DEFAULT: ()

name

The name of the receiver. Used to create the string representation of the receiver.

TYPE: str | None

Source code in frequenz/channels/_merge.py
def __init__(
    self, *receivers: Receiver[ReceiverMessageT_co], name: str | None
) -> None:
    """Initialize this merger.

    Args:
        *receivers: The receivers to merge.
        name: The name of the receiver. Used to create the string representation
            of the receiver.
    """
    self._receivers: dict[str, Receiver[ReceiverMessageT_co]] = {
        str(id): recv for id, recv in enumerate(receivers)
    }
    self._name: str = name if name is not None else type(self).__name__
    self._pending: set[asyncio.Task[Any]] = {
        asyncio.create_task(anext(recv), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: deque[ReceiverMessageT_co] = deque(maxlen=len(self._receivers))
__repr__ ¤
__repr__() -> str

Return a string representation of this receiver.

Source code in frequenz/channels/_merge.py
def __repr__(self) -> str:
    """Return a string representation of this receiver."""
    return (
        f"{self._name}("
        f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})"
    )
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

Source code in frequenz/channels/_merge.py
def __str__(self) -> str:
    """Return a string representation of this receiver."""
    if len(self._receivers) > 3:
        receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)]
        receivers.append("…")
    else:
        receivers = [str(p) for p in self._receivers.values()]
    return f"{self._name}:{','.join(receivers)}"
consume ¤
consume() -> ReceiverMessageT_co

Return the latest message once ready is complete.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next message that was received.

RAISES DESCRIPTION
ReceiverStoppedError

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_merge.py
def consume(self) -> ReceiverMessageT_co:
    """Return the latest message once `ready` is complete.

    Returns:
        The next message that was received.

    Raises:
        ReceiverStoppedError: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceded by a call to `ready()`"

    return self._results.popleft()
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait until the receiver is ready with a message or an error.

Once a call to ready() has finished, the message should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/_merge.py
async def ready(self) -> bool:
    """Wait until the receiver is ready with a message or an error.

    Once a call to `ready()` has finished, the message should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                asyncio.create_task(anext(self._receivers[name]), name=name)
            )
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received
stop async ¤
stop() -> None

Stop this merger.

Source code in frequenz/channels/_merge.py
async def stop(self) -> None:
    """Stop this merger."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.Receiver ¤

Bases: ABC, Generic[ReceiverMessageT_co]

An endpoint to receive messages.

Source code in frequenz/channels/_receiver.py
class Receiver(ABC, Generic[ReceiverMessageT_co]):
    """An endpoint to receive messages."""

    async def __anext__(self) -> ReceiverMessageT_co:
        """Await the next message in the async iteration over received messages.

        Returns:
            The next received message.

        Raises:
            StopAsyncIteration: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the receiver.
        """
        try:
            await self.ready()
            return self.consume()
        except ReceiverStoppedError as exc:
            raise StopAsyncIteration() from exc

    @abstractmethod
    async def ready(self) -> bool:
        """Wait until the receiver is ready with a message or an error.

        Once a call to `ready()` has finished, the message should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """

    @abstractmethod
    def consume(self) -> ReceiverMessageT_co:
        """Return the latest message once `ready()` is complete.

        `ready()` must be called before each call to `consume()`.

        Returns:
            The next message received.

        Raises:
            ReceiverStoppedError: If the receiver stopped producing messages.
            ReceiverError: If there is some problem with the receiver.
        """

    def __aiter__(self) -> Self:
        """Get an async iterator over the received messages.

        Returns:
            This receiver, as it is already an async iterator.
        """
        return self

    async def receive(self) -> ReceiverMessageT_co:
        """Receive a message.

        Returns:
            The received message.

        Raises:
            ReceiverStoppedError: If there is some problem with the receiver.
            ReceiverError: If there is some problem with the receiver.
        """
        try:
            received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
        except StopAsyncIteration as exc:
            # If we already had a cause and it was the receiver was stopped,
            # then reuse that error, as StopAsyncIteration is just an artifact
            # introduced by __anext__.
            if (
                isinstance(exc.__cause__, ReceiverStoppedError)
                # pylint is not smart enough to figure out we checked above
                # this is a ReceiverStoppedError and thus it does have
                # a receiver member
                and exc.__cause__.receiver is self  # pylint: disable=no-member
            ):
                raise exc.__cause__
            raise ReceiverStoppedError(self) from exc
        return received

    def map(
        self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
    ) -> Receiver[MappedMessageT_co]:
        """Apply a mapping function on the received message.

        Tip:
            The returned receiver type won't have all the methods of the original
            receiver. If you need to access methods of the original receiver that are
            not part of the `Receiver` interface you should save a reference to the
            original receiver and use that instead.

        Args:
            mapping_function: The function to be applied on incoming messages.

        Returns:
            A new receiver that applies the function on the received messages.
        """
        return _Mapper(receiver=self, mapping_function=mapping_function)
Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
consume abstractmethod ¤
consume() -> ReceiverMessageT_co

Return the latest message once ready() is complete.

ready() must be called before each call to consume().

RETURNS DESCRIPTION
ReceiverMessageT_co

The next message received.

RAISES DESCRIPTION
ReceiverStoppedError

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
@abstractmethod
def consume(self) -> ReceiverMessageT_co:
    """Return the latest message once `ready()` is complete.

    `ready()` must be called before each call to `consume()`.

    Returns:
        The next message received.

    Raises:
        ReceiverStoppedError: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready abstractmethod async ¤
ready() -> bool

Wait until the receiver is ready with a message or an error.

Once a call to ready() has finished, the message should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/_receiver.py
@abstractmethod
async def ready(self) -> bool:
    """Wait until the receiver is ready with a message or an error.

    Once a call to `ready()` has finished, the message should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received

frequenz.channels.ReceiverError ¤

Bases: Error, Generic[ReceiverMessageT_co]

An error that originated in a Receiver.

All exceptions generated by receivers inherit from this exception.

Source code in frequenz/channels/_receiver.py
class ReceiverError(Error, Generic[ReceiverMessageT_co]):
    """An error that originated in a [Receiver][frequenz.channels.Receiver].

    All exceptions generated by receivers inherit from this exception.
    """

    def __init__(self, message: str, receiver: Receiver[ReceiverMessageT_co]):
        """Initialize this error.

        Args:
            message: The error message.
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(message)
        self.receiver: Receiver[ReceiverMessageT_co] = receiver
        """The receiver where the error happened."""
Attributes¤
receiver instance-attribute ¤

The receiver where the error happened.

Functions¤
__init__ ¤
__init__(
    message: str, receiver: Receiver[ReceiverMessageT_co]
)

Initialize this error.

PARAMETER DESCRIPTION
message

The error message.

TYPE: str

receiver

The Receiver where the error happened.

TYPE: Receiver[ReceiverMessageT_co]

Source code in frequenz/channels/_receiver.py
def __init__(self, message: str, receiver: Receiver[ReceiverMessageT_co]):
    """Initialize this error.

    Args:
        message: The error message.
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(message)
    self.receiver: Receiver[ReceiverMessageT_co] = receiver
    """The receiver where the error happened."""

frequenz.channels.ReceiverStoppedError ¤

Bases: ReceiverError[ReceiverMessageT_co]

A stopped Receiver was used.

Source code in frequenz/channels/_receiver.py
class ReceiverStoppedError(ReceiverError[ReceiverMessageT_co]):
    """A stopped [`Receiver`][frequenz.channels.Receiver] was used."""

    def __init__(self, receiver: Receiver[ReceiverMessageT_co]):
        """Initialize this error.

        Args:
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(f"Receiver {receiver} was stopped", receiver)
Attributes¤
receiver instance-attribute ¤

The receiver where the error happened.

Functions¤
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co])

Initialize this error.

PARAMETER DESCRIPTION
receiver

The Receiver where the error happened.

TYPE: Receiver[ReceiverMessageT_co]

Source code in frequenz/channels/_receiver.py
def __init__(self, receiver: Receiver[ReceiverMessageT_co]):
    """Initialize this error.

    Args:
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(f"Receiver {receiver} was stopped", receiver)

frequenz.channels.SelectError ¤

Bases: Error

An error that happened during a select() operation.

This exception is raised when a select() iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling ready() for example). It is raised as a group exception (BaseExceptionGroup) when a select loop is cleaning up after it's done.

Source code in frequenz/channels/_select.py
class SelectError(Error):
    """An error that happened during a [`select()`][frequenz.channels.select] operation.

    This exception is raised when a `select()` iteration fails.  It is raised as
    a single exception when one receiver fails during normal operation (while calling
    `ready()` for example).  It is raised as a group exception
    ([`BaseExceptionGroup`][]) when a `select` loop is cleaning up after it's done.
    """

frequenz.channels.Selected ¤

Bases: Generic[ReceiverMessageT_co]

A result of a select() iteration.

The selected receiver is consumed immediately and the received message is stored in the instance, unless there was an exception while receiving the message, in which case the exception is stored instead.

Selected instances should be used in conjunction with the selected_from() function to determine which receiver was selected.

Please see select() for an example.

Source code in frequenz/channels/_select.py
class Selected(Generic[ReceiverMessageT_co]):
    """A result of a [`select()`][frequenz.channels.select] iteration.

    The selected receiver is consumed immediately and the received message is stored in
    the instance, unless there was an exception while receiving the message, in which
    case the exception is stored instead.

    `Selected` instances should be used in conjunction with the
    [`selected_from()`][frequenz.channels.selected_from] function to determine
    which receiver was selected.

    Please see [`select()`][frequenz.channels.select] for an example.
    """

    def __init__(self, receiver: Receiver[ReceiverMessageT_co], /) -> None:
        """Initialize this selected result.

        The receiver is consumed immediately when creating the instance and the received
        message is stored in the instance for later use as
        [`message`][frequenz.channels.Selected.message].  If there was an exception
        while receiving the message, then the exception is stored in the instance
        instead (as [`exception`][frequenz.channels.Selected.exception]).

        Args:
            receiver: The receiver that was selected.
        """
        self._recv: Receiver[ReceiverMessageT_co] = receiver
        """The receiver that was selected."""

        self._message: ReceiverMessageT_co | _EmptyResult = _EmptyResult()
        """The message that was received.

        If there was an exception while receiving the message, then this will be `None`.
        """
        self._exception: Exception | None = None
        """The exception that was raised while receiving the message (if any)."""

        try:
            self._message = receiver.consume()
        except Exception as exc:  # pylint: disable=broad-except
            self._exception = exc

        self._handled: bool = False
        """Flag to indicate if this selected has been handled in the if-chain."""

    @property
    def message(self) -> ReceiverMessageT_co:
        """The message that was received, if any.

        Returns:
            The message that was received.

        Raises:
            Exception: If there was an exception while receiving the message. Normally
                this should be an [`frequenz.channels.Error`][frequenz.channels.Error]
                instance, but catches all exceptions in case some receivers can raise
                anything else.
        """
        if self._exception is not None:
            raise self._exception
        assert not isinstance(self._message, _EmptyResult)
        return self._message

    @property
    def exception(self) -> Exception | None:
        """The exception that was raised while receiving the message (if any).

        Returns:
            The exception that was raised while receiving the message (if any).
        """
        return self._exception

    @property
    def was_stopped(self) -> bool:
        """Whether the selected receiver was stopped while receiving a message."""
        return isinstance(self._exception, ReceiverStoppedError)

    def __str__(self) -> str:
        """Return a string representation of this selected receiver."""
        return (
            f"{type(self).__name__}({self._recv}) -> "
            f"{self._exception or self._message})"
        )

    def __repr__(self) -> str:
        """Return a string with the internal representation of this instance."""
        return (
            f"{type(self).__name__}({self._recv=}, {self._message=}, "
            f"{self._exception=}, {self._handled=})"
        )
Attributes¤
exception property ¤
exception: Exception | None

The exception that was raised while receiving the message (if any).

RETURNS DESCRIPTION
Exception | None

The exception that was raised while receiving the message (if any).

message property ¤

The message that was received, if any.

RETURNS DESCRIPTION
ReceiverMessageT_co

The message that was received.

RAISES DESCRIPTION
Exception

If there was an exception while receiving the message. Normally this should be an frequenz.channels.Error instance, but catches all exceptions in case some receivers can raise anything else.

was_stopped property ¤
was_stopped: bool

Whether the selected receiver was stopped while receiving a message.

Functions¤
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co]) -> None

Initialize this selected result.

The receiver is consumed immediately when creating the instance and the received message is stored in the instance for later use as message. If there was an exception while receiving the message, then the exception is stored in the instance instead (as exception).

PARAMETER DESCRIPTION
receiver

The receiver that was selected.

TYPE: Receiver[ReceiverMessageT_co]

Source code in frequenz/channels/_select.py
def __init__(self, receiver: Receiver[ReceiverMessageT_co], /) -> None:
    """Initialize this selected result.

    The receiver is consumed immediately when creating the instance and the received
    message is stored in the instance for later use as
    [`message`][frequenz.channels.Selected.message].  If there was an exception
    while receiving the message, then the exception is stored in the instance
    instead (as [`exception`][frequenz.channels.Selected.exception]).

    Args:
        receiver: The receiver that was selected.
    """
    self._recv: Receiver[ReceiverMessageT_co] = receiver
    """The receiver that was selected."""

    self._message: ReceiverMessageT_co | _EmptyResult = _EmptyResult()
    """The message that was received.

    If there was an exception while receiving the message, then this will be `None`.
    """
    self._exception: Exception | None = None
    """The exception that was raised while receiving the message (if any)."""

    try:
        self._message = receiver.consume()
    except Exception as exc:  # pylint: disable=broad-except
        self._exception = exc

    self._handled: bool = False
    """Flag to indicate if this selected has been handled in the if-chain."""
__repr__ ¤
__repr__() -> str

Return a string with the internal representation of this instance.

Source code in frequenz/channels/_select.py
def __repr__(self) -> str:
    """Return a string with the internal representation of this instance."""
    return (
        f"{type(self).__name__}({self._recv=}, {self._message=}, "
        f"{self._exception=}, {self._handled=})"
    )
__str__ ¤
__str__() -> str

Return a string representation of this selected receiver.

Source code in frequenz/channels/_select.py
def __str__(self) -> str:
    """Return a string representation of this selected receiver."""
    return (
        f"{type(self).__name__}({self._recv}) -> "
        f"{self._exception or self._message})"
    )

frequenz.channels.Sender ¤

Bases: ABC, Generic[SenderMessageT_contra]

An endpoint to sends messages.

Source code in frequenz/channels/_sender.py
class Sender(ABC, Generic[SenderMessageT_contra]):
    """An endpoint to sends messages."""

    @abstractmethod
    async def send(self, message: SenderMessageT_contra, /) -> None:
        """Send a message.

        Args:
            message: The message to be sent.

        Raises:
            SenderError: If there was an error sending the message.
        """
Functions¤
send abstractmethod async ¤
send(message: SenderMessageT_contra) -> None

Send a message.

PARAMETER DESCRIPTION
message

The message to be sent.

TYPE: SenderMessageT_contra

RAISES DESCRIPTION
SenderError

If there was an error sending the message.

Source code in frequenz/channels/_sender.py
@abstractmethod
async def send(self, message: SenderMessageT_contra, /) -> None:
    """Send a message.

    Args:
        message: The message to be sent.

    Raises:
        SenderError: If there was an error sending the message.
    """

frequenz.channels.SenderError ¤

Bases: Error, Generic[SenderMessageT_co]

An error that originated in a Sender.

All exceptions generated by senders inherit from this exception.

Source code in frequenz/channels/_sender.py
class SenderError(Error, Generic[SenderMessageT_co]):
    """An error that originated in a [Sender][frequenz.channels.Sender].

    All exceptions generated by senders inherit from this exception.
    """

    def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
        """Initialize this error.

        Args:
            message: The error message.
            sender: The [Sender][frequenz.channels.Sender] where the error
                happened.
        """
        super().__init__(message)
        self.sender: Sender[SenderMessageT_co] = sender
        """The sender where the error happened."""
Attributes¤
sender instance-attribute ¤

The sender where the error happened.

Functions¤
__init__ ¤
__init__(message: str, sender: Sender[SenderMessageT_co])

Initialize this error.

PARAMETER DESCRIPTION
message

The error message.

TYPE: str

sender

The Sender where the error happened.

TYPE: Sender[SenderMessageT_co]

Source code in frequenz/channels/_sender.py
def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
    """Initialize this error.

    Args:
        message: The error message.
        sender: The [Sender][frequenz.channels.Sender] where the error
            happened.
    """
    super().__init__(message)
    self.sender: Sender[SenderMessageT_co] = sender
    """The sender where the error happened."""

frequenz.channels.UnhandledSelectedError ¤

Bases: SelectError, Generic[ReceiverMessageT_co]

A receiver was not handled in a select() iteration.

This exception is raised when a select() iteration finishes without a call to selected_from() for the selected receiver.

Source code in frequenz/channels/_select.py
class UnhandledSelectedError(SelectError, Generic[ReceiverMessageT_co]):
    """A receiver was not handled in a [`select()`][frequenz.channels.select] iteration.

    This exception is raised when a [`select()`][frequenz.channels.select] iteration
    finishes without a call to [`selected_from()`][frequenz.channels.selected_from] for
    the selected receiver.
    """

    def __init__(self, selected: Selected[ReceiverMessageT_co]) -> None:
        """Initialize this error.

        Args:
            selected: The selected receiver that was not handled.
        """
        recv = selected._recv  # pylint: disable=protected-access
        super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
        self.selected: Selected[ReceiverMessageT_co] = selected
        """The selected receiver that was not handled."""
Attributes¤
selected instance-attribute ¤

The selected receiver that was not handled.

Functions¤
__init__ ¤
__init__(selected: Selected[ReceiverMessageT_co]) -> None

Initialize this error.

PARAMETER DESCRIPTION
selected

The selected receiver that was not handled.

TYPE: Selected[ReceiverMessageT_co]

Source code in frequenz/channels/_select.py
def __init__(self, selected: Selected[ReceiverMessageT_co]) -> None:
    """Initialize this error.

    Args:
        selected: The selected receiver that was not handled.
    """
    recv = selected._recv  # pylint: disable=protected-access
    super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
    self.selected: Selected[ReceiverMessageT_co] = selected
    """The selected receiver that was not handled."""

Functions¤

frequenz.channels.merge ¤

Merge messages coming from multiple receivers into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream like this:

from frequenz.channels import Broadcast

channel1 = Broadcast[int](name="input-channel-1")
channel2 = Broadcast[int](name="input-channel-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for message in merge(receiver1, receiver2):
    print(f"received {message}")
PARAMETER DESCRIPTION
*receivers

The receivers to merge.

TYPE: Receiver[ReceiverMessageT_co] DEFAULT: ()

RETURNS DESCRIPTION
Merger[ReceiverMessageT_co]

A receiver that merges the messages coming from multiple receivers into a single stream.

RAISES DESCRIPTION
ValueError

If no receivers are provided.

Source code in frequenz/channels/_merge.py
def merge(*receivers: Receiver[ReceiverMessageT_co]) -> Merger[ReceiverMessageT_co]:
    """Merge messages coming from multiple receivers into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream like this:

        ```python
        from frequenz.channels import Broadcast

        channel1 = Broadcast[int](name="input-channel-1")
        channel2 = Broadcast[int](name="input-channel-2")
        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        async for message in merge(receiver1, receiver2):
            print(f"received {message}")
        ```

    Args:
        *receivers: The receivers to merge.

    Returns:
        A receiver that merges the messages coming from multiple receivers into a
            single stream.

    Raises:
        ValueError: If no receivers are provided.
    """
    if not receivers:
        raise ValueError("At least one receiver must be provided")

    return Merger(*receivers, name="merge")

frequenz.channels.select async ¤

select(
    *receivers: Receiver[Any],
) -> AsyncIterator[Selected[Any]]

Iterate over the messages of all receivers as they receive new messages.

This function is used to iterate over the messages of all receivers as they receive new messages. It is used in conjunction with the Selected class and the selected_from() function to determine which function to determine which receiver was selected in a select operation.

An exhaustiveness check is performed at runtime to make sure all selected receivers are handled in the if-chain, so you should call selected_from() with all the receivers passed to select() inside the select loop, even if you plan to ignore a message, to signal select() that you are purposefully ignoring the message.

Note

The select() function is intended to be used in cases where the set of receivers is static and known beforehand. If you need to dynamically add/remove receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you:

  • Use merge(): this is useful when you have an unknown number of receivers of the same type that can be handled as a group.
  • Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers.
  • Break the select() loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never

from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed

timer1 = Timer(datetime.timedelta(seconds=1), TriggerAllMissed())
timer2 = Timer(datetime.timedelta(seconds=0.5), SkipMissedAndDrift())

async for selected in select(timer1, timer2):
    if selected_from(selected, timer1):
        # Beware: `selected.message` might raise an exception, you can always
        # check for exceptions with `selected.exception` first or use
        # a try-except block. You can also quickly check if the receiver was
        # stopped and let any other unexpected exceptions bubble up.
        if selected.was_stopped:
            print("timer1 was stopped")
            continue
        print(f"timer1: now={datetime.datetime.now()} drift={selected.message}")
        timer2.stop()
    elif selected_from(selected, timer2):
        # Explicitly handling of exceptions
        match selected.exception:
            case ReceiverStoppedError():
                print("timer2 was stopped")
            case Exception() as exception:
                print(f"timer2: exception={exception}")
            case None:
                # All good, no exception, we can use `selected.message` safely
                print(f"timer2: now={datetime.datetime.now()} drift={selected.message}")
            case _ as unhanded:
                assert_never(unhanded)
    else:
        # This is not necessary, as select() will check for exhaustiveness, but
        # it is good practice to have it in case you forgot to handle a new
        # receiver added to `select()` at a later point in time.
        assert False
PARAMETER DESCRIPTION
*receivers

The receivers to select from.

TYPE: Receiver[Any] DEFAULT: ()

YIELDS DESCRIPTION
AsyncIterator[Selected[Any]]

The currently selected item.

RAISES DESCRIPTION
UnhandledSelectedError

If a selected receiver was not handled in the if-chain.

BaseExceptionGroup

If there is an error while finishing the select operation and receivers fail while cleaning up.

SelectError

If there is an error while selecting receivers during normal operation. For example if a receiver raises an exception in the ready() method. Normal errors while receiving messages are not raised, but reported via the Selected instance.

Source code in frequenz/channels/_select.py
async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
    """Iterate over the messages of all receivers as they receive new messages.

    This function is used to iterate over the messages of all receivers as they receive
    new messages.  It is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class and the
    [`selected_from()`][frequenz.channels.selected_from] function to determine
    which function to determine which receiver was selected in a select operation.

    An exhaustiveness check is performed at runtime to make sure all selected receivers
    are handled in the if-chain, so you should call `selected_from()` with all the
    receivers passed to `select()` inside the select loop, even if you plan to ignore
    a message, to signal `select()` that you are purposefully ignoring the message.

    Note:
        The `select()` function is intended to be used in cases where the set of
        receivers is static and known beforehand.  If you need to dynamically add/remove
        receivers from a select loop, there are a few alternatives.  Depending on your
        use case, one or the other could work better for you:

        * Use [`merge()`][frequenz.channels.merge]: this is useful when you have an
          unknown number of receivers of the same type that can be handled as a group.
        * Use tasks to manage each receiver individually: this is better if there are no
          relationships between the receivers.
        * Break the `select()` loop and start a new one with the new set of receivers
          (this should be the last resort, as it has some performance implications
           because the loop needs to be restarted).

    Example:
        ```python
        import datetime
        from typing import assert_never

        from frequenz.channels import ReceiverStoppedError, select, selected_from
        from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed

        timer1 = Timer(datetime.timedelta(seconds=1), TriggerAllMissed())
        timer2 = Timer(datetime.timedelta(seconds=0.5), SkipMissedAndDrift())

        async for selected in select(timer1, timer2):
            if selected_from(selected, timer1):
                # Beware: `selected.message` might raise an exception, you can always
                # check for exceptions with `selected.exception` first or use
                # a try-except block. You can also quickly check if the receiver was
                # stopped and let any other unexpected exceptions bubble up.
                if selected.was_stopped:
                    print("timer1 was stopped")
                    continue
                print(f"timer1: now={datetime.datetime.now()} drift={selected.message}")
                timer2.stop()
            elif selected_from(selected, timer2):
                # Explicitly handling of exceptions
                match selected.exception:
                    case ReceiverStoppedError():
                        print("timer2 was stopped")
                    case Exception() as exception:
                        print(f"timer2: exception={exception}")
                    case None:
                        # All good, no exception, we can use `selected.message` safely
                        print(f"timer2: now={datetime.datetime.now()} drift={selected.message}")
                    case _ as unhanded:
                        assert_never(unhanded)
            else:
                # This is not necessary, as select() will check for exhaustiveness, but
                # it is good practice to have it in case you forgot to handle a new
                # receiver added to `select()` at a later point in time.
                assert False
        ```

    Args:
        *receivers: The receivers to select from.

    Yields:
        The currently selected item.

    Raises:
        UnhandledSelectedError: If a selected receiver was not handled in the if-chain.
        BaseExceptionGroup: If there is an error while finishing the select operation
            and receivers fail while cleaning up.
        SelectError: If there is an error while selecting receivers during normal
            operation.  For example if a receiver raises an exception in the `ready()`
            method.  Normal errors while receiving messages are not raised, but reported
            via the `Selected` instance.
    """
    receivers_map: dict[str, Receiver[Any]] = {str(hash(r)): r for r in receivers}
    pending: set[asyncio.Task[bool]] = set()

    try:
        for name, recv in receivers_map.items():
            pending.add(asyncio.create_task(recv.ready(), name=name))

        while pending:
            done, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )

            for task in done:
                receiver_active: bool = True
                name = task.get_name()
                recv = receivers_map[name]
                if exception := task.exception():
                    match exception:
                        case asyncio.CancelledError():
                            # If the receiver was cancelled, then it means we want to
                            # exit the select loop, so we handle the receiver but we
                            # don't add it back to the pending list.
                            receiver_active = False
                        case _ as exc:
                            raise SelectError(f"Error while selecting {recv}") from exc

                selected = Selected(recv)
                yield selected
                if not selected._handled:  # pylint: disable=protected-access
                    raise UnhandledSelectedError(selected)

                receiver_active = task.result()
                if not receiver_active:
                    continue

                # Add back the receiver to the pending list
                name = task.get_name()
                recv = receivers_map[name]
                pending.add(asyncio.create_task(recv.ready(), name=name))
    finally:
        await _stop_pending_tasks(pending)

frequenz.channels.selected_from ¤

selected_from(
    selected: Selected[Any],
    receiver: Receiver[ReceiverMessageT_co],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether the given receiver was selected by select().

This function is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

receiver

The receiver to check if it was the source of a select operation.

TYPE: Receiver[ReceiverMessageT_co]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether the given receiver was selected.

Source code in frequenz/channels/_select.py
def selected_from(
    selected: Selected[Any], receiver: Receiver[ReceiverMessageT_co]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether the given receiver was selected by [`select()`][frequenz.channels.select].

    This function is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.
        receiver: The receiver to check if it was the source of a select operation.

    Returns:
        Whether the given receiver was selected.
    """
    if handled := selected._recv is receiver:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled