experimental
frequenz.channels.experimental ¤
Experimental channel primitives.
Warning
This package contains experimental channel primitives that are not yet
considered stable. For more information on what to expect and how to use the
experimental
package please read the experimental
package
guidelines.
Classes¤
frequenz.channels.experimental.OptionalReceiver ¤
Bases: Receiver[ReceiverMessageT_co]
A receiver that will wait indefinitely if there is no underlying receiver.
This receiver is useful when the underlying receiver is not set initially.
Instead of making if-else
branches to check if the receiver is set, you can use
this receiver to wait indefinitely if it is not set.
Source code in frequenz/channels/experimental/_optional_receiver.py
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co] | None)
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The underlying receiver, or
TYPE:
|
Source code in frequenz/channels/experimental/_optional_receiver.py
close ¤
consume ¤
consume() -> ReceiverMessageT_co
Return the latest from the underlying receiver message once ready()
is complete.
ready()
must be called before each call to consume()
.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next message received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the underlying receiver. |
Source code in frequenz/channels/experimental/_optional_receiver.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
],
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
),
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
],
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/experimental/_optional_receiver.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.experimental.Pipe ¤
Bases: Generic[ChannelMessageT]
A pipe between two channels.
The Pipe
class takes a receiver and a sender and creates a pipe between them
by forwarding all the messages received by the receiver to the sender.
Example
import asyncio
from contextlib import closing, aclosing, AsyncExitStack
from frequenz.channels import Broadcast, Pipe, Receiver
async def main() -> None:
# Channels, receivers and Pipe are in AsyncExitStack
# to close and stop them at the end.
async with AsyncExitStack() as stack:
source_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="source channel"))
)
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))
forwarding_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="forwarding channel"))
)
await stack.enter_async_context(
Pipe(source_receiver, forwarding_channel.new_sender())
)
receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))
source_sender = source_channel.new_sender()
await source_sender.send(10)
assert await receiver.receive() == 11
asyncio.run(main())
Source code in frequenz/channels/experimental/_pipe.py
Functions¤
__aenter__
async
¤
__aenter__() -> Pipe[ChannelMessageT]
__aexit__
async
¤
__aexit__(
_exc_type: Type[BaseException] | None,
_exc: BaseException | None,
_tb: Any,
) -> None
__init__ ¤
__init__(
receiver: Receiver[ChannelMessageT],
sender: Sender[ChannelMessageT],
) -> None
Create a new pipe between two channels.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The receiver channel.
TYPE:
|
sender
|
The sender channel.
TYPE:
|
Source code in frequenz/channels/experimental/_pipe.py
start
async
¤
frequenz.channels.experimental.RelaySender ¤
Bases: Generic[SenderMessageT_contra]
, Sender[SenderMessageT_contra]
A Sender for sending messages to multiple senders.
The RelaySender
class takes multiple senders and forwards all the messages sent to
it, to the senders it was created with.
Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import RelaySender
channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()
tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())
await tee_sender.send(5)
assert await receiver1.receive() == 5
assert await receiver2.receive() == 5
Source code in frequenz/channels/experimental/_relay_sender.py
Functions¤
__init__ ¤
__init__(*senders: Sender[SenderMessageT_contra]) -> None
Create a new RelaySender.
PARAMETER | DESCRIPTION |
---|---|
*senders
|
The senders to send messages to.
TYPE:
|
send
async
¤
send(message: SenderMessageT_contra) -> None
Send a message.
PARAMETER | DESCRIPTION |
---|---|
message
|
The message to be sent.
TYPE:
|
frequenz.channels.experimental.WithPrevious ¤
Bases: Generic[ChannelMessageT]
A composable predicate to build predicates that can use also the previous message.
This predicate can be used to filter messages based on a custom condition on the previous and current messages. This can be useful in cases where you want to process messages only if they satisfy a particular condition with respect to the previous message.
Receiving only messages that are different from the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious
channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
sender = channel.new_sender()
# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1
# This message will be skipped as it equals to the previous one.
await sender.send(1)
# This message will be received as it is a different from the previous one.
await sender.send(0)
assert await receiver.receive() == 0
Receiving only messages if they are bigger than the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious
channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(
WithPrevious(lambda old, new: new > old, first_is_true=False)
)
sender = channel.new_sender()
# This message will skipped as first_is_true is False.
await sender.send(1)
# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
# This message will be skipped as it is smaller than the previous one (1).
await sender.send(0)
# This message will be skipped as it is not bigger than the previous one (0).
await sender.send(0)
# This message will be received as it is bigger than the previous one (0).
await sender.send(1)
assert await receiver.receive() == 1
# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
Source code in frequenz/channels/experimental/_with_previous.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
Functions¤
__call__ ¤
__call__(message: ChannelMessageT) -> bool
Return whether message
is the first one received or different from the previous one.
Source code in frequenz/channels/experimental/_with_previous.py
__init__ ¤
__init__(
predicate: Callable[
[ChannelMessageT, ChannelMessageT], bool
],
/,
*,
first_is_true: bool = True,
) -> None
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
predicate
|
A callable that takes two arguments, the previous message and the current message, and returns a boolean indicating whether the current message should be received.
TYPE:
|
first_is_true
|
Whether the first message should be considered as satisfying
the predicate. Defaults to
TYPE:
|