File Watchers¤
A receiver that watches for file events.
Usage¤
A FileWatcher
receiver can be used
to watch for changes in a set of files. It will generate an
Event
message every time a file is
created, modified or deleted, depending on the type of events that it is configured
to watch for.
The event message contains the
type
of change that was observed and
the path
where the change was
observed.
Event Types¤
The following event types are available:
Example¤
Watch for changes and exit after the file is modified
import asyncio
from frequenz.channels.file_watcher import EventType, FileWatcher
PATH = "/tmp/test.txt"
file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])
async def update_file() -> None:
await asyncio.sleep(1)
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async def main() -> None:
# Create file
with open(PATH, "w", encoding="utf-8") as file:
file.write("Hello, world!")
async with asyncio.TaskGroup() as group:
group.create_task(update_file())
async for event in file_watcher:
print(f"File {event.path}: {event.type.name}")
break
asyncio.run(main())