Skip to content

merge_named

frequenz.channels.merge_named ¤

Merge messages coming from channels into a single stream containing name of message.

Classes¤

frequenz.channels.merge_named.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

Source code in frequenz/channels/merge_named.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream."""

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.receive(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def receive(self) -> Optional[Tuple[str, T]]:
        """Wait until there's a message in any of the channels.

        Returns:
            The next message that was received, or `None`, if all channels have
                closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            if len(self._results) > 0:
                return self._results.popleft()

            if len(self._pending) == 0:
                return None
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                result = item.result()
                # if channel is closed, don't add a task for it again.
                if result is None:
                    continue
                self._results.append((name, result))
                self._pending.add(
                    asyncio.create_task(self._receivers[name].receive(), name=name)
                )
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/merge_named.py
29
30
31
32
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in frequenz/channels/merge_named.py
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.receive(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
receive() async ¤

Wait until there's a message in any of the channels.

RETURNS DESCRIPTION
Optional[Tuple[str, T]]

The next message that was received, or None, if all channels have closed.

Source code in frequenz/channels/merge_named.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def receive(self) -> Optional[Tuple[str, T]]:
    """Wait until there's a message in any of the channels.

    Returns:
        The next message that was received, or `None`, if all channels have
            closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        if len(self._results) > 0:
            return self._results.popleft()

        if len(self._pending) == 0:
            return None
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            result = item.result()
            # if channel is closed, don't add a task for it again.
            if result is None:
                continue
            self._results.append((name, result))
            self._pending.add(
                asyncio.create_task(self._receivers[name].receive(), name=name)
            )