Skip to content

merge

frequenz.channels.merge ¤

Merge messages coming from channels into a single stream.

Classes¤

frequenz.channels.merge.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass
Source code in frequenz/channels/merge.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.receive(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def receive(self) -> Optional[T]:
        """Wait until there's a message in any of the channels.

        Returns:
            The next message that was received, or `None`, if all channels have
                closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            if len(self._results) > 0:
                return self._results.popleft()

            if len(self._pending) == 0:
                return None
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                result = item.result()
                # if channel is closed, don't add a task for it again.
                if result is None:
                    continue
                self._results.append(result)
                self._pending.add(
                    asyncio.create_task(self._receivers[name].receive(), name=name)
                )
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/merge.py
42
43
44
45
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in frequenz/channels/merge.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.receive(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
receive() async ¤

Wait until there's a message in any of the channels.

RETURNS DESCRIPTION
Optional[T]

The next message that was received, or None, if all channels have closed.

Source code in frequenz/channels/merge.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def receive(self) -> Optional[T]:
    """Wait until there's a message in any of the channels.

    Returns:
        The next message that was received, or `None`, if all channels have
            closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        if len(self._results) > 0:
            return self._results.popleft()

        if len(self._pending) == 0:
            return None
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            result = item.result()
            # if channel is closed, don't add a task for it again.
            if result is None:
                continue
            self._results.append(result)
            self._pending.add(
                asyncio.create_task(self._receivers[name].receive(), name=name)
            )