timer
frequenz.channels.timer ¤
A receiver that sends a message regularly.
Quick Start¤
Important
This quick start is provided to have a quick feeling of how to use this module, but it is extremely important to understand how timers behave when they are delayed.
We recommend emphatically to read about missed ticks and drifting before using timers in production.
If you need to do something as periodically as possible (avoiding
drifts), you can use
a Timer
like this:
Periodic Timer Example
import asyncio
from datetime import datetime, timedelta
from frequenz.channels.timer import Timer
async def main() -> None:
async for drift in Timer(timedelta(seconds=1.0), TriggerAllMissed()):
print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")
asyncio.run(main())
This timer will tick as close as every second as possible, even if the loop is busy doing something else for a good amount of time. In extreme cases, if the loop was busy for a few seconds, the timer will trigger a few times in a row to catch up, one for every missed tick.
If, instead, you need a timeout, for example to abort waiting for other receivers after
a certain amount of time, you can use a Timer
like
this:
Timeout Example
import asyncio
from datetime import timedelta
from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.timer import Timer
async def main() -> None:
channel = Anycast[int](name="data-channel")
data_receiver = channel.new_receiver()
timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift())
async for selected in select(data_receiver, timer):
if selected_from(selected, data_receiver):
print(f"Received data: {selected.message}")
timer.reset()
elif selected_from(selected, timer):
drift = selected.message
print(f"No data received for {timer.interval + drift} seconds, giving up")
break
asyncio.run(main())
This timer will rearm itself automatically after it was triggered, so it will trigger again after the selected interval, no matter what the current drift was. So if the loop was busy for a few seconds, the timer will trigger immediately and then wait for another second before triggering again. The missed ticks are skipped.
Missed Ticks And Drifting¤
A Timer
can be used to send a messages at regular
time intervals, but there is one fundamental issue with timers in the asyncio world:
the event loop could give control to another task at any time, and that task can take
a long time to finish, making the time it takes the next timer message to be received
longer than the desired interval.
Because of this, it is very important for users to be able to understand and control how timers behave when they are delayed. Timers will handle missed ticks according to a missing tick policy.
The following built-in policies are available:
SkipMissedAndDrift
: A policy that drops all the missed ticks, triggers immediately and resets.SkipMissedAndResync
: A policy that drops all the missed ticks, triggers immediately and resyncs.TriggerAllMissed
: A policy that triggers all the missed ticks immediately until it catches up.
Classes¤
frequenz.channels.timer.MissedTickPolicy ¤
Bases: ABC
A policy to handle timer missed ticks.
To implement a custom policy you need to subclass
MissedTickPolicy
and implement the
calculate_next_tick_time
method.
Example
This policy will just wait one more second than the original interval if a tick is missed:
class WaitOneMoreSecond(MissedTickPolicy):
def calculate_next_tick_time(
self, *, interval: int, scheduled_tick_time: int, now: int
) -> int:
return scheduled_tick_time + interval + 1_000_000
async def main() -> None:
timer = Timer(
interval=timedelta(seconds=1),
missed_tick_policy=WaitOneMoreSecond(),
)
async for drift in timer:
print(f"The timer has triggered with a drift of {drift}")
asyncio.run(main())
Source code in frequenz/channels/timer.py
Functions¤
calculate_next_tick_time
abstractmethod
¤
Calculate the next tick time according to missed_tick_policy
.
This method is called by ready()
after it has determined that the
timer has triggered. It will check if the timer has missed any ticks
and handle them according to missed_tick_policy
.
PARAMETER | DESCRIPTION |
---|---|
interval |
The interval between ticks (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
now |
The current loop time (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds) according to
|
Source code in frequenz/channels/timer.py
frequenz.channels.timer.SkipMissedAndDrift ¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resets.
The SkipMissedAndDrift
policy will
behave effectively as if the timer was
reset every time it is triggered. This means
the start time will change and the drift will be accumulated each time a tick is
delayed. Only the relative drift will be returned on each tick.
The reset happens only if the delay is larger than the delay tolerance, so it is possible to ignore small delays and not drift in those cases.
Example
This example represents a timer with interval 1 second and delay tolerance of 0.1 seconds.
-
The first tick,
T0
, happens exactly at time 0. -
The second tick,
T1.2
, happens at time 1.2 (0.2 seconds late), so the timer triggers immediately but drifts a bit (0.2 seconds), so the next tick is expected at 2.2 seconds. -
The third tick,
T2.2
, happens at 2.3 seconds (0.1 seconds late), so it also triggers immediately but it doesn't drift because the delay is under thedelay_tolerance
. The next tick is expected at 3.2 seconds. -
The fourth tick,
T4.2
, triggers at 4.3 seconds (1.1 seconds late), so it also triggers immediately but the timer has drifted by 1.1 seconds, so a potential tickT3.2
is skipped (not triggered). -
The fifth tick,
T5.3
, triggers at 5.3 seconds so it is right on time (no drift) and the same happens for tickT6.3
, which triggers at 6.3 seconds.
Source code in frequenz/channels/timer.py
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
|
Attributes¤
delay_tolerance
property
¤
delay_tolerance: timedelta
The maximum delay that is tolerated before starting to drift.
Functions¤
__init__ ¤
__init__(*, delay_tolerance: timedelta = timedelta(0))
Initialize this policy.
See the class documentation for more details.
PARAMETER | DESCRIPTION |
---|---|
delay_tolerance |
The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
If |
Source code in frequenz/channels/timer.py
calculate_next_tick_time ¤
Calculate the next tick time.
If the drift is larger than delay_tolerance
, then it returns now +
interval
(so the timer drifts), otherwise it returns
scheduled_tick_time + interval
(we consider the delay too small and
avoid small drifts).
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in frequenz/channels/timer.py
frequenz.channels.timer.SkipMissedAndResync ¤
Bases: MissedTickPolicy
A policy that drops all the missed ticks, triggers immediately and resyncs.
If ticks are missed, the
SkipMissedAndResync
policy will
make the Timer
trigger immediately and it will
schedule to trigger again on the next multiple of the
interval, effectively skipping any missed
ticks, but re-syncing with the original start time.
Example
This example represents a timer with interval 1 second.
-
The first tick
T0
happens exactly at time 0. -
The second tick,
T1
, happens at time 1.2 (0.2 seconds late), so it triggers immediately. But it re-syncs, so the next tick is still expected at 2 seconds. This re-sync happens on every tick, so all ticks are expected at multiples of 1 second, not matter how delayed they were. -
The third tick,
T2
, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. -
The fourth tick,
T3
, happens at time 4.3 (1.3 seconds late), so it also triggers immediately, but there was also a tick expected at 4 seconds,T4
, which is skipped according to this policy to avoid bursts of ticks. -
The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately again. -
The seventh tick,
T6
, happens at 6.0, right on time.
Source code in frequenz/channels/timer.py
Functions¤
calculate_next_tick_time ¤
Calculate the next tick time.
Calculate the next multiple of interval
after scheduled_tick_time
.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |
Source code in frequenz/channels/timer.py
frequenz.channels.timer.Timer ¤
A receiver that sends a message regularly.
Timer
s are started by default after they are
created. This can be disabled by using auto_start=False
option when creating
them. In this case, the timer will not be started until
reset()
is called. Receiving from the timer
(either using receive()
or using the
async iterator interface) will also start the timer at that point.
Timers need to be created in a context where
a asyncio
loop is already running. If no
loop
is specified, the current running loop
is used.
Timers can be stopped by calling stop()
.
A stopped timer will raise
a ReceiverStoppedError
or stop the async
iteration as usual.
Once a timer is explicitly stopped, it can only be started again by explicitly
calling reset()
(trying to receive from it
or using the async iterator interface will keep failing).
Timer messages are timedelta
s containing the drift of the
timer, i.e. the difference between when the timer should have triggered and the time
when it actually triggered.
This drift will likely never be 0
, because if there is a task that is
running when it should trigger, the timer will be delayed. In this case the
drift will be positive. A negative drift should be technically impossible,
as the timer uses asyncio
s loop monotonic clock.
Warning
Even when the asyncio
loop's monotonic clock is a float
, timers use
int
s to represent time internally. The internal time is tracked in
microseconds, so the timer resolution is 1 microsecond
(interval
must be at least
1 microsecond).
This is to avoid floating point errors when performing calculations with time, which can lead to issues that are very hard to reproduce and debug.
If the timer is delayed too much, then it will behave according to the
missed_tick_policy
. Missing
ticks might or might not trigger a message and the drift could be accumulated or not
depending on the chosen policy.
Source code in frequenz/channels/timer.py
|
|
Attributes¤
missed_tick_policy
property
¤
missed_tick_policy: MissedTickPolicy
The policy of the timer when it misses a tick.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__init__ ¤
__init__(
interval: timedelta,
missed_tick_policy: MissedTickPolicy,
/,
*,
auto_start: bool = True,
start_delay: timedelta = timedelta(0),
loop: AbstractEventLoop | None = None,
) -> None
Initialize this timer.
See the class documentation for details.
PARAMETER | DESCRIPTION |
---|---|
interval |
The time between timer ticks. Must be at least 1 microsecond.
TYPE:
|
missed_tick_policy |
The policy of the timer when it misses a tick.
Commonly one of
TYPE:
|
auto_start |
Whether the timer should be started when the
instance is created. This can only be
TYPE:
|
start_delay |
The delay before the timer should start. If |
loop |
The event loop to use to track time. If
TYPE:
|
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If it was called without a loop and there is no running loop. |
ValueError
|
If |
Source code in frequenz/channels/timer.py
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 |
|
consume ¤
consume() -> timedelta
Return the latest drift once ready()
is complete.
Once the timer has triggered (ready()
is done), this method returns the
difference between when the timer should have triggered and the time when
it actually triggered. See the class documentation for more details.
RETURNS | DESCRIPTION |
---|---|
timedelta
|
The difference between when the timer should have triggered and the time when it actually did. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If the timer was stopped via |
Source code in frequenz/channels/timer.py
filter ¤
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
)
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function |
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function |
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the timer interval
passed.
Once a call to ready()
has finished, the resulting tick information
must be read with a call to consume()
(receive()
or iterated over)
to tell the timer it should wait for the next interval.
The timer will remain ready (this method will return immediately) until it is consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the timer was started and it is still running. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If it was called without a running loop. |
Source code in frequenz/channels/timer.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
reset ¤
Reset the timer to start timing from now (plus an optional delay).
If the timer was stopped, or not started yet, it will be started.
This can only be called with a running loop, see the class documentation for more details.
PARAMETER | DESCRIPTION |
---|---|
interval |
The new interval between ticks. If
TYPE:
|
start_delay |
The delay before the timer should start. This has microseconds resolution, anything smaller than a microsecond means no delay. |
RAISES | DESCRIPTION |
---|---|
RuntimeError
|
If it was called without a running loop. |
ValueError
|
If |
Source code in frequenz/channels/timer.py
stop ¤
Stop the timer.
Once stop
has been called, all subsequent calls to ready()
will
immediately return False and calls to consume()
/ receive()
or any
use of the async iterator interface will raise
a ReceiverStoppedError
.
You can restart the timer with reset()
.
Source code in frequenz/channels/timer.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected |
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |
Source code in frequenz/channels/_receiver.py
frequenz.channels.timer.TriggerAllMissed ¤
Bases: MissedTickPolicy
A policy that triggers all the missed ticks immediately until it catches up.
The TriggerAllMissed
policy will
trigger all missed ticks immediately until it catches up with the current time.
This means that if the timer is delayed for any reason, when it finally gets some
time to run, it will trigger all the missed ticks in a burst. The drift is not
accumulated and the next tick will be scheduled according to the original start
time.
Example
This example represents a timer with interval 1 second.
-
The first tick,
T0
happens exactly at time 0. -
The second tick,
T1
, happens at time 1.2 (0.2 seconds late), so it triggers immediately. But it re-syncs, so the next tick is still expected at 2 seconds. This re-sync happens on every tick, so all ticks are expected at multiples of 1 second, not matter how delayed they were. -
The third tick,
T2
, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. -
The fourth tick,
T3
, happens at time 4.3 (1.3 seconds late), so it also triggers immediately. -
The fifth tick,
T4
, which was also already delayed (by 0.3 seconds), triggers immediately too, resulting in a small catch-up burst. -
The sixth tick,
T5
, happens at 5.1 (0.1 seconds late), so it triggers immediately again. -
The seventh tick,
T6
, happens at 6.0, right on time.
Source code in frequenz/channels/timer.py
Functions¤
calculate_next_tick_time ¤
Calculate the next tick time.
This method always returns scheduled_tick_time + interval
, as all
ticks need to produce a trigger event.
PARAMETER | DESCRIPTION |
---|---|
now |
The current loop time (in microseconds).
TYPE:
|
scheduled_tick_time |
The time the current tick was scheduled to trigger (in microseconds).
TYPE:
|
interval |
The interval between ticks (in microseconds).
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The next tick time (in microseconds). |