Broadcast¤
A channel that deliver all messages to all receivers.
Description¤
Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.
Characteristics
- Buffered: Yes, with one buffer per receiver
- Buffer full policy: Drop oldest message
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.
Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When a channel is not needed anymore, it should be closed with
close()
. This will prevent further
attempts to send()
data, and will allow
receivers to drain the pending items on their queues, but after that,
subsequent receive() calls will
raise a ReceiverStoppedError
.
This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.
In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Broadcast, Sender
async def send(sender: Sender[int]) -> None:
for message in range(3):
print(f"sending {message}")
await sender.send(message)
async def main() -> None:
channel = Broadcast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
message = await receiver.receive()
print(f"received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for message in range(start, stop):
print(f"{name} sending {message}")
await sender.send(message)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for message in receiver:
print(f"{name} received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Broadcast[int](name="numbers")
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21