Bases: Receiver[Tuple[str, T]]
Merge messages coming from multiple named channels into a single stream.
Source code in frequenz/channels/merge_named.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | class MergeNamed(Receiver[Tuple[str, T]]):
"""Merge messages coming from multiple named channels into a single stream."""
def __init__(self, **kwargs: Receiver[T]) -> None:
"""Create a `MergeNamed` instance.
Args:
**kwargs: sequence of channel receivers.
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[Any]] = {
asyncio.create_task(recv.receive(), name=name)
for name, recv in self._receivers.items()
}
self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
async def receive(self) -> Optional[Tuple[str, T]]:
"""Wait until there's a message in any of the channels.
Returns:
The next message that was received, or `None`, if all channels have
closed.
"""
# we use a while loop to continue to wait for new data, in case the
# previous `wait` completed because a channel was closed.
while True:
if len(self._results) > 0:
return self._results.popleft()
if len(self._pending) == 0:
return None
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
result = item.result()
# if channel is closed, don't add a task for it again.
if result is None:
continue
self._results.append((name, result))
self._pending.add(
asyncio.create_task(self._receivers[name].receive(), name=name)
)
|
Functions
__del__()
Cleanup any pending tasks.
Source code in frequenz/channels/merge_named.py
| def __del__(self) -> None:
"""Cleanup any pending tasks."""
for task in self._pending:
task.cancel()
|
__init__(**kwargs)
Create a MergeNamed
instance.
PARAMETER |
DESCRIPTION |
**kwargs |
sequence of channel receivers.
TYPE:
Receiver[T]
DEFAULT:
{}
|
Source code in frequenz/channels/merge_named.py
16
17
18
19
20
21
22
23
24
25
26
27 | def __init__(self, **kwargs: Receiver[T]) -> None:
"""Create a `MergeNamed` instance.
Args:
**kwargs: sequence of channel receivers.
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[Any]] = {
asyncio.create_task(recv.receive(), name=name)
for name, recv in self._receivers.items()
}
self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
|
receive()
async
Wait until there's a message in any of the channels.
RETURNS |
DESCRIPTION |
Optional[Tuple[str, T]]
|
The next message that was received, or None , if all channels have
closed.
|
Source code in frequenz/channels/merge_named.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | async def receive(self) -> Optional[Tuple[str, T]]:
"""Wait until there's a message in any of the channels.
Returns:
The next message that was received, or `None`, if all channels have
closed.
"""
# we use a while loop to continue to wait for new data, in case the
# previous `wait` completed because a channel was closed.
while True:
if len(self._results) > 0:
return self._results.popleft()
if len(self._pending) == 0:
return None
done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
)
for item in done:
name = item.get_name()
result = item.result()
# if channel is closed, don't add a task for it again.
if result is None:
continue
self._results.append((name, result))
self._pending.add(
asyncio.create_task(self._receivers[name].receive(), name=name)
)
|