Anycast¤
A channel that delivers each message to exactly one receiver.
Description¤
Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver (but any receiver).
Characteristics
- Buffered: Yes, with a global channel buffer
- Buffer full policy: Block senders
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and if the senders are faster than the receivers, then the
channel's buffer will fill up. In that case, the senders will block at the
send()
method until the receivers consume the
messages in the channel's buffer. The channel's buffer size can be configured at
creation time via the limit
argument.
The first receiver that is awaited will get the next message. When multiple receivers are waiting, the asyncio loop scheduler picks a receiver for each next massage.
This means that, in practice, there might be only one receiver receiving all the messages, depending on how tasks are schduled.
If you need to ensure some delivery policy (like round-robin or uniformly random), then you will have to implement it yourself.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When the channel is not needed anymore, it should be closed with the
close()
method. This will prevent further
attempts to send()
data. Receivers will still be
able to drain the pending messages on the channel, but after that, subsequent
receive()
calls will raise a
ReceiverStoppedError
exception.
This channel is useful, for example, to distribute work across multiple workers.
In cases where each message need to be received by every receiver, a broadcast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Anycast, Sender
async def send(sender: Sender[int]) -> None:
for message in range(3):
print(f"sending {message}")
await sender.send(message)
async def main() -> None:
channel = Anycast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
message = await receiver.receive()
print(f"received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for message in range(start, stop):
print(f"{name} sending {message}")
await sender.send(message)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for message in receiver:
print(f"{name} received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Anycast[int](name="numbers", limit=2)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21