util
frequenz.channels.util
¤
Channel utilities.
A module with several utilities to work with channels:
-
FileWatcher: A receiver that watches for file events.
-
Merge: A receiver that merge messages coming from multiple receivers into a single stream.
-
MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.
-
Select: A helper to select the next available message for each receiver in a group of receivers.
-
Timer: A receiver that emits a now
timestamp
everyinterval
seconds.
Classes¤
frequenz.channels.util.FileWatcher
¤
A channel receiver that watches for file events.
Source code in frequenz/channels/util/_file_watcher.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
|
Classes¤
EventType
¤
Bases: Enum
Available types of changes to watch for.
Source code in frequenz/channels/util/_file_watcher.py
20 21 22 23 24 25 |
|
Functions¤
__del__()
¤
Cleanup registered watches.
awatch
passes the stop_event
to a separate task/thread. This way
awatch
getting destroyed properly. The background task will continue
until the signal is received.
Source code in frequenz/channels/util/_file_watcher.py
59 60 61 62 63 64 65 66 |
|
__init__(paths, event_types=None)
¤
Create a FileWatcher
instance.
PARAMETER | DESCRIPTION |
---|---|
paths |
Paths to watch for changes. |
event_types |
Types of events to watch for or |
Source code in frequenz/channels/util/_file_watcher.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
consume()
¤
Return the latest change once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
pathlib.Path
|
The next change that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_file_watcher.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_file_watcher.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
frequenz.channels.util.Merge
¤
Bases: Receiver[T]
Merge messages coming from multiple channels into a single stream.
Example
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using Merge
like this:
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
When merge
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in frequenz/channels/util/_merge.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge.py
46 47 48 49 |
|
__init__(*args)
¤
Create a Merge
instance.
PARAMETER | DESCRIPTION |
---|---|
*args |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge.py
33 34 35 36 37 38 39 40 41 42 43 44 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
T
|
The next value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_merge.py
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_merge.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
stop()
async
¤
Stop the Merge
instance and cleanup any pending tasks.
Source code in frequenz/channels/util/_merge.py
51 52 53 54 55 56 |
|
frequenz.channels.util.MergeNamed
¤
Bases: Receiver[Tuple[str, T]]
Merge messages coming from multiple named channels into a single stream.
When MergeNamed
is no longer needed, then it should be stopped using
self.stop()
method. This will cleanup any internal pending async tasks.
Source code in frequenz/channels/util/_merge_named.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge_named.py
34 35 36 37 |
|
__init__(**kwargs)
¤
Create a MergeNamed
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge_named.py
21 22 23 24 25 26 27 28 29 30 31 32 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Tuple[str, T]
|
The next key, value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in frequenz/channels/util/_merge_named.py
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_merge_named.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
|
stop()
async
¤
Stop the MergeNamed
instance and cleanup any pending tasks.
Source code in frequenz/channels/util/_merge_named.py
39 40 41 42 43 44 |
|
frequenz.channels.util.Select
¤
Select the next available message from a group of Receivers.
If Select
was created with more Receiver
than what are read in
the if-chain after each call to
ready(), messages coming in the
additional receivers are dropped, and a warning message is logged.
Receivers also function as Receiver
.
When Select is no longer needed, then it should be stopped using
self.stop()
method. This would cleanup any internal pending async tasks.
Example
For example, if there are two receivers that you want to simultaneously wait on, this can be done with:
select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
if msg := select.name1:
if val := msg.inner:
# do something with `val`
pass
else:
# handle closure of receiver.
pass
elif msg := select.name2:
# do something with `msg.inner`
pass
Source code in frequenz/channels/util/_select.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_select.py
108 109 110 111 |
|
__getattr__(name)
¤
Return the latest unread message from a Receiver
, if available.
PARAMETER | DESCRIPTION |
---|---|
name |
Name of the channel.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Optional[Any]
|
Latest unread message for the specified |
RAISES | DESCRIPTION |
---|---|
KeyError
|
when the name was not specified when creating the
|
Source code in frequenz/channels/util/_select.py
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
|
__init__(**kwargs)
¤
Create a Select
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of receivers |
Source code in frequenz/channels/util/_select.py
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
|
ready()
async
¤
Wait until there is a message in any of the receivers.
Returns True
if there is a message available, and False
if all
receivers have closed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether there are further messages or not. |
Source code in frequenz/channels/util/_select.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
|
stop()
async
¤
Stop the Select
instance and cleanup any pending tasks.
Source code in frequenz/channels/util/_select.py
113 114 115 116 117 118 |
|
frequenz.channels.util.Timer
¤
A timer receiver that returns the timestamp every interval
seconds.
Primarily for use with Select.
The timestamp generated is a timezone-aware datetime using UTC as timezone.
Example
When you want something to happen with a fixed period:
timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
if msg := select.bat_1:
if val := msg.inner:
process_data(val)
else:
logging.warn("battery channel closed")
if ts := select.timer:
# something to do once every 30 seconds
pass
When you want something to happen when nothing else has happened in a certain interval:
timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
timer.reset()
if msg := select.bat_1:
if val := msg.inner:
process_data(val)
else:
logging.warn("battery channel closed")
if ts := select.timer:
# something to do if there's no battery data for 30 seconds
pass
Source code in frequenz/channels/util/_timer.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
Functions¤
__init__(interval)
¤
Create a Timer
instance.
PARAMETER | DESCRIPTION |
---|---|
interval |
number of seconds between messages.
TYPE:
|
Source code in frequenz/channels/util/_timer.py
57 58 59 60 61 62 63 64 65 66 |
|
consume()
¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
datetime
|
The time of the next tick in UTC or |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Changelog
- v0.11.0: Returns a timezone-aware datetime with UTC timezone instead of a native datetime object.
Source code in frequenz/channels/util/_timer.py
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
ready()
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/util/_timer.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
|
reset()
¤
Reset the timer to start timing from now
.
Source code in frequenz/channels/util/_timer.py
68 69 70 |
|
stop()
¤
Stop the timer.
Once stop
has been called, all subsequent calls to
receive() will immediately
return None
.
Source code in frequenz/channels/util/_timer.py
72 73 74 75 76 77 78 79 |
|