Skip to content

broadcast

frequenz.channels.broadcast ¤

A channel to broadcast messages to all receivers.

Classes¤

frequenz.channels.broadcast.Broadcast ¤

Bases: Generic[T]

A channel to broadcast messages to multiple receivers.

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Internally, a broadcast receiver's buffer is implemented with just append/pop operations on either side of a deque, which are thread-safe. Because of this, Broadcast channels are thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


bcast = channel.Broadcast()

sender = bcast.get_sender()
receiver_1 = bcast.get_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/broadcast.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class Broadcast(Generic[T]):
    """A channel to broadcast messages to multiple receivers.

    `Broadcast` channels can have multiple senders and multiple receivers. Each
    message sent through any of the senders is received by all of the
    receivers.

    Internally, a broadcast receiver's buffer is implemented with just
    append/pop operations on either side of a [deque][collections.deque], which
    are thread-safe.  Because of this, `Broadcast` channels are thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.Select],
    [Merge][frequenz.channels.Merge] or
    [MergeNamed][frequenz.channels.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        bcast = channel.Broadcast()

        sender = bcast.get_sender()
        receiver_1 = bcast.get_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, name: str, resend_latest: bool = False) -> None:
        """Create a Broadcast channel.

        Args:
            name: A name for the broadcast channel, typically based on the type
                of data sent through it.  Used to identify the channel in the
                logs.
            resend_latest: When True, every time a new receiver is created with
                `get_receiver`, it will automatically get sent the latest value
                on the channel.  This allows new receivers on slow streams to
                get the latest value as soon as they are created, without having
                to wait for the next message on the channel to arrive.
        """
        self.name: str = name
        self._resend_latest = resend_latest

        self.recv_cv: Condition = Condition()
        self.receivers: Dict[UUID, Receiver[T]] = {}
        self.closed: bool = False
        self._latest: Optional[T] = None

    async def close(self) -> None:
        """Close the Broadcast channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on their queues,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._latest = None
        self.closed = True
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def _drop_receiver(self, uuid: UUID) -> None:
        """Drop a specific receiver from the list of broadcast receivers.

        Called from the destructors of receivers.

        Args:
            uuid: a uuid identifying the receiver to be dropped.
        """
        if uuid in self.receivers:
            del self.receivers[uuid]

    def get_sender(self) -> Sender[T]:
        """Create a new broadcast sender.

        Returns:
            A Sender instance attached to the broadcast channel.
        """
        return Sender(self)

    def get_receiver(
        self, name: Optional[str] = None, maxsize: int = 50
    ) -> Receiver[T]:
        """Create a new broadcast receiver.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            name: A name to identify the receiver in the logs.
            maxsize: Size of the receiver's buffer.

        Returns:
            A Receiver instance attached to the broadcast channel.
        """
        uuid = uuid4()
        if name is None:
            name = str(uuid)
        recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
        self.receivers[uuid] = recv
        if self._resend_latest and self._latest is not None:
            recv.enqueue(self._latest)
        return recv

    def get_peekable(self) -> Peekable[T]:
        """Create a new Peekable for the broadcast channel.

        A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
        that allows the user to get a peek at the latest value in the channel,
        without consuming anything.

        Returns:
            A Peekable to peek into the broadcast channel with.
        """
        return Peekable(self)
Functions¤
__init__(name, resend_latest=False) ¤

Create a Broadcast channel.

PARAMETER DESCRIPTION
name

A name for the broadcast channel, typically based on the type of data sent through it. Used to identify the channel in the logs.

TYPE: str

resend_latest

When True, every time a new receiver is created with get_receiver, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, without having to wait for the next message on the channel to arrive.

TYPE: bool DEFAULT: False

Source code in frequenz/channels/broadcast.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, name: str, resend_latest: bool = False) -> None:
    """Create a Broadcast channel.

    Args:
        name: A name for the broadcast channel, typically based on the type
            of data sent through it.  Used to identify the channel in the
            logs.
        resend_latest: When True, every time a new receiver is created with
            `get_receiver`, it will automatically get sent the latest value
            on the channel.  This allows new receivers on slow streams to
            get the latest value as soon as they are created, without having
            to wait for the next message on the channel to arrive.
    """
    self.name: str = name
    self._resend_latest = resend_latest

    self.recv_cv: Condition = Condition()
    self.receivers: Dict[UUID, Receiver[T]] = {}
    self.closed: bool = False
    self._latest: Optional[T] = None
close() async ¤

Close the Broadcast channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on their queues, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/broadcast.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def close(self) -> None:
    """Close the Broadcast channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on their queues,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._latest = None
    self.closed = True
    async with self.recv_cv:
        self.recv_cv.notify_all()
get_peekable() ¤

Create a new Peekable for the broadcast channel.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

RETURNS DESCRIPTION
Peekable[T]

A Peekable to peek into the broadcast channel with.

Source code in frequenz/channels/broadcast.py
148
149
150
151
152
153
154
155
156
157
158
def get_peekable(self) -> Peekable[T]:
    """Create a new Peekable for the broadcast channel.

    A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
    that allows the user to get a peek at the latest value in the channel,
    without consuming anything.

    Returns:
        A Peekable to peek into the broadcast channel with.
    """
    return Peekable(self)
get_receiver(name=None, maxsize=50) ¤

Create a new broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: Optional[str] DEFAULT: None

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the broadcast channel.

Source code in frequenz/channels/broadcast.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_receiver(
    self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
    """Create a new broadcast receiver.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        name: A name to identify the receiver in the logs.
        maxsize: Size of the receiver's buffer.

    Returns:
        A Receiver instance attached to the broadcast channel.
    """
    uuid = uuid4()
    if name is None:
        name = str(uuid)
    recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
    self.receivers[uuid] = recv
    if self._resend_latest and self._latest is not None:
        recv.enqueue(self._latest)
    return recv
get_sender() ¤

Create a new broadcast sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the broadcast channel.

Source code in frequenz/channels/broadcast.py
115
116
117
118
119
120
121
def get_sender(self) -> Sender[T]:
    """Create a new broadcast sender.

    Returns:
        A Sender instance attached to the broadcast channel.
    """
    return Sender(self)

frequenz.channels.broadcast.Peekable ¤

Bases: BasePeekable[T]

A Peekable to peek into broadcast channels.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

Source code in frequenz/channels/broadcast.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class Peekable(BasePeekable[T]):
    """A Peekable to peek into broadcast channels.

    A Peekable provides a [peek()][frequenz.channels.Peekable] method that
    allows the user to get a peek at the latest value in the channel, without
    consuming anything.
    """

    def __init__(self, chan: Broadcast[T]) -> None:
        """Create a `Peekable` instance.

        Args:
            chan: The broadcast channel this Peekable will try to peek into.
        """
        self._chan = chan

    def peek(self) -> Optional[T]:
        """Return the latest value that was sent to the channel.

        Returns:
            The latest value received by the channel, and `None`, if nothing
                has been sent to the channel yet, or if the channel is closed.
        """
        return self._chan._latest  # pylint: disable=protected-access
Functions¤
__init__(chan) ¤

Create a Peekable instance.

PARAMETER DESCRIPTION
chan

The broadcast channel this Peekable will try to peek into.

TYPE: Broadcast[T]

Source code in frequenz/channels/broadcast.py
311
312
313
314
315
316
317
def __init__(self, chan: Broadcast[T]) -> None:
    """Create a `Peekable` instance.

    Args:
        chan: The broadcast channel this Peekable will try to peek into.
    """
    self._chan = chan
peek() ¤

Return the latest value that was sent to the channel.

RETURNS DESCRIPTION
Optional[T]

The latest value received by the channel, and None, if nothing has been sent to the channel yet, or if the channel is closed.

Source code in frequenz/channels/broadcast.py
319
320
321
322
323
324
325
326
def peek(self) -> Optional[T]:
    """Return the latest value that was sent to the channel.

    Returns:
        The latest value received by the channel, and `None`, if nothing
            has been sent to the channel yet, or if the channel is closed.
    """
    return self._chan._latest  # pylint: disable=protected-access

frequenz.channels.broadcast.Receiver ¤

Bases: BufferedReceiver[T]

A receiver to receive messages from the broadcast channel.

Should not be created directly, but through the Broadcast.get_receiver() method.

Source code in frequenz/channels/broadcast.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class Receiver(BufferedReceiver[T]):
    """A receiver to receive messages from the broadcast channel.

    Should not be created directly, but through the
    [Broadcast.get_receiver()][frequenz.channels.Broadcast.get_receiver]
    method.
    """

    def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> None:
        """Create a broadcast receiver.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            uuid: A uuid to identify the receiver in the broadcast channel's
                list of receivers.
            name: A name to identify the receiver in the logs.
            maxsize: Size of the receiver's buffer.
            chan: a reference to the Broadcast channel that this receiver
                belongs to.
        """
        self._uuid = uuid
        self._name = name
        self._chan = chan
        self._q: Deque[T] = deque(maxlen=maxsize)

        self._active = True

    def __del__(self) -> None:
        """Drop this receiver from the list of Broadcast receivers."""
        if self._active:
            self._chan._drop_receiver(self._uuid)

    def enqueue(self, msg: T) -> None:
        """Put a message into this receiver's queue.

        To be called by broadcast senders.  If the receiver's queue is already
        full, drop the oldest message to make room for the incoming message, and
        log a warning.

        Args:
            msg: The message to be sent.
        """
        if len(self._q) == self._q.maxlen:
            self._q.popleft()
            logger.warning(
                "Broadcast receiver [%s:%s] is full. Oldest message was dropped.",
                self._chan.name,
                self._name,
            )
        self._q.append(msg)

    def __len__(self) -> int:
        """Return the number of unconsumed messages in the broadcast receiver.

        Returns:
            Number of items in the receiver's internal queue.
        """
        return len(self._q)

    async def receive(self) -> Optional[T]:
        """Receive a message from the Broadcast channel.

        Waits until there are messages available in the channel and returns
        them.  If there are no remaining messages in the buffer and the channel
        is closed, returns `None` immediately.

        If [into_peekable()][frequenz.channels.Receiver.into_peekable] is called
        on a broadcast `Receiver`, further calls to `receive`, will raise an
        `EOFError`.

        Raises:
            EOFError: when the receiver has been converted into a `Peekable`.

        Returns:
            `None`, if the channel is closed, a message otherwise.
        """
        if not self._active:
            raise EOFError("This receiver is no longer active.")

        while len(self._q) == 0:
            if self._chan.closed:
                return None
            async with self._chan.recv_cv:
                await self._chan.recv_cv.wait()
        ret = self._q.popleft()
        return ret

    def into_peekable(self) -> Peekable[T]:
        """Convert the `Receiver` implementation into a `Peekable`.

        Once this function has been called, the receiver will no longer be
        usable, and calling [receive()][frequenz.channels.Receiver.receive] on
        the receiver will raise an exception.

        Returns:
            A `Peekable` instance.
        """
        self._chan._drop_receiver(self._uuid)  # pylint: disable=protected-access
        self._active = False
        return Peekable(self._chan)
Functions¤
__del__() ¤

Drop this receiver from the list of Broadcast receivers.

Source code in frequenz/channels/broadcast.py
228
229
230
231
def __del__(self) -> None:
    """Drop this receiver from the list of Broadcast receivers."""
    if self._active:
        self._chan._drop_receiver(self._uuid)
__init__(uuid, name, maxsize, chan) ¤

Create a broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
uuid

A uuid to identify the receiver in the broadcast channel's list of receivers.

TYPE: UUID

name

A name to identify the receiver in the logs.

TYPE: str

maxsize

Size of the receiver's buffer.

TYPE: int

chan

a reference to the Broadcast channel that this receiver belongs to.

TYPE: Broadcast[T]

Source code in frequenz/channels/broadcast.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> None:
    """Create a broadcast receiver.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        uuid: A uuid to identify the receiver in the broadcast channel's
            list of receivers.
        name: A name to identify the receiver in the logs.
        maxsize: Size of the receiver's buffer.
        chan: a reference to the Broadcast channel that this receiver
            belongs to.
    """
    self._uuid = uuid
    self._name = name
    self._chan = chan
    self._q: Deque[T] = deque(maxlen=maxsize)

    self._active = True
__len__() ¤

Return the number of unconsumed messages in the broadcast receiver.

RETURNS DESCRIPTION
int

Number of items in the receiver's internal queue.

Source code in frequenz/channels/broadcast.py
252
253
254
255
256
257
258
def __len__(self) -> int:
    """Return the number of unconsumed messages in the broadcast receiver.

    Returns:
        Number of items in the receiver's internal queue.
    """
    return len(self._q)
enqueue(msg) ¤

Put a message into this receiver's queue.

To be called by broadcast senders. If the receiver's queue is already full, drop the oldest message to make room for the incoming message, and log a warning.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: T

Source code in frequenz/channels/broadcast.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def enqueue(self, msg: T) -> None:
    """Put a message into this receiver's queue.

    To be called by broadcast senders.  If the receiver's queue is already
    full, drop the oldest message to make room for the incoming message, and
    log a warning.

    Args:
        msg: The message to be sent.
    """
    if len(self._q) == self._q.maxlen:
        self._q.popleft()
        logger.warning(
            "Broadcast receiver [%s:%s] is full. Oldest message was dropped.",
            self._chan.name,
            self._name,
        )
    self._q.append(msg)
into_peekable() ¤

Convert the Receiver implementation into a Peekable.

Once this function has been called, the receiver will no longer be usable, and calling receive() on the receiver will raise an exception.

RETURNS DESCRIPTION
Peekable[T]

A Peekable instance.

Source code in frequenz/channels/broadcast.py
288
289
290
291
292
293
294
295
296
297
298
299
300
def into_peekable(self) -> Peekable[T]:
    """Convert the `Receiver` implementation into a `Peekable`.

    Once this function has been called, the receiver will no longer be
    usable, and calling [receive()][frequenz.channels.Receiver.receive] on
    the receiver will raise an exception.

    Returns:
        A `Peekable` instance.
    """
    self._chan._drop_receiver(self._uuid)  # pylint: disable=protected-access
    self._active = False
    return Peekable(self._chan)
receive() async ¤

Receive a message from the Broadcast channel.

Waits until there are messages available in the channel and returns them. If there are no remaining messages in the buffer and the channel is closed, returns None immediately.

If into_peekable() is called on a broadcast Receiver, further calls to receive, will raise an EOFError.

RAISES DESCRIPTION
EOFError

when the receiver has been converted into a Peekable.

RETURNS DESCRIPTION
Optional[T]

None, if the channel is closed, a message otherwise.

Source code in frequenz/channels/broadcast.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
async def receive(self) -> Optional[T]:
    """Receive a message from the Broadcast channel.

    Waits until there are messages available in the channel and returns
    them.  If there are no remaining messages in the buffer and the channel
    is closed, returns `None` immediately.

    If [into_peekable()][frequenz.channels.Receiver.into_peekable] is called
    on a broadcast `Receiver`, further calls to `receive`, will raise an
    `EOFError`.

    Raises:
        EOFError: when the receiver has been converted into a `Peekable`.

    Returns:
        `None`, if the channel is closed, a message otherwise.
    """
    if not self._active:
        raise EOFError("This receiver is no longer active.")

    while len(self._q) == 0:
        if self._chan.closed:
            return None
        async with self._chan.recv_cv:
            await self._chan.recv_cv.wait()
    ret = self._q.popleft()
    return ret

frequenz.channels.broadcast.Sender ¤

Bases: BaseSender[T]

A sender to send messages to the broadcast channel.

Should not be created directly, but through the Broadcast.get_sender() method.

Source code in frequenz/channels/broadcast.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class Sender(BaseSender[T]):
    """A sender to send messages to the broadcast channel.

    Should not be created directly, but through the
    [Broadcast.get_sender()][frequenz.channels.Broadcast.get_sender]
    method.
    """

    def __init__(self, chan: Broadcast[T]) -> None:
        """Create a Broadcast sender.

        Args:
            chan: A reference to the broadcast channel this sender belongs to.
        """
        self._chan = chan

    async def send(self, msg: T) -> bool:
        """Send a message to all broadcast receivers.

        Args:
            msg: The message to be broadcast.

        Returns:
            Whether the message was sent, based on whether the broadcast
                channel is open or not.
        """
        if self._chan.closed:
            return False
        # pylint: disable=protected-access
        self._chan._latest = msg
        for recv in self._chan.receivers.values():
            recv.enqueue(msg)
        async with self._chan.recv_cv:
            self._chan.recv_cv.notify_all()
        return True
Functions¤
__init__(chan) ¤

Create a Broadcast sender.

PARAMETER DESCRIPTION
chan

A reference to the broadcast channel this sender belongs to.

TYPE: Broadcast[T]

Source code in frequenz/channels/broadcast.py
169
170
171
172
173
174
175
def __init__(self, chan: Broadcast[T]) -> None:
    """Create a Broadcast sender.

    Args:
        chan: A reference to the broadcast channel this sender belongs to.
    """
    self._chan = chan
send(msg) async ¤

Send a message to all broadcast receivers.

PARAMETER DESCRIPTION
msg

The message to be broadcast.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the message was sent, based on whether the broadcast channel is open or not.

Source code in frequenz/channels/broadcast.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
async def send(self, msg: T) -> bool:
    """Send a message to all broadcast receivers.

    Args:
        msg: The message to be broadcast.

    Returns:
        Whether the message was sent, based on whether the broadcast
            channel is open or not.
    """
    if self._chan.closed:
        return False
    # pylint: disable=protected-access
    self._chan._latest = msg
    for recv in self._chan.receivers.values():
        recv.enqueue(msg)
    async with self._chan.recv_cv:
        self._chan.recv_cv.notify_all()
    return True