Bases: Receiver[T]
Merge messages coming from multiple channels into a single stream.
Example
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using Merge
like this:
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
Source code in frequenz/channels/merge.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 | class Merge(Receiver[T]):
"""Merge messages coming from multiple channels into a single stream.
Example:
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using `Merge` like this:
```python
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
```
"""
def __init__(self, *args: Receiver[T]) -> None:
"""Create a `Merge` instance.
Args:
*args: sequence of channel receivers.
"""
self._receivers = {str(id): recv for id, recv in enumerate(args)}
self._pending: Set[asyncio.Task[Any]] = {
asyncio.create_task(recv.receive(), name=name)
for name, recv in self._receivers.items()
}
self._results: Deque[T] = deque(maxlen=len(self._receivers))
def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
async def receive(self) -> Optional[T]:
"""Wait until there's a message in any of the channels.
Returns:
The next message that was received, or `None`, if all channels have
closed.
"""
# we use a while loop to continue to wait for new data, in case the
# previous `wait` completed because a channel was closed.
while True:
if len(self._results) > 0:
return self._results.popleft()
if len(self._pending) == 0:
return None
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
result = item.result()
# if channel is closed, don't add a task for it again.
if result is None:
continue
self._results.append(result)
self._pending.add(
asyncio.create_task(self._receivers[name].receive(), name=name)
)
|
Functions
__del__()
Cleanup any pending tasks.
Source code in frequenz/channels/merge.py
| def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
|
__init__(*args)
Create a Merge
instance.
PARAMETER |
DESCRIPTION |
*args |
sequence of channel receivers.
TYPE:
Receiver[T]
DEFAULT:
()
|
Source code in frequenz/channels/merge.py
29
30
31
32
33
34
35
36
37
38
39
40 | def __init__(self, *args: Receiver[T]) -> None:
"""Create a `Merge` instance.
Args:
*args: sequence of channel receivers.
"""
self._receivers = {str(id): recv for id, recv in enumerate(args)}
self._pending: Set[asyncio.Task[Any]] = {
asyncio.create_task(recv.receive(), name=name)
for name, recv in self._receivers.items()
}
self._results: Deque[T] = deque(maxlen=len(self._receivers))
|
receive()
async
Wait until there's a message in any of the channels.
RETURNS |
DESCRIPTION |
Optional[T]
|
The next message that was received, or None , if all channels have
closed.
|
Source code in frequenz/channels/merge.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 | async def receive(self) -> Optional[T]:
"""Wait until there's a message in any of the channels.
Returns:
The next message that was received, or `None`, if all channels have
closed.
"""
# we use a while loop to continue to wait for new data, in case the
# previous `wait` completed because a channel was closed.
while True:
if len(self._results) > 0:
return self._results.popleft()
if len(self._pending) == 0:
return None
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
result = item.result()
# if channel is closed, don't add a task for it again.
if result is None:
continue
self._results.append(result)
self._pending.add(
asyncio.create_task(self._receivers[name].receive(), name=name)
)
|