util
frequenz.channels.util
¤
Channel utilities.
A module with several utilities to work with channels:
-
FileWatcher: A receiver that watches for file events.
-
Merge: A receiver that merge messages coming from multiple receivers into a single stream.
-
MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.
-
Select: A helper to select the next available message for each receiver in a group of receivers.
Classes¤
frequenz.channels.util.FileWatcher
¤
Bases: Receiver['FileWatcher.Event']
A channel receiver that watches for file events.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
Classes¤
Event
dataclass
¤
A file change event.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
31 32 33 34 35 36 37 38 |
|
EventType
¤
Bases: Enum
Available types of changes to watch for.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
24 25 26 27 28 29 |
|
Functions¤
__del__()
¤
Cleanup registered watches.
awatch
passes the stop_event
to a separate task/thread. This way
awatch
getting destroyed properly. The background task will continue
until the signal is received.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
80 81 82 83 84 85 86 87 |
|
__init__(paths, event_types=frozenset(EventType))
¤
Create a FileWatcher
instance.
PARAMETER | DESCRIPTION |
---|---|
paths |
Paths to watch for changes. |
event_types |
Types of events to watch for. Defaults to watch for all event types. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
|
consume()
¤
Return the latest event once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Event
|
The next event that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if there is some problem with the receiver. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
frequenz.channels.util.Merge
¤
Bases: Receiver[T]
Merge messages coming from multiple channels into a single stream.
Example
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using Merge
like this:
from frequenz.channels import Broadcast
channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
When merge
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
53 54 55 56 57 |
|
__init__(*args)
¤
Create a Merge
instance.
PARAMETER | DESCRIPTION |
---|---|
*args |
sequence of channel receivers.
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
40 41 42 43 44 45 46 47 48 49 50 51 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
T
|
The next value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
|
stop()
async
¤
Stop the Merge
instance and cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
59 60 61 62 63 64 |
|
frequenz.channels.util.MergeNamed
¤
Bases: Receiver[Tuple[str, T]]
Merge messages coming from multiple named channels into a single stream.
When MergeNamed
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
34 35 36 37 38 |
|
__init__(**kwargs)
¤
Create a MergeNamed
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of channel receivers.
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
21 22 23 24 25 26 27 28 29 30 31 32 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Tuple[str, T]
|
The next key, value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
|
stop()
async
¤
Stop the MergeNamed
instance and cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
40 41 42 43 44 45 |
|
frequenz.channels.util.MissedTickPolicy
¤
A policy to handle timer missed ticks.
This is only relevant if the timer is not ready to trigger when it should (an interval passed) which can happen if the event loop is busy processing other tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
|
Functions¤
calculate_next_tick_time(*, interval, scheduled_tick_time, now)
abstractmethod
¤
Calculate the next tick time according to missed_tick_policy
.
This method is called by ready()
after it has determined that the
timer has triggered. It will check if the timer has missed any ticks
and handle them according to missed_tick_policy
.
PARAMETER | DESCRIPTION |
---|---|
interval |
The interval between ticks (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
now |
The current loop time (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds) according to
|
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
|
frequenz.channels.util.Select
¤
Select the next available message from a group of Receivers.
If Select
was created with more Receiver
than what are read in
the if-chain after each call to
ready(), messages coming in the
additional receivers are dropped, and a warning message is logged.
Receivers also function as Receiver
.
When Select is no longer needed, then it should be stopped using
self.stop()
method. This would cleanup any internal pending async tasks.
Example
For example, if there are two receivers that you want to simultaneously wait on, this can be done with:
from frequenz.channels import Broadcast
channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()
select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
if msg := select.name1:
if val := msg.inner:
# do something with `val`
pass
else:
# handle closure of receiver.
pass
elif msg := select.name2:
# do something with `msg.inner`
pass
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
115 116 117 118 119 |
|
__getattr__(name)
¤
Return the latest unread message from a Receiver
, if available.
PARAMETER | DESCRIPTION |
---|---|
name |
Name of the channel.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Optional[Any]
|
Latest unread message for the specified |
RAISES | DESCRIPTION |
---|---|
KeyError
|
when the name was not specified when creating the
|
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
|
__init__(**kwargs)
¤
Create a Select
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of receivers |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
ready()
async
¤
Wait until there is a message in any of the receivers.
Returns True
if there is a message available, and False
if all
receivers have closed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether there are further messages or not. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
|
stop()
async
¤
Stop the Select
instance and cleanup any pending tasks.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
121 122 123 124 125 126 |
|
frequenz.channels.util.SkipMissedAndDrift
¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resets.
This will behave effectively as if the timer was reset()
at the time it
had triggered last, so the start time will change (and the drift will be
accumulated each time a tick is delayed, but only the relative drift will
be returned on each tick).
The reset happens only if the delay is larger than delay_tolerance
, so
it is possible to ignore small delays and not drift in those cases.
Example
Assume a timer with interval 1 second and delay_tolerance=0.1
, the
first tick, T0
, happens exactly at time 0, the second tick, T1
,
happens at time 1.2 (0.2 seconds late), so the timer triggers
immmediately but drifts a bit. The next tick, T2.2
, happens at 2.3 seconds
(0.1 seconds late), so it also triggers immediately but it doesn't
drift because the delay is under the delay_tolerance
. The next tick,
T3.2
, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
immediately but the timer drifts by 1.1 seconds and the tick T4.2
is
skipped (not triggered). The next tick, T5.3
, triggers at 5.3 seconds
so is right on time (no drift) and the same happens for tick T6.3
,
which triggers at 6.3 seconds.
0 1 2 3 4 5 6
o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
T0 T1 T2.2 T3.2 T5.3 T6.3
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
Attributes¤
delay_tolerance: timedelta
property
¤
Return the maximum delay that is tolerated before starting to drift.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The maximum delay that is tolerated before starting to drift. |
Functions¤
__init__(*, delay_tolerance=timedelta(0))
¤
Create an instance.
See the class documenation for more details.
PARAMETER | DESCRIPTION |
---|---|
delay_tolerance |
The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
If |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
|
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
If the drift is larger than delay_tolerance
, then it returns now +
interval
(so the timer drifts), otherwise it returns
scheduled_tick_time + interval
(we consider the delay too small and
avoid small drifts).
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
frequenz.channels.util.SkipMissedAndResync
¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resyncs.
If ticks are missed, the timer will trigger immediately returing the drift
and it will schedule to trigger again on the next multiple of interval
,
effectively skipping any missed ticks, but resyncing with the original start
time.
Example
Assume a timer with interval 1 second, the tick T0
happens exactly
at time 0, the second tick, T1
, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, T2
, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, T3
, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately but the fifth tick, T4
, which was also
already delayed (by 0.3 seconds) is skipped. The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately
again. The seventh tick, T6
, happens at 6.0, right on time.
0 1 2 3 4 o 5 6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0 T1 T2 T3 T5 T6
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
Calculate the next multiple of interval
after scheduled_tick_time
.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|
frequenz.channels.util.Timer
¤
A timer receiver that triggers every interval
time.
The timer as microseconds resolution, so the interval
must be at least
1 microsecond.
The message it produces is a timedelta
containing the drift of the timer,
i.e. the difference between when the timer should have triggered and the time
when it actually triggered.
This drift will likely never be 0
, because if there is a task that is
running when it should trigger, the timer will be delayed. In this case the
drift will be positive. A negative drift should be technically impossible,
as the timer uses asyncio
s loop monotonic clock.
If the timer is delayed too much, then the timer will behave according to
the missed_tick_policy
. Missing ticks might or might not trigger
a message and the drift could be accumulated or not depending on the
chosen policy.
The timer accepts an optional loop
, which will be used to track the time.
If loop
is None
, then the running loop will be used (if there is no
running loop most calls will raise a RuntimeError
).
Starting the timer can be delayed if necessary by using auto_start=False
(for example until we have a running loop). A call to reset()
, ready()
,
receive()
or the async iterator interface to await for a new message will
start the timer.
For the most common cases, a specialized constructor is provided:
Periodic timer example
async for drift in Timer.periodic(timedelta(seconds=1.0)):
print(f"The timer has triggered {drift=}")
But you can also use Select
to combine it
with other receivers, and even start it (semi) manually:
import logging
from frequenz.channels.util import Select
from frequenz.channels import Broadcast
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan = Broadcast[int]("input-chan")
receiver1 = chan.new_receiver()
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
# Do some other initialization, the timer will start automatically if
# a message is awaited (or manually via `reset()`).
select = Select(bat_1=receiver1, timer=timer)
while await select.ready():
if msg := select.bat_1:
if val := msg.inner:
battery_soc = val
else:
logging.warning("battery channel closed")
elif drift := select.timer:
# Print some regular battery data
print(f"Battery is charged at {battery_soc}%")
Timeout example
import logging
from frequenz.channels.util import Select
from frequenz.channels import Broadcast
def process_data(data: int):
logging.info("Processing data: %d", data)
def do_heavy_processing(data: int):
logging.info("Heavy processing data: %d", data)
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan1 = Broadcast[int]("input-chan-1")
chan2 = Broadcast[int]("input-chan-2")
receiver1 = chan1.new_receiver()
receiver2 = chan2.new_receiver()
select = Select(bat_1=receiver1, heavy_process=receiver2, timeout=timer)
while await select.ready():
if msg := select.bat_1:
if val := msg.inner:
process_data(val)
timer.reset()
else:
logging.warning("battery channel closed")
if msg := select.heavy_process:
if val := msg.inner:
do_heavy_processing(val)
else:
logging.warning("processing channel closed")
elif drift := select.timeout:
logging.warning("No data received in time")
In this case do_heavy_processing
might take 2 seconds, and we don't
want our timeout timer to trigger for the missed ticks, and want the
next tick to be relative to the time timer was last triggered.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
|
Attributes¤
interval: timedelta
property
¤
The interval between timer ticks.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The interval between timer ticks. |
is_running: bool
property
¤
Whether the timer is running.
This will be False
if the timer was stopped, or not started yet.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer is running. |
loop: asyncio.AbstractEventLoop
property
¤
The event loop used by the timer to track time.
RETURNS | DESCRIPTION |
---|---|
asyncio.AbstractEventLoop
|
The event loop used by the timer to track time. |
missed_tick_policy: MissedTickPolicy
property
¤
The policy of the timer when it misses a tick.
RETURNS | DESCRIPTION |
---|---|
MissedTickPolicy
|
The policy of the timer when it misses a tick. |
Functions¤
__init__(interval, missed_tick_policy, /, *, auto_start=True, loop=None)
¤
Create an instance.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
interval |
The time between timer ticks. Must be at least 1 microsecond.
TYPE:
|
missed_tick_policy |
The policy of the timer when it misses
a tick. See the documentation of
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
consume()
¤
Return the latest drift once ready()
is complete.
Once the timer has triggered (ready()
is done), this method returns the
difference between when the timer should have triggered and the time when
it actually triggered. See the class documentation for more details.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The difference between when the timer should have triggered and the time when it actually did. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the timer was stopped via |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 |
|
periodic(period, /, *, skip_missed_ticks=False, auto_start=True, loop=None)
classmethod
¤
Create a periodic timer.
This is basically a shortcut to create a timer with either
TriggerAllMissed()
or SkipMissedAndResync()
as the missed tick policy
(depending on skip_missed_ticks
).
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
period |
The time between timer ticks. Must be at least 1 microsecond.
TYPE:
|
skip_missed_ticks |
Whether to skip missed ticks or trigger them all until it catches up.
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Timer
|
The timer instance. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 |
|
ready()
async
¤
Wait until the timer interval
passed.
Once a call to ready()
has finished, the resulting tick information
must be read with a call to consume()
(receive()
or iterated over)
to tell the timer it should wait for the next interval.
The timer will remain ready (this method will return immediately) until it is consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer was started and it is still running. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a running loop. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 |
|
reset()
¤
Reset the timer to start timing from now.
If the timer was stopped, or not started yet, it will be started.
This can only be called with a running loop, see the class documentation for more details.
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a running loop. |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
569 570 571 572 573 574 575 576 577 578 579 580 581 582 |
|
stop()
¤
Stop the timer.
Once stop
has been called, all subsequent calls to ready()
will
immediately return False and calls to consume()
/ receive()
or any
use of the async iterator interface will raise
a ReceiverStoppedError
.
You can restart the timer with reset()
.
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
584 585 586 587 588 589 590 591 592 593 594 595 596 |
|
timeout(delay, /, *, auto_start=True, loop=None)
classmethod
¤
Create a timer useful for tracking timeouts.
This is basically a shortcut to create a timer with
SkipMissedAndDrift(delay_tolerance=timedelta(0))
as the missed tick policy.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
delay |
The time until the timer ticks. Must be at least 1 microsecond.
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
loop |
The event loop to use to track time. If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Timer
|
The timer instance. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
if it was called without a loop and there is no running loop. |
ValueError
|
if |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 |
|
frequenz.channels.util.TriggerAllMissed
¤
Bases: MissedTickPolicy
A policy that triggers all the missed ticks immediately until it catches up.
Example
Assume a timer with interval 1 second, the tick T0
happens exactly
at time 0, the second tick, T1
, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, T2
, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, T3
, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately as well as the fifth tick, T4
, which was also
already delayed (by 0.3 seconds), so it catches up. The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately
again. The seventh tick, T6
, happens at 6.0, right on time.
0 1 2 3 4 o 5 6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0 T1 T2 T3 T5 T6
T4
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
|
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval)
¤
Calculate the next tick time.
This method always returns scheduled_tick_time + interval
, as all
ticks need to produce a trigger event.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
|