Skip to content

channels

frequenz.channels ¤

Channel implementations.

Classes¤

frequenz.channels.Anycast ¤

Bases: Generic[T]

A channel for sending data across async tasks.

Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

In cases where each message need to be received by every receiver, a Broadcast channel may be used.

Uses an deque internally, so Anycast channels are not thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


acast = channel.Anycast()

sender = acast.get_sender()
receiver_1 = acast.get_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/anycast.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Anycast(Generic[T]):
    """A channel for sending data across async tasks.

    Anycast channels support multiple senders and multiple receivers.  A message sent
    through a sender will be received by exactly one receiver.

    In cases where each message need to be received by every receiver, a
    [Broadcast][frequenz.channels.Broadcast] channel may be used.

    Uses an [deque][collections.deque] internally, so Anycast channels are not
    thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.Select],
    [Merge][frequenz.channels.Merge] or
    [MergeNamed][frequenz.channels.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        acast = channel.Anycast()

        sender = acast.get_sender()
        receiver_1 = acast.get_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, maxsize: int = 10) -> None:
        """Create an Anycast channel.

        Args:
            maxsize: Size of the channel's buffer.
        """
        self.limit: int = maxsize
        self.deque: Deque[T] = deque(maxlen=maxsize)
        self.send_cv: Condition = Condition()
        self.recv_cv: Condition = Condition()
        self.closed: bool = False

    async def close(self) -> None:
        """Close the channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on the channel,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.

        """
        self.closed = True
        async with self.send_cv:
            self.send_cv.notify_all()
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def get_sender(self) -> Sender[T]:
        """Create a new sender.

        Returns:
            A Sender instance attached to the Anycast channel.
        """
        return Sender(self)

    def get_receiver(self) -> Receiver[T]:
        """Create a new receiver.

        Returns:
            A Receiver instance attached to the Anycast channel.
        """
        return Receiver(self)
Functions¤
__init__(maxsize=10) ¤

Create an Anycast channel.

PARAMETER DESCRIPTION
maxsize

Size of the channel's buffer.

TYPE: int DEFAULT: 10

Source code in frequenz/channels/anycast.py
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, maxsize: int = 10) -> None:
    """Create an Anycast channel.

    Args:
        maxsize: Size of the channel's buffer.
    """
    self.limit: int = maxsize
    self.deque: Deque[T] = deque(maxlen=maxsize)
    self.send_cv: Condition = Condition()
    self.recv_cv: Condition = Condition()
    self.closed: bool = False
close() async ¤

Close the channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on the channel, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/anycast.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def close(self) -> None:
    """Close the channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on the channel,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.

    """
    self.closed = True
    async with self.send_cv:
        self.send_cv.notify_all()
    async with self.recv_cv:
        self.recv_cv.notify_all()
get_receiver() ¤

Create a new receiver.

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the Anycast channel.

Source code in frequenz/channels/anycast.py
101
102
103
104
105
106
107
def get_receiver(self) -> Receiver[T]:
    """Create a new receiver.

    Returns:
        A Receiver instance attached to the Anycast channel.
    """
    return Receiver(self)
get_sender() ¤

Create a new sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the Anycast channel.

Source code in frequenz/channels/anycast.py
93
94
95
96
97
98
99
def get_sender(self) -> Sender[T]:
    """Create a new sender.

    Returns:
        A Sender instance attached to the Anycast channel.
    """
    return Sender(self)

frequenz.channels.Bidirectional ¤

Bases: Generic[T, U]

A wrapper class for simulating bidirectional channels.

Source code in frequenz/channels/bidirectional.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Bidirectional(Generic[T, U]):
    """A wrapper class for simulating bidirectional channels."""

    def __init__(self, client_id: str, service_id: str) -> None:
        """Create a `Bidirectional` instance.

        Args:
            client_id: A name for the client, used to name the channels.
            service_id: A name for the service end of the channels.
        """
        self._client_id = client_id
        self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
        self._response_channel: Broadcast[U] = Broadcast(
            f"resp_{service_id}_{client_id}"
        )

        self._client_handle = BidirectionalHandle(
            self._request_channel.get_sender(),
            self._response_channel.get_receiver(),
        )
        self._service_handle = BidirectionalHandle(
            self._response_channel.get_sender(),
            self._request_channel.get_receiver(),
        )

    @property
    def client_handle(self) -> BidirectionalHandle[T, U]:
        """Get a BidirectionalHandle for the client to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._client_handle

    @property
    def service_handle(self) -> BidirectionalHandle[U, T]:
        """Get a `BidirectionalHandle` for the service to use.

        Returns:
            Object to send/receive messages with.
        """
        return self._service_handle
Functions¤
__init__(client_id, service_id) ¤

Create a Bidirectional instance.

PARAMETER DESCRIPTION
client_id

A name for the client, used to name the channels.

TYPE: str

service_id

A name for the service end of the channels.

TYPE: str

Source code in frequenz/channels/bidirectional.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, client_id: str, service_id: str) -> None:
    """Create a `Bidirectional` instance.

    Args:
        client_id: A name for the client, used to name the channels.
        service_id: A name for the service end of the channels.
    """
    self._client_id = client_id
    self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
    self._response_channel: Broadcast[U] = Broadcast(
        f"resp_{service_id}_{client_id}"
    )

    self._client_handle = BidirectionalHandle(
        self._request_channel.get_sender(),
        self._response_channel.get_receiver(),
    )
    self._service_handle = BidirectionalHandle(
        self._response_channel.get_sender(),
        self._request_channel.get_receiver(),
    )
client_handle() property ¤

Get a BidirectionalHandle for the client to use.

RETURNS DESCRIPTION
BidirectionalHandle[T, U]

Object to send/receive messages with.

Source code in frequenz/channels/bidirectional.py
39
40
41
42
43
44
45
46
@property
def client_handle(self) -> BidirectionalHandle[T, U]:
    """Get a BidirectionalHandle for the client to use.

    Returns:
        Object to send/receive messages with.
    """
    return self._client_handle
service_handle() property ¤

Get a BidirectionalHandle for the service to use.

RETURNS DESCRIPTION
BidirectionalHandle[U, T]

Object to send/receive messages with.

Source code in frequenz/channels/bidirectional.py
48
49
50
51
52
53
54
55
@property
def service_handle(self) -> BidirectionalHandle[U, T]:
    """Get a `BidirectionalHandle` for the service to use.

    Returns:
        Object to send/receive messages with.
    """
    return self._service_handle

frequenz.channels.BidirectionalHandle ¤

Bases: Sender[T], Receiver[U]

A handle to a Bidirectional instance.

It can be used to send/receive values between the client and service.

Source code in frequenz/channels/bidirectional.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class BidirectionalHandle(Sender[T], Receiver[U]):
    """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance.

    It can be used to send/receive values between the client and service.
    """

    def __init__(self, sender: Sender[T], receiver: Receiver[U]) -> None:
        """Create a `BidirectionalHandle` instance.

        Args:
            sender: A sender to send values with.
            receiver: A receiver to receive values from.
        """
        self._sender = sender
        self._receiver = receiver

    async def send(self, msg: T) -> bool:
        """Send a value to the other side.

        Args:
            msg: The value to send.

        Returns:
            Whether the send was successful or not.
        """
        return await self._sender.send(msg)

    async def receive(self) -> Optional[U]:
        """Receive a value from the other side.

        Returns:
            Received value, or `None` if the channels are closed.
        """
        return await self._receiver.receive()
Functions¤
__init__(sender, receiver) ¤

Create a BidirectionalHandle instance.

PARAMETER DESCRIPTION
sender

A sender to send values with.

TYPE: Sender[T]

receiver

A receiver to receive values from.

TYPE: Receiver[U]

Source code in frequenz/channels/bidirectional.py
64
65
66
67
68
69
70
71
72
def __init__(self, sender: Sender[T], receiver: Receiver[U]) -> None:
    """Create a `BidirectionalHandle` instance.

    Args:
        sender: A sender to send values with.
        receiver: A receiver to receive values from.
    """
    self._sender = sender
    self._receiver = receiver
receive() async ¤

Receive a value from the other side.

RETURNS DESCRIPTION
Optional[U]

Received value, or None if the channels are closed.

Source code in frequenz/channels/bidirectional.py
85
86
87
88
89
90
91
async def receive(self) -> Optional[U]:
    """Receive a value from the other side.

    Returns:
        Received value, or `None` if the channels are closed.
    """
    return await self._receiver.receive()
send(msg) async ¤

Send a value to the other side.

PARAMETER DESCRIPTION
msg

The value to send.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the send was successful or not.

Source code in frequenz/channels/bidirectional.py
74
75
76
77
78
79
80
81
82
83
async def send(self, msg: T) -> bool:
    """Send a value to the other side.

    Args:
        msg: The value to send.

    Returns:
        Whether the send was successful or not.
    """
    return await self._sender.send(msg)

frequenz.channels.Broadcast ¤

Bases: Generic[T]

A channel to broadcast messages to multiple receivers.

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Internally, a broadcast receiver's buffer is implemented with just append/pop operations on either side of a deque, which are thread-safe. Because of this, Broadcast channels are thread-safe.

When there are multiple channel receivers, they can be awaited simultaneously using Select, Merge or MergeNamed.

Example
async def send(sender: channel.Sender) -> None:
    while True:
        next = random.randint(3, 17)
        print(f"sending: {next}")
        await sender.send(next)


async def recv(id: int, receiver: channel.Receiver) -> None:
    while True:
        next = await receiver.receive()
        print(f"receiver_{id} received {next}")
        await asyncio.sleep(0.1) # sleep (or work) with the data


bcast = channel.Broadcast()

sender = bcast.get_sender()
receiver_1 = bcast.get_receiver()

asyncio.create_task(send(sender))

await recv(1, receiver_1)

Check the tests and benchmarks directories for more examples.

Source code in frequenz/channels/broadcast.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class Broadcast(Generic[T]):
    """A channel to broadcast messages to multiple receivers.

    `Broadcast` channels can have multiple senders and multiple receivers. Each
    message sent through any of the senders is received by all of the
    receivers.

    Internally, a broadcast receiver's buffer is implemented with just
    append/pop operations on either side of a [deque][collections.deque], which
    are thread-safe.  Because of this, `Broadcast` channels are thread-safe.

    When there are multiple channel receivers, they can be awaited
    simultaneously using [Select][frequenz.channels.Select],
    [Merge][frequenz.channels.Merge] or
    [MergeNamed][frequenz.channels.MergeNamed].

    Example:
        ``` python
        async def send(sender: channel.Sender) -> None:
            while True:
                next = random.randint(3, 17)
                print(f"sending: {next}")
                await sender.send(next)


        async def recv(id: int, receiver: channel.Receiver) -> None:
            while True:
                next = await receiver.receive()
                print(f"receiver_{id} received {next}")
                await asyncio.sleep(0.1) # sleep (or work) with the data


        bcast = channel.Broadcast()

        sender = bcast.get_sender()
        receiver_1 = bcast.get_receiver()

        asyncio.create_task(send(sender))

        await recv(1, receiver_1)
        ```

        Check the `tests` and `benchmarks` directories for more examples.
    """

    def __init__(self, name: str, resend_latest: bool = False) -> None:
        """Create a Broadcast channel.

        Args:
            name: A name for the broadcast channel, typically based on the type
                of data sent through it.  Used to identify the channel in the
                logs.
            resend_latest: When True, every time a new receiver is created with
                `get_receiver`, it will automatically get sent the latest value
                on the channel.  This allows new receivers on slow streams to
                get the latest value as soon as they are created, without having
                to wait for the next message on the channel to arrive.
        """
        self.name: str = name
        self._resend_latest = resend_latest

        self.recv_cv: Condition = Condition()
        self.receivers: Dict[UUID, Receiver[T]] = {}
        self.closed: bool = False
        self._latest: Optional[T] = None

    async def close(self) -> None:
        """Close the Broadcast channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on their queues,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._latest = None
        self.closed = True
        async with self.recv_cv:
            self.recv_cv.notify_all()

    def _drop_receiver(self, uuid: UUID) -> None:
        """Drop a specific receiver from the list of broadcast receivers.

        Called from the destructors of receivers.

        Args:
            uuid: a uuid identifying the receiver to be dropped.
        """
        if uuid in self.receivers:
            del self.receivers[uuid]

    def get_sender(self) -> Sender[T]:
        """Create a new broadcast sender.

        Returns:
            A Sender instance attached to the broadcast channel.
        """
        return Sender(self)

    def get_receiver(
        self, name: Optional[str] = None, maxsize: int = 50
    ) -> Receiver[T]:
        """Create a new broadcast receiver.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            name: A name to identify the receiver in the logs.
            maxsize: Size of the receiver's buffer.

        Returns:
            A Receiver instance attached to the broadcast channel.
        """
        uuid = uuid4()
        if name is None:
            name = str(uuid)
        recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
        self.receivers[uuid] = recv
        if self._resend_latest and self._latest is not None:
            recv.enqueue(self._latest)
        return recv

    def get_peekable(self) -> Peekable[T]:
        """Create a new Peekable for the broadcast channel.

        A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
        that allows the user to get a peek at the latest value in the channel,
        without consuming anything.

        Returns:
            A Peekable to peek into the broadcast channel with.
        """
        return Peekable(self)
Functions¤
__init__(name, resend_latest=False) ¤

Create a Broadcast channel.

PARAMETER DESCRIPTION
name

A name for the broadcast channel, typically based on the type of data sent through it. Used to identify the channel in the logs.

TYPE: str

resend_latest

When True, every time a new receiver is created with get_receiver, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, without having to wait for the next message on the channel to arrive.

TYPE: bool DEFAULT: False

Source code in frequenz/channels/broadcast.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, name: str, resend_latest: bool = False) -> None:
    """Create a Broadcast channel.

    Args:
        name: A name for the broadcast channel, typically based on the type
            of data sent through it.  Used to identify the channel in the
            logs.
        resend_latest: When True, every time a new receiver is created with
            `get_receiver`, it will automatically get sent the latest value
            on the channel.  This allows new receivers on slow streams to
            get the latest value as soon as they are created, without having
            to wait for the next message on the channel to arrive.
    """
    self.name: str = name
    self._resend_latest = resend_latest

    self.recv_cv: Condition = Condition()
    self.receivers: Dict[UUID, Receiver[T]] = {}
    self.closed: bool = False
    self._latest: Optional[T] = None
close() async ¤

Close the Broadcast channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on their queues, but after that, subsequent receive() calls will return None immediately.

Source code in frequenz/channels/broadcast.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def close(self) -> None:
    """Close the Broadcast channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on their queues,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._latest = None
    self.closed = True
    async with self.recv_cv:
        self.recv_cv.notify_all()
get_peekable() ¤

Create a new Peekable for the broadcast channel.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

RETURNS DESCRIPTION
Peekable[T]

A Peekable to peek into the broadcast channel with.

Source code in frequenz/channels/broadcast.py
148
149
150
151
152
153
154
155
156
157
158
def get_peekable(self) -> Peekable[T]:
    """Create a new Peekable for the broadcast channel.

    A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
    that allows the user to get a peek at the latest value in the channel,
    without consuming anything.

    Returns:
        A Peekable to peek into the broadcast channel with.
    """
    return Peekable(self)
get_receiver(name=None, maxsize=50) ¤

Create a new broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: Optional[str] DEFAULT: None

maxsize

Size of the receiver's buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[T]

A Receiver instance attached to the broadcast channel.

Source code in frequenz/channels/broadcast.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_receiver(
    self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
    """Create a new broadcast receiver.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        name: A name to identify the receiver in the logs.
        maxsize: Size of the receiver's buffer.

    Returns:
        A Receiver instance attached to the broadcast channel.
    """
    uuid = uuid4()
    if name is None:
        name = str(uuid)
    recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
    self.receivers[uuid] = recv
    if self._resend_latest and self._latest is not None:
        recv.enqueue(self._latest)
    return recv
get_sender() ¤

Create a new broadcast sender.

RETURNS DESCRIPTION
Sender[T]

A Sender instance attached to the broadcast channel.

Source code in frequenz/channels/broadcast.py
115
116
117
118
119
120
121
def get_sender(self) -> Sender[T]:
    """Create a new broadcast sender.

    Returns:
        A Sender instance attached to the broadcast channel.
    """
    return Sender(self)

frequenz.channels.BufferedReceiver ¤

Bases: Receiver[T]

A channel receiver with a buffer.

Source code in frequenz/channels/base_classes.py
107
108
109
110
111
112
113
114
115
116
class BufferedReceiver(Receiver[T]):
    """A channel receiver with a buffer."""

    @abstractmethod
    def enqueue(self, msg: T) -> None:
        """Put a message into this buffered receiver's queue.

        Args:
            msg: The message to be added to the queue.
        """
Functions¤
enqueue(msg) abstractmethod ¤

Put a message into this buffered receiver's queue.

PARAMETER DESCRIPTION
msg

The message to be added to the queue.

TYPE: T

Source code in frequenz/channels/base_classes.py
110
111
112
113
114
115
116
@abstractmethod
def enqueue(self, msg: T) -> None:
    """Put a message into this buffered receiver's queue.

    Args:
        msg: The message to be added to the queue.
    """

frequenz.channels.FileWatcher ¤

Bases: Receiver[pathlib.Path]

A channel receiver that watches for file events.

Source code in frequenz/channels/utils/file_watcher.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class FileWatcher(Receiver[pathlib.Path]):
    """A channel receiver that watches for file events."""

    def __init__(
        self,
        paths: List[Union[pathlib.Path, str]],
        event_types: Optional[Set[EventType]] = None,
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for or `None` to watch for
                all event types.
        """
        if event_types is None:
            event_types = {EventType.CREATE, EventType.MODIFY, EventType.DELETE}

        self.event_types = event_types
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths,
            stop_event=self._stop_event,
            watch_filter=lambda change, path_str: (
                change in [event_type.value for event_type in event_types]  # type: ignore
                and pathlib.Path(path_str).is_file()
            ),
        )

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def receive(self) -> Optional[pathlib.Path]:
        """Wait for the next file event and return its path.

        Returns:
            Path of next file.
        """
        while True:
            changes = await self._awatch.__anext__()
            for change in changes:
                # Tuple of (Change, path) returned by watchfiles
                if change is None or len(change) != 2:
                    return None

                _, path_str = change
                path = pathlib.Path(path_str)
                return path
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in frequenz/channels/utils/file_watcher.py
56
57
58
59
60
61
62
63
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=None) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: List[Union[pathlib.Path, str]]

event_types

Types of events to watch for or None to watch for all event types.

TYPE: Optional[Set[EventType]] DEFAULT: None

Source code in frequenz/channels/utils/file_watcher.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    paths: List[Union[pathlib.Path, str]],
    event_types: Optional[Set[EventType]] = None,
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for or `None` to watch for
            all event types.
    """
    if event_types is None:
        event_types = {EventType.CREATE, EventType.MODIFY, EventType.DELETE}

    self.event_types = event_types
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths,
        stop_event=self._stop_event,
        watch_filter=lambda change, path_str: (
            change in [event_type.value for event_type in event_types]  # type: ignore
            and pathlib.Path(path_str).is_file()
        ),
    )
receive() async ¤

Wait for the next file event and return its path.

RETURNS DESCRIPTION
Optional[pathlib.Path]

Path of next file.

Source code in frequenz/channels/utils/file_watcher.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def receive(self) -> Optional[pathlib.Path]:
    """Wait for the next file event and return its path.

    Returns:
        Path of next file.
    """
    while True:
        changes = await self._awatch.__anext__()
        for change in changes:
            # Tuple of (Change, path) returned by watchfiles
            if change is None or len(change) != 2:
                return None

            _, path_str = change
            path = pathlib.Path(path_str)
            return path

frequenz.channels.Merge ¤

Bases: Receiver[T]

Merge messages coming from multiple channels into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream, by using Merge like this:

merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
    # do something with msg
    pass
Source code in frequenz/channels/merge.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Merge(Receiver[T]):
    """Merge messages coming from multiple channels into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream, by using `Merge` like this:

        ```python
        merge = Merge(receiver1, receiver2)
        while msg := await merge.receive():
            # do something with msg
            pass
        ```
    """

    def __init__(self, *args: Receiver[T]) -> None:
        """Create a `Merge` instance.

        Args:
            *args: sequence of channel receivers.
        """
        self._receivers = {str(id): recv for id, recv in enumerate(args)}
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.receive(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def receive(self) -> Optional[T]:
        """Wait until there's a message in any of the channels.

        Returns:
            The next message that was received, or `None`, if all channels have
                closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            if len(self._results) > 0:
                return self._results.popleft()

            if len(self._pending) == 0:
                return None
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                result = item.result()
                # if channel is closed, don't add a task for it again.
                if result is None:
                    continue
                self._results.append(result)
                self._pending.add(
                    asyncio.create_task(self._receivers[name].receive(), name=name)
                )
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/merge.py
42
43
44
45
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(*args) ¤

Create a Merge instance.

PARAMETER DESCRIPTION
*args

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: ()

Source code in frequenz/channels/merge.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, *args: Receiver[T]) -> None:
    """Create a `Merge` instance.

    Args:
        *args: sequence of channel receivers.
    """
    self._receivers = {str(id): recv for id, recv in enumerate(args)}
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.receive(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[T] = deque(maxlen=len(self._receivers))
receive() async ¤

Wait until there's a message in any of the channels.

RETURNS DESCRIPTION
Optional[T]

The next message that was received, or None, if all channels have closed.

Source code in frequenz/channels/merge.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def receive(self) -> Optional[T]:
    """Wait until there's a message in any of the channels.

    Returns:
        The next message that was received, or `None`, if all channels have
            closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        if len(self._results) > 0:
            return self._results.popleft()

        if len(self._pending) == 0:
            return None
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            result = item.result()
            # if channel is closed, don't add a task for it again.
            if result is None:
                continue
            self._results.append(result)
            self._pending.add(
                asyncio.create_task(self._receivers[name].receive(), name=name)
            )

frequenz.channels.MergeNamed ¤

Bases: Receiver[Tuple[str, T]]

Merge messages coming from multiple named channels into a single stream.

Source code in frequenz/channels/merge_named.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class MergeNamed(Receiver[Tuple[str, T]]):
    """Merge messages coming from multiple named channels into a single stream."""

    def __init__(self, **kwargs: Receiver[T]) -> None:
        """Create a `MergeNamed` instance.

        Args:
            **kwargs: sequence of channel receivers.
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = {
            asyncio.create_task(recv.receive(), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def receive(self) -> Optional[Tuple[str, T]]:
        """Wait until there's a message in any of the channels.

        Returns:
            The next message that was received, or `None`, if all channels have
                closed.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            if len(self._results) > 0:
                return self._results.popleft()

            if len(self._pending) == 0:
                return None
            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                result = item.result()
                # if channel is closed, don't add a task for it again.
                if result is None:
                    continue
                self._results.append((name, result))
                self._pending.add(
                    asyncio.create_task(self._receivers[name].receive(), name=name)
                )
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/merge_named.py
29
30
31
32
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__init__(**kwargs) ¤

Create a MergeNamed instance.

PARAMETER DESCRIPTION
**kwargs

sequence of channel receivers.

TYPE: Receiver[T] DEFAULT: {}

Source code in frequenz/channels/merge_named.py
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, **kwargs: Receiver[T]) -> None:
    """Create a `MergeNamed` instance.

    Args:
        **kwargs: sequence of channel receivers.
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = {
        asyncio.create_task(recv.receive(), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
receive() async ¤

Wait until there's a message in any of the channels.

RETURNS DESCRIPTION
Optional[Tuple[str, T]]

The next message that was received, or None, if all channels have closed.

Source code in frequenz/channels/merge_named.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def receive(self) -> Optional[Tuple[str, T]]:
    """Wait until there's a message in any of the channels.

    Returns:
        The next message that was received, or `None`, if all channels have
            closed.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        if len(self._results) > 0:
            return self._results.popleft()

        if len(self._pending) == 0:
            return None
        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            result = item.result()
            # if channel is closed, don't add a task for it again.
            if result is None:
                continue
            self._results.append((name, result))
            self._pending.add(
                asyncio.create_task(self._receivers[name].receive(), name=name)
            )

frequenz.channels.Peekable ¤

Bases: ABC, Generic[T]

A channel peekable.

A Peekable provides a peek() method that allows the user to get a peek at the latest value in the channel, without consuming anything.

Source code in frequenz/channels/base_classes.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Peekable(ABC, Generic[T]):
    """A channel peekable.

    A Peekable provides a [peek()][frequenz.channels.Peekable] method that
    allows the user to get a peek at the latest value in the channel, without
    consuming anything.
    """

    @abstractmethod
    def peek(self) -> Optional[T]:
        """Return the latest value that was sent to the channel.

        Returns:
            The latest value received by the channel, and `None`, if nothing
                has been sent to the channel yet.
        """
Functions¤
peek() abstractmethod ¤

Return the latest value that was sent to the channel.

RETURNS DESCRIPTION
Optional[T]

The latest value received by the channel, and None, if nothing has been sent to the channel yet.

Source code in frequenz/channels/base_classes.py
 97
 98
 99
100
101
102
103
104
@abstractmethod
def peek(self) -> Optional[T]:
    """Return the latest value that was sent to the channel.

    Returns:
        The latest value received by the channel, and `None`, if nothing
            has been sent to the channel yet.
    """

frequenz.channels.Receiver ¤

Bases: ABC, Generic[T]

A channel Receiver.

Source code in frequenz/channels/base_classes.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class Receiver(ABC, Generic[T]):
    """A channel Receiver."""

    @abstractmethod
    async def receive(self) -> Optional[T]:
        """Receive a message from the channel.

        Returns:
            `None`, if the channel is closed, a message otherwise.
        """

    def __aiter__(self) -> Receiver[T]:
        """Initialize the async iterator over received values.

        Returns:
            `self`, since no extra setup is needed for the iterator.
        """
        return self

    async def __anext__(self) -> T:
        """Await the next value in the async iteration over received values.

        Returns:
            The next value received.

        Raises:
            StopAsyncIteration: if we receive `None`, i.e. if the underlying
                channel is closed.
        """
        received = await self.receive()
        if received is None:
            raise StopAsyncIteration
        return received

    def map(self, call: Callable[[T], U]) -> Receiver[U]:
        """Return a receiver with `call` applied on incoming messages.

        Args:
            call: function to apply on incoming messages.

        Returns:
            A `Receiver` to read results of the given function from.
        """
        return _Map(self, call)

    def into_peekable(self) -> Peekable[T]:
        """Convert the `Receiver` implementation into a `Peekable`.

        Once this function has been called, the receiver will no longer be
        usable, and calling `receive` on the receiver will raise an exception.

        Raises:
            NotImplementedError: when a `Receiver` implementation doesn't have
                a custom `get_peekable` implementation.
        """
        raise NotImplementedError("This receiver does not implement `into_peekable`")
Functions¤
__aiter__() ¤

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Receiver[T]

self, since no extra setup is needed for the iterator.

Source code in frequenz/channels/base_classes.py
42
43
44
45
46
47
48
def __aiter__(self) -> Receiver[T]:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__() async ¤

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if we receive None, i.e. if the underlying channel is closed.

Source code in frequenz/channels/base_classes.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
async def __anext__(self) -> T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if we receive `None`, i.e. if the underlying
            channel is closed.
    """
    received = await self.receive()
    if received is None:
        raise StopAsyncIteration
    return received
into_peekable() ¤

Convert the Receiver implementation into a Peekable.

Once this function has been called, the receiver will no longer be usable, and calling receive on the receiver will raise an exception.

RAISES DESCRIPTION
NotImplementedError

when a Receiver implementation doesn't have a custom get_peekable implementation.

Source code in frequenz/channels/base_classes.py
76
77
78
79
80
81
82
83
84
85
86
def into_peekable(self) -> Peekable[T]:
    """Convert the `Receiver` implementation into a `Peekable`.

    Once this function has been called, the receiver will no longer be
    usable, and calling `receive` on the receiver will raise an exception.

    Raises:
        NotImplementedError: when a `Receiver` implementation doesn't have
            a custom `get_peekable` implementation.
    """
    raise NotImplementedError("This receiver does not implement `into_peekable`")
map(call) ¤

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[T], U]

RETURNS DESCRIPTION
Receiver[U]

A Receiver to read results of the given function from.

Source code in frequenz/channels/base_classes.py
65
66
67
68
69
70
71
72
73
74
def map(self, call: Callable[[T], U]) -> Receiver[U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
receive() abstractmethod async ¤

Receive a message from the channel.

RETURNS DESCRIPTION
Optional[T]

None, if the channel is closed, a message otherwise.

Source code in frequenz/channels/base_classes.py
34
35
36
37
38
39
40
@abstractmethod
async def receive(self) -> Optional[T]:
    """Receive a message from the channel.

    Returns:
        `None`, if the channel is closed, a message otherwise.
    """

frequenz.channels.Select ¤

Select the next available message from a group of AsyncIterators.

If Select was created with more AsyncIterator than what are read in the if-chain after each call to ready(), messages coming in the additional async iterators are dropped, and a warning message is logged.

Receivers also function as AsyncIterator.

Example

For example, if there are two async iterators that you want to simultaneously wait on, this can be done with:

select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
    if msg := select.name1:
        if val := msg.inner:
            # do something with `val`
            pass
        else:
            # handle closure of receiver.
            pass
    elif msg := select.name2:
        # do something with `msg.inner`
        pass
Source code in frequenz/channels/select.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class Select:
    """Select the next available message from a group of AsyncIterators.

    If `Select` was created with more `AsyncIterator` than what are read in
    the if-chain after each call to [ready()][frequenz.channels.Select.ready],
    messages coming in the additional async iterators are dropped, and
    a warning message is logged.

    [Receiver][frequenz.channels.Receiver]s also function as `AsyncIterator`.

    Example:
        For example, if there are two async iterators that you want to
        simultaneously wait on, this can be done with:

        ```python
        select = Select(name1 = receiver1, name2 = receiver2)
        while await select.ready():
            if msg := select.name1:
                if val := msg.inner:
                    # do something with `val`
                    pass
                else:
                    # handle closure of receiver.
                    pass
            elif msg := select.name2:
                # do something with `msg.inner`
                pass
        ```
    """

    def __init__(self, **kwargs: AsyncIterator[Any]) -> None:
        """Create a `Select` instance.

        Args:
            **kwargs: sequence of async iterators
        """
        self._receivers = kwargs
        self._pending: Set[asyncio.Task[Any]] = set()

        for name, recv in self._receivers.items():
            # can replace __anext__() to anext() (Only Python 3.10>=)
            msg = recv.__anext__()  # pylint: disable=unnecessary-dunder-call
            self._pending.add(asyncio.create_task(msg, name=name))  # type: ignore

        self._ready_count = 0
        self._prev_ready_count = 0
        self._result: Dict[str, Optional[_Selected]] = {
            name: None for name in self._receivers
        }

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()

    async def ready(self) -> bool:
        """Wait until there is a message in any of the async iterators.

        Returns `True` if there is a message available, and `False` if all
        async iterators have closed.

        Returns:
            Whether there are further messages or not.
        """
        if self._ready_count > 0:
            if self._ready_count == self._prev_ready_count:
                dropped_names: List[str] = []
                for name, value in self._result.items():
                    if value is not None:
                        dropped_names.append(name)
                        self._result[name] = None
                self._ready_count = 0
                self._prev_ready_count = 0
                logger.warning(
                    "Select.ready() dropped data from async iterator(s): %s, "
                    "because no messages have been fetched since the last call to ready().",
                    dropped_names,
                )
            else:
                self._prev_ready_count = self._ready_count
                return True
        if len(self._pending) == 0:
            return False

        # once all the pending messages have been consumed, reset the
        # `_prev_ready_count` as well, and wait for new messages.
        self._prev_ready_count = 0

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            if isinstance(item.exception(), StopAsyncIteration):
                result = None
            else:
                result = item.result()
            self._ready_count += 1
            self._result[name] = _Selected(result)
            # if channel or AsyncIterator is closed
            # don't add a task for it again.
            if result is None:
                continue
            msg = self._receivers[  # pylint: disable=unnecessary-dunder-call
                name
            ].__anext__()
            self._pending.add(asyncio.create_task(msg, name=name))  # type: ignore
        return True

    def __getattr__(self, name: str) -> Optional[Any]:
        """Return the latest unread message from a `AsyncIterator`, if available.

        Args:
            name: Name of the channel.

        Returns:
            Latest unread message for the specified `AsyncIterator`, or `None`.

        Raises:
            KeyError: when the name was not specified when creating the
                `Select` instance.
        """
        result = self._result[name]
        if result is None:
            return result
        self._result[name] = None
        self._ready_count -= 1
        return result
Functions¤
__del__() ¤

Cleanup any pending tasks.

Source code in frequenz/channels/select.py
81
82
83
84
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
__getattr__(name) ¤

Return the latest unread message from a AsyncIterator, if available.

PARAMETER DESCRIPTION
name

Name of the channel.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Latest unread message for the specified AsyncIterator, or None.

RAISES DESCRIPTION
KeyError

when the name was not specified when creating the Select instance.

Source code in frequenz/channels/select.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def __getattr__(self, name: str) -> Optional[Any]:
    """Return the latest unread message from a `AsyncIterator`, if available.

    Args:
        name: Name of the channel.

    Returns:
        Latest unread message for the specified `AsyncIterator`, or `None`.

    Raises:
        KeyError: when the name was not specified when creating the
            `Select` instance.
    """
    result = self._result[name]
    if result is None:
        return result
    self._result[name] = None
    self._ready_count -= 1
    return result
__init__(**kwargs) ¤

Create a Select instance.

PARAMETER DESCRIPTION
**kwargs

sequence of async iterators

TYPE: AsyncIterator[Any] DEFAULT: {}

Source code in frequenz/channels/select.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(self, **kwargs: AsyncIterator[Any]) -> None:
    """Create a `Select` instance.

    Args:
        **kwargs: sequence of async iterators
    """
    self._receivers = kwargs
    self._pending: Set[asyncio.Task[Any]] = set()

    for name, recv in self._receivers.items():
        # can replace __anext__() to anext() (Only Python 3.10>=)
        msg = recv.__anext__()  # pylint: disable=unnecessary-dunder-call
        self._pending.add(asyncio.create_task(msg, name=name))  # type: ignore

    self._ready_count = 0
    self._prev_ready_count = 0
    self._result: Dict[str, Optional[_Selected]] = {
        name: None for name in self._receivers
    }
ready() async ¤

Wait until there is a message in any of the async iterators.

Returns True if there is a message available, and False if all async iterators have closed.

RETURNS DESCRIPTION
bool

Whether there are further messages or not.

Source code in frequenz/channels/select.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def ready(self) -> bool:
    """Wait until there is a message in any of the async iterators.

    Returns `True` if there is a message available, and `False` if all
    async iterators have closed.

    Returns:
        Whether there are further messages or not.
    """
    if self._ready_count > 0:
        if self._ready_count == self._prev_ready_count:
            dropped_names: List[str] = []
            for name, value in self._result.items():
                if value is not None:
                    dropped_names.append(name)
                    self._result[name] = None
            self._ready_count = 0
            self._prev_ready_count = 0
            logger.warning(
                "Select.ready() dropped data from async iterator(s): %s, "
                "because no messages have been fetched since the last call to ready().",
                dropped_names,
            )
        else:
            self._prev_ready_count = self._ready_count
            return True
    if len(self._pending) == 0:
        return False

    # once all the pending messages have been consumed, reset the
    # `_prev_ready_count` as well, and wait for new messages.
    self._prev_ready_count = 0

    done, self._pending = await asyncio.wait(
        self._pending, return_when=asyncio.FIRST_COMPLETED
    )
    for item in done:
        name = item.get_name()
        if isinstance(item.exception(), StopAsyncIteration):
            result = None
        else:
            result = item.result()
        self._ready_count += 1
        self._result[name] = _Selected(result)
        # if channel or AsyncIterator is closed
        # don't add a task for it again.
        if result is None:
            continue
        msg = self._receivers[  # pylint: disable=unnecessary-dunder-call
            name
        ].__anext__()
        self._pending.add(asyncio.create_task(msg, name=name))  # type: ignore
    return True

frequenz.channels.Sender ¤

Bases: ABC, Generic[T]

A channel Sender.

Source code in frequenz/channels/base_classes.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Sender(ABC, Generic[T]):
    """A channel Sender."""

    @abstractmethod
    async def send(self, msg: T) -> bool:
        """Send a message to the channel.

        Args:
            msg: The message to be sent.

        Returns:
            Whether the message was sent, based on whether the channel is open
                or not.
        """
Functions¤
send(msg) abstractmethod async ¤

Send a message to the channel.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: T

RETURNS DESCRIPTION
bool

Whether the message was sent, based on whether the channel is open or not.

Source code in frequenz/channels/base_classes.py
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
async def send(self, msg: T) -> bool:
    """Send a message to the channel.

    Args:
        msg: The message to be sent.

    Returns:
        Whether the message was sent, based on whether the channel is open
            or not.
    """

frequenz.channels.Timer ¤

Bases: Receiver[datetime]

A timer receiver that returns the timestamp every interval seconds.

Primarily for use with Select.

Example

When you want something to happen with a fixed period:

timer = channel.Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do once every 30 seconds
        pass

When you want something to happen when nothing else has happened in a certain interval:

timer = channel.Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
    timer.reset()
    if msg := select.bat_1:
        if val := msg.inner:
            process_data(val)
        else:
            logging.warn("battery channel closed")
    if ts := select.timer:
        # something to do if there's no battery data for 30 seconds
        pass
Source code in frequenz/channels/utils/timer.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class Timer(Receiver[datetime]):
    """A timer receiver that returns the timestamp every `interval` seconds.

    Primarily for use with [Select][frequenz.channels.Select].

    Example:
        When you want something to happen with a fixed period:

        ```python
        timer = channel.Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do once every 30 seconds
                pass
        ```

        When you want something to happen when nothing else has happened in a
        certain interval:

        ```python
        timer = channel.Timer(30.0)
        select = Select(bat_1 = receiver1, timer = timer)
        while await select.ready():
            timer.reset()
            if msg := select.bat_1:
                if val := msg.inner:
                    process_data(val)
                else:
                    logging.warn("battery channel closed")
            if ts := select.timer:
                # something to do if there's no battery data for 30 seconds
                pass
        ```
    """

    def __init__(self, interval: float) -> None:
        """Create a `Timer` instance.

        Args:
            interval: number of seconds between messages.
        """
        self._stopped = False
        self._interval = timedelta(seconds=interval)
        self._next_msg_time = datetime.now() + self._interval

    def reset(self) -> None:
        """Reset the timer to start timing from `now`."""
        self._next_msg_time = datetime.now() + self._interval

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to
        [receive()][frequenz.channels.Timer.receive] will immediately return
        `None`.
        """
        self._stopped = True

    async def receive(self) -> Optional[datetime]:
        """Return the current time once the next tick is due.

        Returns:
            The time of the next tick or `None` if
                [stop()][frequenz.channels.Timer.stop] has been called on the
                timer.
        """
        if self._stopped:
            return None
        now = datetime.now()
        diff = self._next_msg_time - now
        while diff.total_seconds() > 0:
            await asyncio.sleep(diff.total_seconds())
            now = datetime.now()
            diff = self._next_msg_time - now

        self._next_msg_time = now + self._interval

        return now
Functions¤
__init__(interval) ¤

Create a Timer instance.

PARAMETER DESCRIPTION
interval

number of seconds between messages.

TYPE: float

Source code in frequenz/channels/utils/timer.py
54
55
56
57
58
59
60
61
62
def __init__(self, interval: float) -> None:
    """Create a `Timer` instance.

    Args:
        interval: number of seconds between messages.
    """
    self._stopped = False
    self._interval = timedelta(seconds=interval)
    self._next_msg_time = datetime.now() + self._interval
receive() async ¤

Return the current time once the next tick is due.

RETURNS DESCRIPTION
Optional[datetime]

The time of the next tick or None if stop() has been called on the timer.

Source code in frequenz/channels/utils/timer.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def receive(self) -> Optional[datetime]:
    """Return the current time once the next tick is due.

    Returns:
        The time of the next tick or `None` if
            [stop()][frequenz.channels.Timer.stop] has been called on the
            timer.
    """
    if self._stopped:
        return None
    now = datetime.now()
    diff = self._next_msg_time - now
    while diff.total_seconds() > 0:
        await asyncio.sleep(diff.total_seconds())
        now = datetime.now()
        diff = self._next_msg_time - now

    self._next_msg_time = now + self._interval

    return now
reset() ¤

Reset the timer to start timing from now.

Source code in frequenz/channels/utils/timer.py
64
65
66
def reset(self) -> None:
    """Reset the timer to start timing from `now`."""
    self._next_msg_time = datetime.now() + self._interval
stop() ¤

Stop the timer.

Once stop has been called, all subsequent calls to receive() will immediately return None.

Source code in frequenz/channels/utils/timer.py
68
69
70
71
72
73
74
75
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to
    [receive()][frequenz.channels.Timer.receive] will immediately return
    `None`.
    """
    self._stopped = True