Skip to content

timer

frequenz.channels.utils.timer ¤

A timer receiver that returns the timestamp every interval.

Classes¤

frequenz.channels.utils.timer.Timer ¤

Bases: Receiver[datetime]

A timer receiver that returns the timestamp every interval seconds.

Primarily for use with Select.

Example

When you want something to happen with a fixed period:

timer = channel.Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do once every 30 seconds
        pass

When you want something to happen when nothing else has happened in a certain interval:

timer = channel.Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    timer.reset()
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do if there's no battery data for 30 seconds
        pass
Source code in frequenz/channels/utils/timer.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class Timer(Receiver[datetime]):
    """A timer receiver that returns the timestamp every `interval` seconds.

    Primarily for use with [Select][frequenz.channels.Select].

    Example:
        When you want something to happen with a fixed period:

        ```python
        timer = channel.Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do once every 30 seconds
                pass
        ```

        When you want something to happen when nothing else has happened in a
        certain interval:

        ```python
        timer = channel.Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            timer.reset()
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do if there's no battery data for 30 seconds
                pass
        ```
    """

    def __init__(self, interval: float) -> None:
        """Create a `Timer` instance.

        Args:
            interval: number of seconds between messages.
        """
        self._stopped = False
        self._interval = timedelta(seconds=interval)
        self._next_msg_time = datetime.now() + self._interval

    def reset(self) -> None:
        """Reset the timer to start timing from `now`."""
        self._next_msg_time = datetime.now() + self._interval

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to
        [receive()][frequenz.channels.Timer.receive] will immediately return
        `None`.
        """
        self._stopped = True

    async def receive(self) -> Optional[datetime]:
        """Return the current time once the next tick is due.

        Returns:
            The time of the next tick or `None` if
                [stop()][frequenz.channels.Timer.stop] has been called on the
                timer.
        """
        if self._stopped:
            return None
        now = datetime.now()
        diff = self._next_msg_time - now
        while diff.total_seconds() > 0:
            await asyncio.sleep(diff.total_seconds())
            now = datetime.now()
            diff = self._next_msg_time - now

        self._next_msg_time = now + self._interval

        return now
Functions¤
__init__(interval) ¤

Create a Timer instance.

PARAMETER DESCRIPTION
interval

number of seconds between messages.

TYPE: float

Source code in frequenz/channels/utils/timer.py
54
55
56
57
58
59
60
61
62
def __init__(self, interval: float) -> None:
    """Create a `Timer` instance.

    Args:
        interval: number of seconds between messages.
    """
    self._stopped = False
    self._interval = timedelta(seconds=interval)
    self._next_msg_time = datetime.now() + self._interval
receive() async ¤

Return the current time once the next tick is due.

RETURNS DESCRIPTION
Optional[datetime]

The time of the next tick or None if stop() has been called on the timer.

Source code in frequenz/channels/utils/timer.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def receive(self) -> Optional[datetime]:
    """Return the current time once the next tick is due.

    Returns:
        The time of the next tick or `None` if
            [stop()][frequenz.channels.Timer.stop] has been called on the
            timer.
    """
    if self._stopped:
        return None
    now = datetime.now()
    diff = self._next_msg_time - now
    while diff.total_seconds() > 0:
        await asyncio.sleep(diff.total_seconds())
        now = datetime.now()
        diff = self._next_msg_time - now

    self._next_msg_time = now + self._interval

    return now
reset() ¤

Reset the timer to start timing from now.

Source code in frequenz/channels/utils/timer.py
64
65
66
def reset(self) -> None:
    """Reset the timer to start timing from `now`."""
    self._next_msg_time = datetime.now() + self._interval
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to receive() will immediately return None.

Source code in frequenz/channels/utils/timer.py
68
69
70
71
72
73
74
75
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to
    [receive()][frequenz.channels.Timer.receive] will immediately return
    `None`.
    """
    self._stopped = True