Skip to content

channels

frequenz.channels ¤

Frequenz Channels.

This package contains channel implementations.

Channels:

  • Anycast: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

  • Bidirectional: A channel providing a client and a service handle to send and receive bidirectionally.

  • Broadcast: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Other base classes:

  • Peekable: An object to allow users to get a peek at the latest value in the channel, without consuming anything.

  • Receiver: An object that can wait for and consume messages from a channel.

  • Sender: An object that can send messages to a channel.

Utilities:

  • util: A module with utilities, like special receivers that implement timers, file watchers, merge receivers, or wait for messages in multiple channels.

Exception classes:

Classes¤

frequenz.channels.Anycast ¤

Bases: Generic[T]

A channel for sending data across async tasks.

Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

In cases where each message need to be received by every receiver, a Broadcast channel may be used.

Uses an deque internally, so Anycast channels are not thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


acast = channel.Anycast()

sender = acast.new_sender()
receiver_1 = acast.new_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/_anycast.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class Anycast(Generic[T]):
    """A channel for sending data across async tasks.

    Anycast channels support multiple senders and multiple receivers.  A message sent
    through a sender will be received by exactly one receiver.

    In cases where each message need to be received by every receiver, a
    [Broadcast][frequenz.channels.Broadcast] channel may be used.

    Uses an [deque][collections.deque] internally, so Anycast channels are not
    thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.util.Select],
    [Merge][frequenz.channels.util.Merge] or
    [MergeNamed][frequenz.channels.util.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        acast = channel.Anycast()

        sender = acast.new_sender()
        receiver_1 = acast.new_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, maxsize: int = 10) -> None:
        """Create an Anycast channel.

        Args:
            maxsize: Size of the channel's buffer.
        """
        self.limit: int = maxsize
        self.deque: Deque[T] = deque(maxlen=maxsize)
        self.send_cv: Condition = Condition()
        self.recv_cv: Condition = Condition()
        self.closed: bool = False

    async def close(self) -> None:
        """Close the channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on the channel,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.

        """
        self.closed = True
        async with self.send_cv:
            self.send_cv.notify_all()
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def new_sender(self) -> Sender[T]:
        """Create a new sender.

        Returns:
            A Sender instance attached to the Anycast channel.
        """
        return Sender(self)

    def new_receiver(self) -> Receiver[T]:
        """Create a new receiver.

        Returns:
            A Receiver instance attached to the Anycast channel.
        """
        return Receiver(self)
Functions¤
__init__(maxsize=10) ¤

Create an Anycast channel.

PARAMETER DESCRIPTION
maxsize

Size of the channel's buffer.

TYPE: int DEFAULT: 10

Source code in frequenz/channels/_anycast.py
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, maxsize: int = 10) -> None:
    """Create an Anycast channel.

    Args:
        maxsize: Size of the channel's buffer.
    """
    self.limit: int = maxsize
    self.deque: Deque[T] = deque(maxlen=maxsize)
    self.send_cv: Condition = Condition()
    self.recv_cv: Condition = Condition()
    self.closed: bool = False
close() async ¤

Close the channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on the channel, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/_anycast.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def close(self) -> None:
    """Close the channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on the channel,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.

    """
    self.closed = True
    async with self.send_cv:
        self.send_cv.notify_all()
    async with self.recv_cv:
        self.recv_cv.notify_all()
new_receiver() ¤

Create a new receiver.

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the Anycast channel.

Source code in frequenz/channels/_anycast.py
102
103
104
105
106
107
108
def new_receiver(self) -> Receiver[T]:
    """Create a new receiver.

    Returns:
        A Receiver instance attached to the Anycast channel.
    """
    return Receiver(self)
new_sender() ¤

Create a new sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the Anycast channel.

Source code in frequenz/channels/_anycast.py
 94
 95
 96
 97
 98
 99
100
def new_sender(self) -> Sender[T]:
    """Create a new sender.

    Returns:
        A Sender instance attached to the Anycast channel.
    """
    return Sender(self)

frequenz.channels.Bidirectional ¤

Bases: Generic[T, U]

A wrapper class for simulating bidirectional channels.

Source code in frequenz/channels/_bidirectional.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class Bidirectional(Generic[T, U]):
    """A wrapper class for simulating bidirectional channels."""

    class Handle(Sender[V], Receiver[W]):
        """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance.

        It can be used to send/receive values between the client and service.
        """

        def __init__(
            self,
            channel: Bidirectional[V, W] | Bidirectional[W, V],
            sender: Sender[V],
            receiver: Receiver[W],
        ) -> None:
            """Create a `Bidirectional.Handle` instance.

            Args:
                channel: The underlying channel.
                sender: A sender to send values with.
                receiver: A receiver to receive values from.
            """
            self._chan = channel
            self._sender = sender
            self._receiver = receiver

        async def send(self, msg: V) -> None:
            """Send a value to the other side.

            Args:
                msg: The value to send.

            Raises:
                SenderError: if the underlying channel was closed.
                    A [ChannelClosedError][frequenz.channels.ChannelClosedError]
                    is set as the cause.
            """
            try:
                await self._sender.send(msg)
            except SenderError as err:
                # If this comes from a channel error, then we inject another
                # ChannelError having the information about the Bidirectional
                # channel to hide (at least partially) the underlaying
                # Broadcast channels we use.
                if isinstance(err.__cause__, ChannelError):
                    this_chan_error = ChannelError(
                        f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                        self._chan,  # pylint: disable=protected-access
                    )
                    this_chan_error.__cause__ = err.__cause__
                    err.__cause__ = this_chan_error
                raise err

        async def ready(self) -> bool:
            """Wait until the receiver is ready with a value or an error.

            Once a call to `ready()` has finished, the value should be read with
            a call to `consume()` (`receive()` or iterated over). The receiver will
            remain ready (this method will return immediately) until it is
            consumed.

            Returns:
                Whether the receiver is still active.
            """
            return await self._receiver.ready()  # pylint: disable=protected-access

        def consume(self) -> W:
            """Return the latest value once `_ready` is complete.

            Returns:
                The next value that was received.

            Raises:
                ReceiverStoppedError: if there is some problem with the receiver.
                ReceiverError: if there is some problem with the receiver.

            # noqa: DAR401 err (https://github.com/terrencepreilly/darglint/issues/181)
            """
            try:
                return self._receiver.consume()  # pylint: disable=protected-access
            except ReceiverError as err:
                # If this comes from a channel error, then we inject another
                # ChannelError having the information about the Bidirectional
                # channel to hide (at least partially) the underlaying
                # Broadcast channels we use.
                if isinstance(err.__cause__, ChannelError):
                    this_chan_error = ChannelError(
                        f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                        self._chan,  # pylint: disable=protected-access
                    )
                    this_chan_error.__cause__ = err.__cause__
                    err.__cause__ = this_chan_error
                raise err

    def __init__(self, client_id: str, service_id: str) -> None:
        """Create a `Bidirectional` instance.

        Args:
            client_id: A name for the client, used to name the channels.
            service_id: A name for the service end of the channels.
        """
        self._client_id = client_id
        self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
        self._response_channel: Broadcast[U] = Broadcast(
            f"resp_{service_id}_{client_id}"
        )

        self._client_handle = Bidirectional.Handle(
            self,
            self._request_channel.new_sender(),
            self._response_channel.new_receiver(),
        )
        self._service_handle = Bidirectional.Handle(
            self,
            self._response_channel.new_sender(),
            self._request_channel.new_receiver(),
        )

    @property
    def client_handle(self) -> Bidirectional.Handle[T, U]:
        """Get a `Handle` for the client side to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._client_handle

    @property
    def service_handle(self) -> Bidirectional.Handle[U, T]:
        """Get a `Handle` for the service side to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._service_handle
Attributes¤
client_handle: Bidirectional.Handle[T, U] property ¤

Get a Handle for the client side to use.

RETURNS DESCRIPTION
Bidirectional.Handle[T, U]

Object to send/receive messages with.

service_handle: Bidirectional.Handle[U, T] property ¤

Get a Handle for the service side to use.

RETURNS DESCRIPTION
Bidirectional.Handle[U, T]

Object to send/receive messages with.

Classes¤
Handle ¤

Bases: Sender[V], Receiver[W]

A handle to a Bidirectional instance.

It can be used to send/receive values between the client and service.

Source code in frequenz/channels/_bidirectional.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class Handle(Sender[V], Receiver[W]):
    """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance.

    It can be used to send/receive values between the client and service.
    """

    def __init__(
        self,
        channel: Bidirectional[V, W] | Bidirectional[W, V],
        sender: Sender[V],
        receiver: Receiver[W],
    ) -> None:
        """Create a `Bidirectional.Handle` instance.

        Args:
            channel: The underlying channel.
            sender: A sender to send values with.
            receiver: A receiver to receive values from.
        """
        self._chan = channel
        self._sender = sender
        self._receiver = receiver

    async def send(self, msg: V) -> None:
        """Send a value to the other side.

        Args:
            msg: The value to send.

        Raises:
            SenderError: if the underlying channel was closed.
                A [ChannelClosedError][frequenz.channels.ChannelClosedError]
                is set as the cause.
        """
        try:
            await self._sender.send(msg)
        except SenderError as err:
            # If this comes from a channel error, then we inject another
            # ChannelError having the information about the Bidirectional
            # channel to hide (at least partially) the underlaying
            # Broadcast channels we use.
            if isinstance(err.__cause__, ChannelError):
                this_chan_error = ChannelError(
                    f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                    self._chan,  # pylint: disable=protected-access
                )
                this_chan_error.__cause__ = err.__cause__
                err.__cause__ = this_chan_error
            raise err

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        return await self._receiver.ready()  # pylint: disable=protected-access

    def consume(self) -> W:
        """Return the latest value once `_ready` is complete.

        Returns:
            The next value that was received.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
            ReceiverError: if there is some problem with the receiver.

        # noqa: DAR401 err (https://github.com/terrencepreilly/darglint/issues/181)
        """
        try:
            return self._receiver.consume()  # pylint: disable=protected-access
        except ReceiverError as err:
            # If this comes from a channel error, then we inject another
            # ChannelError having the information about the Bidirectional
            # channel to hide (at least partially) the underlaying
            # Broadcast channels we use.
            if isinstance(err.__cause__, ChannelError):
                this_chan_error = ChannelError(
                    f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                    self._chan,  # pylint: disable=protected-access
                )
                this_chan_error.__cause__ = err.__cause__
                err.__cause__ = this_chan_error
            raise err
Functions¤
__init__(channel, sender, receiver) ¤

Create a Bidirectional.Handle instance.

PARAMETER DESCRIPTION
channel

The underlying channel.

TYPE: Bidirectional[V, W] | Bidirectional[W, V]

sender

A sender to send values with.

TYPE: Sender[V]

receiver

A receiver to receive values from.

TYPE: Receiver[W]

Source code in frequenz/channels/_bidirectional.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    channel: Bidirectional[V, W] | Bidirectional[W, V],
    sender: Sender[V],
    receiver: Receiver[W],
) -> None:
    """Create a `Bidirectional.Handle` instance.

    Args:
        channel: The underlying channel.
        sender: A sender to send values with.
        receiver: A receiver to receive values from.
    """
    self._chan = channel
    self._sender = sender
    self._receiver = receiver
consume() ¤

Return the latest value once _ready is complete.

RETURNS DESCRIPTION
W

The next value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

noqa: DAR401 err (https://github.com/terrencepreilly/darglint/issues/181)¤
Source code in frequenz/channels/_bidirectional.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def consume(self) -> W:
    """Return the latest value once `_ready` is complete.

    Returns:
        The next value that was received.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.

    # noqa: DAR401 err (https://github.com/terrencepreilly/darglint/issues/181)
    """
    try:
        return self._receiver.consume()  # pylint: disable=protected-access
    except ReceiverError as err:
        # If this comes from a channel error, then we inject another
        # ChannelError having the information about the Bidirectional
        # channel to hide (at least partially) the underlaying
        # Broadcast channels we use.
        if isinstance(err.__cause__, ChannelError):
            this_chan_error = ChannelError(
                f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                self._chan,  # pylint: disable=protected-access
            )
            this_chan_error.__cause__ = err.__cause__
            err.__cause__ = this_chan_error
        raise err
ready() async ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/_bidirectional.py
71
72
73
74
75
76
77
78
79
80
81
82
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    return await self._receiver.ready()  # pylint: disable=protected-access
send(msg) async ¤

Send a value to the other side.

PARAMETER DESCRIPTION
msg

The value to send.

TYPE: V

RAISES DESCRIPTION
SenderError

if the underlying channel was closed. A ChannelClosedError is set as the cause.

Source code in frequenz/channels/_bidirectional.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def send(self, msg: V) -> None:
    """Send a value to the other side.

    Args:
        msg: The value to send.

    Raises:
        SenderError: if the underlying channel was closed.
            A [ChannelClosedError][frequenz.channels.ChannelClosedError]
            is set as the cause.
    """
    try:
        await self._sender.send(msg)
    except SenderError as err:
        # If this comes from a channel error, then we inject another
        # ChannelError having the information about the Bidirectional
        # channel to hide (at least partially) the underlaying
        # Broadcast channels we use.
        if isinstance(err.__cause__, ChannelError):
            this_chan_error = ChannelError(
                f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
                self._chan,  # pylint: disable=protected-access
            )
            this_chan_error.__cause__ = err.__cause__
            err.__cause__ = this_chan_error
        raise err
Functions¤
__init__(client_id, service_id) ¤

Create a Bidirectional instance.

PARAMETER DESCRIPTION
client_id

A name for the client, used to name the channels.

TYPE: str

service_id

A name for the service end of the channels.

TYPE: str

Source code in frequenz/channels/_bidirectional.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, client_id: str, service_id: str) -> None:
    """Create a `Bidirectional` instance.

    Args:
        client_id: A name for the client, used to name the channels.
        service_id: A name for the service end of the channels.
    """
    self._client_id = client_id
    self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
    self._response_channel: Broadcast[U] = Broadcast(
        f"resp_{service_id}_{client_id}"
    )

    self._client_handle = Bidirectional.Handle(
        self,
        self._request_channel.new_sender(),
        self._response_channel.new_receiver(),
    )
    self._service_handle = Bidirectional.Handle(
        self,
        self._response_channel.new_sender(),
        self._request_channel.new_receiver(),
    )

frequenz.channels.Broadcast ¤

Bases: Generic[T]

A channel to broadcast messages to multiple receivers.

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Internally, a broadcast receiver's buffer is implemented with just append/pop operations on either side of a deque, which are thread-safe. Because of this, Broadcast channels are thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


bcast = channel.Broadcast()

sender = bcast.new_sender()
receiver_1 = bcast.new_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/_broadcast.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Broadcast(Generic[T]):
    """A channel to broadcast messages to multiple receivers.

    `Broadcast` channels can have multiple senders and multiple receivers. Each
    message sent through any of the senders is received by all of the
    receivers.

    Internally, a broadcast receiver's buffer is implemented with just
    append/pop operations on either side of a [deque][collections.deque], which
    are thread-safe.  Because of this, `Broadcast` channels are thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.util.Select],
    [Merge][frequenz.channels.util.Merge] or
    [MergeNamed][frequenz.channels.util.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        bcast = channel.Broadcast()

        sender = bcast.new_sender()
        receiver_1 = bcast.new_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, name: str, resend_latest: bool = False) -> None:
        """Create a Broadcast channel.

        Args:
            name: A name for the broadcast channel, typically based on the type
                of data sent through it.  Used to identify the channel in the
                logs.
            resend_latest: When True, every time a new receiver is created with
                `new_receiver`, it will automatically get sent the latest value
                on the channel.  This allows new receivers on slow streams to
                get the latest value as soon as they are created, without having
                to wait for the next message on the channel to arrive.
        """
        self.name: str = name
        self._resend_latest = resend_latest

        self.recv_cv: Condition = Condition()
        self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
        self.closed: bool = False
        self._latest: Optional[T] = None

    async def close(self) -> None:
        """Close the Broadcast channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on their queues,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._latest = None
        self.closed = True
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def new_sender(self) -> Sender[T]:
        """Create a new broadcast sender.

        Returns:
            A Sender instance attached to the broadcast channel.
        """
        return Sender(self)

    def new_receiver(
        self, name: Optional[str] = None, maxsize: int = 50
    ) -> Receiver[T]:
        """Create a new broadcast receiver.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            name: A name to identify the receiver in the logs.
            maxsize: Size of the receiver's buffer.

        Returns:
            A Receiver instance attached to the broadcast channel.
        """
        uuid = uuid4()
        if name is None:
            name = str(uuid)
        recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
        self.receivers[uuid] = weakref.ref(recv)
        if self._resend_latest and self._latest is not None:
            recv.enqueue(self._latest)
        return recv

    def new_peekable(self) -> Peekable[T]:
        """Create a new Peekable for the broadcast channel.

        A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
        that allows the user to get a peek at the latest value in the channel,
        without consuming anything.

        Returns:
            A Peekable to peek into the broadcast channel with.
        """
        return Peekable(self)
Functions¤
__init__(name, resend_latest=False) ¤

Create a Broadcast channel.

PARAMETER DESCRIPTION
name

A name for the broadcast channel, typically based on the type of data sent through it. Used to identify the channel in the logs.

TYPE: str

resend_latest

When True, every time a new receiver is created with new_receiver, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, without having to wait for the next message on the channel to arrive.

TYPE: bool DEFAULT: False

Source code in frequenz/channels/_broadcast.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(self, name: str, resend_latest: bool = False) -> None:
    """Create a Broadcast channel.

    Args:
        name: A name for the broadcast channel, typically based on the type
            of data sent through it.  Used to identify the channel in the
            logs.
        resend_latest: When True, every time a new receiver is created with
            `new_receiver`, it will automatically get sent the latest value
            on the channel.  This allows new receivers on slow streams to
            get the latest value as soon as they are created, without having
            to wait for the next message on the channel to arrive.
    """
    self.name: str = name
    self._resend_latest = resend_latest

    self.recv_cv: Condition = Condition()
    self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
    self.closed: bool = False
    self._latest: Optional[T] = None
close() async ¤

Close the Broadcast channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on their queues, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/_broadcast.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def close(self) -> None:
    """Close the Broadcast channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on their queues,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._latest = None
    self.closed = True
    async with self.recv_cv:
        self.recv_cv.notify_all()
new_peekable() ¤

Create a new Peekable for the broadcast channel.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

RETURNS DESCRIPTION
Peekable[T]

A Peekable to peek into the broadcast channel with.

Source code in frequenz/channels/_broadcast.py
144
145
146
147
148
149
150
151
152
153
154
def new_peekable(self) -> Peekable[T]:
    """Create a new Peekable for the broadcast channel.

    A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
    that allows the user to get a peek at the latest value in the channel,
    without consuming anything.

    Returns:
        A Peekable to peek into the broadcast channel with.
    """
    return Peekable(self)
new_receiver(name=None, maxsize=50) ¤

Create a new broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: Optional[str] DEFAULT: None

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the broadcast channel.

Source code in frequenz/channels/_broadcast.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def new_receiver(
    self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
    """Create a new broadcast receiver.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        name: A name to identify the receiver in the logs.
        maxsize: Size of the receiver's buffer.

    Returns:
        A Receiver instance attached to the broadcast channel.
    """
    uuid = uuid4()
    if name is None:
        name = str(uuid)
    recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
    self.receivers[uuid] = weakref.ref(recv)
    if self._resend_latest and self._latest is not None:
        recv.enqueue(self._latest)
    return recv
new_sender() ¤

Create a new broadcast sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the broadcast channel.

Source code in frequenz/channels/_broadcast.py
111
112
113
114
115
116
117
def new_sender(self) -> Sender[T]:
    """Create a new broadcast sender.

    Returns:
        A Sender instance attached to the broadcast channel.
    """
    return Sender(self)

frequenz.channels.ChannelClosedError ¤

Bases: ChannelError

Error raised when trying to operate on a closed channel.

Source code in frequenz/channels/_exceptions.py
48
49
50
51
52
53
54
55
56
57
class ChannelClosedError(ChannelError):
    """Error raised when trying to operate on a closed channel."""

    def __init__(self, channel: Any):
        """Create a `ChannelClosedError` instance.

        Args:
            channel: A reference to the channel that was closed.
        """
        super().__init__(f"Channel {channel} was closed", channel)
Functions¤
__init__(channel) ¤

Create a ChannelClosedError instance.

PARAMETER DESCRIPTION
channel

A reference to the channel that was closed.

TYPE: Any

Source code in frequenz/channels/_exceptions.py
51
52
53
54
55
56
57
def __init__(self, channel: Any):
    """Create a `ChannelClosedError` instance.

    Args:
        channel: A reference to the channel that was closed.
    """
    super().__init__(f"Channel {channel} was closed", channel)

frequenz.channels.ChannelError ¤

Bases: Error

An error produced in a channel.

All exceptions generated by channels inherit from this exception.

Source code in frequenz/channels/_exceptions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ChannelError(Error):
    """An error produced in a channel.

    All exceptions generated by channels inherit from this exception.
    """

    def __init__(self, message: Any, channel: Any):
        """Create a ChannelError instance.

        Args:
            message: An error message.
            channel: A reference to the channel that encountered the error.
        """
        super().__init__(message)
        self.channel: Any = channel
Functions¤
__init__(message, channel) ¤

Create a ChannelError instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: Any

channel

A reference to the channel that encountered the error.

TYPE: Any

Source code in frequenz/channels/_exceptions.py
37
38
39
40
41
42
43
44
45
def __init__(self, message: Any, channel: Any):
    """Create a ChannelError instance.

    Args:
        message: An error message.
        channel: A reference to the channel that encountered the error.
    """
    super().__init__(message)
    self.channel: Any = channel

frequenz.channels.Error ¤

Bases: RuntimeError

Base error.

All exceptions generated by this library inherit from this exception.

Source code in frequenz/channels/_exceptions.py
16
17
18
19
20
21
22
23
24
25
26
27
28
class Error(RuntimeError):
    """Base error.

    All exceptions generated by this library inherit from this exception.
    """

    def __init__(self, message: Any):
        """Create a ChannelError instance.

        Args:
            message: An error message.
        """
        super().__init__(message)
Functions¤
__init__(message) ¤

Create a ChannelError instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: Any

Source code in frequenz/channels/_exceptions.py
22
23
24
25
26
27
28
def __init__(self, message: Any):
    """Create a ChannelError instance.

    Args:
        message: An error message.
    """
    super().__init__(message)

frequenz.channels.Peekable ¤

Bases: ABC, Generic[T]

A channel peekable.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

Source code in frequenz/channels/_base_classes.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Peekable(ABC, Generic[T]):
    """A channel peekable.

    A Peekable provides a [peek()][frequenz.channels.Peekable] method that
    allows the user to get a peek at the latest value in the channel, without
    consuming anything.
    """

    @abstractmethod
    def peek(self) -> Optional[T]:
        """Return the latest value that was sent to the channel.

        Returns:
            The latest value received by the channel, and `None`, if nothing
                has been sent to the channel yet.
        """
Functions¤
peek() abstractmethod ¤

Return the latest value that was sent to the channel.

RETURNS DESCRIPTION
Optional[T]

The latest value received by the channel, and None, if nothing has been sent to the channel yet.

Source code in frequenz/channels/_base_classes.py
147
148
149
150
151
152
153
154
@abstractmethod
def peek(self) -> Optional[T]:
    """Return the latest value that was sent to the channel.

    Returns:
        The latest value received by the channel, and `None`, if nothing
            has been sent to the channel yet.
    """

frequenz.channels.Receiver ¤

Bases: ABC, Generic[T]

A channel Receiver.

Source code in frequenz/channels/_base_classes.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class Receiver(ABC, Generic[T]):
    """A channel Receiver."""

    async def __anext__(self) -> T:
        """Await the next value in the async iteration over received values.

        Returns:
            The next value received.

        Raises:
            StopAsyncIteration: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        try:
            await self.ready()
            return self.consume()
        except ReceiverStoppedError as exc:
            raise StopAsyncIteration() from exc

    @abstractmethod
    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """

    @abstractmethod
    def consume(self) -> T:
        """Return the latest value once `ready()` is complete.

        `ready()` must be called before each call to `consume()`.

        Returns:
            The next value received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """

    def __aiter__(self) -> Receiver[T]:
        """Initialize the async iterator over received values.

        Returns:
            `self`, since no extra setup is needed for the iterator.
        """
        return self

    async def receive(self) -> T:
        """Receive a message from the channel.

        Returns:
            The received message.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
            ReceiverError: if there is some problem with the receiver.

        # noqa: DAR401 __cause__ (https://github.com/terrencepreilly/darglint/issues/181)
        """
        try:
            received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
        except StopAsyncIteration as exc:
            # If we already had a cause and it was the receiver was stopped,
            # then reuse that error, as StopAsyncIteration is just an artifact
            # introduced by __anext__.
            if (
                isinstance(exc.__cause__, ReceiverStoppedError)
                # pylint is not smart enough to figure out we checked above
                # this is a ReceiverStoppedError and thus it does have
                # a receiver member
                and exc.__cause__.receiver is self  # pylint: disable=no-member
            ):
                raise exc.__cause__
            raise ReceiverStoppedError(self) from exc
        return received

    def map(self, call: Callable[[T], U]) -> Receiver[U]:
        """Return a receiver with `call` applied on incoming messages.

        Args:
            call: function to apply on incoming messages.

        Returns:
            A `Receiver` to read results of the given function from.
        """
        return _Map(self, call)

    def into_peekable(self) -> Peekable[T]:
        """Convert the `Receiver` implementation into a `Peekable`.

        Once this function has been called, the receiver will no longer be
        usable, and calling `receive` on the receiver will raise an exception.

        Raises:
            NotImplementedError: when a `Receiver` implementation doesn't have
                a custom `into_peekable` implementation.
        """
        raise NotImplementedError("This receiver does not implement `into_peekable`")
Functions¤
__aiter__() ¤

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Receiver[T]

self, since no extra setup is needed for the iterator.

Source code in frequenz/channels/_base_classes.py
78
79
80
81
82
83
84
def __aiter__(self) -> Receiver[T]:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__() async ¤

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/_base_classes.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
async def __anext__(self) -> T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
consume() abstractmethod ¤

Return the latest value once ready() is complete.

ready() must be called before each call to consume().

RETURNS DESCRIPTION
T

The next value received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in frequenz/channels/_base_classes.py
64
65
66
67
68
69
70
71
72
73
74
75
76
@abstractmethod
def consume(self) -> T:
    """Return the latest value once `ready()` is complete.

    `ready()` must be called before each call to `consume()`.

    Returns:
        The next value received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
into_peekable() ¤

Convert the Receiver implementation into a Peekable.

Once this function has been called, the receiver will no longer be usable, and calling receive on the receiver will raise an exception.

RAISES DESCRIPTION
NotImplementedError

when a Receiver implementation doesn't have a custom into_peekable implementation.

Source code in frequenz/channels/_base_classes.py
126
127
128
129
130
131
132
133
134
135
136
def into_peekable(self) -> Peekable[T]:
    """Convert the `Receiver` implementation into a `Peekable`.

    Once this function has been called, the receiver will no longer be
    usable, and calling `receive` on the receiver will raise an exception.

    Raises:
        NotImplementedError: when a `Receiver` implementation doesn't have
            a custom `into_peekable` implementation.
    """
    raise NotImplementedError("This receiver does not implement `into_peekable`")
map(call) ¤

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[T], U]

RETURNS DESCRIPTION
Receiver[U]

A Receiver to read results of the given function from.

Source code in frequenz/channels/_base_classes.py
115
116
117
118
119
120
121
122
123
124
def map(self, call: Callable[[T], U]) -> Receiver[U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
ready() async abstractmethod ¤

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/_base_classes.py
51
52
53
54
55
56
57
58
59
60
61
62
@abstractmethod
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
receive() async ¤

Receive a message from the channel.

RETURNS DESCRIPTION
T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

noqa: DAR401 cause (https://github.com/terrencepreilly/darglint/issues/181)¤
Source code in frequenz/channels/_base_classes.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def receive(self) -> T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.

    # noqa: DAR401 __cause__ (https://github.com/terrencepreilly/darglint/issues/181)
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received

frequenz.channels.ReceiverError ¤

Bases: Error, Generic[T]

An error produced in a Receiver.

All exceptions generated by receivers inherit from this exception.

Source code in frequenz/channels/_exceptions.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ReceiverError(Error, Generic[T]):
    """An error produced in a [Receiver][frequenz.channels.Receiver].

    All exceptions generated by receivers inherit from this exception.
    """

    def __init__(self, message: Any, receiver: _base_classes.Receiver[T]):
        """Create an instance.

        Args:
            message: An error message.
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(message)
        self.receiver: _base_classes.Receiver[T] = receiver
Functions¤
__init__(message, receiver) ¤

Create an instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: Any

receiver

The Receiver where the error happened.

TYPE: _base_classes.Receiver[T]

Source code in frequenz/channels/_exceptions.py
84
85
86
87
88
89
90
91
92
93
def __init__(self, message: Any, receiver: _base_classes.Receiver[T]):
    """Create an instance.

    Args:
        message: An error message.
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(message)
    self.receiver: _base_classes.Receiver[T] = receiver

frequenz.channels.ReceiverInvalidatedError ¤

Bases: ReceiverError[T]

The Receiver was invalidated.

This happens when the Receiver is converted into a Peekable.

Source code in frequenz/channels/_exceptions.py
109
110
111
112
113
114
115
class ReceiverInvalidatedError(ReceiverError[T]):
    """The [Receiver][frequenz.channels.Receiver] was invalidated.

    This happens when the Receiver is converted
    [into][frequenz.channels.Receiver.into_peekable]
    a [Peekable][frequenz.channels.Peekable].
    """

frequenz.channels.ReceiverStoppedError ¤

Bases: ReceiverError[T]

The Receiver stopped producing messages.

Source code in frequenz/channels/_exceptions.py
 96
 97
 98
 99
100
101
102
103
104
105
106
class ReceiverStoppedError(ReceiverError[T]):
    """The [Receiver][frequenz.channels.Receiver] stopped producing messages."""

    def __init__(self, receiver: _base_classes.Receiver[T]):
        """Create an instance.

        Args:
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(f"Receiver {receiver} was stopped", receiver)
Functions¤
__init__(receiver) ¤

Create an instance.

PARAMETER DESCRIPTION
receiver

The Receiver where the error happened.

TYPE: _base_classes.Receiver[T]

Source code in frequenz/channels/_exceptions.py
 99
100
101
102
103
104
105
106
def __init__(self, receiver: _base_classes.Receiver[T]):
    """Create an instance.

    Args:
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(f"Receiver {receiver} was stopped", receiver)

frequenz.channels.Sender ¤

Bases: ABC, Generic[T]

A channel Sender.

Source code in frequenz/channels/_base_classes.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class Sender(ABC, Generic[T]):
    """A channel Sender."""

    @abstractmethod
    async def send(self, msg: T) -> None:
        """Send a message to the channel.

        Args:
            msg: The message to be sent.

        Raises:
            SenderError: if there was an error sending the message.
        """
Functions¤
send(msg) async abstractmethod ¤

Send a message to the channel.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: T

RAISES DESCRIPTION
SenderError

if there was an error sending the message.

Source code in frequenz/channels/_base_classes.py
20
21
22
23
24
25
26
27
28
29
@abstractmethod
async def send(self, msg: T) -> None:
    """Send a message to the channel.

    Args:
        msg: The message to be sent.

    Raises:
        SenderError: if there was an error sending the message.
    """

frequenz.channels.SenderError ¤

Bases: Error, Generic[T]

An error produced in a Sender.

All exceptions generated by senders inherit from this exception.

Source code in frequenz/channels/_exceptions.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class SenderError(Error, Generic[T]):
    """An error produced in a [Sender][frequenz.channels.Sender].

    All exceptions generated by senders inherit from this exception.
    """

    def __init__(self, message: Any, sender: _base_classes.Sender[T]):
        """Create an instance.

        Args:
            message: An error message.
            sender: The [Sender][frequenz.channels.Sender] where the error
                happened.
        """
        super().__init__(message)
        self.sender: _base_classes.Sender[T] = sender
Functions¤
__init__(message, sender) ¤

Create an instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: Any

sender

The Sender where the error happened.

TYPE: _base_classes.Sender[T]

Source code in frequenz/channels/_exceptions.py
66
67
68
69
70
71
72
73
74
75
def __init__(self, message: Any, sender: _base_classes.Sender[T]):
    """Create an instance.

    Args:
        message: An error message.
        sender: The [Sender][frequenz.channels.Sender] where the error
            happened.
    """
    super().__init__(message)
    self.sender: _base_classes.Sender[T] = sender