file_watcher
frequenz.channels.file_watcher ¤
A receiver for watching for new, modified or deleted files.
Tip
Read the FileWatcher
documentation for more information.
This module contains the following:
FileWatcher
: A receiver that watches for file events.Event
: A file change event.EventType
: The types of file events that can be observed.
Classes¤
frequenz.channels.file_watcher.Event
dataclass
¤
A file change event.
Source code in frequenz/channels/file_watcher.py
frequenz.channels.file_watcher.EventType ¤
Bases: Enum
The types of file events that can be observed.
Source code in frequenz/channels/file_watcher.py
frequenz.channels.file_watcher.FileWatcher ¤
A receiver that watches for file events.
Usage¤
A FileWatcher
receiver can be used
to watch for changes in a set of files. It will generate an
Event
message every time a file is
created, modified or deleted, depending on the type of events that it is configured
to watch for.
The event message contains the
type
of change that was observed and
the path
where the change was
observed.
Note
The owner of the FileWatcher
receiver is responsible for recreating the FileWatcher
after it has been
cancelled or stopped.
For example, if a Task
uses an asynchronous iterator to consume
events from the FileWatcher
and the task is cancelled, the FileWatcher
will
also stop. Therefore, the same FileWatcher
instance cannot be reused for a new
task to consume events. In this case, a new FileWatcher instance must be created.
Event Types¤
The following event types are available:
Example¤
Watch for changes and exit after the file is modified
import asyncio
from frequenz.channels.file_watcher import EventType, FileWatcher
PATH = "/tmp/test.txt"
file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])
async def update_file() -> None:
await asyncio.sleep(1)
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async def main() -> None:
# Create file
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async with asyncio.TaskGroup() as group:
group.create_task(update_file())
async for event in file_watcher:
print(f"File {event.path}: {event.type.name}")
break
asyncio.run(main())
Source code in frequenz/channels/file_watcher.py
|
|
Attributes¤
event_types
instance-attribute
¤
event_types: frozenset[EventType] = frozenset(event_types)
The types of events to watch for.
Functions¤
__aiter__ ¤
__aiter__() -> Self
Get an async iterator over the received messages.
RETURNS | DESCRIPTION |
---|---|
Self
|
This receiver, as it is already an async iterator. |
__anext__
async
¤
__anext__() -> ReceiverMessageT_co
Await the next message in the async iteration over received messages.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The next received message. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
If the receiver stopped producing messages. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
__del__ ¤
__init__ ¤
__init__(
paths: list[Path | str],
event_types: Iterable[EventType] = frozenset(EventType),
*,
force_polling: bool = True,
polling_interval: timedelta = timedelta(seconds=1)
) -> None
Initialize this file watcher.
PARAMETER | DESCRIPTION |
---|---|
paths
|
The paths to watch for changes. |
event_types
|
The types of events to watch for. Defaults to watch for all event types. |
force_polling
|
Whether to explicitly force file polling to check for changes. Note that even if set to False, file polling will still be used as a fallback when the underlying file system does not support event-based notifications.
TYPE:
|
polling_interval
|
The interval to poll for changes. Only relevant if polling is enabled. |
Source code in frequenz/channels/file_watcher.py
__str__ ¤
__str__() -> str
Return a string representation of this receiver.
Source code in frequenz/channels/file_watcher.py
consume ¤
consume() -> Event
Return the latest event once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
Event
|
The next event that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/file_watcher.py
filter ¤
filter(
filter_function: Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
) -> Receiver[FilteredMessageT_co]
filter(
filter_function: Callable[[ReceiverMessageT_co], bool]
) -> Receiver[ReceiverMessageT_co]
filter(
filter_function: (
Callable[[ReceiverMessageT_co], bool]
| Callable[
[ReceiverMessageT_co],
TypeGuard[FilteredMessageT_co],
]
)
) -> (
Receiver[ReceiverMessageT_co]
| Receiver[FilteredMessageT_co]
)
Apply a filter function on the messages on a receiver.
Note
You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
filter_function
|
The function to be applied on incoming messages to determine if they should be received.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]
|
A new receiver that only receives messages that pass the filter. |
Source code in frequenz/channels/_receiver.py
map ¤
map(
mapping_function: Callable[
[ReceiverMessageT_co], MappedMessageT_co
]
) -> Receiver[MappedMessageT_co]
Apply a mapping function on the received message.
Tip
The returned receiver type won't have all the methods of the original
receiver. If you need to access methods of the original receiver that are
not part of the Receiver
interface you should save a reference to the
original receiver and use that instead.
PARAMETER | DESCRIPTION |
---|---|
mapping_function
|
The function to be applied on incoming messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[MappedMessageT_co]
|
A new receiver that applies the function on the received messages. |
Source code in frequenz/channels/_receiver.py
ready
async
¤
ready() -> bool
Wait until the receiver is ready with a message or an error.
Once a call to ready()
has finished, the message should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in frequenz/channels/file_watcher.py
receive
async
¤
receive() -> ReceiverMessageT_co
Receive a message.
RETURNS | DESCRIPTION |
---|---|
ReceiverMessageT_co
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
If there is some problem with the receiver. |
ReceiverError
|
If there is some problem with the receiver. |
Source code in frequenz/channels/_receiver.py
triggered ¤
triggered(
selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]
Check whether this receiver was selected by select()
.
This method is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected
|
The result of a |
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[ReceiverMessageT_co]]
|
Whether this receiver was selected. |