Skip to content

util

frequenz.channels.util ¤

Channel utilities.

A module with several utilities to work with channels:

  • FileWatcher: A receiver that watches for file events.

  • Merge: A receiver that merge messages coming from multiple receivers into a single stream.

  • MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.

  • Select: A helper to select the next available message for each receiver in a group of receivers.

  • Timer: A receiver that emits a now timestamp every interval seconds.

Classes¤

frequenz.channels.util.FileWatcher ¤

Bases: Receiver[pathlib.Path]

A channel receiver that watches for file events.

Source code in frequenz/channels/util/_file_watcher.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class FileWatcher(Receiver[pathlib.Path]):
    """A channel receiver that watches for file events."""

    class EventType(Enum):
        """Available types of changes to watch for."""

        CREATE = Change.added
        MODIFY = Change.modified
        DELETE = Change.deleted

    def __init__(
        self,
        paths: List[Union[pathlib.Path, str]],
        event_types: Optional[Set[EventType]] = None,
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for or `None` to watch for
                all event types.
        """
        if event_types is None:
            event_types = set(FileWatcher.EventType)  # all types

        self.event_types = event_types
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths,
            stop_event=self._stop_event,
            watch_filter=lambda change, path_str: (
                change in [event_type.value for event_type in event_types]  # type: ignore
                and pathlib.Path(path_str).is_file()
            ),
        )
        self._changes: Set[FileChange] = set()

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def ready(self) -> None:
        """Wait for the next file event and return its path.

        Raises:
            StopAsyncIteration: When the channel is closed.

        Returns:
            Path of next file.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return

        self._changes = await self._awatch.__anext__()

    def consume(self) -> pathlib.Path:
        """Return the latest change once `ready` is complete.

        Raises:
            ChannelClosedError: When the channel is closed.

        Returns:
            The next change that was received.
        """
        assert self._changes, "calls to `consume()` must be follow a call to `ready()`"
        change = self._changes.pop()
        # Tuple of (Change, path) returned by watchfiles
        _, path_str = change
        path = pathlib.Path(path_str)
        return path
Classes¤
EventType ¤

Bases: Enum

Available types of changes to watch for.

Source code in frequenz/channels/util/_file_watcher.py
19
20
21
22
23
24
class EventType(Enum):
    """Available types of changes to watch for."""

    CREATE = Change.added
    MODIFY = Change.modified
    DELETE = Change.deleted
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in frequenz/channels/util/_file_watcher.py
57
58
59
60
61
62
63
64
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=None) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: List[Union[pathlib.Path, str]]

event_types

Types of events to watch for or None to watch for all event types.

TYPE: Optional[Set[EventType]] DEFAULT: None

Source code in frequenz/channels/util/_file_watcher.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    paths: List[Union[pathlib.Path, str]],
    event_types: Optional[Set[EventType]] = None,
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for or `None` to watch for
            all event types.
    """
    if event_types is None:
        event_types = set(FileWatcher.EventType)  # all types

    self.event_types = event_types
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths,
        stop_event=self._stop_event,
        watch_filter=lambda change, path_str: (
            change in [event_type.value for event_type in event_types]  # type: ignore
            and pathlib.Path(path_str).is_file()
        ),
    )
    self._changes: Set[FileChange] = set()
consume() ¤

Return the latest change once ready is complete.

RAISES DESCRIPTION
ChannelClosedError

When the channel is closed.

RETURNS DESCRIPTION
pathlib.Path

The next change that was received.

Source code in frequenz/channels/util/_file_watcher.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def consume(self) -> pathlib.Path:
    """Return the latest change once `ready` is complete.

    Raises:
        ChannelClosedError: When the channel is closed.

    Returns:
        The next change that was received.
    """
    assert self._changes, "calls to `consume()` must be follow a call to `ready()`"
    change = self._changes.pop()
    # Tuple of (Change, path) returned by watchfiles
    _, path_str = change
    path = pathlib.Path(path_str)
    return path
ready() async ¤

Wait for the next file event and return its path.

RAISES DESCRIPTION
StopAsyncIteration

When the channel is closed.

RETURNS DESCRIPTION
None

Path of next file.

Source code in frequenz/channels/util/_file_watcher.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def ready(self) -> None:
    """Wait for the next file event and return its path.

    Raises:
        StopAsyncIteration: When the channel is closed.

    Returns:
        Path of next file.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return

    self._changes = await self._awatch.__anext__()

frequenz.channels.util.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass
Source code in frequenz/channels/util/_merge.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def ready(self) -> None:
        """Wait until the receiver is ready with a value.

        Raises:
            ChannelClosedError: if the underlying channel is closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return

            if len(self._pending) == 0:
                raise ChannelClosedError()
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> T:
        """Return the latest value once `ready` is complete.

        Raises:
            EOFError: When called before a call to `ready()` finishes.

        Returns:
            The next value that was received.
        """
        assert self._results, "calls to `consume()` must be follow a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge.py
42
43
44
45
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in frequenz/channels/util/_merge.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RAISES DESCRIPTION
EOFError

When called before a call to ready() finishes.

RETURNS DESCRIPTION
T

The next value that was received.

Source code in frequenz/channels/util/_merge.py
77
78
79
80
81
82
83
84
85
86
87
88
def consume(self) -> T:
    """Return the latest value once `ready` is complete.

    Raises:
        EOFError: When called before a call to `ready()` finishes.

    Returns:
        The next value that was received.
    """
    assert self._results, "calls to `consume()` must be follow a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until the receiver is ready with a value.

RAISES DESCRIPTION
ChannelClosedError

if the underlying channel is closed.

Source code in frequenz/channels/util/_merge.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
async def ready(self) -> None:
    """Wait until the receiver is ready with a value.

    Raises:
        ChannelClosedError: if the underlying channel is closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return

        if len(self._pending) == 0:
            raise ChannelClosedError()
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )

frequenz.channels.util.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

Source code in frequenz/channels/util/_merge_named.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream."""

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.__anext__(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def ready(self) -> None:
        """Wait until there's a message in any of the channels.

        Raises:
            ChannelClosedError: when all the channels are closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return

            if len(self._pending) == 0:
                raise ChannelClosedError()
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append((name, result))
                self._pending.add(
                    # pylint: disable=unnecessary-dunder-call
                    asyncio.create_task(self._receivers[name].__anext__(), name=name)
                )

    def consume(self) -> Tuple[str, T]:
        """Return the latest value once `ready` is complete.

        Raises:
            EOFError: When called before a call to `ready()` finishes.

        Returns:
            The next value that was received, along with its name.
        """
        assert self._results, "calls to `consume()` must be follow a call to `ready()`"

        return self._results.popleft()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_merge_named.py
29
30
31
32
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in frequenz/channels/util/_merge_named.py
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.__anext__(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
consume() ¤

Return the latest value once ready is complete.

RAISES DESCRIPTION
EOFError

When called before a call to ready() finishes.

RETURNS DESCRIPTION
Tuple[str, T]

The next value that was received, along with its name.

Source code in frequenz/channels/util/_merge_named.py
64
65
66
67
68
69
70
71
72
73
74
75
def consume(self) -> Tuple[str, T]:
    """Return the latest value once `ready` is complete.

    Raises:
        EOFError: When called before a call to `ready()` finishes.

    Returns:
        The next value that was received, along with its name.
    """
    assert self._results, "calls to `consume()` must be follow a call to `ready()`"

    return self._results.popleft()
ready() async ¤

Wait until there's a message in any of the channels.

RAISES DESCRIPTION
ChannelClosedError

when all the channels are closed.

Source code in frequenz/channels/util/_merge_named.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def ready(self) -> None:
    """Wait until there's a message in any of the channels.

    Raises:
        ChannelClosedError: when all the channels are closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return

        if len(self._pending) == 0:
            raise ChannelClosedError()
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append((name, result))
            self._pending.add(
                # pylint: disable=unnecessary-dunder-call
                asyncio.create_task(self._receivers[name].__anext__(), name=name)
            )

frequenz.channels.util.Select ¤

Select the next available message from a group of Receivers.

If Select was created with more Receiver than what are read in the if-chain after each call to ready(), messages coming in the additional receivers are dropped, and a warning message is logged.

Receivers also function as Receiver.

Example

For example, if there are two receivers that you want to simultaneously wait on, this can be done with:

select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
    if msg := select.name1:
        if val := msg.inner:
            # do something with `val`
            pass
        else:
            # handle closure of receiver.
            pass
    elif msg := select.name2:
        # do something with `msg.inner`
        pass
Source code in frequenz/channels/util/_select.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class Select:
    """Select the next available message from a group of Receivers.

    If `Select` was created with more `Receiver` than what are read in
    the if-chain after each call to
    [ready()][frequenz.channels.util.Select.ready], messages coming in the
    additional receivers are dropped, and a warning message is logged.

    [Receiver][frequenz.channels.Receiver]s also function as `Receiver`.

    Example:
        For example, if there are two receivers that you want to
        simultaneously wait on, this can be done with:

        ```python
        select = Select(name1 = receiver1, name2 = receiver2)
        while await select.ready():
            if msg := select.name1:
                if val := msg.inner:
                    # do something with `val`
                    pass
                else:
                    # handle closure of receiver.
                    pass
            elif msg := select.name2:
                # do something with `msg.inner`
                pass
        ```
    """

    def __init__(self, **kwargs: Receiver[Any]) -> None:
        """Create a `Select` instance.

        Args:
            **kwargs: sequence of receivers
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[None]] = set()

        for name, recv in self._receivers.items():
            self._pending.add(asyncio.create_task(recv.ready(), name=name))

        self._ready_count = 0
        self._prev_ready_count = 0
        self._result: Dict[str, Optional[_ReadyReceiver]] = {
            name: None for name in self._receivers
        }

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def ready(self) -> bool:
        """Wait until there is a message in any of the receivers.

        Returns `True` if there is a message available, and `False` if all
        receivers have closed.

        Returns:
            Whether there are further messages or not.
        """
        if self._ready_count > 0:
            if self._ready_count == self._prev_ready_count:
                dropped_names: List[str] = []
                for name, value in self._result.items():
                    if value is not None:
                        dropped_names.append(name)
                        if value.recv is not None:
                            value.recv.consume()
                        self._result[name] = None
                self._ready_count = 0
                self._prev_ready_count = 0
                logger.warning(
                    "Select.ready() dropped data from receiver(s): %s, "
                    "because no messages have been fetched since the last call to ready().",
                    dropped_names,
                )
            else:
                self._prev_ready_count = self._ready_count
                return True
        if len(self._pending) == 0:
            return False

        # once all the pending messages have been consumed, reset the
        # `_prev_ready_count` as well, and wait for new messages.
        self._prev_ready_count = 0

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            recv = self._receivers[name]
            if isinstance(item.exception(), ChannelClosedError):
                result = None
            else:
                result = recv
            self._ready_count += 1
            self._result[name] = _ReadyReceiver(result)
            # if channel or Receiver is closed
            # don't add a task for it again.
            if result is None:
                continue
            self._pending.add(asyncio.create_task(recv.ready(), name=name))
        return True

    def __getattr__(self, name: str) -> Optional[Any]:
        """Return the latest unread message from a `Receiver`, if available.

        Args:
            name: Name of the channel.

        Returns:
            Latest unread message for the specified `Receiver`, or `None`.

        Raises:
            KeyError: when the name was not specified when creating the
                `Select` instance.
        """
        result = self._result[name]
        if result is None:
            return result
        self._result[name] = None
        self._ready_count -= 1
        return result.get()
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/util/_select.py
104
105
106
107
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__getattr__(name) ¤

Return the latest unread message from a Receiver, if available.

PARAMETER DESCRIPTION
name

Name of the channel.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Latest unread message for the specified Receiver, or None.

RAISES DESCRIPTION
KeyError

when the name was not specified when creating the Select instance.

Source code in frequenz/channels/util/_select.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def __getattr__(self, name: str) -> Optional[Any]:
    """Return the latest unread message from a `Receiver`, if available.

    Args:
        name: Name of the channel.

    Returns:
        Latest unread message for the specified `Receiver`, or `None`.

    Raises:
        KeyError: when the name was not specified when creating the
            `Select` instance.
    """
    result = self._result[name]
    if result is None:
        return result
    self._result[name] = None
    self._ready_count -= 1
    return result.get()
__init__(**kwargs) ¤

Create a Select instance.

PARAMETER DESCRIPTION
**kwargs

sequence of receivers

TYPE: Receiver[Any] DEFAULT: {}

Source code in frequenz/channels/util/_select.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, **kwargs: Receiver[Any]) -> None:
    """Create a `Select` instance.

    Args:
        **kwargs: sequence of receivers
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[None]] = set()

    for name, recv in self._receivers.items():
        self._pending.add(asyncio.create_task(recv.ready(), name=name))

    self._ready_count = 0
    self._prev_ready_count = 0
    self._result: Dict[str, Optional[_ReadyReceiver]] = {
        name: None for name in self._receivers
    }
ready() async ¤

Wait until there is a message in any of the receivers.

Returns True if there is a message available, and False if all receivers have closed.

RETURNS DESCRIPTION
bool

Whether there are further messages or not.

Source code in frequenz/channels/util/_select.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def ready(self) -> bool:
    """Wait until there is a message in any of the receivers.

    Returns `True` if there is a message available, and `False` if all
    receivers have closed.

    Returns:
        Whether there are further messages or not.
    """
    if self._ready_count > 0:
        if self._ready_count == self._prev_ready_count:
            dropped_names: List[str] = []
            for name, value in self._result.items():
                if value is not None:
                    dropped_names.append(name)
                    if value.recv is not None:
                        value.recv.consume()
                    self._result[name] = None
            self._ready_count = 0
            self._prev_ready_count = 0
            logger.warning(
                "Select.ready() dropped data from receiver(s): %s, "
                "because no messages have been fetched since the last call to ready().",
                dropped_names,
            )
        else:
            self._prev_ready_count = self._ready_count
            return True
    if len(self._pending) == 0:
        return False

    # once all the pending messages have been consumed, reset the
    # `_prev_ready_count` as well, and wait for new messages.
    self._prev_ready_count = 0

    done, self._pending = await asyncio.wait(
        self._pending, return_when=asyncio.FIRST_COMPLETED
    )
    for item in done:
        name = item.get_name()
        recv = self._receivers[name]
        if isinstance(item.exception(), ChannelClosedError):
            result = None
        else:
            result = recv
        self._ready_count += 1
        self._result[name] = _ReadyReceiver(result)
        # if channel or Receiver is closed
        # don't add a task for it again.
        if result is None:
            continue
        self._pending.add(asyncio.create_task(recv.ready(), name=name))
    return True

frequenz.channels.util.Timer ¤

Bases: Receiver[datetime]

A timer receiver that returns the timestamp every interval seconds.

Primarily for use with Select.

The timestamp generated is a timezone-aware datetime using UTC as timezone.

Example

When you want something to happen with a fixed period:

timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do once every 30 seconds
        pass

When you want something to happen when nothing else has happened in a certain interval:

timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    timer.reset()
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do if there's no battery data for 30 seconds
        pass
Source code in frequenz/channels/util/_timer.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class Timer(Receiver[datetime]):
    """A timer receiver that returns the timestamp every `interval` seconds.

    Primarily for use with [Select][frequenz.channels.util.Select].

    The timestamp generated is a timezone-aware datetime using UTC as timezone.

    Example:
        When you want something to happen with a fixed period:

        ```python
        timer = Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do once every 30 seconds
                pass
        ```

        When you want something to happen when nothing else has happened in a
        certain interval:

        ```python
        timer = Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            timer.reset()
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do if there's no battery data for 30 seconds
                pass
        ```
    """

    def __init__(self, interval: float) -> None:
        """Create a `Timer` instance.

        Args:
            interval: number of seconds between messages.
        """
        self._stopped = False
        self._interval = timedelta(seconds=interval)
        self._next_msg_time = datetime.now(timezone.utc) + self._interval
        self._now: Optional[datetime] = None

    def reset(self) -> None:
        """Reset the timer to start timing from `now`."""
        self._next_msg_time = datetime.now(timezone.utc) + self._interval

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to
        [receive()][frequenz.channels.Receiver.receive] will immediately
        return `None`.
        """
        self._stopped = True

    async def ready(self) -> None:
        """Return the current time (in UTC) once the next tick is due.

        Raises:
            ChannelClosedError: if [stop()][frequenz.channels.util.Timer.stop]
                has been called on the timer.

        Returns:
            The time of the next tick in UTC or `None` if
                [stop()][frequenz.channels.util.Timer.stop] has been called on
                the timer.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._now is not None:
            return

        if self._stopped:
            raise ChannelClosedError()
        now = datetime.now(timezone.utc)
        diff = self._next_msg_time - now
        while diff.total_seconds() > 0:
            await asyncio.sleep(diff.total_seconds())
            now = datetime.now(timezone.utc)
            diff = self._next_msg_time - now
        self._now = now

        self._next_msg_time = self._now + self._interval

    def consume(self) -> datetime:
        """Return the latest value once `ready` is complete.

        Raises:
            EOFError: When called before a call to `ready()` finishes.

        Returns:
            The timestamp for the next tick.

        Changelog:
            * **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
              instead of a native datetime object.
        """
        assert (
            self._now is not None
        ), "calls to `consume()` must be follow a call to `ready()`"
        now = self._now
        self._now = None
        return now
Functions¤
__init__(interval) ¤

Create a Timer instance.

PARAMETER DESCRIPTION
interval

number of seconds between messages.

TYPE: float

Source code in frequenz/channels/util/_timer.py
56
57
58
59
60
61
62
63
64
65
def __init__(self, interval: float) -> None:
    """Create a `Timer` instance.

    Args:
        interval: number of seconds between messages.
    """
    self._stopped = False
    self._interval = timedelta(seconds=interval)
    self._next_msg_time = datetime.now(timezone.utc) + self._interval
    self._now: Optional[datetime] = None
consume() ¤

Return the latest value once ready is complete.

RAISES DESCRIPTION
EOFError

When called before a call to ready() finishes.

RETURNS DESCRIPTION
datetime

The timestamp for the next tick.

Changelog
  • v0.11.0: Returns a timezone-aware datetime with UTC timezone instead of a native datetime object.
Source code in frequenz/channels/util/_timer.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def consume(self) -> datetime:
    """Return the latest value once `ready` is complete.

    Raises:
        EOFError: When called before a call to `ready()` finishes.

    Returns:
        The timestamp for the next tick.

    Changelog:
        * **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
          instead of a native datetime object.
    """
    assert (
        self._now is not None
    ), "calls to `consume()` must be follow a call to `ready()`"
    now = self._now
    self._now = None
    return now
ready() async ¤

Return the current time (in UTC) once the next tick is due.

RAISES DESCRIPTION
ChannelClosedError

if stop() has been called on the timer.

RETURNS DESCRIPTION
None

The time of the next tick in UTC or None if stop() has been called on the timer.

Source code in frequenz/channels/util/_timer.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def ready(self) -> None:
    """Return the current time (in UTC) once the next tick is due.

    Raises:
        ChannelClosedError: if [stop()][frequenz.channels.util.Timer.stop]
            has been called on the timer.

    Returns:
        The time of the next tick in UTC or `None` if
            [stop()][frequenz.channels.util.Timer.stop] has been called on
            the timer.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._now is not None:
        return

    if self._stopped:
        raise ChannelClosedError()
    now = datetime.now(timezone.utc)
    diff = self._next_msg_time - now
    while diff.total_seconds() > 0:
        await asyncio.sleep(diff.total_seconds())
        now = datetime.now(timezone.utc)
        diff = self._next_msg_time - now
    self._now = now

    self._next_msg_time = self._now + self._interval
reset() ¤

Reset the timer to start timing from now.

Source code in frequenz/channels/util/_timer.py
67
68
69
def reset(self) -> None:
    """Reset the timer to start timing from `now`."""
    self._next_msg_time = datetime.now(timezone.utc) + self._interval
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to receive() will immediately return None.

Source code in frequenz/channels/util/_timer.py
71
72
73
74
75
76
77
78
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to
    [receive()][frequenz.channels.Receiver.receive] will immediately
    return `None`.
    """
    self._stopped = True