event
frequenz.channels.event ¤
A receiver that can be made ready directly.
Tip
Read the Event
documentation for more
information.
This module contains the following:
Event
: A receiver that can be made ready directly.
Classes¤
frequenz.channels.event.Event ¤
Bases: Receiver[None]
A receiver that can be made ready directly.
Usage¤
There are cases where it is useful to be able to send a signal to
a select()
loop, for example, to stop a loop from
outside the loop itself.
To do that, you can use an Event
receiver and
call set()
on it when you want to make it
ready.
Stopping¤
The receiver will be re-activated (will keep blocking) after the current set
event is received. To stop the receiver completely, you can call
stop()
.
Example¤
Exit after printing the first 5 numbers
import asyncio
from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.event import Event
channel: Anycast[int] = Anycast(name="channel")
receiver = channel.new_receiver()
sender = channel.new_sender()
stop_event = Event(name="stop")
async def do_work() -> None:
async for selected in select(receiver, stop_event):
if selected_from(selected, receiver):
print(selected.message)
elif selected_from(selected, stop_event):
print("Stop event triggered")
stop_event.stop()
break
async def send_stuff() -> None:
for i in range(10):
if stop_event.is_stopped:
break
await asyncio.sleep(1)
await sender.send(i)
async def main() -> None:
async with asyncio.TaskGroup() as task_group:
task_group.create_task(do_work(), name="do_work")
task_group.create_task(send_stuff(), name="send_stuff")
await asyncio.sleep(5.5)
stop_event.set()
asyncio.run(main())
Source code in frequenz/channels/event.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
|
Attributes¤
name
property
¤
name: str
The name of this receiver.
This is for debugging purposes, it will be shown in the string representation of this receiver.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(*, name: str | None = None) -> None
Initialize this event.
PARAMETER | DESCRIPTION |
---|---|
name
|
The name of the receiver. If
TYPE:
|
Source code in frequenz/channels/event.py
consume ¤
Consume the event.
This makes this receiver wait again until the event is set again.
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If this receiver is stopped. |
Source code in frequenz/channels/event.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool]
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
)
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until this receiver is ready.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this receiver is still running. |
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
set ¤
stop ¤
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected
|
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |