anycast
frequenz.channels.anycast
¤
A channel for sending data across async tasks.
Classes¤
frequenz.channels.anycast.Anycast
¤
Bases: Generic[T]
A channel for sending data across async tasks.
Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.
In cases where each message need to be received by every receiver, a Broadcast channel may be used.
Uses an deque internally, so Anycast channels are not thread-safe.
When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.
Example
async def send(sender: channel.Sender) -> None:
while True:
next = random.randint(3, 17)
print(f"sending: {next}")
await sender.send(next)
async def recv(id: int, receiver: channel.Receiver) -> None:
while True:
next = await receiver.receive()
print(f"receiver_{id} received {next}")
await asyncio.sleep(0.1) # sleep (or work) with the data
acast = channel.Anycast()
sender = acast.get_sender()
receiver_1 = acast.get_receiver()
asyncio.create_task(send(sender))
await recv(1, receiver_1)
Check the tests
and benchmarks
directories for more examples.
Source code in frequenz/channels/anycast.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
|
Functions¤
__init__(maxsize=10)
¤
Create an Anycast channel.
PARAMETER | DESCRIPTION |
---|---|
maxsize |
Size of the channel's buffer.
TYPE:
|
Source code in frequenz/channels/anycast.py
63 64 65 66 67 68 69 70 71 72 73 |
|
close()
async
¤
Close the channel.
Any further attempts to send() data
will return False
.
Receivers will still be able to drain the pending items on the channel,
but after that, subsequent
receive() calls will return None
immediately.
Source code in frequenz/channels/anycast.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
|
get_receiver()
¤
Create a new receiver.
RETURNS | DESCRIPTION |
---|---|
Receiver[T]
|
A Receiver instance attached to the Anycast channel. |
Source code in frequenz/channels/anycast.py
101 102 103 104 105 106 107 |
|
get_sender()
¤
Create a new sender.
RETURNS | DESCRIPTION |
---|---|
Sender[T]
|
A Sender instance attached to the Anycast channel. |
Source code in frequenz/channels/anycast.py
93 94 95 96 97 98 99 |
|
frequenz.channels.anycast.Receiver
¤
Bases: BaseReceiver[T]
A receiver to receive messages from an Anycast channel.
Should not be created directly, but through the Anycast.get_receiver()
method.
Source code in frequenz/channels/anycast.py
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
|
Functions¤
__init__(chan)
¤
Create a channel receiver.
PARAMETER | DESCRIPTION |
---|---|
chan |
A reference to the channel that this receiver belongs to.
TYPE:
|
Source code in frequenz/channels/anycast.py
158 159 160 161 162 163 164 |
|
receive()
async
¤
Receive a message from the channel.
Waits for an message to become available, and returns that message. When there are multiple receivers for the channel, only one receiver will receive each message.
RETURNS | DESCRIPTION |
---|---|
Optional[T]
|
|
Source code in frequenz/channels/anycast.py
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
|
frequenz.channels.anycast.Sender
¤
Bases: BaseSender[T]
A sender to send messages to an Anycast channel.
Should not be created directly, but through the Anycast.get_sender()
method.
Source code in frequenz/channels/anycast.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
|
Functions¤
__init__(chan)
¤
Create a channel sender.
PARAMETER | DESCRIPTION |
---|---|
chan |
A reference to the channel that this sender belongs to.
TYPE:
|
Source code in frequenz/channels/anycast.py
117 118 119 120 121 122 123 |
|
send(msg)
async
¤
Send a message across the channel.
To send, this method inserts the message into the Anycast channel's buffer. If the channel's buffer is full, waits for messages to get consumed, until there's some free space available in the buffer. Each message will be received by exactly one receiver.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to be sent.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the message was sent, based on whether the channel is open or not. |
Source code in frequenz/channels/anycast.py
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
|