experimental
frequenz.channels.experimental ¤
Experimental channel primitives.
Warning
This package contains experimental channel primitives that are not yet
considered stable. For more information on what to expect and how to use the
experimental
package please read the experimental
package
guidelines.
Attributes¤
frequenz.channels.experimental.DefaultT
module-attribute
¤
DefaultT = TypeVar('DefaultT')
Type variable for the default value returned by GroupingLatestValueCache.get
.
frequenz.channels.experimental.HashableT
module-attribute
¤
Type variable for the keys used to group values in the GroupingLatestValueCache
.
frequenz.channels.experimental.ValueT_co
module-attribute
¤
ValueT_co = TypeVar('ValueT_co', covariant=True)
Covariant type variable for the values cached by the GroupingLatestValueCache
.
Classes¤
frequenz.channels.experimental.GroupingLatestValueCache ¤
Bases: Mapping[HashableT, ValueT_co]
A cache that stores the latest value in a receiver, grouped by key.
It provides a way to look up on demand, the latest value in a stream for any key, as long as there has been at least one value received for that key.
GroupingLatestValueCache
takes a Receiver and a key
function as arguments and
stores the latest value received by that receiver for each key separately.
The GroupingLatestValueCache
implements the Mapping
interface, so it can be used like a dictionary. Additionally other methods from
MutableMapping
are implemented, but only
methods removing items from the cache are allowed, such as
pop()
,
popitem()
,
clear()
, and
__delitem__()
.
Other update methods are not provided because the user should not update the
cache values directly.
Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache
channel = Broadcast[tuple[int, str]](name="lvc_test")
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()
assert cache.get(6) is None
assert 6 not in cache
await sender.send((6, "twenty-six"))
assert 6 in cache
assert cache.get(6) == (6, "twenty-six")
del cache[6]
assert cache.get(6) is None
assert 6 not in cache
await cache.stop()
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
|
Attributes¤
Functions¤
__contains__ ¤
__delitem__ ¤
__delitem__(key: HashableT) -> None
Clear the latest value for a specific key.
PARAMETER | DESCRIPTION |
---|---|
key
|
The key for which to clear the latest value.
TYPE:
|
__eq__ ¤
Check if this cache is equal to another object.
Two caches are considered equal if they have the same keys and values.
PARAMETER | DESCRIPTION |
---|---|
other
|
The object to compare with.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
|
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
__getitem__ ¤
Return the latest value that has been received for a specific key.
PARAMETER | DESCRIPTION |
---|---|
key
|
The key to retrieve the latest value for.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
ValueT_co
|
The latest value that has been received for that key. |
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
__init__ ¤
__init__(
receiver: Receiver[ValueT_co],
*,
key: Callable[[ValueT_co], HashableT],
unique_id: str | None = None
) -> None
Create a new cache.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The receiver to cache values from. |
key
|
An function that takes a value and returns a key to group the values by. |
unique_id
|
A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
TYPE:
|
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
__iter__ ¤
Return an iterator over the keys for which values have been received.
__ne__ ¤
__repr__ ¤
__repr__() -> str
Return a string representation of this cache.
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
clear ¤
get ¤
Return the latest value that has been received.
PARAMETER | DESCRIPTION |
---|---|
key
|
An optional key to retrieve the latest value for that key. If not provided, it retrieves the latest value received overall.
TYPE:
|
default
|
The default value to return if no value has been received yet for
the specified key. If not provided, it defaults to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
ValueT_co | DefaultT | None
|
The latest value that has been received. |
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
items ¤
Return an iterator over the key-value pairs of the latest values received.
keys ¤
Return the set of keys for which values have been received.
If no key function is provided, this will return an empty set.
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
pop ¤
pop(
key: HashableT,
/,
default: DefaultT | _NotSpecified = _NotSpecified(),
) -> ValueT_co | DefaultT | None
Remove the latest value for a specific key and return it.
If no value has been received yet for that key, it returns the default value or
raises a KeyError
if no default value is provided.
PARAMETER | DESCRIPTION |
---|---|
key
|
The key for which to remove the latest value.
TYPE:
|
default
|
The default value to return if no value has been received yet for the specified key.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
ValueT_co | DefaultT | None
|
The latest value that has been received for that key, or the default value if no value has been received yet and a default value is provided. |
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
popitem ¤
Remove and return a (key, value) pair from the cache.
Pairs are returned in LIFO (last-in, first-out) order.
RETURNS | DESCRIPTION |
---|---|
tuple[HashableT, ValueT_co]
|
A tuple containing the key and the latest value that has been received for that key. |
Source code in frequenz/channels/experimental/_grouping_latest_value_cache.py
stop
async
¤
values ¤
values() -> ValuesView[ValueT_co]
frequenz.channels.experimental.NopReceiver ¤
Bases: Receiver[ReceiverMessageT_co]
A place-holder receiver that will never receive a message.
Source code in frequenz/channels/experimental/_nop_receiver.py
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
close ¤
consume ¤
consume() -> ReceiverMessageT_co
Raise ReceiverError
unless the NopReceiver is closed.
If the receiver is closed, then raise ReceiverStoppedError
.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next message received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the underlying receiver. |
Source code in frequenz/channels/experimental/_nop_receiver.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
],
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
),
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
],
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait for ever unless the receiver is closed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/experimental/_nop_receiver.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected
|
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.experimental.OptionalReceiver ¤
Bases: Receiver[ReceiverMessageT_co]
A receiver that will wait indefinitely if there is no underlying receiver.
This receiver is useful when the underlying receiver is not set initially.
Instead of making if-else
branches to check if the receiver is set, you can use
this receiver to wait indefinitely if it is not set.
Source code in frequenz/channels/experimental/_optional_receiver.py
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(receiver: Receiver[ReceiverMessageT_co] | None)
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The underlying receiver, or
TYPE:
|
Source code in frequenz/channels/experimental/_optional_receiver.py
close ¤
consume ¤
consume() -> ReceiverMessageT_co
Return the latest from the underlying receiver message once ready()
is complete.
ready()
must be called before each call to consume()
.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next message received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the underlying receiver. |
Source code in frequenz/channels/experimental/_optional_receiver.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
],
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool],
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
),
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
],
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/experimental/_optional_receiver.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected
|
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.experimental.Pipe ¤
Bases: Generic[ChannelMessageT]
A pipe between two channels.
The Pipe
class takes a receiver and a sender and creates a pipe between them
by forwarding all the messages received by the receiver to the sender.
Example
import asyncio
from contextlib import closing, aclosing, AsyncExitStack
from frequenz.channels import Broadcast, Pipe, Receiver
async def main() -> None:
# Channels, receivers and Pipe are in AsyncExitStack
# to close and stop them at the end.
async with AsyncExitStack() as stack:
source_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="source channel"))
)
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))
forwarding_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="forwarding channel"))
)
await stack.enter_async_context(
Pipe(source_receiver, forwarding_channel.new_sender())
)
receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))
source_sender = source_channel.new_sender()
await source_sender.send(10)
assert await receiver.receive() == 11
asyncio.run(main())
Source code in frequenz/channels/experimental/_pipe.py
Functions¤
__aenter__
async
¤
__aenter__() -> Pipe[ChannelMessageT]
__aexit__
async
¤
__aexit__(
_exc_type: Type[BaseException] | None,
_exc: BaseException | None,
_tb: Any,
) -> None
__init__ ¤
__init__(
receiver: Receiver[ChannelMessageT],
sender: Sender[ChannelMessageT],
) -> None
Create a new pipe between two channels.
PARAMETER | DESCRIPTION |
---|---|
receiver
|
The receiver channel.
TYPE:
|
sender
|
The sender channel.
TYPE:
|
Source code in frequenz/channels/experimental/_pipe.py
start
async
¤
frequenz.channels.experimental.RelaySender ¤
Bases: Generic[SenderMessageT_contra]
, Sender[SenderMessageT_contra]
A Sender for sending messages to multiple senders.
The RelaySender
class takes multiple senders and forwards all the messages sent to
it, to the senders it was created with.
Example
from frequenz.channels import Broadcast
from frequenz.channels.experimental import RelaySender
channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()
tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())
await tee_sender.send(5)
assert await receiver1.receive() == 5
assert await receiver2.receive() == 5
Source code in frequenz/channels/experimental/_relay_sender.py
Functions¤
__init__ ¤
__init__(*senders: Sender[SenderMessageT_contra]) -> None
Create a new RelaySender.
PARAMETER | DESCRIPTION |
---|---|
*senders
|
The senders to send messages to.
TYPE:
|
send
async
¤
send(message: SenderMessageT_contra) -> None
Send a message.
PARAMETER | DESCRIPTION |
---|---|
message
|
The message to be sent.
TYPE:
|
frequenz.channels.experimental.WithPrevious ¤
Bases: Generic[ChannelMessageT]
A composable predicate to build predicates that can use also the previous message.
This predicate can be used to filter messages based on a custom condition on the previous and current messages. This can be useful in cases where you want to process messages only if they satisfy a particular condition with respect to the previous message.
Receiving only messages that are different from the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious
channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(WithPrevious(lambda old, new: old != new))
sender = channel.new_sender()
# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1
# This message will be skipped as it equals to the previous one.
await sender.send(1)
# This message will be received as it is a different from the previous one.
await sender.send(0)
assert await receiver.receive() == 0
Receiving only messages if they are bigger than the previous one.
from frequenz.channels import Broadcast
from frequenz.channels.experimental import WithPrevious
channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(
WithPrevious(lambda old, new: new > old, first_is_true=False)
)
sender = channel.new_sender()
# This message will skipped as first_is_true is False.
await sender.send(1)
# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
# This message will be skipped as it is smaller than the previous one (1).
await sender.send(0)
# This message will be skipped as it is not bigger than the previous one (0).
await sender.send(0)
# This message will be received as it is bigger than the previous one (0).
await sender.send(1)
assert await receiver.receive() == 1
# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
Source code in frequenz/channels/experimental/_with_previous.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|
Functions¤
__call__ ¤
__call__(message: ChannelMessageT) -> bool
Return whether message
is the first one received or different from the previous one.
Source code in frequenz/channels/experimental/_with_previous.py
__init__ ¤
__init__(
predicate: Callable[
[ChannelMessageT, ChannelMessageT], bool
],
/,
*,
first_is_true: bool = True,
) -> None
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
predicate
|
A callable that takes two arguments, the previous message and the current message, and returns a boolean indicating whether the current message should be received.
TYPE:
|
first_is_true
|
Whether the first message should be considered as satisfying
the predicate. Defaults to
TYPE:
|