util
frequenz.channels.util
¤
Channel utilities.
A module with several utilities to work with channels:
-
FileWatcher: A receiver that watches for file events.
-
Merge: A receiver that merge messages coming from multiple receivers into a single stream.
-
MergeNamed: A receiver that merge messages coming from multiple receivers into a single named stream, allowing to identify the origin of each message.
-
Select: A helper to select the next available message for each receiver in a group of receivers.
-
Timer: A receiver that emits a now
timestamp
everyinterval
seconds.
Classes¤
frequenz.channels.util.FileWatcher
¤
A channel receiver that watches for file events.
Source code in frequenz/channels/util/_file_watcher.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|
Classes¤
EventType
¤
Bases: Enum
Available types of changes to watch for.
Source code in frequenz/channels/util/_file_watcher.py
19 20 21 22 23 24 |
|
Functions¤
__del__()
¤
Cleanup registered watches.
awatch
passes the stop_event
to a separate task/thread. This way
awatch
getting destroyed properly. The background task will continue
until the signal is received.
Source code in frequenz/channels/util/_file_watcher.py
57 58 59 60 61 62 63 64 |
|
__init__(paths, event_types=None)
¤
Create a FileWatcher
instance.
PARAMETER | DESCRIPTION |
---|---|
paths |
Paths to watch for changes. |
event_types |
Types of events to watch for or |
Source code in frequenz/channels/util/_file_watcher.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
|
consume()
¤
Return the latest change once ready
is complete.
RAISES | DESCRIPTION |
---|---|
ChannelClosedError
|
When the channel is closed. |
RETURNS | DESCRIPTION |
---|---|
pathlib.Path
|
The next change that was received. |
Source code in frequenz/channels/util/_file_watcher.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|
ready()
async
¤
Wait for the next file event and return its path.
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
When the channel is closed. |
RETURNS | DESCRIPTION |
---|---|
None
|
Path of next file. |
Source code in frequenz/channels/util/_file_watcher.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
|
frequenz.channels.util.Merge
¤
Bases: Receiver[T]
Merge messages coming from multiple channels into a single stream.
Example
For example, if there are two channel receivers with the same type,
they can be awaited together, and their results merged into a single
stream, by using Merge
like this:
merge = Merge(receiver1, receiver2)
while msg := await merge.receive():
# do something with msg
pass
Source code in frequenz/channels/util/_merge.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge.py
42 43 44 45 |
|
__init__(*args)
¤
Create a Merge
instance.
PARAMETER | DESCRIPTION |
---|---|
*args |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge.py
29 30 31 32 33 34 35 36 37 38 39 40 |
|
consume()
¤
Return the latest value once ready
is complete.
RAISES | DESCRIPTION |
---|---|
EOFError
|
When called before a call to |
RETURNS | DESCRIPTION |
---|---|
T
|
The next value that was received. |
Source code in frequenz/channels/util/_merge.py
77 78 79 80 81 82 83 84 85 86 87 88 |
|
ready()
async
¤
Wait until the receiver is ready with a value.
RAISES | DESCRIPTION |
---|---|
ChannelClosedError
|
if the underlying channel is closed. |
Source code in frequenz/channels/util/_merge.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
frequenz.channels.util.MergeNamed
¤
Bases: Receiver[Tuple[str, T]]
Merge messages coming from multiple named channels into a single stream.
Source code in frequenz/channels/util/_merge_named.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_merge_named.py
29 30 31 32 |
|
__init__(**kwargs)
¤
Create a MergeNamed
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of channel receivers.
TYPE:
|
Source code in frequenz/channels/util/_merge_named.py
16 17 18 19 20 21 22 23 24 25 26 27 |
|
consume()
¤
Return the latest value once ready
is complete.
RAISES | DESCRIPTION |
---|---|
EOFError
|
When called before a call to |
RETURNS | DESCRIPTION |
---|---|
Tuple[str, T]
|
The next value that was received, along with its name. |
Source code in frequenz/channels/util/_merge_named.py
64 65 66 67 68 69 70 71 72 73 74 75 |
|
ready()
async
¤
Wait until there's a message in any of the channels.
RAISES | DESCRIPTION |
---|---|
ChannelClosedError
|
when all the channels are closed. |
Source code in frequenz/channels/util/_merge_named.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
|
frequenz.channels.util.Select
¤
Select the next available message from a group of Receivers.
If Select
was created with more Receiver
than what are read in
the if-chain after each call to
ready(), messages coming in the
additional receivers are dropped, and a warning message is logged.
Receivers also function as Receiver
.
Example
For example, if there are two receivers that you want to simultaneously wait on, this can be done with:
select = Select(name1 = receiver1, name2 = receiver2)
while await select.ready():
if msg := select.name1:
if val := msg.inner:
# do something with `val`
pass
else:
# handle closure of receiver.
pass
elif msg := select.name2:
# do something with `msg.inner`
pass
Source code in frequenz/channels/util/_select.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
|
Functions¤
__del__()
¤
Cleanup any pending tasks.
Source code in frequenz/channels/util/_select.py
104 105 106 107 |
|
__getattr__(name)
¤
Return the latest unread message from a Receiver
, if available.
PARAMETER | DESCRIPTION |
---|---|
name |
Name of the channel.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Optional[Any]
|
Latest unread message for the specified |
RAISES | DESCRIPTION |
---|---|
KeyError
|
when the name was not specified when creating the
|
Source code in frequenz/channels/util/_select.py
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
|
__init__(**kwargs)
¤
Create a Select
instance.
PARAMETER | DESCRIPTION |
---|---|
**kwargs |
sequence of receivers |
Source code in frequenz/channels/util/_select.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
ready()
async
¤
Wait until there is a message in any of the receivers.
Returns True
if there is a message available, and False
if all
receivers have closed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether there are further messages or not. |
Source code in frequenz/channels/util/_select.py
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
|
frequenz.channels.util.Timer
¤
A timer receiver that returns the timestamp every interval
seconds.
Primarily for use with Select.
The timestamp generated is a timezone-aware datetime using UTC as timezone.
Example
When you want something to happen with a fixed period:
timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
if msg := select.bat_1:
if val := msg.inner:
process_data(val)
else:
logging.warn("battery channel closed")
if ts := select.timer:
# something to do once every 30 seconds
pass
When you want something to happen when nothing else has happened in a certain interval:
timer = Timer(30.0)
select = Select(bat_1 = receiver1, timer = timer)
while await select.ready():
timer.reset()
if msg := select.bat_1:
if val := msg.inner:
process_data(val)
else:
logging.warn("battery channel closed")
if ts := select.timer:
# something to do if there's no battery data for 30 seconds
pass
Source code in frequenz/channels/util/_timer.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
Functions¤
__init__(interval)
¤
Create a Timer
instance.
PARAMETER | DESCRIPTION |
---|---|
interval |
number of seconds between messages.
TYPE:
|
Source code in frequenz/channels/util/_timer.py
56 57 58 59 60 61 62 63 64 65 |
|
consume()
¤
Return the latest value once ready
is complete.
RAISES | DESCRIPTION |
---|---|
EOFError
|
When called before a call to |
RETURNS | DESCRIPTION |
---|---|
datetime
|
The timestamp for the next tick. |
Changelog
- v0.11.0: Returns a timezone-aware datetime with UTC timezone instead of a native datetime object.
Source code in frequenz/channels/util/_timer.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
ready()
async
¤
Return the current time (in UTC) once the next tick is due.
RAISES | DESCRIPTION |
---|---|
ChannelClosedError
|
if stop() has been called on the timer. |
RETURNS | DESCRIPTION |
---|---|
None
|
The time of the next tick in UTC or |
Source code in frequenz/channels/util/_timer.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
|
reset()
¤
Reset the timer to start timing from now
.
Source code in frequenz/channels/util/_timer.py
67 68 69 |
|
stop()
¤
Stop the timer.
Once stop
has been called, all subsequent calls to
receive() will immediately
return None
.
Source code in frequenz/channels/util/_timer.py
71 72 73 74 75 76 77 78 |
|