Skip to content

Anycast¤

A channel that delivers each message to exactly one receiver.

Description¤

Tip

Anycast channels behave like the Golang channels.

Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver (but any receiver).

Receiver Receiver msg1 msg1 Sender Channel Sender msg2 msg2

Characteristics

  • Buffered: Yes, with a global channel buffer
  • Buffer full policy: Block senders
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and if the senders are faster than the receivers, then the channel's buffer will fill up. In that case, the senders will block at the send() method until the receivers consume the messages in the channel's buffer. The channel's buffer size can be configured at creation time via the limit argument.

The first receiver that is awaited will get the next message. When multiple receivers are waiting, the asyncio loop scheduler picks a receiver for each next massage.

This means that, in practice, there might be only one receiver receiving all the messages, depending on how tasks are schduled.

If you need to ensure some delivery policy (like round-robin or uniformly random), then you will have to implement it yourself.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When the channel is not needed anymore, it should be closed with the close() method. This will prevent further attempts to send() data. Receivers will still be able to drain the pending messages on the channel, but after that, subsequent receive() calls will raise a ReceiverStoppedError exception.

This channel is useful, for example, to distribute work across multiple workers.

In cases where each message need to be received by every receiver, a broadcast channel may be used.

Examples¤

Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Anycast, Sender


async def send(sender: Sender[int]) -> None:
    for message in range(3):
        print(f"sending {message}")
        await sender.send(message)


async def main() -> None:
    channel = Anycast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            message = await receiver.receive()
            print(f"received {message}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for message in range(start, stop):
        print(f"{name} sending {message}")
        await sender.send(message)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for message in receiver:
            print(f"{name} received {message}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Anycast[int](name="numbers", limit=2)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21