event
frequenz.channels.event ¤
A receiver that can be made ready directly.
Tip
Read the Event
documentation for more
information.
This module contains the following:
Event
: A receiver that can be made ready directly.
Classes¤
frequenz.channels.event.Event ¤
Bases: Receiver[None]
A receiver that can be made ready directly.
Usage¤
There are cases where it is useful to be able to send a signal to
a select()
loop, for example, to stop a loop from
outside the loop itself.
To do that, you can use an Event
receiver and
call set()
on it when you want to make it
ready.
Stopping¤
The receiver will be re-activated (will keep blocking) after the current set
event is received. To stop the receiver completely, you can call
stop()
.
Example¤
Exit after printing the first 5 numbers
import asyncio
from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.event import Event
channel: Anycast[int] = Anycast(name="channel")
receiver = channel.new_receiver()
sender = channel.new_sender()
stop_event = Event(name="stop")
async def do_work() -> None:
async for selected in select(receiver, stop_event):
if selected_from(selected, receiver):
print(selected.message)
elif selected_from(selected, stop_event):
print("Stop event triggered")
stop_event.stop()
break
async def send_stuff() -> None:
for i in range(10):
if stop_event.is_stopped:
break
await asyncio.sleep(1)
await sender.send(i)
async def main() -> None:
async with asyncio.TaskGroup() as task_group:
task_group.create_task(do_work(), name="do_work")
task_group.create_task(send_stuff(), name="send_stuff")
await asyncio.sleep(5.5)
stop_event.set()
asyncio.run(main())
Source code in frequenz/channels/event.py
|
|
Attributes¤
name
property
¤
name: str
The name of this receiver.
This is for debugging purposes, it will be shown in the string representation of this receiver.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(*, name: str | None = None) -> None
Initialize this event.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the receiver. If
TYPE:
|
Source code in frequenz/channels/event.py
consume ¤
Consume the event.
This makes this receiver wait again until the event is set again.
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If this receiver is stopped. |
Source code in frequenz/channels/event.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function |
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until this receiver is ready.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this receiver is still running. |
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |