Select the next available message from a group of AsyncIterators.
If Select
was created with more AsyncIterator
than what are read in
the if-chain after each call to ready(),
messages coming in the additional async iterators are dropped, and
a warning message is logged.
Receivers also function as AsyncIterator
.
Example
For example, if there are two async iterators that you want to
simultaneously wait on, this can be done with:
select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
if msg := select.name1:
if val := msg.inner:
# do something with `val`
pass
else:
# handle closure of receiver.
pass
elif msg := select.name2:
# do something with `msg.inner`
pass
Source code in frequenz/channels/select.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | class Select:
"""Select the next available message from a group of AsyncIterators.
If `Select` was created with more `AsyncIterator` than what are read in
the if-chain after each call to [ready()][frequenz.channels.Select.ready],
messages coming in the additional async iterators are dropped, and
a warning message is logged.
[Receiver][frequenz.channels.Receiver]s also function as `AsyncIterator`.
Example:
For example, if there are two async iterators that you want to
simultaneously wait on, this can be done with:
```python
select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
if msg := select.name1:
if val := msg.inner:
# do something with `val`
pass
else:
# handle closure of receiver.
pass
elif msg := select.name2:
# do something with `msg.inner`
pass
```
"""
def __init__(self, **kwargs: AsyncIterator[Any]) -> None:
"""Create a `Select` instance.
Args:
**kwargs: sequence of async iterators
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[Any]] = set()
for name, recv in self._receivers.items():
# can replace __anext__() to anext() (Only Python 3.10>=)
msg = recv.__anext__() # pylint: disable=unnecessary-dunder-call
self._pending.add(asyncio.create_task(msg, name=name)) # type: ignore
self._ready_count = 0
self._prev_ready_count = 0
self._result: Dict[str, Optional[_Selected]] = {
name: None for name in self._receivers
}
def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
async def ready(self) -> bool:
"""Wait until there is a message in any of the async iterators.
Returns `True` if there is a message available, and `False` if all
async iterators have closed.
Returns:
Whether there are further messages or not.
"""
if self._ready_count > 0:
if self._ready_count == self._prev_ready_count:
dropped_names: List[str] = []
for name, value in self._result.items():
if value is not None:
dropped_names.append(name)
self._result[name] = None
self._ready_count = 0
self._prev_ready_count = 0
logger.warning(
"Select.ready() dropped data from async iterator(s): %s, "
"because no messages have been fetched since the last call to ready().",
dropped_names,
)
else:
self._prev_ready_count = self._ready_count
return True
if len(self._pending) == 0:
return False
# once all the pending messages have been consumed, reset the
# `_prev_ready_count` as well, and wait for new messages.
self._prev_ready_count = 0
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
if isinstance(item.exception(), StopAsyncIteration):
result = None
else:
result = item.result()
self._ready_count += 1
self._result[name] = _Selected(result)
# if channel or AsyncIterator is closed
# don't add a task for it again.
if result is None:
continue
msg = self._receivers[ # pylint: disable=unnecessary-dunder-call
name
].__anext__()
self._pending.add(asyncio.create_task(msg, name=name)) # type: ignore
return True
def __getattr__(self, name: str) -> Optional[Any]:
"""Return the latest unread message from a `AsyncIterator`, if available.
Args:
name: Name of the channel.
Returns:
Latest unread message for the specified `AsyncIterator`, or `None`.
Raises:
KeyError: when the name was not specified when creating the
`Select` instance.
"""
result = self._result[name]
if result is None:
return result
self._result[name] = None
self._ready_count -= 1
return result
|
Functions
__del__()
Cleanup any pending tasks.
Source code in frequenz/channels/select.py
| def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
|
__getattr__(name)
Return the latest unread message from a AsyncIterator
, if available.
PARAMETER |
DESCRIPTION |
name |
Name of the channel.
TYPE:
str
|
RETURNS |
DESCRIPTION |
Optional[Any]
|
Latest unread message for the specified AsyncIterator , or None .
|
RAISES |
DESCRIPTION |
KeyError
|
when the name was not specified when creating the
Select instance.
|
Source code in frequenz/channels/select.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | def __getattr__(self, name: str) -> Optional[Any]:
"""Return the latest unread message from a `AsyncIterator`, if available.
Args:
name: Name of the channel.
Returns:
Latest unread message for the specified `AsyncIterator`, or `None`.
Raises:
KeyError: when the name was not specified when creating the
`Select` instance.
"""
result = self._result[name]
if result is None:
return result
self._result[name] = None
self._ready_count -= 1
return result
|
__init__(**kwargs)
Create a Select
instance.
PARAMETER |
DESCRIPTION |
**kwargs |
sequence of async iterators
TYPE:
AsyncIterator[Any]
DEFAULT:
{}
|
Source code in frequenz/channels/select.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 | def __init__(self, **kwargs: AsyncIterator[Any]) -> None:
"""Create a `Select` instance.
Args:
**kwargs: sequence of async iterators
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[Any]] = set()
for name, recv in self._receivers.items():
# can replace __anext__() to anext() (Only Python 3.10>=)
msg = recv.__anext__() # pylint: disable=unnecessary-dunder-call
self._pending.add(asyncio.create_task(msg, name=name)) # type: ignore
self._ready_count = 0
self._prev_ready_count = 0
self._result: Dict[str, Optional[_Selected]] = {
name: None for name in self._receivers
}
|
ready()
async
Wait until there is a message in any of the async iterators.
Returns True
if there is a message available, and False
if all
async iterators have closed.
RETURNS |
DESCRIPTION |
bool
|
Whether there are further messages or not.
|
Source code in frequenz/channels/select.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | async def ready(self) -> bool:
"""Wait until there is a message in any of the async iterators.
Returns `True` if there is a message available, and `False` if all
async iterators have closed.
Returns:
Whether there are further messages or not.
"""
if self._ready_count > 0:
if self._ready_count == self._prev_ready_count:
dropped_names: List[str] = []
for name, value in self._result.items():
if value is not None:
dropped_names.append(name)
self._result[name] = None
self._ready_count = 0
self._prev_ready_count = 0
logger.warning(
"Select.ready() dropped data from async iterator(s): %s, "
"because no messages have been fetched since the last call to ready().",
dropped_names,
)
else:
self._prev_ready_count = self._ready_count
return True
if len(self._pending) == 0:
return False
# once all the pending messages have been consumed, reset the
# `_prev_ready_count` as well, and wait for new messages.
self._prev_ready_count = 0
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
if isinstance(item.exception(), StopAsyncIteration):
result = None
else:
result = item.result()
self._ready_count += 1
self._result[name] = _Selected(result)
# if channel or AsyncIterator is closed
# don't add a task for it again.
if result is None:
continue
msg = self._receivers[ # pylint: disable=unnecessary-dunder-call
name
].__anext__()
self._pending.add(asyncio.create_task(msg, name=name)) # type: ignore
return True
|