Skip to content

anycast

frequenz.channels.anycast ¤

A channel for sending data across async tasks.

Classes¤

frequenz.channels.anycast.Anycast ¤

Bases: Generic[T]

A channel for sending data across async tasks.

Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

In cases where each message need to be received by every receiver, a Broadcast channel may be used.

Uses an deque internally, so Anycast channels are not thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


acast = channel.Anycast()

sender = acast.get_sender()
receiver_1 = acast.get_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/anycast.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Anycast(Generic[T]):
    """A channel for sending data across async tasks.

    Anycast channels support multiple senders and multiple receivers.  A message sent
    through a sender will be received by exactly one receiver.

    In cases where each message need to be received by every receiver, a
    [Broadcast][frequenz.channels.Broadcast] channel may be used.

    Uses an [deque][collections.deque] internally, so Anycast channels are not
    thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.Select],
    [Merge][frequenz.channels.Merge] or
    [MergeNamed][frequenz.channels.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        acast = channel.Anycast()

        sender = acast.get_sender()
        receiver_1 = acast.get_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, maxsize: int = 10) -> None:
        """Create an Anycast channel.

        Args:
            maxsize: Size of the channel's buffer.
        """
        self.limit: int = maxsize
        self.deque: Deque[T] = deque(maxlen=maxsize)
        self.send_cv: Condition = Condition()
        self.recv_cv: Condition = Condition()
        self.closed: bool = False

    async def close(self) -> None:
        """Close the channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on the channel,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.

        """
        self.closed = True
        async with self.send_cv:
            self.send_cv.notify_all()
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def get_sender(self) -> Sender[T]:
        """Create a new sender.

        Returns:
            A Sender instance attached to the Anycast channel.
        """
        return Sender(self)

    def get_receiver(self) -> Receiver[T]:
        """Create a new receiver.

        Returns:
            A Receiver instance attached to the Anycast channel.
        """
        return Receiver(self)
Functions¤
__init__(maxsize=10) ¤

Create an Anycast channel.

PARAMETER DESCRIPTION
maxsize

Size of the channel's buffer.

TYPE: int DEFAULT: 10

Source code in frequenz/channels/anycast.py
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, maxsize: int = 10) -> None:
    """Create an Anycast channel.

    Args:
        maxsize: Size of the channel's buffer.
    """
    self.limit: int = maxsize
    self.deque: Deque[T] = deque(maxlen=maxsize)
    self.send_cv: Condition = Condition()
    self.recv_cv: Condition = Condition()
    self.closed: bool = False
close() async ¤

Close the channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on the channel, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/anycast.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def close(self) -> None:
    """Close the channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on the channel,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.

    """
    self.closed = True
    async with self.send_cv:
        self.send_cv.notify_all()
    async with self.recv_cv:
        self.recv_cv.notify_all()
get_receiver() ¤

Create a new receiver.

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the Anycast channel.

Source code in frequenz/channels/anycast.py
101
102
103
104
105
106
107
def get_receiver(self) -> Receiver[T]:
    """Create a new receiver.

    Returns:
        A Receiver instance attached to the Anycast channel.
    """
    return Receiver(self)
get_sender() ¤

Create a new sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the Anycast channel.

Source code in frequenz/channels/anycast.py
93
94
95
96
97
98
99
def get_sender(self) -> Sender[T]:
    """Create a new sender.

    Returns:
        A Sender instance attached to the Anycast channel.
    """
    return Sender(self)

frequenz.channels.anycast.Receiver ¤

Bases: BaseReceiver[T]

A receiver to receive messages from an Anycast channel.

Should not be created directly, but through the Anycast.get_receiver() method.

Source code in frequenz/channels/anycast.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class Receiver(BaseReceiver[T]):
    """A receiver to receive messages from an Anycast channel.

    Should not be created directly, but through the `Anycast.get_receiver()`
    method.
    """

    def __init__(self, chan: Anycast[T]) -> None:
        """Create a channel receiver.

        Args:
            chan: A reference to the channel that this receiver belongs to.
        """
        self._chan = chan

    async def receive(self) -> Optional[T]:
        """Receive a message from the channel.

        Waits for an message to become available, and returns that message.
        When there are multiple receivers for the channel, only one receiver
        will receive each message.

        Returns:
            `None`, if the channel is closed, a message otherwise.
        """
        while len(self._chan.deque) == 0:
            if self._chan.closed:
                return None
            async with self._chan.recv_cv:
                await self._chan.recv_cv.wait()
        ret = self._chan.deque.popleft()
        async with self._chan.send_cv:
            self._chan.send_cv.notify(1)
        return ret
Functions¤
__init__(chan) ¤

Create a channel receiver.

PARAMETER DESCRIPTION
chan

A reference to the channel that this receiver belongs to.

TYPE: Anycast[T]

Source code in frequenz/channels/anycast.py
158
159
160
161
162
163
164
def __init__(self, chan: Anycast[T]) -> None:
    """Create a channel receiver.

    Args:
        chan: A reference to the channel that this receiver belongs to.
    """
    self._chan = chan
receive() async ¤

Receive a message from the channel.

Waits for an message to become available, and returns that message. When there are multiple receivers for the channel, only one receiver will receive each message.

RETURNS DESCRIPTION
Optional[T]

None, if the channel is closed, a message otherwise.

Source code in frequenz/channels/anycast.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
async def receive(self) -> Optional[T]:
    """Receive a message from the channel.

    Waits for an message to become available, and returns that message.
    When there are multiple receivers for the channel, only one receiver
    will receive each message.

    Returns:
        `None`, if the channel is closed, a message otherwise.
    """
    while len(self._chan.deque) == 0:
        if self._chan.closed:
            return None
        async with self._chan.recv_cv:
            await self._chan.recv_cv.wait()
    ret = self._chan.deque.popleft()
    async with self._chan.send_cv:
        self._chan.send_cv.notify(1)
    return ret

frequenz.channels.anycast.Sender ¤

Bases: BaseSender[T]

A sender to send messages to an Anycast channel.

Should not be created directly, but through the Anycast.get_sender() method.

Source code in frequenz/channels/anycast.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class Sender(BaseSender[T]):
    """A sender to send messages to an Anycast channel.

    Should not be created directly, but through the `Anycast.get_sender()`
    method.
    """

    def __init__(self, chan: Anycast[T]) -> None:
        """Create a channel sender.

        Args:
            chan: A reference to the channel that this sender belongs to.
        """
        self._chan = chan

    async def send(self, msg: T) -> bool:
        """Send a message across the channel.

        To send, this method inserts the message into the Anycast channel's
        buffer.  If the channel's buffer is full, waits for messages to get
        consumed, until there's some free space available in the buffer.  Each
        message will be received by exactly one receiver.

        Args:
            msg: The message to be sent.

        Returns:
            Whether the message was sent, based on whether the channel is open
                or not.
        """
        if self._chan.closed:
            return False
        while len(self._chan.deque) == self._chan.deque.maxlen:
            async with self._chan.send_cv:
                await self._chan.send_cv.wait()
        self._chan.deque.append(msg)
        async with self._chan.recv_cv:
            self._chan.recv_cv.notify(1)
        return True
Functions¤
__init__(chan) ¤

Create a channel sender.

PARAMETER DESCRIPTION
chan

A reference to the channel that this sender belongs to.

TYPE: Anycast[T]

Source code in frequenz/channels/anycast.py
117
118
119
120
121
122
123
def __init__(self, chan: Anycast[T]) -> None:
    """Create a channel sender.

    Args:
        chan: A reference to the channel that this sender belongs to.
    """
    self._chan = chan
send(msg) async ¤

Send a message across the channel.

To send, this method inserts the message into the Anycast channel's buffer. If the channel's buffer is full, waits for messages to get consumed, until there's some free space available in the buffer. Each message will be received by exactly one receiver.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the message was sent, based on whether the channel is open or not.

Source code in frequenz/channels/anycast.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
async def send(self, msg: T) -> bool:
    """Send a message across the channel.

    To send, this method inserts the message into the Anycast channel's
    buffer.  If the channel's buffer is full, waits for messages to get
    consumed, until there's some free space available in the buffer.  Each
    message will be received by exactly one receiver.

    Args:
        msg: The message to be sent.

    Returns:
        Whether the message was sent, based on whether the channel is open
            or not.
    """
    if self._chan.closed:
        return False
    while len(self._chan.deque) == self._chan.deque.maxlen:
        async with self._chan.send_cv:
            await self._chan.send_cv.wait()
    self._chan.deque.append(msg)
    async with self._chan.recv_cv:
        self._chan.recv_cv.notify(1)
    return True