util
frequenz.channels.util
¤
Channel utilities.
A module with several utilities to work with channels:
-
FileWatcher: A receiver that watches for file events.
-
Merge: A receiver that merge messages coming from multiple receivers into a single stream.
-
MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.
-
select: Iterate over the values of all receivers as new values become available.
Classes¤
frequenz.channels.util.Event
¤
Bases: Receiver[None]
A receiver that can be made ready through an event.
The receiver (the ready()
method) will wait
until set()
is called. At that point the
receiver will wait again after the event is
consume()
d.
The receiver can be completely stopped by calling
stop()
.
Example
import asyncio
from frequenz.channels import Receiver
from frequenz.channels.util import Event, select, selected_from
other_receiver: Receiver[int] = ...
exit_event = Event()
async def exit_after_10_seconds() -> None:
asyncio.sleep(10)
exit_event.set()
asyncio.ensure_future(exit_after_10_seconds())
async for selected in select(exit_event, other_receiver):
if selected_from(selected, exit_event):
break
if selected_from(selected, other_receiver):
print(selected.value)
else:
assert False, "Unknow receiver selected"
Source code in frequenz/channels/util/_event.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
|
Attributes¤
is_set: bool
property
¤
Whether this receiver is set (ready).
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this receiver is set (ready). |
is_stopped: bool
property
¤
Whether this receiver is stopped.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this receiver is stopped. |
name: str
property
¤
The name of this receiver.
This is for debugging purposes, it will be shown in the string representation of this receiver.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this receiver. |
Functions¤
__init__(name=None)
¤
Create a new instance.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the receiver. If
TYPE:
|
Source code in frequenz/channels/util/_event.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
__repr__()
¤
Return a string representation of this receiver.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this receiver. |
Source code in frequenz/channels/util/_event.py
152 153 154 155 156 157 158 159 160 161 |
|
__str__()
¤
Return a string representation of this receiver.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this receiver. |
Source code in frequenz/channels/util/_event.py
144 145 146 147 148 149 150 |
|
consume()
¤
Consume the event.
This makes this receiver wait again until the event is set again.
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If this receiver is stopped. |
Source code in frequenz/channels/util/_event.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
|
ready()
async
¤
Wait until this receiver is ready.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this receiver is still running. |
Source code in frequenz/channels/util/_event.py
117 118 119 120 121 122 123 124 125 126 |
|
set()
¤
Trigger the event (make the receiver ready).
Source code in frequenz/channels/util/_event.py
112 113 114 115 |
|
stop()
¤
Stop this receiver.
Source code in frequenz/channels/util/_event.py
107 108 109 110 |
|
frequenz.channels.util.FileWatcher
¤
Bases: Receiver['FileWatcher.Event']
A channel receiver that watches for file events.
Source code in frequenz/channels/util/_file_watcher.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
Classes¤
Event
dataclass
¤
A file change event.
Source code in frequenz/channels/util/_file_watcher.py
31 32 33 34 35 36 37 38 |
|
EventType
¤
Bases: Enum
Available types of changes to watch for.
Source code in frequenz/channels/util/_file_watcher.py
24 25 26 27 28 29 |
|
Functions¤
__del__()
¤
Cleanup registered watches.
awatch
passes the stop_event
to a separate task/thread. This way
awatch
getting destroyed properly. The background task will continue
until the signal is received.
Source code in frequenz/channels/util/_file_watcher.py
80 81 82 83 84 85 86 87 |
|
__init__(paths, event_types=frozenset(EventType))
¤
Create a FileWatcher
instance.
PARAMETER | DESCRIPTION |
---|---|
paths |
Paths to watch for changes. |
event_types |
Types of events to watch for. Defaults to watch for all event types. |
Source code in frequenz/channels/util/_file_watcher.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
|
consume()
¤
Return the latest event once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Event
|
The next event that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_file_watcher.py
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_file_watcher.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
frequenz.channels.util.Merge
¤
Bases: Receiver[T]
Merge messages coming from multiple channels into a single stream.
Example
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using Merge
like this:
from frequenz.channels import Broadcast
channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
When merge
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in frequenz/channels/util/_merge.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge.py
53 54 55 56 57 |
|
__init__(*args)
¤
Create a Merge
instance.
PARAMETER | DESCRIPTION |
---|---|
*args |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge.py
40 41 42 43 44 45 46 47 48 49 50 51 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
T
|
The next value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_merge.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_merge.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
|
stop()
async
¤
Stop the Merge
instance and cleanup any pending tasks.
Source code in frequenz/channels/util/_merge.py
59 60 61 62 63 64 |
|
frequenz.channels.util.MergeNamed
¤
Bases: Receiver[Tuple[str, T]]
Merge messages coming from multiple named channels into a single stream.
When MergeNamed
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in frequenz/channels/util/_merge_named.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge_named.py
34 35 36 37 38 |
|
__init__(**kwargs)
¤
Create a MergeNamed
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge_named.py
21 22 23 24 25 26 27 28 29 30 31 32 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Tuple[str, T]
|
The next key, value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_merge_named.py
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_merge_named.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
|
stop()
async
¤
Stop the MergeNamed
instance and cleanup any pending tasks.
Source code in frequenz/channels/util/_merge_named.py
40 41 42 43 44 45 |
|
frequenz.channels.util.MissedTickPolicy
¤
Bases: ABC
A policy to handle timer missed ticks.
This is only relevant if the timer is not ready to trigger when it should (an interval passed) which can happen if the event loop is busy processing other tasks.
Source code in frequenz/channels/util/_timer.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
Functions¤
__repr__()
¤
Return a string representation of the instance.
RETURNS | DESCRIPTION |
---|---|
str
|
The string representation of the instance. |
Source code in frequenz/channels/util/_timer.py
69 70 71 72 73 74 75 |
|
calculate_next_tick_time(*, interval, scheduled_tick_time, now)
abstractmethod
¤
Calculate the next tick time according to missed_tick_policy
.
This method is called by ready()
after it has determined that the
timer has triggered. It will check if the timer has missed any ticks
and handle them according to missed_tick_policy
.
PARAMETER | DESCRIPTION |
---|---|
interval |
The interval between ticks (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
now |
The current loop time (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds) according to
|
Source code in frequenz/channels/util/_timer.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
|
frequenz.channels.util.SelectError
¤
Bases: BaseException
A base exception for select
.
This exception is raised when a select()
iteration fails. It is raised as
a single exception when one receiver fails during normal operation (while calling
ready()
for example). It is raised as a group exception
(SelectErrorGroup
) when a select
loop
is cleaning up after it's done.
Source code in frequenz/channels/util/_select.py
167 168 169 170 171 172 173 174 175 |
|
frequenz.channels.util.SelectErrorGroup
¤
Bases: BaseExceptionGroup[BaseException]
, SelectError
An exception group for select()
operation.
This exception group is raised when a [select()
] loops fails while cleaning up
runing tasts to check for ready receivers.
Source code in frequenz/channels/util/_select.py
196 197 198 199 200 201 |
|
frequenz.channels.util.Selected
¤
Bases: Generic[_T]
A result of a select
iteration.
The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead.
Selected
instances should be used in conjunction with the
selected_from()
function to determine
which receiver was selected.
Please see select
for an example.
Source code in frequenz/channels/util/_select.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
Attributes¤
exception: Exception | None
property
¤
The exception that was raised while receiving the value (if any).
RETURNS | DESCRIPTION |
---|---|
Exception | None
|
The exception that was raised while receiving the value (if any). |
value: _T
property
¤
The value that was received, if any.
RETURNS | DESCRIPTION |
---|---|
_T
|
The value that was received. |
RAISES | DESCRIPTION |
---|---|
Exception
|
If there was an exception while receiving the value. Normally
this should be an |
noqa: DAR401 _exception¤
Functions¤
__init__(receiver)
¤
Create a new instance.
The receiver is consumed immediately when creating the instance and the received
value is stored in the instance for later use as
value
. If there was an exception
while receiving the value, then the exception is stored in the instance instead
(as exception
).
PARAMETER | DESCRIPTION |
---|---|
receiver |
The receiver that was selected.
TYPE:
|
Source code in frequenz/channels/util/_select.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
|
__repr__()
¤
Return a the internal representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in frequenz/channels/util/_select.py
126 127 128 129 130 131 132 133 134 135 |
|
__str__()
¤
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in frequenz/channels/util/_select.py
115 116 117 118 119 120 121 122 123 124 |
|
was_stopped()
¤
Check if the selected receiver was stopped.
Check if the selected receiver raised
a ReceiverStoppedError
while
consuming a value.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver was stopped. |
Source code in frequenz/channels/util/_select.py
103 104 105 106 107 108 109 110 111 112 113 |
|
frequenz.channels.util.SkipMissedAndDrift
¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resets.
This will behave effectively as if the timer was reset()
at the time it
had triggered last, so the start time will change (and the drift will be
accumulated each time a tick is delayed, but only the relative drift will
be returned on each tick).
The reset happens only if the delay is larger than delay_tolerance
, so
it is possible to ignore small delays and not drift in those cases.
Example
Assume a timer with interval 1 second and delay_tolerance=0.1
, the
first tick, T0
, happens exactly at time 0, the second tick, T1
,
happens at time 1.2 (0.2 seconds late), so the timer triggers
immmediately but drifts a bit. The next tick, T2.2
, happens at 2.3 seconds
(0.1 seconds late), so it also triggers immediately but it doesn't
drift because the delay is under the delay_tolerance
. The next tick,
T3.2
, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
immediately but the timer drifts by 1.1 seconds and the tick T4.2
is
skipped (not triggered). The next tick, T5.3
, triggers at 5.3 seconds
so is right on time (no drift) and the same happens for tick T6.3
,
which triggers at 6.3 seconds.
0 1 2 3 4 5 6
o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
T0 T1 T2.2 T3.2 T5.3 T6.3
Source code in frequenz/channels/util/_timer.py
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
|
Attributes¤
delay_tolerance: timedelta
property
¤
Return the maximum delay that is tolerated before starting to drift.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The maximum delay that is tolerated before starting to drift. |
Functions¤
__init__(*, delay_tolerance=timedelta(0))
¤
Create an instance.
See the class documenation for more details.
PARAMETER | DESCRIPTION |
---|---|
delay_tolerance |
The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If |
Source code in frequenz/channels/util/_timer.py
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
|
__repr__()
¤
Return a string representation of the instance.
RETURNS | DESCRIPTION |
---|---|
str
|
The string representation of the instance. |
Source code in frequenz/channels/util/_timer.py
261 262 263 264 265 266 267 |
|
__str__()
¤
Return a string representation of the instance.
RETURNS | DESCRIPTION |
---|---|
str
|
The string representation of the instance. |
Source code in frequenz/channels/util/_timer.py
253 254 255 256 257 258 259 |
|
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
If the drift is larger than delay_tolerance
, then it returns now +
interval
(so the timer drifts), otherwise it returns
scheduled_tick_time + interval
(we consider the delay too small and
avoid small drifts).
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in frequenz/channels/util/_timer.py
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
|
frequenz.channels.util.SkipMissedAndResync
¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resyncs.
If ticks are missed, the timer will trigger immediately returing the drift
and it will schedule to trigger again on the next multiple of interval
,
effectively skipping any missed ticks, but resyncing with the original start
time.
Example
Assume a timer with interval 1 second, the tick T0
happens exactly
at time 0, the second tick, T1
, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, T2
, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, T3
, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately but the fifth tick, T4
, which was also
already delayed (by 0.3 seconds) is skipped. The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately
again. The seventh tick, T6
, happens at 6.0, right on time.
0 1 2 3 4 o 5 6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0 T1 T2 T3 T5 T6
Source code in frequenz/channels/util/_timer.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
Calculate the next multiple of interval
after scheduled_tick_time
.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in frequenz/channels/util/_timer.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
frequenz.channels.util.Timer
¤
A timer receiver that triggers every interval
time.
The timer as microseconds resolution, so the interval
must be at least
1 microsecond.
The message it produces is a timedelta
containing the drift of the timer,
i.e. the difference between when the timer should have triggered and the time
when it actually triggered.
This drift will likely never be 0
, because if there is a task that is
running when it should trigger, the timer will be delayed. In this case the
drift will be positive. A negative drift should be technically impossible,
as the timer uses asyncio
s loop monotonic clock.
If the timer is delayed too much, then the timer will behave according to
the missed_tick_policy
. Missing ticks might or might not trigger
a message and the drift could be accumulated or not depending on the
chosen policy.
The timer accepts an optional loop
, which will be used to track the time.
If loop
is None
, then the running loop will be used (if there is no
running loop most calls will raise a RuntimeError
).
Starting the timer can be delayed if necessary by using auto_start=False
(for example until we have a running loop). A call to reset()
, ready()
,
receive()
or the async iterator interface to await for a new message will
start the timer.
For the most common cases, a specialized constructor is provided:
Periodic timer example
async for drift in Timer.periodic(timedelta(seconds=1.0)):
print(f"The timer has triggered {drift=}")
But you can also use a select
to combine
it with other receivers, and even start it (semi) manually:
import logging
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan = Broadcast[int]("input-chan")
battery_data = chan.new_receiver()
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
# Do some other initialization, the timer will start automatically if
# a message is awaited (or manually via `reset()`).
async for selected in select(battery_data, timer):
if selected_from(selected, battery_data):
if selected.was_closed():
logging.warning("battery channel closed")
continue
battery_soc = selected.value
elif selected_from(selected, timer):
# Print some regular battery data
print(f"Battery is charged at {battery_soc}%")
Timeout example
import logging
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast
def process_data(data: int):
logging.info("Processing data: %d", data)
def do_heavy_processing(data: int):
logging.info("Heavy processing data: %d", data)
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan1 = Broadcast[int]("input-chan-1")
chan2 = Broadcast[int]("input-chan-2")
battery_data = chan1.new_receiver()
heavy_process = chan2.new_receiver()
async for selected in select(battery_data, heavy_process, timer):
if selected_from(selected, battery_data):
if selected.was_closed():
logging.warning("battery channel closed")
continue
process_data(selected.value)
timer.reset()
elif selected_from(selected, heavy_process):
if selected.was_closed():
logging.warning("processing channel closed")
continue
do_heavy_processing(selected.value)
elif selected_from(selected, timer):
logging.warning("No data received in time")
In this case do_heavy_processing
might take 2 seconds, and we don't
want our timeout timer to trigger for the missed ticks, and want the
next tick to be relative to the time timer was last triggered.
Source code in frequenz/channels/util/_timer.py
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 |
|
Attributes¤
interval: timedelta
property
¤
The interval between timer ticks.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The interval between timer ticks. |
is_running: bool
property
¤
Whether the timer is running.
This will be False
if the timer was stopped, or not started yet.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer is running. |
loop: asyncio.AbstractEventLoop
property
¤
The event loop used by the timer to track time.
RETURNS | DESCRIPTION |
---|---|
AbstractEventLoop
|
The event loop used by the timer to track time. |
missed_tick_policy: MissedTickPolicy
property
¤
The policy of the timer when it misses a tick.
RETURNS | DESCRIPTION |
---|---|
MissedTickPolicy
|
The policy of the timer when it misses a tick. |
Functions¤
__init__(interval, missed_tick_policy, /, *, auto_start=True, loop=None)
¤
Create an instance.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
interval |
The time between timer ticks. Must be at least 1 microsecond.
TYPE:
|
missed_tick_policy |
The policy of the timer when it misses a tick.
Commonly one of
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in frequenz/channels/util/_timer.py
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
|
__repr__()
¤
Return a string representation of the timer.
RETURNS | DESCRIPTION |
---|---|
str
|
The string representation of the timer. |
Source code in frequenz/channels/util/_timer.py
720 721 722 723 724 725 726 727 728 729 |
|
__str__()
¤
Return a string representation of the timer.
RETURNS | DESCRIPTION |
---|---|
str
|
The string representation of the timer. |
Source code in frequenz/channels/util/_timer.py
712 713 714 715 716 717 718 |
|
consume()
¤
Return the latest drift once ready()
is complete.
Once the timer has triggered (ready()
is done), this method returns the
difference between when the timer should have triggered and the time when
it actually triggered. See the class documentation for more details.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The difference between when the timer should have triggered and the time when it actually did. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the timer was stopped via |
Source code in frequenz/channels/util/_timer.py
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 |
|
periodic(period, /, *, skip_missed_ticks=False, auto_start=True, loop=None)
classmethod
¤
Create a periodic timer.
This is basically a shortcut to create a timer with either
TriggerAllMissed()
or SkipMissedAndResync()
as the missed tick policy
(depending on skip_missed_ticks
).
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
period |
The time between timer ticks. Must be at least 1 microsecond.
TYPE:
|
skip_missed_ticks |
Whether to skip missed ticks or trigger them all until it catches up.
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Timer
|
The timer instance. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in frequenz/channels/util/_timer.py
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 |
|
ready()
async
¤
Wait until the timer interval
passed.
Once a call to ready()
has finished, the resulting tick information
must be read with a call to consume()
(receive()
or iterated over)
to tell the timer it should wait for the next interval.
The timer will remain ready (this method will return immediately) until it is consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer was started and it is still running. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a running loop. |
Source code in frequenz/channels/util/_timer.py
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 |
|
reset()
¤
Reset the timer to start timing from now.
If the timer was stopped, or not started yet, it will be started.
This can only be called with a running loop, see the class documentation for more details.
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a running loop. |
Source code in frequenz/channels/util/_timer.py
592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
|
stop()
¤
Stop the timer.
Once stop
has been called, all subsequent calls to ready()
will
immediately return False and calls to consume()
/ receive()
or any
use of the async iterator interface will raise
a ReceiverStoppedError
.
You can restart the timer with reset()
.
Source code in frequenz/channels/util/_timer.py
607 608 609 610 611 612 613 614 615 616 617 618 619 |
|
timeout(delay, /, *, auto_start=True, loop=None)
classmethod
¤
Create a timer useful for tracking timeouts.
This is basically a shortcut to create a timer with
SkipMissedAndDrift(delay_tolerance=timedelta(0))
as the missed tick policy.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
delay |
The time until the timer ticks. Must be at least 1 microsecond.
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Timer
|
The timer instance. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in frequenz/channels/util/_timer.py
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
|
frequenz.channels.util.TriggerAllMissed
¤
Bases: MissedTickPolicy
A policy that triggers all the missed ticks immediately until it catches up.
Example
Assume a timer with interval 1 second, the tick T0
happens exactly
at time 0, the second tick, T1
, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, T2
, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, T3
, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately as well as the fifth tick, T4
, which was also
already delayed (by 0.3 seconds), so it catches up. The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately
again. The seventh tick, T6
, happens at 6.0, right on time.
0 1 2 3 4 o 5 6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0 T1 T2 T3 T5 T6
T4
Source code in frequenz/channels/util/_timer.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
|
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
This method always returns scheduled_tick_time + interval
, as all
ticks need to produce a trigger event.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in frequenz/channels/util/_timer.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
|
frequenz.channels.util.UnhandledSelectedError
¤
Bases: SelectError
, Generic[_T]
A receiver was not handled in a select()
loop.
This exception is raised when a select()
iteration finishes without a call to
selected_from()
for the selected receiver.
Source code in frequenz/channels/util/_select.py
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
|
Functions¤
__init__(selected)
¤
Create a new instance.
PARAMETER | DESCRIPTION |
---|---|
selected |
The selected receiver that was not handled.
TYPE:
|
Source code in frequenz/channels/util/_select.py
185 186 187 188 189 190 191 192 193 |
|
Functions¤
frequenz.channels.util.select(*receivers)
async
¤
Iterate over the values of all receivers as they receive new values.
This function is used to iterate over the values of all receivers as they receive
new values. It is used in conjunction with the
Selected
class and the
selected_from()
function to determine
which function to determine which receiver was selected in a select operation.
An exhaustiveness check is performed at runtime to make sure all selected receivers
are handled in the if-chain, so you should call selected_from()
with all the
receivers passed to select()
inside the select loop, even if you plan to ignore
a value, to signal select()
that you are purposefully ignoring the value.
Note
The select()
function is intended to be used in cases where the set of
receivers is static and known beforehand. If you need to dynamically add/remove
receivers from a select loop, there are a few alternatives. Depending on your
use case, one or the other could work better for you:
- Use
Merge
orMergeNamed
: this is useful when you have and unknown number of receivers of the same type that can be handled as a group. - Use tasks to manage each recever individually: this is better if there are no relationships between the receivers.
- Break the
select()
loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never
from frequenz.channels import ReceiverStoppedError
from frequenz.channels.util import select, selected_from, Timer
timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))
async for selected in select(timer1, timer2):
if selected_from(selected, timer1):
# Beware: `selected.value` might raise an exception, you can always
# check for exceptions with `selected.exception` first or use
# a try-except block. You can also quickly check if the receiver was
# stopped and let any other unexpected exceptions bubble up.
if selected.was_stopped:
print("timer1 was stopped")
continue
print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
timer2.stop()
elif selected_from(selected, timer2):
# Explicitly handling of exceptions
match selected.exception:
case ReceiverStoppedError():
print("timer2 was stopped")
case Exception() as exception:
print(f"timer2: exception={exception}")
case None:
# All good, no exception, we can use `selected.value` safely
print(
f"timer2: now={datetime.datetime.now()} drift={selected.value}"
)
case _ as unhanded:
assert_never(unhanded)
else:
# This is not necessary, as select() will check for exhaustiveness, but
# it is good practice to have it in case you forgot to handle a new
# receiver added to `select()` at a later point in time.
assert False
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to select from. |
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Selected[Any]]
|
The currently selected item. |
RAISES | DESCRIPTION |
---|---|
UnhandledSelectedError
|
If a selected receiver was not handled in the if-chain. |
SelectErrorGroup
|
If there is an error while finishing the select operation and receivers fail while cleaning up. |
SelectError
|
If there is an error while selecting receivers during normal
operation. For example if a receiver raises an exception in the |
Source code in frequenz/channels/util/_select.py
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
|
frequenz.channels.util.selected_from(selected, receiver)
¤
Check if the given receiver was selected by select
.
This function is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected |
The result of a |
receiver |
The receiver to check if it was the source of a select operation.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[_T]]
|
Whether the given receiver was selected. |
Source code in frequenz/channels/util/_select.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
|