Skip to content

util

frequenz.channels.util ¤

Channel utilities.

A module with several utilities to work with channels:

  • Event: A receiver that can be made ready through an event.

  • FileWatcher: A receiver that watches for file events.

  • Merge: A receiver that merge messages coming from multiple receivers into a single stream.

  • MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.

  • Timer: A receiver that ticks at certain intervals.

  • select: Iterate over the values of all receivers as new values become available.

Classes¤

frequenz.channels.util.Event ¤

Bases: Receiver[None]

A receiver that can be made ready through an event.

The receiver (the ready() method) will wait until set() is called. At that point the receiver will wait again after the event is consume()d.

The receiver can be completely stopped by calling stop().

Example
import asyncio
from frequenz.channels import Receiver
from frequenz.channels.util import Event, select, selected_from

other_receiver: Receiver[int] = ...
exit_event = Event()

async def exit_after_10_seconds() -> None:
    asyncio.sleep(10)
    exit_event.set()

asyncio.ensure_future(exit_after_10_seconds())

async for selected in select(exit_event, other_receiver):
    if selected_from(selected, exit_event):
        break
    if selected_from(selected, other_receiver):
        print(selected.value)
    else:
        assert False, "Unknow receiver selected"
Source code in frequenz/channels/util/_event.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class Event(_base_classes.Receiver[None]):
    """A receiver that can be made ready through an event.

    The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait
    until [`set()`][frequenz.channels.util.Event.set] is called.  At that point the
    receiver will wait again after the event is
    [`consume()`][frequenz.channels.Receiver.consume]d.

    The receiver can be completely stopped by calling
    [`stop()`][frequenz.channels.util.Event.stop].

    Example:
        ```python
        import asyncio
        from frequenz.channels import Receiver
        from frequenz.channels.util import Event, select, selected_from

        other_receiver: Receiver[int] = ...
        exit_event = Event()

        async def exit_after_10_seconds() -> None:
            asyncio.sleep(10)
            exit_event.set()

        asyncio.ensure_future(exit_after_10_seconds())

        async for selected in select(exit_event, other_receiver):
            if selected_from(selected, exit_event):
                break
            if selected_from(selected, other_receiver):
                print(selected.value)
            else:
                assert False, "Unknow receiver selected"
        ```
    """

    def __init__(self, name: str | None = None) -> None:
        """Create a new instance.

        Args:
            name: The name of the receiver.  If `None` the `id(self)` will be used as
                the name.  This is only for debugging purposes, it will be shown in the
                string representation of the receiver.
        """
        self._event: _asyncio.Event = _asyncio.Event()
        """The event that is set when the receiver is ready."""

        self._name: str = name or str(id(self))
        """The name of the receiver.

        This is for debugging purposes, it will be shown in the string representation
        of the receiver.
        """

        self._is_set: bool = False
        """Whether the receiver is ready to be consumed.

        This is used to differentiate between when the receiver was stopped (the event
        is triggered too) but still there is an event to be consumed and when it was
        stopped but was not explicitly set().
        """

        self._is_stopped: bool = False
        """Whether the receiver is stopped."""

    @property
    def name(self) -> str:
        """The name of this receiver.

        This is for debugging purposes, it will be shown in the string representation
        of this receiver.

        Returns:
            The name of this receiver.
        """
        return self._name

    @property
    def is_set(self) -> bool:
        """Whether this receiver is set (ready).

        Returns:
            Whether this receiver is set (ready).
        """
        return self._is_set

    @property
    def is_stopped(self) -> bool:
        """Whether this receiver is stopped.

        Returns:
            Whether this receiver is stopped.
        """
        return self._is_stopped

    def stop(self) -> None:
        """Stop this receiver."""
        self._is_stopped = True
        self._event.set()

    def set(self) -> None:
        """Trigger the event (make the receiver ready)."""
        self._is_set = True
        self._event.set()

    async def ready(self) -> bool:
        """Wait until this receiver is ready.

        Returns:
            Whether this receiver is still running.
        """
        if self._is_stopped:
            return False
        await self._event.wait()
        return not self._is_stopped

    def consume(self) -> None:
        """Consume the event.

        This makes this receiver wait again until the event is set again.

        Raises:
            ReceiverStoppedError: If this receiver is stopped.
        """
        if not self._is_set and self._is_stopped:
            raise _exceptions.ReceiverStoppedError(self)

        assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

        self._is_set = False
        self._event.clear()

    def __str__(self) -> str:
        """Return a string representation of this receiver.

        Returns:
            A string representation of this receiver.
        """
        return f"{type(self).__name__}({self._name!r})"

    def __repr__(self) -> str:
        """Return a string representation of this receiver.

        Returns:
            A string representation of this receiver.
        """
        return (
            f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
            f"is_stopped={self.is_stopped!r}>"
        )
Attributes¤
is_set: bool property ¤

Whether this receiver is set (ready).

RETURNS DESCRIPTION
bool

Whether this receiver is set (ready).

is_stopped: bool property ¤

Whether this receiver is stopped.

RETURNS DESCRIPTION
bool

Whether this receiver is stopped.

name: str property ¤

The name of this receiver.

This is for debugging purposes, it will be shown in the string representation of this receiver.

RETURNS DESCRIPTION
str

The name of this receiver.

Functions¤
__init__(name=None) ¤

Create a new instance.

PARAMETER DESCRIPTION
name

The name of the receiver. If None the id(self) will be used as the name. This is only for debugging purposes, it will be shown in the string representation of the receiver.

TYPE: str | None DEFAULT: None

Source code in frequenz/channels/util/_event.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, name: str | None = None) -> None:
    """Create a new instance.

    Args:
        name: The name of the receiver.  If `None` the `id(self)` will be used as
            the name.  This is only for debugging purposes, it will be shown in the
            string representation of the receiver.
    """
    self._event: _asyncio.Event = _asyncio.Event()
    """The event that is set when the receiver is ready."""

    self._name: str = name or str(id(self))
    """The name of the receiver.

    This is for debugging purposes, it will be shown in the string representation
    of the receiver.
    """

    self._is_set: bool = False
    """Whether the receiver is ready to be consumed.

    This is used to differentiate between when the receiver was stopped (the event
    is triggered too) but still there is an event to be consumed and when it was
    stopped but was not explicitly set().
    """

    self._is_stopped: bool = False
    """Whether the receiver is stopped."""
__repr__() ¤

Return a string representation of this receiver.

RETURNS DESCRIPTION
str

A string representation of this receiver.

Source code in frequenz/channels/util/_event.py
152
153
154
155
156
157
158
159
160
161
def __repr__(self) -> str:
    """Return a string representation of this receiver.

    Returns:
        A string representation of this receiver.
    """
    return (
        f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
        f"is_stopped={self.is_stopped!r}>"
    )
__str__() ¤

Return a string representation of this receiver.

RETURNS DESCRIPTION
str

A string representation of this receiver.

Source code in frequenz/channels/util/_event.py
144
145
146
147
148
149
150
def __str__(self) -> str:
    """Return a string representation of this receiver.

    Returns:
        A string representation of this receiver.
    """
    return f"{type(self).__name__}({self._name!r})"
consume() ¤

Consume the event.

This makes this receiver wait again until the event is set again.

RAISES DESCRIPTION
ReceiverStoppedError

If this receiver is stopped.

Source code in frequenz/channels/util/_event.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def consume(self) -> None:
    """Consume the event.

    This makes this receiver wait again until the event is set again.

    Raises:
        ReceiverStoppedError: If this receiver is stopped.
    """
    if not self._is_set and self._is_stopped:
        raise _exceptions.ReceiverStoppedError(self)

    assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

    self._is_set = False
    self._event.clear()
ready() async ¤

Wait until this receiver is ready.

RETURNS DESCRIPTION
bool

Whether this receiver is still running.

Source code in frequenz/channels/util/_event.py
117
118
119
120
121
122
123
124
125
126
async def ready(self) -> bool:
    """Wait until this receiver is ready.

    Returns:
        Whether this receiver is still running.
    """
    if self._is_stopped:
        return False
    await self._event.wait()
    return not self._is_stopped
set() ¤

Trigger the event (make the receiver ready).

Source code in frequenz/channels/util/_event.py
112
113
114
115
def set(self) -> None:
    """Trigger the event (make the receiver ready)."""
    self._is_set = True
    self._event.set()
stop() ¤

Stop this receiver.

Source code in frequenz/channels/util/_event.py
107
108
109
110
def stop(self) -> None:
    """Stop this receiver."""
    self._is_stopped = True
    self._event.set()

frequenz.channels.util.FileWatcher ¤

Bases: Receiver['FileWatcher.Event']

A channel receiver that watches for file events.

Source code in frequenz/channels/util/_file_watcher.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class FileWatcher(Receiver["FileWatcher.Event"]):
    """A channel receiver that watches for file events."""

    class EventType(Enum):
        """Available types of changes to watch for."""

        CREATE = Change.added
        MODIFY = Change.modified
        DELETE = Change.deleted

    @dataclass(frozen=True)
    class Event:
        """A file change event."""

        type: FileWatcher.EventType
        """The type of change that was observed."""
        path: pathlib.Path
        """The path where the change was observed."""

    def __init__(
        self,
        paths: list[pathlib.Path | str],
        event_types: abc.Iterable[EventType] = frozenset(EventType),
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for. Defaults to watch for
                all event types.
        """
        self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types)
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
        )
        self._awatch_stopped_exc: Exception | None = None
        self._changes: set[FileChange] = set()

    def _filter_events(
        self,
        change: Change,
        path: str,  # pylint: disable=unused-argument
    ) -> bool:
        """Filter events based on the event type and path.

        Args:
            change: The type of change to be notified.
            path: The path of the file that changed.

        Returns:
            Whether the event should be notified.
        """
        return change in [event_type.value for event_type in self.event_types]

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return True

        # if it was already stopped, return immediately.
        if self._awatch_stopped_exc is not None:
            return False

        try:
            self._changes = await self._awatch.__anext__()
        except StopAsyncIteration as err:
            self._awatch_stopped_exc = err

        return True

    def consume(self) -> Event:
        """Return the latest event once `ready` is complete.

        Returns:
            The next event that was received.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
        """
        if not self._changes and self._awatch_stopped_exc is not None:
            raise ReceiverStoppedError(self) from self._awatch_stopped_exc

        assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
        # Tuple of (Change, path) returned by watchfiles
        change, path_str = self._changes.pop()
        return FileWatcher.Event(
            type=FileWatcher.EventType(change), path=pathlib.Path(path_str)
        )
Classes¤
Event dataclass ¤

A file change event.

Source code in frequenz/channels/util/_file_watcher.py
31
32
33
34
35
36
37
38
@dataclass(frozen=True)
class Event:
    """A file change event."""

    type: FileWatcher.EventType
    """The type of change that was observed."""
    path: pathlib.Path
    """The path where the change was observed."""
Attributes¤
path: pathlib.Path instance-attribute ¤

The path where the change was observed.

type: FileWatcher.EventType instance-attribute ¤

The type of change that was observed.

EventType ¤

Bases: Enum

Available types of changes to watch for.

Source code in frequenz/channels/util/_file_watcher.py
24
25
26
27
28
29
class EventType(Enum):
    """Available types of changes to watch for."""

    CREATE = Change.added
    MODIFY = Change.modified
    DELETE = Change.deleted
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in frequenz/channels/util/_file_watcher.py
80
81
82
83
84
85
86
87
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=frozenset(EventType)) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: list[Path | str]

event_types

Types of events to watch for. Defaults to watch for all event types.

TYPE: Iterable[EventType] DEFAULT: frozenset(EventType)

Source code in frequenz/channels/util/_file_watcher.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    paths: list[pathlib.Path | str],
    event_types: abc.Iterable[EventType] = frozenset(EventType),
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for. Defaults to watch for
            all event types.
    """
    self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types)
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
    )
    self._awatch_stopped_exc: Exception | None = None
    self._changes: set[FileChange] = set()
consume() ¤

Return the latest event once ready is complete.

RETURNS DESCRIPTION
Event

The next event that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_file_watcher.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def consume(self) -> Event:
    """Return the latest event once `ready` is complete.

    Returns:
        The next event that was received.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
    """
    if not self._changes and self._awatch_stopped_exc is not None:
        raise ReceiverStoppedError(self) from self._awatch_stopped_exc

    assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
    # Tuple of (Change, path) returned by watchfiles
    change, path_str = self._changes.pop()
    return FileWatcher.Event(
        type=FileWatcher.EventType(change), path=pathlib.Path(path_str)
    )
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_file_watcher.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return True

    # if it was already stopped, return immediately.
    if self._awatch_stopped_exc is not None:
        return False

    try:
        self._changes = await self._awatch.__anext__()
    except StopAsyncIteration as err:
        self._awatch_stopped_exc = err

    return True

frequenz.channels.util.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

from frequenz.channels import Broadcast

channel1 = Broadcast[int]("input-chan-1")
channel2 = Broadcast[int]("input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass

When merge is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in frequenz/channels/util/_merge.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        from frequenz.channels import Broadcast

        channel1 = Broadcast[int]("input-chan-1")
        channel2 = Broadcast[int]("input-chan-2")
        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```

        When `merge` is no longer needed, then it should be stopped using
        `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `Merge` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> T:
        """Return the latest value once `ready` is complete.

        Returns:
            The next value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge.py
53
54
55
56
57
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in frequenz/channels/util/_merge.py
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
T

The next value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_merge.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def consume(self) -> T:
    """Return the latest value once `ready` is complete.

    Returns:
        The next value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_merge.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the Merge instance and cleanup any pending tasks.

Source code in frequenz/channels/util/_merge.py
59
60
61
62
63
64
async def stop(self) -> None:
    """Stop the `Merge` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

When MergeNamed is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in frequenz/channels/util/_merge_named.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream.

    When `MergeNamed` is no longer needed, then it should be stopped using
    `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `MergeNamed` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append((name, result))
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> Tuple[str, T]:
        """Return the latest value once `ready` is complete.

        Returns:
            The next key, value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge_named.py
34
35
36
37
38
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in frequenz/channels/util/_merge_named.py
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
Tuple[str, T]

The next key, value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_merge_named.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def consume(self) -> Tuple[str, T]:
    """Return the latest value once `ready` is complete.

    Returns:
        The next key, value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_merge_named.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append((name, result))
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the MergeNamed instance and cleanup any pending tasks.

Source code in frequenz/channels/util/_merge_named.py
40
41
42
43
44
45
async def stop(self) -> None:
    """Stop the `MergeNamed` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.MissedTickPolicy ¤

Bases: ABC

A policy to handle timer missed ticks.

This is only relevant if the timer is not ready to trigger when it should (an interval passed) which can happen if the event loop is busy processing other tasks.

Source code in frequenz/channels/util/_timer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MissedTickPolicy(abc.ABC):
    """A policy to handle timer missed ticks.

    This is only relevant if the timer is not ready to trigger when it should
    (an interval passed) which can happen if the event loop is busy processing
    other tasks.
    """

    @abc.abstractmethod
    def calculate_next_tick_time(
        self, *, interval: int, scheduled_tick_time: int, now: int
    ) -> int:
        """Calculate the next tick time according to `missed_tick_policy`.

        This method is called by `ready()` after it has determined that the
        timer has triggered.  It will check if the timer has missed any ticks
        and handle them according to `missed_tick_policy`.

        Args:
            interval: The interval between ticks (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            now: The current loop time (in microseconds).

        Returns:
            The next tick time (in microseconds) according to
                `missed_tick_policy`.
        """
        return 0  # dummy value to avoid darglint warnings

    def __repr__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}()"
Functions¤
__repr__() ¤

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in frequenz/channels/util/_timer.py
69
70
71
72
73
74
75
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}()"
calculate_next_tick_time(*, interval, scheduled_tick_time, now) abstractmethod ¤

Calculate the next tick time according to missed_tick_policy.

This method is called by ready() after it has determined that the timer has triggered. It will check if the timer has missed any ticks and handle them according to missed_tick_policy.

PARAMETER DESCRIPTION
interval

The interval between ticks (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

now

The current loop time (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds) according to missed_tick_policy.

Source code in frequenz/channels/util/_timer.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abc.abstractmethod
def calculate_next_tick_time(
    self, *, interval: int, scheduled_tick_time: int, now: int
) -> int:
    """Calculate the next tick time according to `missed_tick_policy`.

    This method is called by `ready()` after it has determined that the
    timer has triggered.  It will check if the timer has missed any ticks
    and handle them according to `missed_tick_policy`.

    Args:
        interval: The interval between ticks (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        now: The current loop time (in microseconds).

    Returns:
        The next tick time (in microseconds) according to
            `missed_tick_policy`.
    """
    return 0  # dummy value to avoid darglint warnings

frequenz.channels.util.SelectError ¤

Bases: BaseException

A base exception for select.

This exception is raised when a select() iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling ready() for example). It is raised as a group exception (SelectErrorGroup) when a select loop is cleaning up after it's done.

Source code in frequenz/channels/util/_select.py
167
168
169
170
171
172
173
174
175
class SelectError(BaseException):
    """A base exception for [`select`][frequenz.channels.util.select].

    This exception is raised when a `select()` iteration fails.  It is raised as
    a single exception when one receiver fails during normal operation (while calling
    `ready()` for example).  It is raised as a group exception
    ([`SelectErrorGroup`][frequenz.channels.util.SelectErrorGroup]) when a `select` loop
    is cleaning up after it's done.
    """

frequenz.channels.util.SelectErrorGroup ¤

Bases: BaseExceptionGroup[BaseException], SelectError

An exception group for select() operation.

This exception group is raised when a [select()] loops fails while cleaning up runing tasts to check for ready receivers.

Source code in frequenz/channels/util/_select.py
196
197
198
199
200
201
class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError):
    """An exception group for [`select()`][frequenz.channels.util.select] operation.

    This exception group is raised when a [`select()`] loops fails while cleaning up
    runing tasts to check for ready receivers.
    """

frequenz.channels.util.Selected ¤

Bases: Generic[_T]

A result of a select iteration.

The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead.

Selected instances should be used in conjunction with the selected_from() function to determine which receiver was selected.

Please see select for an example.

Source code in frequenz/channels/util/_select.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class Selected(Generic[_T]):
    """A result of a [`select`][frequenz.channels.util.select] iteration.

    The selected receiver is consumed immediately and the received value is stored in
    the instance, unless there was an exception while receiving the value, in which case
    the exception is stored instead.

    `Selected` instances should be used in conjunction with the
    [`selected_from()`][frequenz.channels.util.selected_from] function to determine
    which receiver was selected.

    Please see [`select`][frequenz.channels.util.select] for an example.
    """

    class _EmptyResult:
        """A sentinel value to distinguish between None and empty result.

        We need a sentinel because a result can also be `None`.
        """

        def __repr__(self) -> str:
            return "<empty>"

    def __init__(self, receiver: Receiver[_T]) -> None:
        """Create a new instance.

        The receiver is consumed immediately when creating the instance and the received
        value is stored in the instance for later use as
        [`value`][frequenz.channels.util.Selected.value].  If there was an exception
        while receiving the value, then the exception is stored in the instance instead
        (as [`exception`][frequenz.channels.util.Selected.exception]).

        Args:
            receiver: The receiver that was selected.
        """
        self._recv: Receiver[_T] = receiver
        """The receiver that was selected."""

        self._value: _T | Selected._EmptyResult = Selected._EmptyResult()
        """The value that was received.

        If there was an exception while receiving the value, then this will be `None`.
        """
        self._exception: Exception | None = None
        """The exception that was raised while receiving the value (if any)."""

        try:
            self._value = receiver.consume()
        except Exception as exc:  # pylint: disable=broad-except
            self._exception = exc

        self._handled: bool = False
        """Flag to indicate if this selected has been handled in the if-chain."""

    @property
    def value(self) -> _T:
        """The value that was received, if any.

        Returns:
            The value that was received.

        Raises:
            Exception: If there was an exception while receiving the value. Normally
                this should be an [`frequenz.channels.Error`][frequenz.channels.Error]
                instance, but catches all exceptions in case some receivers can raise
                anything else.

        # noqa: DAR401 _exception
        """
        if self._exception is not None:
            raise self._exception
        assert not isinstance(self._value, Selected._EmptyResult)
        return self._value

    @property
    def exception(self) -> Exception | None:
        """The exception that was raised while receiving the value (if any).

        Returns:
            The exception that was raised while receiving the value (if any).
        """
        return self._exception

    def was_stopped(self) -> bool:
        """Check if the selected receiver was stopped.

        Check if the selected receiver raised
        a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] while
        consuming a value.

        Returns:
            Whether the receiver was stopped.
        """
        return isinstance(self._exception, ReceiverStoppedError)

    def __str__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return (
            f"{type(self).__name__}({self._recv}) -> "
            f"{self._exception or self._value})"
        )

    def __repr__(self) -> str:
        """Return a the internal representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return (
            f"{type(self).__name__}({self._recv=}, {self._value=}, "
            f"{self._exception=}, {self._handled=})"
        )
Attributes¤
exception: Exception | None property ¤

The exception that was raised while receiving the value (if any).

RETURNS DESCRIPTION
Exception | None

The exception that was raised while receiving the value (if any).

value: _T property ¤

The value that was received, if any.

RETURNS DESCRIPTION
_T

The value that was received.

RAISES DESCRIPTION
Exception

If there was an exception while receiving the value. Normally this should be an frequenz.channels.Error instance, but catches all exceptions in case some receivers can raise anything else.

noqa: DAR401 _exception¤
Functions¤
__init__(receiver) ¤

Create a new instance.

The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as value. If there was an exception while receiving the value, then the exception is stored in the instance instead (as exception).

PARAMETER DESCRIPTION
receiver

The receiver that was selected.

TYPE: Receiver[_T]

Source code in frequenz/channels/util/_select.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(self, receiver: Receiver[_T]) -> None:
    """Create a new instance.

    The receiver is consumed immediately when creating the instance and the received
    value is stored in the instance for later use as
    [`value`][frequenz.channels.util.Selected.value].  If there was an exception
    while receiving the value, then the exception is stored in the instance instead
    (as [`exception`][frequenz.channels.util.Selected.exception]).

    Args:
        receiver: The receiver that was selected.
    """
    self._recv: Receiver[_T] = receiver
    """The receiver that was selected."""

    self._value: _T | Selected._EmptyResult = Selected._EmptyResult()
    """The value that was received.

    If there was an exception while receiving the value, then this will be `None`.
    """
    self._exception: Exception | None = None
    """The exception that was raised while receiving the value (if any)."""

    try:
        self._value = receiver.consume()
    except Exception as exc:  # pylint: disable=broad-except
        self._exception = exc

    self._handled: bool = False
    """Flag to indicate if this selected has been handled in the if-chain."""
__repr__() ¤

Return a the internal representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/channels/util/_select.py
126
127
128
129
130
131
132
133
134
135
def __repr__(self) -> str:
    """Return a the internal representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return (
        f"{type(self).__name__}({self._recv=}, {self._value=}, "
        f"{self._exception=}, {self._handled=})"
    )
__str__() ¤

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in frequenz/channels/util/_select.py
115
116
117
118
119
120
121
122
123
124
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return (
        f"{type(self).__name__}({self._recv}) -> "
        f"{self._exception or self._value})"
    )
was_stopped() ¤

Check if the selected receiver was stopped.

Check if the selected receiver raised a ReceiverStoppedError while consuming a value.

RETURNS DESCRIPTION
bool

Whether the receiver was stopped.

Source code in frequenz/channels/util/_select.py
103
104
105
106
107
108
109
110
111
112
113
def was_stopped(self) -> bool:
    """Check if the selected receiver was stopped.

    Check if the selected receiver raised
    a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] while
    consuming a value.

    Returns:
        Whether the receiver was stopped.
    """
    return isinstance(self._exception, ReceiverStoppedError)

frequenz.channels.util.SkipMissedAndDrift ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resets.

This will behave effectively as if the timer was reset() at the time it had triggered last, so the start time will change (and the drift will be accumulated each time a tick is delayed, but only the relative drift will be returned on each tick).

The reset happens only if the delay is larger than delay_tolerance, so it is possible to ignore small delays and not drift in those cases.

Example

Assume a timer with interval 1 second and delay_tolerance=0.1, the first tick, T0, happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so the timer triggers immmediately but drifts a bit. The next tick, T2.2, happens at 2.3 seconds (0.1 seconds late), so it also triggers immediately but it doesn't drift because the delay is under the delay_tolerance. The next tick, T3.2, triggers at 4.3 seconds (1.1 seconds late), so it also triggers immediately but the timer drifts by 1.1 seconds and the tick T4.2 is skipped (not triggered). The next tick, T5.3, triggers at 5.3 seconds so is right on time (no drift) and the same happens for tick T6.3, which triggers at 6.3 seconds.

0         1         2         3         4         5         6
o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
T0          T1         T2.2                T3.2      T5.3      T6.3
Source code in frequenz/channels/util/_timer.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class SkipMissedAndDrift(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resets.

    This will behave effectively as if the timer was `reset()` at the time it
    had triggered last, so the start time will change (and the drift will be
    accumulated each time a tick is delayed, but only the relative drift will
    be returned on each tick).

    The reset happens only if the delay is larger than `delay_tolerance`, so
    it is possible to ignore small delays and not drift in those cases.

    Example:
        Assume a timer with interval 1 second and `delay_tolerance=0.1`, the
        first tick, `T0`, happens exactly at time 0, the second tick, `T1`,
        happens at time 1.2 (0.2 seconds late), so the timer triggers
        immmediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds
        (0.1 seconds late), so it also triggers immediately but it doesn't
        drift because the delay is under the `delay_tolerance`. The next tick,
        `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
        immediately but the timer drifts by 1.1 seconds and the tick `T4.2` is
        skipped (not triggered). The next tick, `T5.3`, triggers at 5.3 seconds
        so is right on time (no drift) and the same happens for tick `T6.3`,
        which triggers at 6.3 seconds.

        ```
        0         1         2         3         4         5         6
        o---------|-o-------|--o------|---------|--o------|--o------|--o--> time
        T0          T1         T2.2                T3.2      T5.3      T6.3
        ```
    """

    def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
        """
        Create an instance.

        See the class documenation for more details.

        Args:
            delay_tolerance: The maximum delay that is tolerated before
                starting to drift.  If a tick is delayed less than this, then
                it is not considered a missed tick and the timer doesn't
                accumulate this drift.

        Raises:
            ValueError: If `delay_tolerance` is negative.
        """
        self._tolerance: int = _to_microseconds(delay_tolerance)
        """The maximum allowed delay before starting to drift."""

        if self._tolerance < 0:
            raise ValueError("delay_tolerance must be positive")

    @property
    def delay_tolerance(self) -> timedelta:
        """Return the maximum delay that is tolerated before starting to drift.

        Returns:
            The maximum delay that is tolerated before starting to drift.
        """
        return timedelta(microseconds=self._tolerance)

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        If the drift is larger than `delay_tolerance`, then it returns `now +
        interval` (so the timer drifts), otherwise it returns
        `scheduled_tick_time + interval` (we consider the delay too small and
        avoid small drifts).

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        drift = now - scheduled_tick_time
        if drift > self._tolerance:
            return now + interval
        return scheduled_tick_time + interval

    def __str__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}({self.delay_tolerance})"

    def __repr__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}({self.delay_tolerance=})"
Attributes¤
delay_tolerance: timedelta property ¤

Return the maximum delay that is tolerated before starting to drift.

RETURNS DESCRIPTION
timedelta

The maximum delay that is tolerated before starting to drift.

Functions¤
__init__(*, delay_tolerance=timedelta(0)) ¤

Create an instance.

See the class documenation for more details.

PARAMETER DESCRIPTION
delay_tolerance

The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift.

TYPE: timedelta DEFAULT: timedelta(0)

RAISES DESCRIPTION
ValueError

If delay_tolerance is negative.

Source code in frequenz/channels/util/_timer.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
    """
    Create an instance.

    See the class documenation for more details.

    Args:
        delay_tolerance: The maximum delay that is tolerated before
            starting to drift.  If a tick is delayed less than this, then
            it is not considered a missed tick and the timer doesn't
            accumulate this drift.

    Raises:
        ValueError: If `delay_tolerance` is negative.
    """
    self._tolerance: int = _to_microseconds(delay_tolerance)
    """The maximum allowed delay before starting to drift."""

    if self._tolerance < 0:
        raise ValueError("delay_tolerance must be positive")
__repr__() ¤

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in frequenz/channels/util/_timer.py
261
262
263
264
265
266
267
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}({self.delay_tolerance=})"
__str__() ¤

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in frequenz/channels/util/_timer.py
253
254
255
256
257
258
259
def __str__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}({self.delay_tolerance})"
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

If the drift is larger than delay_tolerance, then it returns now + interval (so the timer drifts), otherwise it returns scheduled_tick_time + interval (we consider the delay too small and avoid small drifts).

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in frequenz/channels/util/_timer.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    If the drift is larger than `delay_tolerance`, then it returns `now +
    interval` (so the timer drifts), otherwise it returns
    `scheduled_tick_time + interval` (we consider the delay too small and
    avoid small drifts).

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    drift = now - scheduled_tick_time
    if drift > self._tolerance:
        return now + interval
    return scheduled_tick_time + interval

frequenz.channels.util.SkipMissedAndResync ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resyncs.

If ticks are missed, the timer will trigger immediately returing the drift and it will schedule to trigger again on the next multiple of interval, effectively skipping any missed ticks, but resyncing with the original start time.

Example

Assume a timer with interval 1 second, the tick T0 happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so it trigges immediately. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately but the fifth tick, T4, which was also already delayed (by 0.3 seconds) is skipped. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again. The seventh tick, T6, happens at 6.0, right on time.

0         1         2         3         4  o      5         6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0          T1         T2                  T3      T5       T6
Source code in frequenz/channels/util/_timer.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class SkipMissedAndResync(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resyncs.

    If ticks are missed, the timer will trigger immediately returing the drift
    and it will schedule to trigger again on the next multiple of `interval`,
    effectively skipping any missed ticks, but resyncing with the original start
    time.

    Example:
        Assume a timer with interval 1 second, the tick `T0` happens exactly
        at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
        late), so it trigges immediately.  The third tick, `T2`, happens at
        time 2.3 (0.3 seconds late), so it also triggers immediately.  The
        fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
        triggers immediately but the fifth tick, `T4`, which was also
        already delayed (by 0.3 seconds) is skipped.  The sixth tick,
        `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
        again. The seventh tick, `T6`, happens at 6.0, right on time.

        ```
        0         1         2         3         4  o      5         6
        o---------|-o-------|--o------|---------|--o------|o--------o-----> time
        T0          T1         T2                  T3      T5       T6
        ```
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        Calculate the next multiple of `interval` after `scheduled_tick_time`.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        # We need to resync (align) the next tick time to the current time
        drift = now - scheduled_tick_time
        delta_to_next_tick = interval - (drift % interval)
        return now + delta_to_next_tick
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

Calculate the next multiple of interval after scheduled_tick_time.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in frequenz/channels/util/_timer.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    Calculate the next multiple of `interval` after `scheduled_tick_time`.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    # We need to resync (align) the next tick time to the current time
    drift = now - scheduled_tick_time
    delta_to_next_tick = interval - (drift % interval)
    return now + delta_to_next_tick

frequenz.channels.util.Timer ¤

Bases: Receiver[timedelta]

A timer receiver that triggers every interval time.

The timer as microseconds resolution, so the interval must be at least 1 microsecond.

The message it produces is a timedelta containing the drift of the timer, i.e. the difference between when the timer should have triggered and the time when it actually triggered.

This drift will likely never be 0, because if there is a task that is running when it should trigger, the timer will be delayed. In this case the drift will be positive. A negative drift should be technically impossible, as the timer uses asyncios loop monotonic clock.

If the timer is delayed too much, then the timer will behave according to the missed_tick_policy. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy.

The timer accepts an optional loop, which will be used to track the time. If loop is None, then the running loop will be used (if there is no running loop most calls will raise a RuntimeError).

Starting the timer can be delayed if necessary by using auto_start=False (for example until we have a running loop). A call to reset(), ready(), receive() or the async iterator interface to await for a new message will start the timer.

For the most common cases, a specialized constructor is provided:

Periodic timer example
async for drift in Timer.periodic(timedelta(seconds=1.0)):
    print(f"The timer has triggered {drift=}")

But you can also use a select to combine it with other receivers, and even start it (semi) manually:

import logging
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan = Broadcast[int]("input-chan")
battery_data = chan.new_receiver()

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
# Do some other initialization, the timer will start automatically if
# a message is awaited (or manually via `reset()`).
async for selected in select(battery_data, timer):
    if selected_from(selected, battery_data):
        if selected.was_closed():
            logging.warning("battery channel closed")
            continue
        battery_soc = selected.value
    elif selected_from(selected, timer):
        # Print some regular battery data
        print(f"Battery is charged at {battery_soc}%")
Timeout example
import logging
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast

def process_data(data: int):
    logging.info("Processing data: %d", data)

def do_heavy_processing(data: int):
    logging.info("Heavy processing data: %d", data)

timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan1 = Broadcast[int]("input-chan-1")
chan2 = Broadcast[int]("input-chan-2")
battery_data = chan1.new_receiver()
heavy_process = chan2.new_receiver()
async for selected in select(battery_data, heavy_process, timer):
    if selected_from(selected, battery_data):
        if selected.was_closed():
            logging.warning("battery channel closed")
            continue
        process_data(selected.value)
        timer.reset()
    elif selected_from(selected, heavy_process):
        if selected.was_closed():
            logging.warning("processing channel closed")
            continue
        do_heavy_processing(selected.value)
    elif selected_from(selected, timer):
        logging.warning("No data received in time")

In this case do_heavy_processing might take 2 seconds, and we don't want our timeout timer to trigger for the missed ticks, and want the next tick to be relative to the time timer was last triggered.

Source code in frequenz/channels/util/_timer.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
class Timer(Receiver[timedelta]):
    """A timer receiver that triggers every `interval` time.

    The timer as microseconds resolution, so the `interval` must be at least
    1 microsecond.

    The message it produces is a `timedelta` containing the drift of the timer,
    i.e. the difference between when the timer should have triggered and the time
    when it actually triggered.

    This drift will likely never be `0`, because if there is a task that is
    running when it should trigger, the timer will be delayed. In this case the
    drift will be positive. A negative drift should be technically impossible,
    as the timer uses `asyncio`s loop monotonic clock.

    If the timer is delayed too much, then the timer will behave according to
    the `missed_tick_policy`. Missing ticks might or might not trigger
    a message and the drift could be accumulated or not depending on the
    chosen policy.

    The timer accepts an optional `loop`, which will be used to track the time.
    If `loop` is `None`, then the running loop will be used (if there is no
    running loop most calls will raise a `RuntimeError`).

    Starting the timer can be delayed if necessary by using `auto_start=False`
    (for example until we have a running loop). A call to `reset()`, `ready()`,
    `receive()` or the async iterator interface to await for a new message will
    start the timer.

    For the most common cases, a specialized constructor is provided:

    * [`periodic()`][frequenz.channels.util.Timer.periodic]
    * [`timeout()`][frequenz.channels.util.Timer.timeout]

    Example: Periodic timer example
        ```python
        async for drift in Timer.periodic(timedelta(seconds=1.0)):
            print(f"The timer has triggered {drift=}")
        ```

        But you can also use a [`select`][frequenz.channels.util.select] to combine
        it with other receivers, and even start it (semi) manually:

        ```python
        import logging
        from frequenz.channels.util import select, selected_from
        from frequenz.channels import Broadcast

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        chan = Broadcast[int]("input-chan")
        battery_data = chan.new_receiver()

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        # Do some other initialization, the timer will start automatically if
        # a message is awaited (or manually via `reset()`).
        async for selected in select(battery_data, timer):
            if selected_from(selected, battery_data):
                if selected.was_closed():
                    logging.warning("battery channel closed")
                    continue
                battery_soc = selected.value
            elif selected_from(selected, timer):
                # Print some regular battery data
                print(f"Battery is charged at {battery_soc}%")
        ```

    Example: Timeout example
        ```python
        import logging
        from frequenz.channels.util import select, selected_from
        from frequenz.channels import Broadcast

        def process_data(data: int):
            logging.info("Processing data: %d", data)

        def do_heavy_processing(data: int):
            logging.info("Heavy processing data: %d", data)

        timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
        chan1 = Broadcast[int]("input-chan-1")
        chan2 = Broadcast[int]("input-chan-2")
        battery_data = chan1.new_receiver()
        heavy_process = chan2.new_receiver()
        async for selected in select(battery_data, heavy_process, timer):
            if selected_from(selected, battery_data):
                if selected.was_closed():
                    logging.warning("battery channel closed")
                    continue
                process_data(selected.value)
                timer.reset()
            elif selected_from(selected, heavy_process):
                if selected.was_closed():
                    logging.warning("processing channel closed")
                    continue
                do_heavy_processing(selected.value)
            elif selected_from(selected, timer):
                logging.warning("No data received in time")
        ```

        In this case `do_heavy_processing` might take 2 seconds, and we don't
        want our timeout timer to trigger for the missed ticks, and want the
        next tick to be relative to the time timer was last triggered.
    """

    def __init__(
        self,
        interval: timedelta,
        missed_tick_policy: MissedTickPolicy,
        /,
        *,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """Create an instance.

        See the class documentation for details.

        Args:
            interval: The time between timer ticks. Must be at least
                1 microsecond.
            missed_tick_policy: The policy of the timer when it misses a tick.
                Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift`
                or a custom class deriving from `MissedTickPolicy`. See the
                documentation of the each class for more details.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        self._interval: int = _to_microseconds(interval)
        """The time to between timer ticks."""

        self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
        """The policy of the timer when it misses a tick.

        See the documentation of `MissedTickPolicy` for details.
        """

        self._loop: asyncio.AbstractEventLoop = (
            loop if loop is not None else asyncio.get_running_loop()
        )
        """The event loop to use to track time."""

        self._stopped: bool = True
        """Whether the timer was requested to stop.

        If this is `False`, then the timer is running.

        If this is `True`, then it is stopped or there is a request to stop it
        or it was not started yet:

        * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
          created with `auto_start=False`).  Any receiving method will start
          it by calling `reset()` in this case.

        * If `_next_msg_time` is not `None`, it means there was a request to
          stop it.  In this case receiving methods will raise
          a `ReceiverClosedError`.
        """

        self._next_tick_time: int | None = None
        """The absolute (monotonic) time when the timer should trigger.

        If this is `None`, it means the timer didn't start yet, but it should
        be started as soon as it is used.
        """

        self._current_drift: timedelta | None = None
        """The difference between `_next_msg_time` and the triggered time.

        This is calculated by `ready()` but is returned by `consume()`. If
        `None` it means `ready()` wasn't called and `consume()` will assert.
        `consume()` will set it back to `None` to tell `ready()` that it needs
        to wait again.
        """

        if self._interval <= 0:
            raise ValueError(
                "The `interval` must be positive and at least 1 microsecond, "
                f"not {interval} ({self._interval} microseconds)"
            )

        if auto_start:
            self.reset()

    @classmethod
    def timeout(
        cls,
        delay: timedelta,
        /,
        *,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a timer useful for tracking timeouts.

        This is basically a shortcut to create a timer with
        `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy.

        See the class documentation for details.

        Args:
            delay: The time until the timer ticks. Must be at least
                1 microsecond.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        return Timer(
            delay,
            SkipMissedAndDrift(delay_tolerance=timedelta(0)),
            auto_start=auto_start,
            loop=loop,
        )

    @classmethod
    def periodic(
        cls,
        period: timedelta,
        /,
        *,
        skip_missed_ticks: bool = False,
        auto_start: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a periodic timer.

        This is basically a shortcut to create a timer with either
        `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy
        (depending on `skip_missed_ticks`).

        See the class documentation for details.

        Args:
            period: The time between timer ticks. Must be at least
                1 microsecond.
            skip_missed_ticks: Whether to skip missed ticks or trigger them
                all until it catches up.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond.
        """
        missed_tick_policy = (
            SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
        )
        return Timer(
            period,
            missed_tick_policy,
            auto_start=auto_start,
            loop=loop,
        )

    @property
    def interval(self) -> timedelta:
        """The interval between timer ticks.

        Returns:
            The interval between timer ticks.
        """
        return timedelta(microseconds=self._interval)

    @property
    def missed_tick_policy(self) -> MissedTickPolicy:
        """The policy of the timer when it misses a tick.

        Returns:
            The policy of the timer when it misses a tick.
        """
        return self._missed_tick_policy

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        """The event loop used by the timer to track time.

        Returns:
            The event loop used by the timer to track time.
        """
        return self._loop

    @property
    def is_running(self) -> bool:
        """Whether the timer is running.

        This will be `False` if the timer was stopped, or not started yet.

        Returns:
            Whether the timer is running.
        """
        return not self._stopped

    def reset(self) -> None:
        """Reset the timer to start timing from now.

        If the timer was stopped, or not started yet, it will be started.

        This can only be called with a running loop, see the class
        documentation for more details.

        Raises:
            RuntimeError: if it was called without a running loop.
        """
        self._stopped = False
        self._next_tick_time = self._now() + self._interval
        self._current_drift = None

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to `ready()` will
        immediately return False and calls to `consume()` / `receive()` or any
        use of the async iterator interface will raise
        a `ReceiverStoppedError`.

        You can restart the timer with `reset()`.
        """
        self._stopped = True
        # We need to make sure it's not None, otherwise `ready()` will start it
        self._next_tick_time = self._now()

    async def ready(self) -> bool:
        """Wait until the timer `interval` passed.

        Once a call to `ready()` has finished, the resulting tick information
        must be read with a call to `consume()` (`receive()` or iterated over)
        to tell the timer it should wait for the next interval.

        The timer will remain ready (this method will return immediately)
        until it is consumed.

        Returns:
            Whether the timer was started and it is still running.

        Raises:
            RuntimeError: if it was called without a running loop.
        """
        # If there are messages waiting to be consumed, return immediately.
        if self._current_drift is not None:
            return True

        # If `_next_tick_time` is `None`, it means it was created with
        # `auto_start=False` and should be started.
        if self._next_tick_time is None:
            self.reset()
            assert (
                self._next_tick_time is not None
            ), "This should be assigned by reset()"

        # If a stop was explicitly requested, we bail out.
        if self._stopped:
            return False

        now = self._now()
        time_to_next_tick = self._next_tick_time - now

        # If we didn't reach the tick yet, sleep until we do.
        # We need to do this in a loop also reacting to the reset event, as the timer
        # could be reset while we are sleeping, in which case we need to recalculate
        # the time to the next tick and try again.
        while time_to_next_tick > 0:
            await asyncio.sleep(time_to_next_tick / 1_000_000)
            now = self._now()
            time_to_next_tick = self._next_tick_time - now

        # If a stop was explicitly requested during the sleep, we bail out.
        if self._stopped:
            return False

        self._current_drift = timedelta(microseconds=now - self._next_tick_time)
        self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
            now=now,
            scheduled_tick_time=self._next_tick_time,
            interval=self._interval,
        )

        return True

    def consume(self) -> timedelta:
        """Return the latest drift once `ready()` is complete.

        Once the timer has triggered (`ready()` is done), this method returns the
        difference between when the timer should have triggered and the time when
        it actually triggered. See the class documentation for more details.

        Returns:
            The difference between when the timer should have triggered and the
                time when it actually did.

        Raises:
            ReceiverStoppedError: if the timer was stopped via `stop()`.
        """
        # If it was stopped and there it no pending result, we raise
        # (if there is a pending result, then we still want to return it first)
        if self._stopped and self._current_drift is None:
            raise ReceiverStoppedError(self)

        assert (
            self._current_drift is not None
        ), "calls to `consume()` must be follow a call to `ready()`"
        drift = self._current_drift
        self._current_drift = None
        return drift

    def _now(self) -> int:
        """Return the current monotonic clock time in microseconds.

        Returns:
            The current monotonic clock time in microseconds.
        """
        return _to_microseconds(self._loop.time())

    def __str__(self) -> str:
        """Return a string representation of the timer.

        Returns:
            The string representation of the timer.
        """
        return f"{type(self).__name__}({self.interval})"

    def __repr__(self) -> str:
        """Return a string representation of the timer.

        Returns:
            The string representation of the timer.
        """
        return (
            f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
            f"{self.loop=}, {self.is_running=}>"
        )
Attributes¤
interval: timedelta property ¤

The interval between timer ticks.

RETURNS DESCRIPTION
timedelta

The interval between timer ticks.

is_running: bool property ¤

Whether the timer is running.

This will be False if the timer was stopped, or not started yet.

RETURNS DESCRIPTION
bool

Whether the timer is running.

loop: asyncio.AbstractEventLoop property ¤

The event loop used by the timer to track time.

RETURNS DESCRIPTION
AbstractEventLoop

The event loop used by the timer to track time.

missed_tick_policy: MissedTickPolicy property ¤

The policy of the timer when it misses a tick.

RETURNS DESCRIPTION
MissedTickPolicy

The policy of the timer when it misses a tick.

Functions¤
__init__(interval, missed_tick_policy, /, *, auto_start=True, loop=None) ¤

Create an instance.

See the class documentation for details.

PARAMETER DESCRIPTION
interval

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

missed_tick_policy

The policy of the timer when it misses a tick. Commonly one of TriggerAllMissed, SkipMissedAndResync, SkipMissedAndDrift or a custom class deriving from MissedTickPolicy. See the documentation of the each class for more details.

TYPE: MissedTickPolicy

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in frequenz/channels/util/_timer.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def __init__(
    self,
    interval: timedelta,
    missed_tick_policy: MissedTickPolicy,
    /,
    *,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> None:
    """Create an instance.

    See the class documentation for details.

    Args:
        interval: The time between timer ticks. Must be at least
            1 microsecond.
        missed_tick_policy: The policy of the timer when it misses a tick.
            Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift`
            or a custom class deriving from `MissedTickPolicy`. See the
            documentation of the each class for more details.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    self._interval: int = _to_microseconds(interval)
    """The time to between timer ticks."""

    self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
    """The policy of the timer when it misses a tick.

    See the documentation of `MissedTickPolicy` for details.
    """

    self._loop: asyncio.AbstractEventLoop = (
        loop if loop is not None else asyncio.get_running_loop()
    )
    """The event loop to use to track time."""

    self._stopped: bool = True
    """Whether the timer was requested to stop.

    If this is `False`, then the timer is running.

    If this is `True`, then it is stopped or there is a request to stop it
    or it was not started yet:

    * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
      created with `auto_start=False`).  Any receiving method will start
      it by calling `reset()` in this case.

    * If `_next_msg_time` is not `None`, it means there was a request to
      stop it.  In this case receiving methods will raise
      a `ReceiverClosedError`.
    """

    self._next_tick_time: int | None = None
    """The absolute (monotonic) time when the timer should trigger.

    If this is `None`, it means the timer didn't start yet, but it should
    be started as soon as it is used.
    """

    self._current_drift: timedelta | None = None
    """The difference between `_next_msg_time` and the triggered time.

    This is calculated by `ready()` but is returned by `consume()`. If
    `None` it means `ready()` wasn't called and `consume()` will assert.
    `consume()` will set it back to `None` to tell `ready()` that it needs
    to wait again.
    """

    if self._interval <= 0:
        raise ValueError(
            "The `interval` must be positive and at least 1 microsecond, "
            f"not {interval} ({self._interval} microseconds)"
        )

    if auto_start:
        self.reset()
__repr__() ¤

Return a string representation of the timer.

RETURNS DESCRIPTION
str

The string representation of the timer.

Source code in frequenz/channels/util/_timer.py
720
721
722
723
724
725
726
727
728
729
def __repr__(self) -> str:
    """Return a string representation of the timer.

    Returns:
        The string representation of the timer.
    """
    return (
        f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
        f"{self.loop=}, {self.is_running=}>"
    )
__str__() ¤

Return a string representation of the timer.

RETURNS DESCRIPTION
str

The string representation of the timer.

Source code in frequenz/channels/util/_timer.py
712
713
714
715
716
717
718
def __str__(self) -> str:
    """Return a string representation of the timer.

    Returns:
        The string representation of the timer.
    """
    return f"{type(self).__name__}({self.interval})"
consume() ¤

Return the latest drift once ready() is complete.

Once the timer has triggered (ready() is done), this method returns the difference between when the timer should have triggered and the time when it actually triggered. See the class documentation for more details.

RETURNS DESCRIPTION
timedelta

The difference between when the timer should have triggered and the time when it actually did.

RAISES DESCRIPTION
ReceiverStoppedError

if the timer was stopped via stop().

Source code in frequenz/channels/util/_timer.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
def consume(self) -> timedelta:
    """Return the latest drift once `ready()` is complete.

    Once the timer has triggered (`ready()` is done), this method returns the
    difference between when the timer should have triggered and the time when
    it actually triggered. See the class documentation for more details.

    Returns:
        The difference between when the timer should have triggered and the
            time when it actually did.

    Raises:
        ReceiverStoppedError: if the timer was stopped via `stop()`.
    """
    # If it was stopped and there it no pending result, we raise
    # (if there is a pending result, then we still want to return it first)
    if self._stopped and self._current_drift is None:
        raise ReceiverStoppedError(self)

    assert (
        self._current_drift is not None
    ), "calls to `consume()` must be follow a call to `ready()`"
    drift = self._current_drift
    self._current_drift = None
    return drift
periodic(period, /, *, skip_missed_ticks=False, auto_start=True, loop=None) classmethod ¤

Create a periodic timer.

This is basically a shortcut to create a timer with either TriggerAllMissed() or SkipMissedAndResync() as the missed tick policy (depending on skip_missed_ticks).

See the class documentation for details.

PARAMETER DESCRIPTION
period

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

skip_missed_ticks

Whether to skip missed ticks or trigger them all until it catches up.

TYPE: bool DEFAULT: False

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in frequenz/channels/util/_timer.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
@classmethod
def periodic(
    cls,
    period: timedelta,
    /,
    *,
    skip_missed_ticks: bool = False,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a periodic timer.

    This is basically a shortcut to create a timer with either
    `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy
    (depending on `skip_missed_ticks`).

    See the class documentation for details.

    Args:
        period: The time between timer ticks. Must be at least
            1 microsecond.
        skip_missed_ticks: Whether to skip missed ticks or trigger them
            all until it catches up.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    missed_tick_policy = (
        SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
    )
    return Timer(
        period,
        missed_tick_policy,
        auto_start=auto_start,
        loop=loop,
    )
ready() async ¤

Wait until the timer interval passed.

Once a call to ready() has finished, the resulting tick information must be read with a call to consume() (receive() or iterated over) to tell the timer it should wait for the next interval.

The timer will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the timer was started and it is still running.

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

Source code in frequenz/channels/util/_timer.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
async def ready(self) -> bool:
    """Wait until the timer `interval` passed.

    Once a call to `ready()` has finished, the resulting tick information
    must be read with a call to `consume()` (`receive()` or iterated over)
    to tell the timer it should wait for the next interval.

    The timer will remain ready (this method will return immediately)
    until it is consumed.

    Returns:
        Whether the timer was started and it is still running.

    Raises:
        RuntimeError: if it was called without a running loop.
    """
    # If there are messages waiting to be consumed, return immediately.
    if self._current_drift is not None:
        return True

    # If `_next_tick_time` is `None`, it means it was created with
    # `auto_start=False` and should be started.
    if self._next_tick_time is None:
        self.reset()
        assert (
            self._next_tick_time is not None
        ), "This should be assigned by reset()"

    # If a stop was explicitly requested, we bail out.
    if self._stopped:
        return False

    now = self._now()
    time_to_next_tick = self._next_tick_time - now

    # If we didn't reach the tick yet, sleep until we do.
    # We need to do this in a loop also reacting to the reset event, as the timer
    # could be reset while we are sleeping, in which case we need to recalculate
    # the time to the next tick and try again.
    while time_to_next_tick > 0:
        await asyncio.sleep(time_to_next_tick / 1_000_000)
        now = self._now()
        time_to_next_tick = self._next_tick_time - now

    # If a stop was explicitly requested during the sleep, we bail out.
    if self._stopped:
        return False

    self._current_drift = timedelta(microseconds=now - self._next_tick_time)
    self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
        now=now,
        scheduled_tick_time=self._next_tick_time,
        interval=self._interval,
    )

    return True
reset() ¤

Reset the timer to start timing from now.

If the timer was stopped, or not started yet, it will be started.

This can only be called with a running loop, see the class documentation for more details.

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

Source code in frequenz/channels/util/_timer.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def reset(self) -> None:
    """Reset the timer to start timing from now.

    If the timer was stopped, or not started yet, it will be started.

    This can only be called with a running loop, see the class
    documentation for more details.

    Raises:
        RuntimeError: if it was called without a running loop.
    """
    self._stopped = False
    self._next_tick_time = self._now() + self._interval
    self._current_drift = None
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to ready() will immediately return False and calls to consume() / receive() or any use of the async iterator interface will raise a ReceiverStoppedError.

You can restart the timer with reset().

Source code in frequenz/channels/util/_timer.py
607
608
609
610
611
612
613
614
615
616
617
618
619
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to `ready()` will
    immediately return False and calls to `consume()` / `receive()` or any
    use of the async iterator interface will raise
    a `ReceiverStoppedError`.

    You can restart the timer with `reset()`.
    """
    self._stopped = True
    # We need to make sure it's not None, otherwise `ready()` will start it
    self._next_tick_time = self._now()
timeout(delay, /, *, auto_start=True, loop=None) classmethod ¤

Create a timer useful for tracking timeouts.

This is basically a shortcut to create a timer with SkipMissedAndDrift(delay_tolerance=timedelta(0)) as the missed tick policy.

See the class documentation for details.

PARAMETER DESCRIPTION
delay

The time until the timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond.

Source code in frequenz/channels/util/_timer.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
@classmethod
def timeout(
    cls,
    delay: timedelta,
    /,
    *,
    auto_start: bool = True,
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a timer useful for tracking timeouts.

    This is basically a shortcut to create a timer with
    `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy.

    See the class documentation for details.

    Args:
        delay: The time until the timer ticks. Must be at least
            1 microsecond.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond.
    """
    return Timer(
        delay,
        SkipMissedAndDrift(delay_tolerance=timedelta(0)),
        auto_start=auto_start,
        loop=loop,
    )

frequenz.channels.util.TriggerAllMissed ¤

Bases: MissedTickPolicy

A policy that triggers all the missed ticks immediately until it catches up.

Example

Assume a timer with interval 1 second, the tick T0 happens exactly at time 0, the second tick, T1, happens at time 1.2 (0.2 seconds late), so it trigges immediately. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately as well as the fifth tick, T4, which was also already delayed (by 0.3 seconds), so it catches up. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again. The seventh tick, T6, happens at 6.0, right on time.

0         1         2         3         4  o      5         6
o---------|-o-------|--o------|---------|--o------|o--------o-----> time
T0          T1         T2                  T3      T5       T6
                                           T4
Source code in frequenz/channels/util/_timer.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class TriggerAllMissed(MissedTickPolicy):
    """A policy that triggers all the missed ticks immediately until it catches up.

    Example:
        Assume a timer with interval 1 second, the tick `T0` happens exactly
        at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
        late), so it trigges immediately.  The third tick, `T2`, happens at
        time 2.3 (0.3 seconds late), so it also triggers immediately.  The
        fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
        triggers immediately as well as the fifth tick, `T4`, which was also
        already delayed (by 0.3 seconds), so it catches up.  The sixth tick,
        `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
        again. The seventh tick, `T6`, happens at 6.0, right on time.

        ```
        0         1         2         3         4  o      5         6
        o---------|-o-------|--o------|---------|--o------|o--------o-----> time
        T0          T1         T2                  T3      T5       T6
                                                   T4
        ```
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        This method always returns `scheduled_tick_time + interval`, as all
        ticks need to produce a trigger event.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        return scheduled_tick_time + interval
Functions¤
calculate_next_tick_time(*, now, scheduled_tick_time, interval) ¤

Calculate the next tick time.

This method always returns scheduled_tick_time + interval, as all ticks need to produce a trigger event.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in frequenz/channels/util/_timer.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    This method always returns `scheduled_tick_time + interval`, as all
    ticks need to produce a trigger event.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    return scheduled_tick_time + interval

frequenz.channels.util.UnhandledSelectedError ¤

Bases: SelectError, Generic[_T]

A receiver was not handled in a select() loop.

This exception is raised when a select() iteration finishes without a call to selected_from() for the selected receiver.

Source code in frequenz/channels/util/_select.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class UnhandledSelectedError(SelectError, Generic[_T]):
    """A receiver was not handled in a [`select()`][frequenz.channels.util.select] loop.

    This exception is raised when a `select()` iteration finishes without a call to
    [`selected_from()`][frequenz.channels.util.selected_from] for the selected receiver.
    """

    def __init__(self, selected: Selected[_T]) -> None:
        """Create a new instance.

        Args:
            selected: The selected receiver that was not handled.
        """
        recv = selected._recv  # pylint: disable=protected-access
        super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
        self.selected = selected
Functions¤
__init__(selected) ¤

Create a new instance.

PARAMETER DESCRIPTION
selected

The selected receiver that was not handled.

TYPE: Selected[_T]

Source code in frequenz/channels/util/_select.py
185
186
187
188
189
190
191
192
193
def __init__(self, selected: Selected[_T]) -> None:
    """Create a new instance.

    Args:
        selected: The selected receiver that was not handled.
    """
    recv = selected._recv  # pylint: disable=protected-access
    super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
    self.selected = selected

Functions¤

frequenz.channels.util.select(*receivers) async ¤

Iterate over the values of all receivers as they receive new values.

This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the Selected class and the selected_from() function to determine which function to determine which receiver was selected in a select operation.

An exhaustiveness check is performed at runtime to make sure all selected receivers are handled in the if-chain, so you should call selected_from() with all the receivers passed to select() inside the select loop, even if you plan to ignore a value, to signal select() that you are purposefully ignoring the value.

Note

The select() function is intended to be used in cases where the set of receivers is static and known beforehand. If you need to dynamically add/remove receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you:

  • Use Merge or MergeNamed: this is useful when you have and unknown number of receivers of the same type that can be handled as a group.
  • Use tasks to manage each recever individually: this is better if there are no relationships between the receivers.
  • Break the select() loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never

from frequenz.channels import ReceiverStoppedError
from frequenz.channels.util import select, selected_from, Timer

timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

async for selected in select(timer1, timer2):
    if selected_from(selected, timer1):
        # Beware: `selected.value` might raise an exception, you can always
        # check for exceptions with `selected.exception` first or use
        # a try-except block. You can also quickly check if the receiver was
        # stopped and let any other unexpected exceptions bubble up.
        if selected.was_stopped:
            print("timer1 was stopped")
            continue
        print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
        timer2.stop()
    elif selected_from(selected, timer2):
        # Explicitly handling of exceptions
        match selected.exception:
            case ReceiverStoppedError():
                print("timer2 was stopped")
            case Exception() as exception:
                print(f"timer2: exception={exception}")
            case None:
                # All good, no exception, we can use `selected.value` safely
                print(
                    f"timer2: now={datetime.datetime.now()} drift={selected.value}"
                )
            case _ as unhanded:
                assert_never(unhanded)
    else:
        # This is not necessary, as select() will check for exhaustiveness, but
        # it is good practice to have it in case you forgot to handle a new
        # receiver added to `select()` at a later point in time.
        assert False
PARAMETER DESCRIPTION
*receivers

The receivers to select from.

TYPE: Receiver[Any] DEFAULT: ()

YIELDS DESCRIPTION
AsyncIterator[Selected[Any]]

The currently selected item.

RAISES DESCRIPTION
UnhandledSelectedError

If a selected receiver was not handled in the if-chain.

SelectErrorGroup

If there is an error while finishing the select operation and receivers fail while cleaning up.

SelectError

If there is an error while selecting receivers during normal operation. For example if a receiver raises an exception in the ready() method. Normal errors while receiving values are not raised, but reported via the Selected instance.

Source code in frequenz/channels/util/_select.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
    """Iterate over the values of all receivers as they receive new values.

    This function is used to iterate over the values of all receivers as they receive
    new values.  It is used in conjunction with the
    [`Selected`][frequenz.channels.util.Selected] class and the
    [`selected_from()`][frequenz.channels.util.selected_from] function to determine
    which function to determine which receiver was selected in a select operation.

    An exhaustiveness check is performed at runtime to make sure all selected receivers
    are handled in the if-chain, so you should call `selected_from()` with all the
    receivers passed to `select()` inside the select loop, even if you plan to ignore
    a value, to signal `select()` that you are purposefully ignoring the value.

    Note:
        The `select()` function is intended to be used in cases where the set of
        receivers is static and known beforehand.  If you need to dynamically add/remove
        receivers from a select loop, there are a few alternatives.  Depending on your
        use case, one or the other could work better for you:

        * Use [`Merge`][frequenz.channels.util.Merge] or
          [`MergeNamed`][frequenz.channels.util.MergeNamed]: this is useful when you
          have and unknown number of receivers of the same type that can be handled as
          a group.
        * Use tasks to manage each recever individually: this is better if there are no
          relationships between the receivers.
        * Break the `select()` loop and start a new one with the new set of receivers
          (this should be the last resort, as it has some performance implications
           because the loop needs to be restarted).

    Example:
        ```python
        import datetime
        from typing import assert_never

        from frequenz.channels import ReceiverStoppedError
        from frequenz.channels.util import select, selected_from, Timer

        timer1 = Timer.periodic(datetime.timedelta(seconds=1))
        timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

        async for selected in select(timer1, timer2):
            if selected_from(selected, timer1):
                # Beware: `selected.value` might raise an exception, you can always
                # check for exceptions with `selected.exception` first or use
                # a try-except block. You can also quickly check if the receiver was
                # stopped and let any other unexpected exceptions bubble up.
                if selected.was_stopped:
                    print("timer1 was stopped")
                    continue
                print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
                timer2.stop()
            elif selected_from(selected, timer2):
                # Explicitly handling of exceptions
                match selected.exception:
                    case ReceiverStoppedError():
                        print("timer2 was stopped")
                    case Exception() as exception:
                        print(f"timer2: exception={exception}")
                    case None:
                        # All good, no exception, we can use `selected.value` safely
                        print(
                            f"timer2: now={datetime.datetime.now()} drift={selected.value}"
                        )
                    case _ as unhanded:
                        assert_never(unhanded)
            else:
                # This is not necessary, as select() will check for exhaustiveness, but
                # it is good practice to have it in case you forgot to handle a new
                # receiver added to `select()` at a later point in time.
                assert False
        ```

    Args:
        *receivers: The receivers to select from.

    Yields:
        The currently selected item.

    Raises:
        UnhandledSelectedError: If a selected receiver was not handled in the if-chain.
        SelectErrorGroup: If there is an error while finishing the select operation and
            receivers fail while cleaning up.
        SelectError: If there is an error while selecting receivers during normal
            operation.  For example if a receiver raises an exception in the `ready()`
            method.  Normal errors while receiving values are not raised, but reported
            via the `Selected` instance.
    """
    receivers_map: dict[str, Receiver[Any]] = {str(hash(r)): r for r in receivers}
    pending: set[asyncio.Task[bool]] = set()

    try:
        for name, recv in receivers_map.items():
            pending.add(asyncio.create_task(recv.ready(), name=name))

        while pending:
            done, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )

            for task in done:
                receiver_active: bool = True
                name = task.get_name()
                recv = receivers_map[name]
                if exception := task.exception():
                    match exception:
                        case asyncio.CancelledError():
                            # If the receiver was cancelled, then it means we want to
                            # exit the select loop, so we handle the receiver but we
                            # don't add it back to the pending list.
                            receiver_active = False
                        case _ as exc:
                            raise SelectError(f"Error while selecting {recv}") from exc

                selected = Selected(recv)
                yield selected
                if not selected._handled:  # pylint: disable=protected-access
                    raise UnhandledSelectedError(selected)

                receiver_active = task.result()
                if not receiver_active:
                    continue

                # Add back the receiver to the pending list
                name = task.get_name()
                recv = receivers_map[name]
                pending.add(asyncio.create_task(recv.ready(), name=name))
    finally:
        await _stop_pending_tasks(pending)

frequenz.channels.util.selected_from(selected, receiver) ¤

Check if the given receiver was selected by select.

This function is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

receiver

The receiver to check if it was the source of a select operation.

TYPE: Receiver[_T]

RETURNS DESCRIPTION
TypeGuard[Selected[_T]]

Whether the given receiver was selected.

Source code in frequenz/channels/util/_select.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def selected_from(
    selected: Selected[Any], receiver: Receiver[_T]
) -> TypeGuard[Selected[_T]]:
    """Check if the given receiver was selected by [`select`][frequenz.channels.util.select].

    This function is used in conjunction with the
    [`Selected`][frequenz.channels.util.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select`][frequenz.channels.util.select] for an example.

    Args:
        selected: The result of a `select()` iteration.
        receiver: The receiver to check if it was the source of a select operation.

    Returns:
        Whether the given receiver was selected.
    """
    if handled := selected._recv is receiver:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled