file_watcher
frequenz.channels.file_watcher ¤
A receiver for watching for new, modified or deleted files.
Tip
Read the FileWatcher
documentation for more information.
This module contains the following:
FileWatcher
: A receiver that watches for file events.Event
: A file change event.EventType
: The types of file events that can be observed.
Classes¤
frequenz.channels.file_watcher.Event
dataclass
¤
A file change event.
Source code in frequenz/channels/file_watcher.py
frequenz.channels.file_watcher.EventType ¤
Bases: Enum
The types of file events that can be observed.
Source code in frequenz/channels/file_watcher.py
frequenz.channels.file_watcher.FileWatcher ¤
A receiver that watches for file events.
Usage¤
A FileWatcher
receiver can be used
to watch for changes in a set of files. It will generate an
Event
message every time a file is
created, modified or deleted, depending on the type of events that it is configured
to watch for.
The event message contains the
type
of change that was observed and
the path
where the change was
observed.
Note
The owner of the FileWatcher
receiver is responsible for recreating the FileWatcher
after it has been
cancelled or stopped.
For example, if a Task
uses an asynchronous iterator to consume
events from the FileWatcher
and the task is cancelled, the FileWatcher
will
also stop. Therefore, the same FileWatcher
instance cannot be reused for a new
task to consume events. In this case, a new FileWatcher instance must be created.
Event Types¤
The following event types are available:
Example¤
Watch for changes and exit after the file is modified
import asyncio
from frequenz.channels.file_watcher import EventType, FileWatcher
PATH = "/tmp/test.txt"
file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])
async def update_file() -> None:
await asyncio.sleep(1)
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async def main() -> None:
# Create file
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async with asyncio.TaskGroup() as group:
group.create_task(update_file())
async for event in file_watcher:
print(f"File {event.path}: {event.type.name}")
break
asyncio.run(main())
Source code in frequenz/channels/file_watcher.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
|
Attributes¤
event_types
instance-attribute
¤
event_types: frozenset[EventType] = frozenset(event_types)
The types of events to watch for.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__del__ ¤
__init__ ¤
__init__(
paths: list[Path | str],
event_types: Iterable[EventType] = frozenset(EventType),
*,
force_polling: bool = True,
polling_interval: timedelta = timedelta(seconds=1)
) -> None
Initialize this file watcher.
PARAMETER | DESCRIPTION |
---|---|
paths |
The paths to watch for changes. |
event_types |
The types of events to watch for. Defaults to watch for all event types. |
force_polling |
Whether to explicitly force file polling to check for changes. Note that even if set to False, file polling will still be used as a fallback when the underlying file system does not support event-based notifications.
TYPE:
|
polling_interval |
The interval to poll for changes. Only relevant if polling is enabled. |
Source code in frequenz/channels/file_watcher.py
__str__ ¤
__str__() -> str
Return a string representation of this receiver.
Source code in frequenz/channels/file_watcher.py
consume ¤
consume() -> Event
Return the latest event once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Event
|
The next event that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/file_watcher.py
filter ¤
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
)
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function |
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function |
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/file_watcher.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected |
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |