Index
frequenz.channels ¤
Frequenz Channels.
This package contains channel implementations.
Base classes:
-
Receiver: An object that can wait for and consume messages from a channel.
-
Sender: An object that can send messages to a channel.
Channels:
-
Anycast: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.
-
Broadcast: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers.
Utilities to work with channels:
-
merge: Merge messages coming from multiple receivers into a single stream.
-
select: Iterate over the messages of all receivers as new messages become available.
-
LatestValueCache: A cache that stores the latest value in a receiver, providing a way to look up the latest value in a stream, without having to wait, as long as there has been one value received.
Exception classes:
-
Error: Base class for all errors in this library.
-
ChannelError: Base class for all errors related to channels.
-
ChannelClosedError: Error raised when trying to operate (send, receive, etc.) through a closed channel.
-
SenderError: Base class for all errors related to senders.
-
ReceiverError: Base class for all errors related to receivers.
-
ReceiverStoppedError: A receiver stopped producing messages.
-
SelectError: Base class for all errors related to select.
-
UnhandledSelectedError: An error raised by select that was not handled by the user.
Extra utility receivers:
-
Event: A receiver that generates a message when an event is set.
-
FileWatcher: A receiver that generates a message when a file is added, modified or deleted.
-
Timer: A receiver that generates a message after a given amount of time.
Attributes¤
frequenz.channels.ChannelMessageT
module-attribute
¤
ChannelMessageT = TypeVar('ChannelMessageT')
The type of the message that can be sent across a channel.
frequenz.channels.ErroredChannelT_co
module-attribute
¤
ErroredChannelT_co = TypeVar(
"ErroredChannelT_co", covariant=True
)
The type of channel having an error.
frequenz.channels.MappedMessageT_co
module-attribute
¤
MappedMessageT_co = TypeVar(
"MappedMessageT_co", covariant=True
)
The type of the message received by the receiver after being mapped.
frequenz.channels.ReceiverMessageT_co
module-attribute
¤
ReceiverMessageT_co = TypeVar(
"ReceiverMessageT_co", covariant=True
)
The type of the message received by a receiver.
frequenz.channels.SenderMessageT_co
module-attribute
¤
SenderMessageT_co = TypeVar(
"SenderMessageT_co", covariant=True
)
The type of the message sent by a sender.
frequenz.channels.SenderMessageT_contra
module-attribute
¤
SenderMessageT_contra = TypeVar(
"SenderMessageT_contra", contravariant=True
)
The type of the message sent by a sender.
Classes¤
frequenz.channels.Anycast ¤
Bases: Generic[ChannelMessageT]
A channel that delivers each message to exactly one receiver.
Description¤
Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver (but any receiver).
Characteristics
- Buffered: Yes, with a global channel buffer
- Buffer full policy: Block senders
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and if the senders are faster than the receivers, then the
channel's buffer will fill up. In that case, the senders will block at the
send()
method until the receivers consume the
messages in the channel's buffer. The channel's buffer size can be configured at
creation time via the limit
argument.
The first receiver that is awaited will get the next message. When multiple receivers are waiting, the asyncio loop scheduler picks a receiver for each next massage.
This means that, in practice, there might be only one receiver receiving all the messages, depending on how tasks are schduled.
If you need to ensure some delivery policy (like round-robin or uniformly random), then you will have to implement it yourself.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When the channel is not needed anymore, it should be closed with the
close()
method. This will prevent further
attempts to send()
data. Receivers will still be
able to drain the pending messages on the channel, but after that, subsequent
receive()
calls will raise a
ReceiverStoppedError
exception.
This channel is useful, for example, to distribute work across multiple workers.
In cases where each message need to be received by every receiver, a broadcast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Anycast, Sender
async def send(sender: Sender[int]) -> None:
for message in range(3):
print(f"sending {message}")
await sender.send(message)
async def main() -> None:
channel = Anycast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
message = await receiver.receive()
print(f"received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for message in range(start, stop):
print(f"{name} sending {message}")
await sender.send(message)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for message in receiver:
print(f"{name} received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Anycast[int](name="numbers", limit=2)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a message
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
Source code in frequenz/channels/_anycast.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
|
Attributes¤
is_closed
property
¤
is_closed: bool
Whether this channel is closed.
Any further attempts to use this channel after it is closed will result in an exception.
limit
property
¤
limit: int
The maximum number of messages that can be stored in the channel's buffer.
If the length of channel's buffer reaches the limit, then the sender blocks at the send() method until a message is consumed.
name
property
¤
name: str
The name of this channel.
This is for debugging purposes, it will be shown in the string representation of this channel.
Functions¤
__init__ ¤
Initialize this channel.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.
TYPE:
|
limit |
The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer.
TYPE:
|
Source code in frequenz/channels/_anycast.py
close
async
¤
Close the channel.
Any further attempts to send() data
will return False
.
Receivers will still be able to drain the pending messages on the channel,
but after that, subsequent
receive() calls will return None
immediately.
Source code in frequenz/channels/_anycast.py
new_receiver ¤
new_receiver() -> Receiver[ChannelMessageT]
frequenz.channels.Broadcast ¤
Bases: Generic[ChannelMessageT]
A channel that deliver all messages to all receivers.
Description¤
Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.
Characteristics
- Buffered: Yes, with one buffer per receiver
- Buffer full policy: Drop oldest message
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.
Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When a channel is not needed anymore, it should be closed with
close()
. This will prevent further
attempts to send()
data, and will allow
receivers to drain the pending items on their queues, but after that,
subsequent receive() calls will
raise a ReceiverStoppedError
.
This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.
In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Broadcast, Sender
async def send(sender: Sender[int]) -> None:
for message in range(3):
print(f"sending {message}")
await sender.send(message)
async def main() -> None:
channel = Broadcast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
message = await receiver.receive()
print(f"received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for message in range(start, stop):
print(f"{name} sending {message}")
await sender.send(message)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for message in receiver:
print(f"{name} received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Broadcast[int](name="numbers")
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21
Source code in frequenz/channels/_broadcast.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
|
Attributes¤
is_closed
property
¤
is_closed: bool
Whether this channel is closed.
Any further attempts to use this channel after it is closed will result in an exception.
name
property
¤
name: str
The name of this channel.
This is for logging purposes, and it will be shown in the string representation of this channel.
resend_latest
instance-attribute
¤
resend_latest: bool = resend_latest
Whether to resend the latest message to new receivers.
When True
, every time a new receiver is created with new_receiver
, it will
automatically get sent the latest message on the channel. This allows new
receivers on slow streams to get the latest message as soon as they are created,
without having to wait for the next message on the channel to arrive.
It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.
Functions¤
__init__ ¤
Initialize this channel.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.
TYPE:
|
resend_latest |
When True, every time a new receiver is created with
TYPE:
|
Source code in frequenz/channels/_broadcast.py
__repr__ ¤
__repr__() -> str
Return a string representation of this channel.
Source code in frequenz/channels/_broadcast.py
close
async
¤
Close this channel.
Any further attempts to send() data
will return False
.
Receivers will still be able to drain the pending items on their queues,
but after that, subsequent
receive() calls will return None
immediately.
Source code in frequenz/channels/_broadcast.py
new_receiver ¤
new_receiver(
*, name: str | None = None, limit: int = 50
) -> Receiver[ChannelMessageT]
Return a new receiver attached to this channel.
Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.
PARAMETER | DESCRIPTION |
---|---|
name |
A name to identify the receiver in the logs.
TYPE:
|
limit |
Number of messages the receiver can hold in its buffer.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ChannelMessageT]
|
A new receiver attached to this channel. |
Source code in frequenz/channels/_broadcast.py
frequenz.channels.ChannelClosedError ¤
Bases: ChannelError[ErroredChannelT_co]
A closed channel was used.
Source code in frequenz/channels/_exceptions.py
Attributes¤
channel
instance-attribute
¤
channel: ErroredChannelT_co = channel
The channel where the error happened.
Functions¤
__init__ ¤
__init__(channel: ErroredChannelT_co)
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
channel |
The channel that was closed.
TYPE:
|
frequenz.channels.ChannelError ¤
Bases: Error
, Generic[ErroredChannelT_co]
An error that originated in a channel.
All exceptions generated by channels inherit from this exception.
Source code in frequenz/channels/_exceptions.py
Attributes¤
channel
instance-attribute
¤
channel: ErroredChannelT_co = channel
The channel where the error happened.
Functions¤
__init__ ¤
__init__(message: str, channel: ErroredChannelT_co)
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
message |
The error message.
TYPE:
|
channel |
The channel where the error happened.
TYPE:
|
Source code in frequenz/channels/_exceptions.py
frequenz.channels.Error ¤
Bases: RuntimeError
An error that originated in this library.
This is useful if you want to catch all exceptions generated by this library.
Source code in frequenz/channels/_exceptions.py
frequenz.channels.LatestValueCache ¤
Bases: Generic[T_co]
A cache that stores the latest value in a receiver.
It provides a way to look up the latest value in a stream without any delay, as long as there has been one value received.
Source code in frequenz/channels/_latest_value_cache.py
Attributes¤
Functions¤
__init__ ¤
Create a new cache.
PARAMETER | DESCRIPTION |
---|---|
receiver |
The receiver to cache.
TYPE:
|
unique_id |
A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
TYPE:
|
Source code in frequenz/channels/_latest_value_cache.py
get ¤
Return the latest value that has been received.
This raises a ValueError
if no value has been received yet. Use has_value
to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.
RETURNS | DESCRIPTION |
---|---|
T_co
|
The latest value that has been received. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If no value has been received yet. |
Source code in frequenz/channels/_latest_value_cache.py
frequenz.channels.Merger ¤
Bases: Receiver[ReceiverMessageT_co]
A receiver that merges messages coming from multiple receivers into a single stream.
Tip
Please consider using the more idiomatic merge()
function instead of creating a Merger
instance directly.
Source code in frequenz/channels/_merge.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
|
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__del__ ¤
__init__ ¤
__init__(
*receivers: Receiver[ReceiverMessageT_co],
name: str | None
) -> None
Initialize this merger.
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to merge.
TYPE:
|
name |
The name of the receiver. Used to create the string representation of the receiver.
TYPE:
|
Source code in frequenz/channels/_merge.py
__str__ ¤
__str__() -> str
Return a string representation of this receiver.
Source code in frequenz/channels/_merge.py
consume ¤
consume() -> ReceiverMessageT_co
Return the latest message once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next message that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_merge.py
filter ¤
filter(
filter_function: Callable[[ReceiverMessageT_co], bool]
) -> Receiver[ReceiverMessageT_co]
Apply a filter function on the messages on a receiver.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function |
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function |
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/_merge.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.Receiver ¤
Bases: ABC
, Generic[ReceiverMessageT_co]
An endpoint to receive messages.
Source code in frequenz/channels/_receiver.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
|
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
consume
abstractmethod
¤
consume() -> ReceiverMessageT_co
Return the latest message once ready()
is complete.
ready()
must be called before each call to consume()
.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next message received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
filter ¤
filter(
filter_function: Callable[[ReceiverMessageT_co], bool]
) -> Receiver[ReceiverMessageT_co]
Apply a filter function on the messages on a receiver.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function |
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function |
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
abstractmethod
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/_receiver.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.ReceiverError ¤
Bases: Error
, Generic[ReceiverMessageT_co]
An error that originated in a Receiver.
All exceptions generated by receivers inherit from this exception.
Source code in frequenz/channels/_receiver.py
Attributes¤
receiver
instance-attribute
¤
receiver: Receiver[ReceiverMessageT_co] = receiver
The receiver where the error happened.
Functions¤
__init__ ¤
__init__(
message: str, receiver: Receiver[ReceiverMessageT_co]
)
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
message |
The error message.
TYPE:
|
receiver |
The Receiver where the error happened.
TYPE:
|
Source code in frequenz/channels/_receiver.py
frequenz.channels.ReceiverStoppedError ¤
Bases: ReceiverError[ReceiverMessageT_co]
A stopped Receiver
was used.
Source code in frequenz/channels/_receiver.py
Attributes¤
receiver
instance-attribute
¤
receiver: Receiver[ReceiverMessageT_co] = receiver
The receiver where the error happened.
Functions¤
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co])
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
receiver |
The Receiver where the error happened.
TYPE:
|
Source code in frequenz/channels/_receiver.py
frequenz.channels.SelectError ¤
Bases: Error
An error that happened during a select()
operation.
This exception is raised when a select()
iteration fails. It is raised as
a single exception when one receiver fails during normal operation (while calling
ready()
for example). It is raised as a group exception
(BaseExceptionGroup
) when a select
loop is cleaning up after it's done.
Source code in frequenz/channels/_select.py
frequenz.channels.Selected ¤
Bases: Generic[ReceiverMessageT_co]
A result of a select()
iteration.
The selected receiver is consumed immediately and the received message is stored in the instance, unless there was an exception while receiving the message, in which case the exception is stored instead.
Selected
instances should be used in conjunction with the
selected_from()
function to determine
which receiver was selected.
Please see select()
for an example.
Source code in frequenz/channels/_select.py
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
Attributes¤
exception
property
¤
exception: Exception | None
The exception that was raised while receiving the message (if any).
RETURNS | DESCRIPTION |
---|---|
Exception | None
|
The exception that was raised while receiving the message (if any). |
message
property
¤
message: ReceiverMessageT_co
The message that was received, if any.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The message that was received. |
RAISES | DESCRIPTION |
---|---|
Exception
|
If there was an exception while receiving the message. Normally
this should be an |
was_stopped
property
¤
was_stopped: bool
Whether the selected receiver was stopped while receiving a message.
Functions¤
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co]) -> None
Initialize this selected result.
The receiver is consumed immediately when creating the instance and the received
message is stored in the instance for later use as
message
. If there was an exception
while receiving the message, then the exception is stored in the instance
instead (as exception
).
PARAMETER | DESCRIPTION |
---|---|
receiver |
The receiver that was selected.
TYPE:
|
Source code in frequenz/channels/_select.py
frequenz.channels.Sender ¤
Bases: ABC
, Generic[SenderMessageT_contra]
An endpoint to sends messages.
Source code in frequenz/channels/_sender.py
Functions¤
send
abstractmethod
async
¤
send(message: SenderMessageT_contra) -> None
Send a message.
PARAMETER | DESCRIPTION |
---|---|
message |
The message to be sent.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
SenderError
|
If there was an error sending the message. |
frequenz.channels.SenderError ¤
Bases: Error
, Generic[SenderMessageT_co]
An error that originated in a Sender.
All exceptions generated by senders inherit from this exception.
Source code in frequenz/channels/_sender.py
Attributes¤
sender
instance-attribute
¤
sender: Sender[SenderMessageT_co] = sender
The sender where the error happened.
Functions¤
__init__ ¤
__init__(message: str, sender: Sender[SenderMessageT_co])
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
message |
The error message.
TYPE:
|
sender |
The Sender where the error happened.
TYPE:
|
Source code in frequenz/channels/_sender.py
frequenz.channels.UnhandledSelectedError ¤
Bases: SelectError
, Generic[ReceiverMessageT_co]
A receiver was not handled in a select()
iteration.
This exception is raised when a select()
iteration
finishes without a call to selected_from()
for
the selected receiver.
Source code in frequenz/channels/_select.py
Attributes¤
selected
instance-attribute
¤
selected: Selected[ReceiverMessageT_co] = selected
The selected receiver that was not handled.
Functions¤
__init__ ¤
__init__(selected: Selected[ReceiverMessageT_co]) -> None
Initialize this error.
PARAMETER | DESCRIPTION |
---|---|
selected |
The selected receiver that was not handled.
TYPE:
|
Source code in frequenz/channels/_select.py
Functions¤
frequenz.channels.merge ¤
merge(
*receivers: Receiver[ReceiverMessageT_co],
) -> Merger[ReceiverMessageT_co]
Merge messages coming from multiple receivers into a single stream.
Example
For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream like this:
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to merge.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Merger[ReceiverMessageT_co]
|
A receiver that merges the messages coming from multiple receivers into a single stream. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If no receivers are provided. |
Source code in frequenz/channels/_merge.py
frequenz.channels.select
async
¤
select(
*receivers: Receiver[Any],
) -> AsyncIterator[Selected[Any]]
Iterate over the messages of all receivers as they receive new messages.
This function is used to iterate over the messages of all receivers as they receive
new messages. It is used in conjunction with the
Selected
class and the
selected_from()
function to determine
which function to determine which receiver was selected in a select operation.
An exhaustiveness check is performed at runtime to make sure all selected receivers
are handled in the if-chain, so you should call selected_from()
with all the
receivers passed to select()
inside the select loop, even if you plan to ignore
a message, to signal select()
that you are purposefully ignoring the message.
Note
The select()
function is intended to be used in cases where the set of
receivers is static and known beforehand. If you need to dynamically add/remove
receivers from a select loop, there are a few alternatives. Depending on your
use case, one or the other could work better for you:
- Use
merge()
: this is useful when you have an unknown number of receivers of the same type that can be handled as a group. - Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers.
- Break the
select()
loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never
from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed
timer1 = Timer(datetime.timedelta(seconds=1), TriggerAllMissed())
timer2 = Timer(datetime.timedelta(seconds=0.5), SkipMissedAndDrift())
async for selected in select(timer1, timer2):
if selected_from(selected, timer1):
# Beware: `selected.message` might raise an exception, you can always
# check for exceptions with `selected.exception` first or use
# a try-except block. You can also quickly check if the receiver was
# stopped and let any other unexpected exceptions bubble up.
if selected.was_stopped:
print("timer1 was stopped")
continue
print(f"timer1: now={datetime.datetime.now()} drift={selected.message}")
timer2.stop()
elif selected_from(selected, timer2):
# Explicitly handling of exceptions
match selected.exception:
case ReceiverStoppedError():
print("timer2 was stopped")
case Exception() as exception:
print(f"timer2: exception={exception}")
case None:
# All good, no exception, we can use `selected.message` safely
print(f"timer2: now={datetime.datetime.now()} drift={selected.message}")
case _ as unhanded:
assert_never(unhanded)
else:
# This is not necessary, as select() will check for exhaustiveness, but
# it is good practice to have it in case you forgot to handle a new
# receiver added to `select()` at a later point in time.
assert False
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to select from. |
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Selected[Any]]
|
The currently selected item. |
RAISES | DESCRIPTION |
---|---|
UnhandledSelectedError
|
If a selected receiver was not handled in the if-chain. |
BaseExceptionGroup
|
If there is an error while finishing the select operation and receivers fail while cleaning up. |
SelectError
|
If there is an error while selecting receivers during normal
operation. For example if a receiver raises an exception in the |
Source code in frequenz/channels/_select.py
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
|
frequenz.channels.selected_from ¤
selected_from(
selected: Selected[Any],
receiver: Receiver[ReceiverMessageT_co],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether the given receiver was selected by select()
.
This function is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected |
The result of a |
receiver |
The receiver to check if it was the source of a select operation.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether the given receiver was selected. |