Skip to content

util

frequenz.channels.util ¤

Channel utilities.

A module with several utilities to work with channels:

  • FileWatcher: A receiver that watches for file events.

  • Merge: A receiver that merge messages coming from multiple receivers into a single stream.

  • MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.

  • Select: A helper to select the next available message for each receiver in a group of receivers.

  • Timer: A receiver that emits a now timestamp every interval seconds.

Classes¤

frequenz.channels.util.FileWatcher ¤

Bases: Receiver[pathlib.Path]

A channel receiver that watches for file events.

Source code in frequenz/channels/util/_file_watcher.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class FileWatcher(Receiver[pathlib.Path]):
    """A channel receiver that watches for file events."""

    class EventType(Enum):
        """Available types of changes to watch for."""

        CREATE = Change.added
        MODIFY = Change.modified
        DELETE = Change.deleted

    def __init__(
        self,
        paths: List[Union[pathlib.Path, str]],
        event_types: Optional[Set[EventType]] = None,
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for or `None` to watch for
                all event types.
        """
        if event_types is None:
            event_types = set(FileWatcher.EventType)  # all types

        self.event_types = event_types
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths,
            stop_event=self._stop_event,
            watch_filter=lambda change, path_str: (
                change in [event_type.value for event_type in event_types]  # type: ignore
                and pathlib.Path(path_str).is_file()
            ),
        )
        self._awatch_stopped_exc: Optional[Exception] = None
        self._changes: Set[FileChange] = set()

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return True

        # if it was already stopped, return immediately.
        if self._awatch_stopped_exc is not None:
            return False

        try:
            self._changes = await self._awatch.__anext__()
        except StopAsyncIteration as err:
            self._awatch_stopped_exc = err

        return True

    def consume(self) -> pathlib.Path:
        """Return the latest change once `ready` is complete.

        Returns:
            The next change that was received.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
        """
        if not self._changes and self._awatch_stopped_exc is not None:
            raise ReceiverStoppedError(self) from self._awatch_stopped_exc

        assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
        change = self._changes.pop()
        # Tuple of (Change, path) returned by watchfiles
        _, path_str = change
        path = pathlib.Path(path_str)
        return path
Classes¤
EventType ¤

Bases: Enum

Available types of changes to watch for.

Source code in frequenz/channels/util/_file_watcher.py
20
21
22
23
24
25
class EventType(Enum):
    """Available types of changes to watch for."""

    CREATE = Change.added
    MODIFY = Change.modified
    DELETE = Change.deleted
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in frequenz/channels/util/_file_watcher.py
59
60
61
62
63
64
65
66
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=None) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: List[Union[pathlib.Path, str]]

event_types

Types of events to watch for or None to watch for all event types.

TYPE: Optional[Set[EventType]] DEFAULT: None

Source code in frequenz/channels/util/_file_watcher.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    paths: List[Union[pathlib.Path, str]],
    event_types: Optional[Set[EventType]] = None,
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for or `None` to watch for
            all event types.
    """
    if event_types is None:
        event_types = set(FileWatcher.EventType)  # all types

    self.event_types = event_types
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths,
        stop_event=self._stop_event,
        watch_filter=lambda change, path_str: (
            change in [event_type.value for event_type in event_types]  # type: ignore
            and pathlib.Path(path_str).is_file()
        ),
    )
    self._awatch_stopped_exc: Optional[Exception] = None
    self._changes: Set[FileChange] = set()
consume() ¤

Return the latest change once ready is complete.

RETURNS DESCRIPTION
pathlib.Path

The next change that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_file_watcher.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def consume(self) -> pathlib.Path:
    """Return the latest change once `ready` is complete.

    Returns:
        The next change that was received.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
    """
    if not self._changes and self._awatch_stopped_exc is not None:
        raise ReceiverStoppedError(self) from self._awatch_stopped_exc

    assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
    change = self._changes.pop()
    # Tuple of (Change, path) returned by watchfiles
    _, path_str = change
    path = pathlib.Path(path_str)
    return path
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_file_watcher.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return True

    # if it was already stopped, return immediately.
    if self._awatch_stopped_exc is not None:
        return False

    try:
        self._changes = await self._awatch.__anext__()
    except StopAsyncIteration as err:
        self._awatch_stopped_exc = err

    return True

frequenz.channels.util.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass

When merge is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in frequenz/channels/util/_merge.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```

        When `merge` is no longer needed, then it should be stopped using
        `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def stop(self) -> None:
        """Stop the `Merge` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> T:
        """Return the latest value once `ready` is complete.

        Returns:
            The next value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge.py
46
47
48
49
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in frequenz/channels/util/_merge.py
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
T

The next value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_merge.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def consume(self) -> T:
    """Return the latest value once `ready` is complete.

    Returns:
        The next value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_merge.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the Merge instance and cleanup any pending tasks.

Source code in frequenz/channels/util/_merge.py
51
52
53
54
55
56
async def stop(self) -> None:
    """Stop the `Merge` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

When MergeNamed is no longer needed, then it should be stopped using self.stop() method. This will cleanup any internal pending async tasks.

Source code in frequenz/channels/util/_merge_named.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream.

    When `MergeNamed` is no longer needed, then it should be stopped using
    `self.stop()` method. This will cleanup any internal pending async tasks.
    """

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def stop(self) -> None:
        """Stop the `MergeNamed` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append((name, result))
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> Tuple[str, T]:
        """Return the latest value once `ready` is complete.

        Returns:
            The next key, value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceeded by a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge_named.py
34
35
36
37
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in frequenz/channels/util/_merge_named.py
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
Tuple[str, T]

The next key, value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/util/_merge_named.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def consume(self) -> Tuple[str, T]:
    """Return the latest value once `ready` is complete.

    Returns:
        The next key, value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceeded by a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_merge_named.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append((name, result))
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )
stop() async ¤

Stop the MergeNamed instance and cleanup any pending tasks.

Source code in frequenz/channels/util/_merge_named.py
39
40
41
42
43
44
async def stop(self) -> None:
    """Stop the `MergeNamed` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.Select ¤

Select the next available message from a group of Receivers.

If Select was created with more Receiver than what are read in the if-chain after each call to ready(), messages coming in the additional receivers are dropped, and a warning message is logged.

Receivers also function as Receiver.

When Select is no longer needed, then it should be stopped using self.stop() method. This would cleanup any internal pending async tasks.

Example

For example, if there are two receivers that you want to simultaneously wait on, this can be done with:

select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
    if msg := select.name1:
        if val := msg.inner:
            # do something with `val`
            pass
        else:
            # handle closure of receiver.
            pass
    elif msg := select.name2:
        # do something with `msg.inner`
        pass
Source code in frequenz/channels/util/_select.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class Select:
    """Select the next available message from a group of Receivers.

    If `Select` was created with more `Receiver` than what are read in
    the if-chain after each call to
    [ready()][frequenz.channels.util.Select.ready], messages coming in the
    additional receivers are dropped, and a warning message is logged.

    [Receiver][frequenz.channels.Receiver]s also function as `Receiver`.

    When Select is no longer needed, then it should be stopped using
    `self.stop()` method. This would cleanup any internal pending async tasks.

    Example:
        For example, if there are two receivers that you want to
        simultaneously wait on, this can be done with:

        ```python
        select = Select(name1 = receiver1, name2 = receiver2)
        while await select.ready():
            if msg := select.name1:
                if val := msg.inner:
                    # do something with `val`
                    pass
                else:
                    # handle closure of receiver.
                    pass
            elif msg := select.name2:
                # do something with `msg.inner`
                pass
        ```
    """

    def __init__(self, **kwargs: Receiver[Any]) -> None:
        """Create a `Select` instance.

        Args:
            **kwargs: sequence of receivers
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[bool]] = set()

        for name, recv in self._receivers.items():
            self._pending.add(asyncio.create_task(recv.ready(), name=name))

        self._ready_count = 0
        self._prev_ready_count = 0
        self._result: Dict[str, Optional[_ReadyReceiver]] = {
            name: None for name in self._receivers
        }

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def stop(self) -> None:
        """Stop the `Select` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until there is a message in any of the receivers.

        Returns `True` if there is a message available, and `False` if all
        receivers have closed.

        Returns:
            Whether there are further messages or not.
        """
        # This function will change radically soon
        # pylint: disable=too-many-nested-blocks
        if self._ready_count > 0:
            if self._ready_count == self._prev_ready_count:
                dropped_names: List[str] = []
                for name, value in self._result.items():
                    if value is not None:
                        dropped_names.append(name)
                        if value.recv is not None:
                            try:
                                value.recv.consume()
                            except ReceiverStoppedError:
                                pass
                        self._result[name] = None
                self._ready_count = 0
                self._prev_ready_count = 0
                logger.warning(
                    "Select.ready() dropped data from receiver(s): %s, "
                    "because no messages have been fetched since the last call to ready().",
                    dropped_names,
                )
            else:
                self._prev_ready_count = self._ready_count
                return True
        if len(self._pending) == 0:
            return False

        # once all the pending messages have been consumed, reset the
        # `_prev_ready_count` as well, and wait for new messages.
        self._prev_ready_count = 0

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for task in done:
            name = task.get_name()
            recv = self._receivers[name]
            receiver_active = task.result()
            if receiver_active:
                ready_recv = recv
            else:
                ready_recv = None
            self._ready_count += 1
            self._result[name] = _ReadyReceiver(ready_recv)
            # if channel or Receiver is closed
            # don't add a task for it again.
            if not receiver_active:
                continue
            self._pending.add(asyncio.create_task(recv.ready(), name=name))
        return True

    def __getattr__(self, name: str) -> Optional[Any]:
        """Return the latest unread message from a `Receiver`, if available.

        Args:
            name: Name of the channel.

        Returns:
            Latest unread message for the specified `Receiver`, or `None`.

        Raises:
            KeyError: when the name was not specified when creating the
                `Select` instance.
        """
        result = self._result[name]
        if result is None:
            return result
        self._result[name] = None
        self._ready_count -= 1
        return result.get()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_select.py
108
109
110
111
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__getattr__(name) ¤

Return the latest unread message from a Receiver, if available.

PARAMETER DESCRIPTION
name

Name of the channel.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Latest unread message for the specified Receiver, or None.

RAISES DESCRIPTION
KeyError

when the name was not specified when creating the Select instance.

Source code in frequenz/channels/util/_select.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def __getattr__(self, name: str) -> Optional[Any]:
    """Return the latest unread message from a `Receiver`, if available.

    Args:
        name: Name of the channel.

    Returns:
        Latest unread message for the specified `Receiver`, or `None`.

    Raises:
        KeyError: when the name was not specified when creating the
            `Select` instance.
    """
    result = self._result[name]
    if result is None:
        return result
    self._result[name] = None
    self._ready_count -= 1
    return result.get()
__init__(**kwargs) ¤

Create a Select instance.

PARAMETER DESCRIPTION
**kwargs

sequence of receivers

TYPE: Receiver[Any] DEFAULT: {}

Source code in frequenz/channels/util/_select.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(self, **kwargs: Receiver[Any]) -> None:
    """Create a `Select` instance.

    Args:
        **kwargs: sequence of receivers
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[bool]] = set()

    for name, recv in self._receivers.items():
        self._pending.add(asyncio.create_task(recv.ready(), name=name))

    self._ready_count = 0
    self._prev_ready_count = 0
    self._result: Dict[str, Optional[_ReadyReceiver]] = {
        name: None for name in self._receivers
    }
ready() async ¤

Wait until there is a message in any of the receivers.

Returns True if there is a message available, and False if all receivers have closed.

RETURNS DESCRIPTION
bool

Whether there are further messages or not.

Source code in frequenz/channels/util/_select.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
async def ready(self) -> bool:
    """Wait until there is a message in any of the receivers.

    Returns `True` if there is a message available, and `False` if all
    receivers have closed.

    Returns:
        Whether there are further messages or not.
    """
    # This function will change radically soon
    # pylint: disable=too-many-nested-blocks
    if self._ready_count > 0:
        if self._ready_count == self._prev_ready_count:
            dropped_names: List[str] = []
            for name, value in self._result.items():
                if value is not None:
                    dropped_names.append(name)
                    if value.recv is not None:
                        try:
                            value.recv.consume()
                        except ReceiverStoppedError:
                            pass
                    self._result[name] = None
            self._ready_count = 0
            self._prev_ready_count = 0
            logger.warning(
                "Select.ready() dropped data from receiver(s): %s, "
                "because no messages have been fetched since the last call to ready().",
                dropped_names,
            )
        else:
            self._prev_ready_count = self._ready_count
            return True
    if len(self._pending) == 0:
        return False

    # once all the pending messages have been consumed, reset the
    # `_prev_ready_count` as well, and wait for new messages.
    self._prev_ready_count = 0

    done, self._pending = await asyncio.wait(
        self._pending, return_when=asyncio.FIRST_COMPLETED
    )
    for task in done:
        name = task.get_name()
        recv = self._receivers[name]
        receiver_active = task.result()
        if receiver_active:
            ready_recv = recv
        else:
            ready_recv = None
        self._ready_count += 1
        self._result[name] = _ReadyReceiver(ready_recv)
        # if channel or Receiver is closed
        # don't add a task for it again.
        if not receiver_active:
            continue
        self._pending.add(asyncio.create_task(recv.ready(), name=name))
    return True
stop() async ¤

Stop the Select instance and cleanup any pending tasks.

Source code in frequenz/channels/util/_select.py
113
114
115
116
117
118
async def stop(self) -> None:
    """Stop the `Select` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.util.Timer ¤

Bases: Receiver[datetime]

A timer receiver that returns the timestamp every interval seconds.

Primarily for use with Select.

The timestamp generated is a timezone-aware datetime using UTC as timezone.

Example

When you want something to happen with a fixed period:

timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do once every 30 seconds
        pass

When you want something to happen when nothing else has happened in a certain interval:

timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    timer.reset()
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do if there's no battery data for 30 seconds
        pass
Source code in frequenz/channels/util/_timer.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class Timer(Receiver[datetime]):
    """A timer receiver that returns the timestamp every `interval` seconds.

    Primarily for use with [Select][frequenz.channels.util.Select].

    The timestamp generated is a timezone-aware datetime using UTC as timezone.

    Example:
        When you want something to happen with a fixed period:

        ```python
        timer = Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do once every 30 seconds
                pass
        ```

        When you want something to happen when nothing else has happened in a
        certain interval:

        ```python
        timer = Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            timer.reset()
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do if there's no battery data for 30 seconds
                pass
        ```
    """

    def __init__(self, interval: float) -> None:
        """Create a `Timer` instance.

        Args:
            interval: number of seconds between messages.
        """
        self._stopped = False
        self._interval = timedelta(seconds=interval)
        self._next_msg_time = datetime.now(timezone.utc) + self._interval
        self._now: Optional[datetime] = None

    def reset(self) -> None:
        """Reset the timer to start timing from `now`."""
        self._next_msg_time = datetime.now(timezone.utc) + self._interval

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to
        [receive()][frequenz.channels.Receiver.receive] will immediately
        return `None`.
        """
        self._stopped = True

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._now is not None:
            return True

        if self._stopped:
            return False

        now = datetime.now(timezone.utc)
        diff = self._next_msg_time - now
        while diff.total_seconds() > 0:
            await asyncio.sleep(diff.total_seconds())
            now = datetime.now(timezone.utc)
            diff = self._next_msg_time - now
        self._now = now

        self._next_msg_time = self._now + self._interval

        return True

    def consume(self) -> datetime:
        """Return the latest value once `ready` is complete.

        Returns:
            The time of the next tick in UTC or `None` if
                [stop()][frequenz.channels.util.Timer.stop] has been called on
                the timer.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.

        Changelog:
            * **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
              instead of a native datetime object.
        """
        if self._stopped:
            raise ReceiverStoppedError(self)

        assert (
            self._now is not None
        ), "`consume()` must be preceeded by a call to `ready()`"
        now = self._now
        self._now = None
        return now
Functions¤
__init__(interval) ¤

Create a Timer instance.

PARAMETER DESCRIPTION
interval

number of seconds between messages.

TYPE: float

Source code in frequenz/channels/util/_timer.py
57
58
59
60
61
62
63
64
65
66
def __init__(self, interval: float) -> None:
    """Create a `Timer` instance.

    Args:
        interval: number of seconds between messages.
    """
    self._stopped = False
    self._interval = timedelta(seconds=interval)
    self._next_msg_time = datetime.now(timezone.utc) + self._interval
    self._now: Optional[datetime] = None
consume() ¤

Return the latest value once ready is complete.

RETURNS DESCRIPTION
datetime

The time of the next tick in UTC or None if stop() has been called on the timer.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Changelog
  • v0.11.0: Returns a timezone-aware datetime with UTC timezone instead of a native datetime object.
Source code in frequenz/channels/util/_timer.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def consume(self) -> datetime:
    """Return the latest value once `ready` is complete.

    Returns:
        The time of the next tick in UTC or `None` if
            [stop()][frequenz.channels.util.Timer.stop] has been called on
            the timer.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.

    Changelog:
        * **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
          instead of a native datetime object.
    """
    if self._stopped:
        raise ReceiverStoppedError(self)

    assert (
        self._now is not None
    ), "`consume()` must be preceeded by a call to `ready()`"
    now = self._now
    self._now = None
    return now
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/util/_timer.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._now is not None:
        return True

    if self._stopped:
        return False

    now = datetime.now(timezone.utc)
    diff = self._next_msg_time - now
    while diff.total_seconds() > 0:
        await asyncio.sleep(diff.total_seconds())
        now = datetime.now(timezone.utc)
        diff = self._next_msg_time - now
    self._now = now

    self._next_msg_time = self._now + self._interval

    return True
reset() ¤

Reset the timer to start timing from now.

Source code in frequenz/channels/util/_timer.py
68
69
70
def reset(self) -> None:
    """Reset the timer to start timing from `now`."""
    self._next_msg_time = datetime.now(timezone.utc) + self._interval
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to receive() will immediately return None.

Source code in frequenz/channels/util/_timer.py
72
73
74
75
76
77
78
79
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to
    [receive()][frequenz.channels.Receiver.receive] will immediately
    return `None`.
    """
    self._stopped = True