Skip to content

Broadcast¤

A channel that deliver all messages to all receivers.

Description¤

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.

Receiver Receiver msg1 msg1,msg2 Sender Channel Sender msg2 msg1,msg2

Characteristics

  • Buffered: Yes, with one buffer per receiver
  • Buffer full policy: Drop oldest message
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.

Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When a channel is not needed anymore, it should be closed with close(). This will prevent further attempts to send() data, and will allow receivers to drain the pending items on their queues, but after that, subsequent receive() calls will raise a ReceiverStoppedError.

This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.

In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.

Examples¤

Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Broadcast, Sender


async def send(sender: Sender[int]) -> None:
    for message in range(3):
        print(f"sending {message}")
        await sender.send(message)


async def main() -> None:
    channel = Broadcast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            message = await receiver.receive()
            print(f"received {message}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for message in range(start, stop):
        print(f"{name} sending {message}")
        await sender.send(message)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for message in receiver:
            print(f"{name} received {message}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Broadcast[int](name="numbers")

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21