Skip to content

util

frequenz.channels.util ¤

Channel utilities.

A module with several utilities to work with channels:

  • FileWatcher: A receiver that watches for file events.

  • Merge: A receiver that merge messages coming from multiple receivers into a single stream.

  • MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.

  • Timer: A receiver that ticks at certain intervals.

  • Select: A helper to select the next available message for each receiver in a group of receivers.

Classes¤

frequenz.channels.util.FileWatcher ¤

Bases: Receiver['FileWatcher.Event']

A channel receiver that watches for file events.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class FileWatcher(Receiver["FileWatcher.Event"]):
    """A channel receiver that watches for file events."""

    class EventType(Enum):
        """Available types of changes to watch for."""

        CREATE = Change.added
        MODIFY = Change.modified
        DELETE = Change.deleted

    @dataclass(frozen=True)
    class Event:
        """A file change event."""

        type: FileWatcher.EventType
        """The type of change that was observed."""
        path: pathlib.Path
        """The path where the change was observed."""

    def __init__(
        self,
        paths: list[pathlib.Path | str],
        event_types: abc.Iterable[EventType] = frozenset(EventType),
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for. Defaults to watch for
                all event types.
        """
        self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types)
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
        )
        self._awatch_stopped_exc: Exception | None = None
        self._changes: set[FileChange] = set()

    def _filter_events(
        self,
        change: Change,
        path: str,  # pylint: disable=unused-argument
    ) -> bool:
        """Filter events based on the event type and path.

        Args:
            change: The type of change to be notified.
            path: The path of the file that changed.

        Returns:
            Whether the event should be notified.
        """
        return change in [event_type.value for event_type in self.event_types]

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return True

        # if it was already stopped, return immediately.
        if self._awatch_stopped_exc is not None:
            return False

        try:
            self._changes = await self._awatch.__anext__()
        except StopAsyncIteration as err:
            self._awatch_stopped_exc = err

        return True

    def consume(self) -> Event:
        """Return the latest event once `ready` is complete.

        Returns:
            The next event that was received.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
        """
        if not self._changes and self._awatch_stopped_exc is not None:
            raise ReceiverStoppedError(self) from self._awatch_stopped_exc

        assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
        # Tuple of (Change, path) returned by watchfiles
        change, path_str = self._changes.pop()
        return FileWatcher.Event(
            type=FileWatcher.EventType(change), path=pathlib.Path(path_str)
        )
Classes¤
Event dataclass ¤

A file change event.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
31
32
33
34
35
36
37
38
@dataclass(frozen=True)
class Event:
    """A file change event."""

    type: FileWatcher.EventType
    """The type of change that was observed."""
    path: pathlib.Path
    """The path where the change was observed."""
Attributes¤
path: pathlib.Path instance-attribute ¤

The path where the change was observed.

type: FileWatcher.EventType instance-attribute ¤

The type of change that was observed.

EventType ¤

Bases: Enum

Available types of changes to watch for.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
24
25
26
27
28
29
class EventType(Enum):
    """Available types of changes to watch for."""

    CREATE = Change.added
    MODIFY = Change.modified
    DELETE = Change.deleted
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
80
81
82
83
84
85
86
87
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=frozenset(EventType)) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: list[pathlib.Path | str]

event_types

Types of events to watch for. Defaults to watch for all event types.

TYPE: abc.Iterable[EventType] DEFAULT: frozenset(EventType)

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    paths: list[pathlib.Path | str],
    event_types: abc.Iterable[EventType] = frozenset(EventType),
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for. Defaults to watch for
            all event types.
    """
    self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types)
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
    )
    self._awatch_stopped_exc: Exception | None = None
    self._changes: set[FileChange] = set()
consume() ¤

Return the latest event once ready is complete.

RETURNS DESCRIPTION
Event

The next event that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def consume(self) -> Event:
    """Return the latest event once `ready` is complete.

    Returns:
        The next event that was received.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
    """
    if not self._changes and self._awatch_stopped_exc is not None:
        raise ReceiverStoppedError(self) from self._awatch_stopped_exc

    assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
    # Tuple of (Change, path) returned by watchfiles
    change, path_str = self._changes.pop()
    return FileWatcher.Event(
        type=FileWatcher.EventType(change), path=pathlib.Path(path_str)
    )
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_file_watcher.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return True

    # if it was already stopped, return immediately.
    if self._awatch_stopped_exc is not None:
        return False

    try:
        self._changes = await self._awatch.__anext__()
    except StopAsyncIteration as err:
        self._awatch_stopped_exc = err

    return True

frequenz.channels.util.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

from frequenz.channels import Broadcast

channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass

When merge is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        from frequenz.channels import Broadcast

        channel1 = Broadcast[int]("input-chan-1")
        channel2 = Broadcast[int]("input-chan-2")
        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```

        When `merge` is no longer needed, then it should be stopped using
        `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `Merge` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> T:
        """Return the latest value once `ready` is complete.

        Returns:
            The next value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
53
54
55
56
57
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
T

The next value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def consume(self) -> T:
    """Return the latest value once `ready` is complete.

    Returns:
        The next value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the Merge instance and cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge.py
59
60
61
62
63
64
async def stop(self) -> None:
    """Stop the `Merge` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

When MergeNamed is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream.

    When `MergeNamed` is no longer needed, then it should be stopped using
    `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `MergeNamed` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append((name, result))
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> Tuple[str, T]:
        """Return the latest value once `ready` is complete.

        Returns:
            The next key, value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
34
35
36
37
38
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
Tuple[str, T]

The next key, value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def consume(self) -> Tuple[str, T]:
    """Return the latest value once `ready` is complete.

    Returns:
        The next key, value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append((name, result))
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the MergeNamed instance and cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_merge_named.py
40
41
42
43
44
45
async def stop(self) -> None:
    """Stop the `MergeNamed` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.MissedTickPolicy ¤

Bases: abc.ABC

A policy to handle timer missed ticks.

This is only relevant if the timer is not ready to trigger when it should (an interval passed) which can happen if the event loop is busy processing other tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class MissedTickPolicy(abc.ABC):
    """A policy to handle timer missed ticks.

    This is only relevant if the timer is not ready to trigger when it should
    (an interval passed) which can happen if the event loop is busy processing
    other tasks.
    """

    @abc.abstractmethod
    def calculate_next_tick_time(
        self, *, interval: int, scheduled_tick_time: int, now: int
    ) -> int:
        """Calculate the next tick time according to `missed_tick_policy`.

        This method is called by `ready()` after it has determined that the
        timer has triggered.  It will check if the timer has missed any ticks
        and handle them according to `missed_tick_policy`.

        Args:
            interval: The interval between ticks (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            now: The current loop time (in microseconds).

        Returns:
            The next tick time (in microseconds) according to
                `missed_tick_policy`.
        """
        return 0  # dummy value to avoid darglint warnings
Functions¤
calculate_next_tick_time(*, interval, scheduled_tick_time, now) abstractmethod ¤

Calculate the next tick time according to missed_tick_policy.

This method is called by ready() after it has determined that the timer has triggered. It will check if the timer has missed any ticks and handle them according to missed_tick_policy.

PARAMETER DESCRIPTION
interval

The interval between ticks (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

now

The current loop time (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds) according to missed_tick_policy.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abc.abstractmethod
def calculate_next_tick_time(
    self, *, interval: int, scheduled_tick_time: int, now: int
) -> int:
    """Calculate the next tick time according to `missed_tick_policy`.

    This method is called by `ready()` after it has determined that the
    timer has triggered.  It will check if the timer has missed any ticks
    and handle them according to `missed_tick_policy`.

    Args:
        interval: The interval between ticks (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        now: The current loop time (in microseconds).

    Returns:
        The next tick time (in microseconds) according to
            `missed_tick_policy`.
    """
    return 0  # dummy value to avoid darglint warnings

frequenz.channels.util.Select ¤

Select the next available message from a group of Receivers.

If Select was created with more Receiver than what are read in the if-chain after each call to ready(), messages coming in the additional receivers are dropped, and a warning message is logged.

Receivers also function as Receiver.

When Select is no longer needed, then it should be stopped using self.stop() method. This would cleanup any internal pending async tasks.

Example

For example, if there are two receivers that you want to simultaneously wait on, this can be done with:

from frequenz.channels import Broadcast

channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
    if msg := select.name1:
        if val := msg.inner:
            # do something with `val`
            pass
        else:
            # handle closure of receiver.
            pass
    elif msg := select.name2:
        # do something with `msg.inner`
        pass
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class Select:
    """Select the next available message from a group of Receivers.

    If `Select` was created with more `Receiver` than what are read in
    the if-chain after each call to
    [ready()][frequenz.channels.util.Select.ready], messages coming in the
    additional receivers are dropped, and a warning message is logged.

    [Receiver][frequenz.channels.Receiver]s also function as `Receiver`.

    When Select is no longer needed, then it should be stopped using
    `self.stop()` method. This would cleanup any internal pending async tasks.

    Example:
        For example, if there are two receivers that you want to
        simultaneously wait on, this can be done with:

        ```python
        from frequenz.channels import Broadcast

        channel1 = Broadcast[int]("input-chan-1")
        channel2 = Broadcast[int]("input-chan-2")
        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        select = Select(name1 = receiver1, name2 = receiver2)
        while await select.ready():
            if msg := select.name1:
                if val := msg.inner:
                    # do something with `val`
                    pass
                else:
                    # handle closure of receiver.
                    pass
            elif msg := select.name2:
                # do something with `msg.inner`
                pass
        ```
    """

    def __init__(self, **kwargs: Receiver[Any]) -> None:
        """Create a `Select` instance.

        Args:
            **kwargs: sequence of receivers
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[bool]] = set()

        for name, recv in self._receivers.items():
            self._pending.add(asyncio.create_task(recv.ready(), name=name))

        self._ready_count = 0
        self._prev_ready_count = 0
        self._result: Dict[str, Optional[_ReadyReceiver]] = {
            name: None for name in self._receivers
        }

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `Select` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until there is a message in any of the receivers.

        Returns `True` if there is a message available, and `False` if all
        receivers have closed.

        Returns:
            Whether there are further messages or not.
        """
        # This function will change radically soon
        # pylint: disable=too-many-nested-blocks
        if self._ready_count > 0:
            if self._ready_count == self._prev_ready_count:
                dropped_names: List[str] = []
                for name, value in self._result.items():
                    if value is not None:
                        dropped_names.append(name)
                        if value.recv is not None:
                            try:
                                value.recv.consume()
                            except ReceiverStoppedError:
                                pass
                        self._result[name] = None
                self._ready_count = 0
                self._prev_ready_count = 0
                logger.warning(
                    "Select.ready() dropped data from receiver(s): %s, "
                    "because no messages have been fetched since the last call to ready().",
                    dropped_names,
                )
            else:
                self._prev_ready_count = self._ready_count
                return True
        if len(self._pending) == 0:
            return False

        # once all the pending messages have been consumed, reset the
        # `_prev_ready_count` as well, and wait for new messages.
        self._prev_ready_count = 0

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for task in done:
            name = task.get_name()
            recv = self._receivers[name]
            receiver_active = task.result()
            if receiver_active:
                ready_recv = recv
            else:
                ready_recv = None
            self._ready_count += 1
            self._result[name] = _ReadyReceiver(ready_recv)
            # if channel or Receiver is closed
            # don't add a task for it again.
            if not receiver_active:
                continue
            self._pending.add(asyncio.create_task(recv.ready(), name=name))
        return True

    def __getattr__(self, name: str) -> Optional[Any]:
        """Return the latest unread message from a `Receiver`, if available.

        Args:
            name: Name of the channel.

        Returns:
            Latest unread message for the specified `Receiver`, or `None`.

        Raises:
            KeyError: when the name was not specified when creating the
                `Select` instance.
        """
        result = self._result[name]
        if result is None:
            return result
        self._result[name] = None
        self._ready_count -= 1
        return result.get()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
115
116
117
118
119
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__getattr__(name) ¤

Return the latest unread message from a Receiver, if available.

PARAMETER DESCRIPTION
name

Name of the channel.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Latest unread message for the specified Receiver, or None.

RAISES DESCRIPTION
KeyError

when the name was not specified when creating the Select instance.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def __getattr__(self, name: str) -> Optional[Any]:
    """Return the latest unread message from a `Receiver`, if available.

    Args:
        name: Name of the channel.

    Returns:
        Latest unread message for the specified `Receiver`, or `None`.

    Raises:
        KeyError: when the name was not specified when creating the
            `Select` instance.
    """
    result = self._result[name]
    if result is None:
        return result
    self._result[name] = None
    self._ready_count -= 1
    return result.get()
__init__(**kwargs) ¤

Create a Select instance.

PARAMETER DESCRIPTION
**kwargs

sequence of receivers

TYPE: Receiver[Any] DEFAULT: {}

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(self, **kwargs: Receiver[Any]) -> None:
    """Create a `Select` instance.

    Args:
        **kwargs: sequence of receivers
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[bool]] = set()

    for name, recv in self._receivers.items():
        self._pending.add(asyncio.create_task(recv.ready(), name=name))

    self._ready_count = 0
    self._prev_ready_count = 0
    self._result: Dict[str, Optional[_ReadyReceiver]] = {
        name: None for name in self._receivers
    }
ready() async ¤

Wait until there is a message in any of the receivers.

Returns True if there is a message available, and False if all receivers have closed.

RETURNS DESCRIPTION
bool

Whether there are further messages or not.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def ready(self) -> bool:
    """Wait until there is a message in any of the receivers.

    Returns `True` if there is a message available, and `False` if all
    receivers have closed.

    Returns:
        Whether there are further messages or not.
    """
    # This function will change radically soon
    # pylint: disable=too-many-nested-blocks
    if self._ready_count > 0:
        if self._ready_count == self._prev_ready_count:
            dropped_names: List[str] = []
            for name, value in self._result.items():
                if value is not None:
                    dropped_names.append(name)
                    if value.recv is not None:
                        try:
                            value.recv.consume()
                        except ReceiverStoppedError:
                            pass
                    self._result[name] = None
            self._ready_count = 0
            self._prev_ready_count = 0
            logger.warning(
                "Select.ready() dropped data from receiver(s): %s, "
                "because no messages have been fetched since the last call to ready().",
                dropped_names,
            )
        else:
            self._prev_ready_count = self._ready_count
            return True
    if len(self._pending) == 0:
        return False

    # once all the pending messages have been consumed, reset the
    # `_prev_ready_count` as well, and wait for new messages.
    self._prev_ready_count = 0

    done, self._pending = await asyncio.wait(
        self._pending, return_when=asyncio.FIRST_COMPLETED
    )
    for task in done:
        name = task.get_name()
        recv = self._receivers[name]
        receiver_active = task.result()
        if receiver_active:
            ready_recv = recv
        else:
            ready_recv = None
        self._ready_count += 1
        self._result[name] = _ReadyReceiver(ready_recv)
        # if channel or Receiver is closed
        # don't add a task for it again.
        if not receiver_active:
            continue
        self._pending.add(asyncio.create_task(recv.ready(), name=name))
    return True
stop() async ¤

Stop the Select instance and cleanup any pending tasks.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_select.py
121
122
123
124
125
126
async def stop(self) -> None:
    """Stop the `Select` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.SkipMissedAndDrift ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resets.

This will behave effectively as if the timer was reset() at the time it had triggered last, so the start time will change (and the drift will be accumulated each time a tick is delayed, but only the relative drift will be returned on each tick).

The reset happens only if the delay is larger than delay_tolerance, so it is possible to ignore small delays and not drift in those cases.

Example

Assume a timer with interval 1 second and delay_tolerance=0.1, the first tick, T0, happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so the timer triggers immmediately but drifts a bit. The next tick, T2.2, happens at 2.3 seconds (0.1 seconds late), so it also triggers immediately but it doesn't drift because the delay is under the delay_tolerance. The next tick, T3.2, triggers at 4.3 seconds (1.1 seconds late), so it also triggers immediately but the timer drifts by 1.1 seconds and the tick T4.2 is skipped (not triggered). The next tick, T5.3, triggers at 5.3 seconds so is right on time (no drift) and the same happens for tick T6.3, which triggers at 6.3 seconds.

0         1         2         3         4         5         6
o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
T0          T1         T2.2                T3.2      T5.3      T6.3
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class SkipMissedAndDrift(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resets.

    This will behave effectively as if the timer was `reset()` at the time it
    had triggered last, so the start time will change (and the drift will be
    accumulated each time a tick is delayed, but only the relative drift will
    be returned on each tick).

    The reset happens only if the delay is larger than `delay_tolerance`, so
    it is possible to ignore small delays and not drift in those cases.

    Example:
        Assume a timer with interval 1 second and `delay_tolerance=0.1`, the
        first tick, `T0`, happens exactly at time 0, the second tick, `T1`,
        happens at time 1.2 (0.2 seconds late), so the timer triggers
        immmediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds
        (0.1 seconds late), so it also triggers immediately but it doesn't
        drift because the delay is under the `delay_tolerance`. The next tick,
        `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
        immediately but the timer drifts by 1.1 seconds and the tick `T4.2` is
        skipped (not triggered). The next tick, `T5.3`, triggers at 5.3 seconds
        so is right on time (no drift) and the same happens for tick `T6.3`,
        which triggers at 6.3 seconds.

        ```
        0         1         2         3         4         5         6
        o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
        T0          T1         T2.2                T3.2      T5.3      T6.3
        ```
    """

    def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
        """
        Create an instance.

        See the class documenation for more details.

        Args:
            delay_tolerance: The maximum delay that is tolerated before
                starting to drift.  If a tick is delayed less than this, then
                it is not considered a missed tick and the timer doesn't
                accumulate this drift.

        Raises:
            ValueError: If `delay_tolerance` is negative.
        """
        self._tolerance: int = _to_microseconds(delay_tolerance)
        """The maximum allowed delay before starting to drift."""

        if self._tolerance < 0:
            raise ValueError("delay_tolerance must be positive")

    @property
    def delay_tolerance(self) -> timedelta:
        """Return the maximum delay that is tolerated before starting to drift.

        Returns:
            The maximum delay that is tolerated before starting to drift.
        """
        return timedelta(microseconds=self._tolerance)

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        If the drift is larger than `delay_tolerance`, then it returns `now +
        interval` (so the timer drifts), otherwise it returns
        `scheduled_tick_time + interval` (we consider the delay too small and
        avoid small drifts).

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        drift = now - scheduled_tick_time
        if drift > self._tolerance:
            return now + interval
        return scheduled_tick_time + interval
Attributes¤
delay_tolerance: timedelta property ¤

Return the maximum delay that is tolerated before starting to drift.

RETURNS DESCRIPTION
timedelta

The maximum delay that is tolerated before starting to drift.

Functions¤
__init__(*, delay_tolerance=timedelta(0)) ¤

Create an instance.

See the class documenation for more details.

PARAMETER DESCRIPTION
delay_tolerance

The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift.

TYPE: timedelta DEFAULT: timedelta(0)

RAISES DESCRIPTION
ValueError

If delay_tolerance is negative.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
    """
    Create an instance.

    See the class documenation for more details.

    Args:
        delay_tolerance: The maximum delay that is tolerated before
            starting to drift.  If a tick is delayed less than this, then
            it is not considered a missed tick and the timer doesn't
            accumulate this drift.

    Raises:
        ValueError: If `delay_tolerance` is negative.
    """
    self._tolerance: int = _to_microseconds(delay_tolerance)
    """The maximum allowed delay before starting to drift."""

    if self._tolerance < 0:
        raise ValueError("delay_tolerance must be positive")
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

If the drift is larger than delay_tolerance, then it returns now + interval (so the timer drifts), otherwise it returns scheduled_tick_time + interval (we consider the delay too small and avoid small drifts).

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    If the drift is larger than `delay_tolerance`, then it returns `now +
    interval` (so the timer drifts), otherwise it returns
    `scheduled_tick_time + interval` (we consider the delay too small and
    avoid small drifts).

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    drift = now - scheduled_tick_time
    if drift > self._tolerance:
        return now + interval
    return scheduled_tick_time + interval

frequenz.channels.util.SkipMissedAndResync ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resyncs.

If ticks are missed, the timer will trigger immediately returing the drift and it will schedule to trigger again on the next multiple of interval, effectively skipping any missed ticks, but resyncing with the original start time.

Example

Assume a timer with interval 1 second, the tick T0 happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so it trigges immediately. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately but the fifth tick, T4, which was also already delayed (by 0.3 seconds) is skipped. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again. The seventh tick, T6, happens at 6.0, right on time.

0         1         2         3         4  o      5         6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0          T1         T2                  T3      T5       T6
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class SkipMissedAndResync(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resyncs.

    If ticks are missed, the timer will trigger immediately returing the drift
    and it will schedule to trigger again on the next multiple of `interval`,
    effectively skipping any missed ticks, but resyncing with the original start
    time.

    Example:
        Assume a timer with interval 1 second, the tick `T0` happens exactly
        at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
        late), so it trigges immediately.  The third tick, `T2`, happens at
        time 2.3 (0.3 seconds late), so it also triggers immediately.  The
        fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
        triggers immediately but the fifth tick, `T4`, which was also
        already delayed (by 0.3 seconds) is skipped.  The sixth tick,
        `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
        again. The seventh tick, `T6`, happens at 6.0, right on time.

        ```
        0         1         2         3         4  o      5         6
        o---------|-o-------|--o------|---------|--o------|o--------o-----> time
        T0          T1         T2                  T3      T5       T6
        ```
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        Calculate the next multiple of `interval` after `scheduled_tick_time`.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        # We need to resync (align) the next tick time to the current time
        drift = now - scheduled_tick_time
        delta_to_next_tick = interval - (drift % interval)
        return now + delta_to_next_tick
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

Calculate the next multiple of interval after scheduled_tick_time.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    Calculate the next multiple of `interval` after `scheduled_tick_time`.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    # We need to resync (align) the next tick time to the current time
    drift = now - scheduled_tick_time
    delta_to_next_tick = interval - (drift % interval)
    return now + delta_to_next_tick

frequenz.channels.util.Timer ¤

Bases: Receiver[timedelta]

A timer receiver that triggers every interval time.

The timer as microseconds resolution, so the interval must be at least 1 microsecond.

The message it produces is a timedelta containing the drift of the timer, i.e. the difference between when the timer should have triggered and the time when it actually triggered.

This drift will likely never be 0, because if there is a task that is running when it should trigger, the timer will be delayed. In this case the drift will be positive. A negative drift should be technically impossible, as the timer uses asyncios loop monotonic clock.

If the timer is delayed too much, then the timer will behave according to the missed_tick_policy. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy.

The timer accepts an optional loop, which will be used to track the time. If loop is None, then the running loop will be used (if there is no running loop most calls will raise a RuntimeError).

Starting the timer can be delayed if necessary by using auto_start=False (for example until we have a running loop). A call to reset(), ready(), receive() or the async iterator interface to await for a new message will start the timer.

For the most common cases, a specialized constructor is provided:

Periodic timer example
async for drift in Timer.periodic(timedelta(seconds=1.0)):
    print(f"The timer has triggered {drift=}")

But you can also use Select to combine it with other receivers, and even start it (semi) manually:

import logging
from frequenz.channels.util import Select
from frequenz.channels import Broadcast

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan = Broadcast[int]("input-chan")
receiver1 = chan.new_receiver()

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
# Do some other initialization, the timer will start automatically if
# a message is awaited (or manually via `reset()`).
select = Select(bat_1=receiver1, timer=timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            battery_soc = val
        else:
            logging.warning("battery channel closed")
    elif drift := select.timer:
        # Print some regular battery data
        print(f"Battery is charged at {battery_soc}%")
Timeout example
import logging
from frequenz.channels.util import Select
from frequenz.channels import Broadcast

def process_data(data: int):
    logging.info("Processing data: %d", data)

def do_heavy_processing(data: int):
    logging.info("Heavy processing data: %d", data)

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan1 = Broadcast[int]("input-chan-1")
chan2 = Broadcast[int]("input-chan-2")
receiver1 = chan1.new_receiver()
receiver2 = chan2.new_receiver()
select = Select(bat_1=receiver1, heavy_process=receiver2, timeout=timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
            timer.reset()
        else:
            logging.warning("battery channel closed")
    if msg := select.heavy_process:
        if val := msg.inner:
            do_heavy_processing(val)
        else:
            logging.warning("processing channel closed")
    elif drift := select.timeout:
        logging.warning("No data received in time")

In this case do_heavy_processing might take 2 seconds, and we don't want our timeout timer to trigger for the missed ticks, and want the next tick to be relative to the time timer was last triggered.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class Timer(Receiver[timedelta]):
    """A timer receiver that triggers every `interval` time.

    The timer as microseconds resolution, so the `interval` must be at least
    1 microsecond.

    The message it produces is a `timedelta` containing the drift of the timer,
    i.e. the difference between when the timer should have triggered and the time
    when it actually triggered.

    This drift will likely never be `0`, because if there is a task that is
    running when it should trigger, the timer will be delayed. In this case the
    drift will be positive. A negative drift should be technically impossible,
    as the timer uses `asyncio`s loop monotonic clock.

    If the timer is delayed too much, then the timer will behave according to
    the `missed_tick_policy`. Missing ticks might or might not trigger
    a message and the drift could be accumulated or not depending on the
    chosen policy.

    The timer accepts an optional `loop`, which will be used to track the time.
    If `loop` is `None`, then the running loop will be used (if there is no
    running loop most calls will raise a `RuntimeError`).

    Starting the timer can be delayed if necessary by using `auto_start=False`
    (for example until we have a running loop). A call to `reset()`, `ready()`,
    `receive()` or the async iterator interface to await for a new message will
    start the timer.

    For the most common cases, a specialized constructor is provided:

    * [`periodic()`][frequenz.channels.util.Timer.periodic]
    * [`timeout()`][frequenz.channels.util.Timer.timeout]

    Example: Periodic timer example
        ```python
        async for drift in Timer.periodic(timedelta(seconds=1.0)):
            print(f"The timer has triggered {drift=}")
        ```

        But you can also use [`Select`][frequenz.channels.util.Select] to combine it
        with other receivers, and even start it (semi) manually:

        ```python
        import logging
        from frequenz.channels.util import Select
        from frequenz.channels import Broadcast

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        chan = Broadcast[int]("input-chan")
        receiver1 = chan.new_receiver()

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        # Do some other initialization, the timer will start automatically if
        # a message is awaited (or manually via `reset()`).
        select = Select(bat_1=receiver1, timer=timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    battery_soc = val
                else:
                    logging.warning("battery channel closed")
            elif drift := select.timer:
                # Print some regular battery data
                print(f"Battery is charged at {battery_soc}%")
        ```

    Example: Timeout example
        ```python
        import logging
        from frequenz.channels.util import Select
        from frequenz.channels import Broadcast

        def process_data(data: int):
            logging.info("Processing data: %d", data)

        def do_heavy_processing(data: int):
            logging.info("Heavy processing data: %d", data)

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        chan1 = Broadcast[int]("input-chan-1")
        chan2 = Broadcast[int]("input-chan-2")
        receiver1 = chan1.new_receiver()
        receiver2 = chan2.new_receiver()
        select = Select(bat_1=receiver1, heavy_process=receiver2, timeout=timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                    timer.reset()
                else:
                    logging.warning("battery channel closed")
            if msg := select.heavy_process:
                if val := msg.inner:
                    do_heavy_processing(val)
                else:
                    logging.warning("processing channel closed")
            elif drift := select.timeout:
                logging.warning("No data received in time")
        ```

        In this case `do_heavy_processing` might take 2 seconds, and we don't
        want our timeout timer to trigger for the missed ticks, and want the
        next tick to be relative to the time timer was last triggered.
    """

    def __init__(
        self,
        interval: timedelta,
        missed_tick_policy: MissedTickPolicy,
        /,
        *,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """Create an instance.

        See the class documentation for details.

        Args:
            interval: The time between timer ticks. Must be at least
                1 microsecond.
            missed_tick_policy: The policy of the timer when it misses
                a tick. See the documentation of `MissedTickPolicy` for
                details.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        self._interval: int = _to_microseconds(interval)
        """The time to between timer ticks."""

        self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
        """The policy of the timer when it misses a tick.

        See the documentation of `MissedTickPolicy` for details.
        """

        self._loop: asyncio.AbstractEventLoop = (
            loop if loop is not None else asyncio.get_running_loop()
        )
        """The event loop to use to track time."""

        self._stopped: bool = True
        """Whether the timer was requested to stop.

        If this is `False`, then the timer is running.

        If this is `True`, then it is stopped or there is a request to stop it
        or it was not started yet:

        * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
          created with `auto_start=False`).  Any receiving method will start
          it by calling `reset()` in this case.

        * If `_next_msg_time` is not `None`, it means there was a request to
          stop it.  In this case receiving methods will raise
          a `ReceiverClosedError`.
        """

        self._next_tick_time: int | None = None
        """The absolute (monotonic) time when the timer should trigger.

        If this is `None`, it means the timer didn't start yet, but it should
        be started as soon as it is used.
        """

        self._current_drift: timedelta | None = None
        """The difference between `_next_msg_time` and the triggered time.

        This is calculated by `ready()` but is returned by `consume()`. If
        `None` it means `ready()` wasn't called and `consume()` will assert.
        `consume()` will set it back to `None` to tell `ready()` that it needs
        to wait again.
        """

        if self._interval <= 0:
            raise ValueError(
                "The `interval` must be positive and at least 1 microsecond, "
                f"not {interval} ({self._interval} microseconds)"
            )

        if auto_start:
            self.reset()

    @classmethod
    def timeout(
        cls,
        delay: timedelta,
        /,
        *,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a timer useful for tracking timeouts.

        This is basically a shortcut to create a timer with
        `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy.

        See the class documentation for details.

        Args:
            delay: The time until the timer ticks. Must be at least
                1 microsecond.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        return Timer(
            delay,
            SkipMissedAndDrift(delay_tolerance=timedelta(0)),
            auto_start=auto_start,
            loop=loop,
        )

    @classmethod
    def periodic(
        cls,
        period: timedelta,
        /,
        *,
        skip_missed_ticks: bool = False,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a periodic timer.

        This is basically a shortcut to create a timer with either
        `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy
        (depending on `skip_missed_ticks`).

        See the class documentation for details.

        Args:
            period: The time between timer ticks. Must be at least
                1 microsecond.
            skip_missed_ticks: Whether to skip missed ticks or trigger them
                all until it catches up.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        missed_tick_policy = (
            SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
        )
        return Timer(
            period,
            missed_tick_policy,
            auto_start=auto_start,
            loop=loop,
        )

    @property
    def interval(self) -> timedelta:
        """The interval between timer ticks.

        Returns:
            The interval between timer ticks.
        """
        return timedelta(microseconds=self._interval)

    @property
    def missed_tick_policy(self) -> MissedTickPolicy:
        """The policy of the timer when it misses a tick.

        Returns:
            The policy of the timer when it misses a tick.
        """
        return self._missed_tick_policy

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        """The event loop used by the timer to track time.

        Returns:
            The event loop used by the timer to track time.
        """
        return self._loop

    @property
    def is_running(self) -> bool:
        """Whether the timer is running.

        This will be `False` if the timer was stopped, or not started yet.

        Returns:
            Whether the timer is running.
        """
        return not self._stopped

    def reset(self) -> None:
        """Reset the timer to start timing from now.

        If the timer was stopped, or not started yet, it will be started.

        This can only be called with a running loop, see the class
        documentation for more details.

        Raises:
            RuntimeError: if it was called without a running loop.
        """
        self._stopped = False
        self._next_tick_time = self._now() + self._interval
        self._current_drift = None

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to `ready()` will
        immediately return False and calls to `consume()` / `receive()` or any
        use of the async iterator interface will raise
        a `ReceiverStoppedError`.

        You can restart the timer with `reset()`.
        """
        self._stopped = True
        # We need to make sure it's not None, otherwise `ready()` will start it
        self._next_tick_time = self._now()

    async def ready(self) -> bool:
        """Wait until the timer `interval` passed.

        Once a call to `ready()` has finished, the resulting tick information
        must be read with a call to `consume()` (`receive()` or iterated over)
        to tell the timer it should wait for the next interval.

        The timer will remain ready (this method will return immediately)
        until it is consumed.

        Returns:
            Whether the timer was started and it is still running.

        Raises:
            RuntimeError: if it was called without a running loop.
        """
        # If there are messages waiting to be consumed, return immediately.
        if self._current_drift is not None:
            return True

        # If `_next_tick_time` is `None`, it means it was created with
        # `auto_start=False` and should be started.
        if self._next_tick_time is None:
            self.reset()
            assert (
                self._next_tick_time is not None
            ), "This should be assigned by reset()"

        # If a stop was explicitly requested, we bail out.
        if self._stopped:
            return False

        now = self._now()
        time_to_next_tick = self._next_tick_time - now
        # If we didn't reach the tick yet, sleep until we do.
        if time_to_next_tick > 0:
            await asyncio.sleep(time_to_next_tick / 1_000_000)
            now = self._now()

        # If a stop was explicitly requested during the sleep, we bail out.
        if self._stopped:
            return False

        self._current_drift = timedelta(microseconds=now - self._next_tick_time)
        self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
            now=now,
            scheduled_tick_time=self._next_tick_time,
            interval=self._interval,
        )

        return True

    def consume(self) -> timedelta:
        """Return the latest drift once `ready()` is complete.

        Once the timer has triggered (`ready()` is done), this method returns the
        difference between when the timer should have triggered and the time when
        it actually triggered. See the class documentation for more details.

        Returns:
            The difference between when the timer should have triggered and the
                time when it actually did.

        Raises:
            ReceiverStoppedError: if the timer was stopped via `stop()`.
        """
        # If it was stopped and there it no pending result, we raise
        # (if there is a pending result, then we still want to return it first)
        if self._stopped and self._current_drift is None:
            raise ReceiverStoppedError(self)

        assert (
            self._current_drift is not None
        ), "calls to `consume()` must be follow a call to `ready()`"
        drift = self._current_drift
        self._current_drift = None
        return drift

    def _now(self) -> int:
        """Return the current monotonic clock time in microseconds.

        Returns:
            The current monotonic clock time in microseconds.
        """
        return _to_microseconds(self._loop.time())
Attributes¤
interval: timedelta property ¤

The interval between timer ticks.

RETURNS DESCRIPTION
timedelta

The interval between timer ticks.

is_running: bool property ¤

Whether the timer is running.

This will be False if the timer was stopped, or not started yet.

RETURNS DESCRIPTION
bool

Whether the timer is running.

loop: asyncio.AbstractEventLoop property ¤

The event loop used by the timer to track time.

RETURNS DESCRIPTION
asyncio.AbstractEventLoop

The event loop used by the timer to track time.

missed_tick_policy: MissedTickPolicy property ¤

The policy of the timer when it misses a tick.

RETURNS DESCRIPTION
MissedTickPolicy

The policy of the timer when it misses a tick.

Functions¤
__init__(interval, missed_tick_policy, /, *, auto_start=True, loop=None) ¤

Create an instance.

See the class documentation for details.

PARAMETER DESCRIPTION
interval

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

missed_tick_policy

The policy of the timer when it misses a tick. See the documentation of MissedTickPolicy for details.

TYPE: MissedTickPolicy

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: asyncio.AbstractEventLoop | None DEFAULT: None

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def __init__(
    self,
    interval: timedelta,
    missed_tick_policy: MissedTickPolicy,
    /,
    *,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> None:
    """Create an instance.

    See the class documentation for details.

    Args:
        interval: The time between timer ticks. Must be at least
            1 microsecond.
        missed_tick_policy: The policy of the timer when it misses
            a tick. See the documentation of `MissedTickPolicy` for
            details.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    self._interval: int = _to_microseconds(interval)
    """The time to between timer ticks."""

    self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
    """The policy of the timer when it misses a tick.

    See the documentation of `MissedTickPolicy` for details.
    """

    self._loop: asyncio.AbstractEventLoop = (
        loop if loop is not None else asyncio.get_running_loop()
    )
    """The event loop to use to track time."""

    self._stopped: bool = True
    """Whether the timer was requested to stop.

    If this is `False`, then the timer is running.

    If this is `True`, then it is stopped or there is a request to stop it
    or it was not started yet:

    * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
      created with `auto_start=False`).  Any receiving method will start
      it by calling `reset()` in this case.

    * If `_next_msg_time` is not `None`, it means there was a request to
      stop it.  In this case receiving methods will raise
      a `ReceiverClosedError`.
    """

    self._next_tick_time: int | None = None
    """The absolute (monotonic) time when the timer should trigger.

    If this is `None`, it means the timer didn't start yet, but it should
    be started as soon as it is used.
    """

    self._current_drift: timedelta | None = None
    """The difference between `_next_msg_time` and the triggered time.

    This is calculated by `ready()` but is returned by `consume()`. If
    `None` it means `ready()` wasn't called and `consume()` will assert.
    `consume()` will set it back to `None` to tell `ready()` that it needs
    to wait again.
    """

    if self._interval <= 0:
        raise ValueError(
            "The `interval` must be positive and at least 1 microsecond, "
            f"not {interval} ({self._interval} microseconds)"
        )

    if auto_start:
        self.reset()
consume() ¤

Return the latest drift once ready() is complete.

Once the timer has triggered (ready() is done), this method returns the difference between when the timer should have triggered and the time when it actually triggered. See the class documentation for more details.

RETURNS DESCRIPTION
timedelta

The difference between when the timer should have triggered and the time when it actually did.

RAISES DESCRIPTION
ReceiverStoppedError

if the timer was stopped via stop().

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
def consume(self) -> timedelta:
    """Return the latest drift once `ready()` is complete.

    Once the timer has triggered (`ready()` is done), this method returns the
    difference between when the timer should have triggered and the time when
    it actually triggered. See the class documentation for more details.

    Returns:
        The difference between when the timer should have triggered and the
            time when it actually did.

    Raises:
        ReceiverStoppedError: if the timer was stopped via `stop()`.
    """
    # If it was stopped and there it no pending result, we raise
    # (if there is a pending result, then we still want to return it first)
    if self._stopped and self._current_drift is None:
        raise ReceiverStoppedError(self)

    assert (
        self._current_drift is not None
    ), "calls to `consume()` must be follow a call to `ready()`"
    drift = self._current_drift
    self._current_drift = None
    return drift
periodic(period, /, *, skip_missed_ticks=False, auto_start=True, loop=None) classmethod ¤

Create a periodic timer.

This is basically a shortcut to create a timer with either TriggerAllMissed() or SkipMissedAndResync() as the missed tick policy (depending on skip_missed_ticks).

See the class documentation for details.

PARAMETER DESCRIPTION
period

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

skip_missed_ticks

Whether to skip missed ticks or trigger them all until it catches up.

TYPE: bool DEFAULT: False

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: asyncio.AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
@classmethod
def periodic(
    cls,
    period: timedelta,
    /,
    *,
    skip_missed_ticks: bool = False,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a periodic timer.

    This is basically a shortcut to create a timer with either
    `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy
    (depending on `skip_missed_ticks`).

    See the class documentation for details.

    Args:
        period: The time between timer ticks. Must be at least
            1 microsecond.
        skip_missed_ticks: Whether to skip missed ticks or trigger them
            all until it catches up.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    missed_tick_policy = (
        SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
    )
    return Timer(
        period,
        missed_tick_policy,
        auto_start=auto_start,
        loop=loop,
    )
ready() async ¤

Wait until the timer interval passed.

Once a call to ready() has finished, the resulting tick information must be read with a call to consume() (receive() or iterated over) to tell the timer it should wait for the next interval.

The timer will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the timer was started and it is still running.

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
async def ready(self) -> bool:
    """Wait until the timer `interval` passed.

    Once a call to `ready()` has finished, the resulting tick information
    must be read with a call to `consume()` (`receive()` or iterated over)
    to tell the timer it should wait for the next interval.

    The timer will remain ready (this method will return immediately)
    until it is consumed.

    Returns:
        Whether the timer was started and it is still running.

    Raises:
        RuntimeError: if it was called without a running loop.
    """
    # If there are messages waiting to be consumed, return immediately.
    if self._current_drift is not None:
        return True

    # If `_next_tick_time` is `None`, it means it was created with
    # `auto_start=False` and should be started.
    if self._next_tick_time is None:
        self.reset()
        assert (
            self._next_tick_time is not None
        ), "This should be assigned by reset()"

    # If a stop was explicitly requested, we bail out.
    if self._stopped:
        return False

    now = self._now()
    time_to_next_tick = self._next_tick_time - now
    # If we didn't reach the tick yet, sleep until we do.
    if time_to_next_tick > 0:
        await asyncio.sleep(time_to_next_tick / 1_000_000)
        now = self._now()

    # If a stop was explicitly requested during the sleep, we bail out.
    if self._stopped:
        return False

    self._current_drift = timedelta(microseconds=now - self._next_tick_time)
    self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
        now=now,
        scheduled_tick_time=self._next_tick_time,
        interval=self._interval,
    )

    return True
reset() ¤

Reset the timer to start timing from now.

If the timer was stopped, or not started yet, it will be started.

This can only be called with a running loop, see the class documentation for more details.

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def reset(self) -> None:
    """Reset the timer to start timing from now.

    If the timer was stopped, or not started yet, it will be started.

    This can only be called with a running loop, see the class
    documentation for more details.

    Raises:
        RuntimeError: if it was called without a running loop.
    """
    self._stopped = False
    self._next_tick_time = self._now() + self._interval
    self._current_drift = None
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to ready() will immediately return False and calls to consume() / receive() or any use of the async iterator interface will raise a ReceiverStoppedError.

You can restart the timer with reset().

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
584
585
586
587
588
589
590
591
592
593
594
595
596
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to `ready()` will
    immediately return False and calls to `consume()` / `receive()` or any
    use of the async iterator interface will raise
    a `ReceiverStoppedError`.

    You can restart the timer with `reset()`.
    """
    self._stopped = True
    # We need to make sure it's not None, otherwise `ready()` will start it
    self._next_tick_time = self._now()
timeout(delay, /, *, auto_start=True, loop=None) classmethod ¤

Create a timer useful for tracking timeouts.

This is basically a shortcut to create a timer with SkipMissedAndDrift(delay_tolerance=timedelta(0)) as the missed tick policy.

See the class documentation for details.

PARAMETER DESCRIPTION
delay

The time until the timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: asyncio.AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@classmethod
def timeout(
    cls,
    delay: timedelta,
    /,
    *,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a timer useful for tracking timeouts.

    This is basically a shortcut to create a timer with
    `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy.

    See the class documentation for details.

    Args:
        delay: The time until the timer ticks. Must be at least
            1 microsecond.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    return Timer(
        delay,
        SkipMissedAndDrift(delay_tolerance=timedelta(0)),
        auto_start=auto_start,
        loop=loop,
    )

frequenz.channels.util.TriggerAllMissed ¤

Bases: MissedTickPolicy

A policy that triggers all the missed ticks immediately until it catches up.

Example

Assume a timer with interval 1 second, the tick T0 happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so it trigges immediately. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately as well as the fifth tick, T4, which was also already delayed (by 0.3 seconds), so it catches up. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again. The seventh tick, T6, happens at 6.0, right on time.

0         1         2         3         4  o      5         6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0          T1         T2                  T3      T5       T6
                                           T4
Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class TriggerAllMissed(MissedTickPolicy):
    """A policy that triggers all the missed ticks immediately until it catches up.

    Example:
        Assume a timer with interval 1 second, the tick `T0` happens exactly
        at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
        late), so it trigges immediately.  The third tick, `T2`, happens at
        time 2.3 (0.3 seconds late), so it also triggers immediately.  The
        fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
        triggers immediately as well as the fifth tick, `T4`, which was also
        already delayed (by 0.3 seconds), so it catches up.  The sixth tick,
        `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
        again. The seventh tick, `T6`, happens at 6.0, right on time.

        ```
        0         1         2         3         4  o      5         6
        o---------|-o-------|--o------|---------|--o------|o--------o-----> time
        T0          T1         T2                  T3      T5       T6
                                                   T4
        ```
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        This method always returns `scheduled_tick_time + interval`, as all
        ticks need to produce a trigger event.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        return scheduled_tick_time + interval
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

This method always returns scheduled_tick_time + interval, as all ticks need to produce a trigger event.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in /opt/hostedtoolcache/Python/3.11.3/x64/lib/python3.11/site-packages/frequenz/channels/util/_timer.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    This method always returns `scheduled_tick_time + interval`, as all
    ticks need to produce a trigger event.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    return scheduled_tick_time + interval