Skip to content

file_watcher

frequenz.channels.file_watcher ¤

A receiver for watching for new, modified or deleted files.

Tip

Read the FileWatcher documentation for more information.

This module contains the following:

  • FileWatcher: A receiver that watches for file events.
  • Event: A file change event.
  • EventType: The types of file events that can be observed.

Classes¤

frequenz.channels.file_watcher.Event dataclass ¤

A file change event.

Source code in frequenz/channels/file_watcher.py
@dataclass(frozen=True)
class Event:
    """A file change event."""

    type: EventType
    """The type of change that was observed."""

    path: pathlib.Path
    """The path where the change was observed."""
Attributes¤
path instance-attribute ¤
path: Path

The path where the change was observed.

type instance-attribute ¤
type: EventType

The type of change that was observed.

frequenz.channels.file_watcher.EventType ¤

Bases: Enum

The types of file events that can be observed.

Source code in frequenz/channels/file_watcher.py
class EventType(Enum):
    """The types of file events that can be observed."""

    CREATE = Change.added
    """The file was created."""

    MODIFY = Change.modified
    """The file was modified."""

    DELETE = Change.deleted
    """The file was deleted."""
Attributes¤
CREATE class-attribute instance-attribute ¤
CREATE = added

The file was created.

DELETE class-attribute instance-attribute ¤
DELETE = deleted

The file was deleted.

MODIFY class-attribute instance-attribute ¤
MODIFY = modified

The file was modified.

frequenz.channels.file_watcher.FileWatcher ¤

Bases: Receiver[Event]

A receiver that watches for file events.

Usage¤

A FileWatcher receiver can be used to watch for changes in a set of files. It will generate an Event message every time a file is created, modified or deleted, depending on the type of events that it is configured to watch for.

The event message contains the type of change that was observed and the path where the change was observed.

Note

The owner of the FileWatcher receiver is responsible for recreating the FileWatcher after it has been cancelled or stopped. For example, if a Task uses an asynchronous iterator to consume events from the FileWatcher and the task is cancelled, the FileWatcher will also stop. Therefore, the same FileWatcher instance cannot be reused for a new task to consume events. In this case, a new FileWatcher instance must be created.

Event Types¤

The following event types are available:

  • CREATE: The file was created.
  • MODIFY: The file was modified.
  • DELETE: The file was deleted.
Example¤
Watch for changes and exit after the file is modified
import asyncio

from frequenz.channels.file_watcher import EventType, FileWatcher

PATH = "/tmp/test.txt"
file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])


async def update_file() -> None:
    await asyncio.sleep(1)
    with open(PATH, "w", encoding="utf-8") as file:
        file.write("Hello, world!")


async def main() -> None:
    # Create file
    with open(PATH, "w", encoding="utf-8") as file:
        file.write("Hello, world!")
    async with asyncio.TaskGroup() as group:
        group.create_task(update_file())
        async for event in file_watcher:
            print(f"File {event.path}: {event.type.name}")
            break


asyncio.run(main())
Source code in frequenz/channels/file_watcher.py
class FileWatcher(Receiver[Event]):
    """A receiver that watches for file events.

    # Usage

    A [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher] receiver can be used
    to watch for changes in a set of files. It will generate an
    [`Event`][frequenz.channels.file_watcher.Event] message every time a file is
    created, modified or deleted, depending on the type of events that it is configured
    to watch for.

    The [event][frequenz.channels.file_watcher.EventType] message contains the
    [`type`][frequenz.channels.file_watcher.Event.type] of change that was observed and
    the [`path`][frequenz.channels.file_watcher.Event.path] where the change was
    observed.

    Note:
        The owner of the [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher]
        receiver is responsible for recreating the `FileWatcher` after it has been
        cancelled or stopped.
        For example, if a [`Task`][asyncio.Task] uses an asynchronous iterator to consume
        events from the `FileWatcher` and the task is cancelled, the `FileWatcher` will
        also stop. Therefore, the same `FileWatcher` instance cannot be reused for a new
        task to consume events. In this case, a new FileWatcher instance must be created.

    # Event Types

    The following event types are available:

    * [`CREATE`][frequenz.channels.file_watcher.EventType.CREATE]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.CREATE")}}
    * [`MODIFY`][frequenz.channels.file_watcher.EventType.MODIFY]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.MODIFY")}}
    * [`DELETE`][frequenz.channels.file_watcher.EventType.DELETE]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.DELETE")}}

    # Example

    Example: Watch for changes and exit after the file is modified
        ```python
        import asyncio

        from frequenz.channels.file_watcher import EventType, FileWatcher

        PATH = "/tmp/test.txt"
        file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])


        async def update_file() -> None:
            await asyncio.sleep(1)
            with open(PATH, "w", encoding="utf-8") as file:
                file.write("Hello, world!")


        async def main() -> None:
            # Create file
            with open(PATH, "w", encoding="utf-8") as file:
                file.write("Hello, world!")
            async with asyncio.TaskGroup() as group:
                group.create_task(update_file())
                async for event in file_watcher:
                    print(f"File {event.path}: {event.type.name}")
                    break


        asyncio.run(main())
        ```
    """

    def __init__(
        self,
        paths: list[pathlib.Path | str],
        event_types: abc.Iterable[EventType] = frozenset(EventType),
        *,
        force_polling: bool = True,
        polling_interval: timedelta = timedelta(seconds=1),
    ) -> None:
        """Initialize this file watcher.

        Args:
            paths: The paths to watch for changes.
            event_types: The types of events to watch for. Defaults to watch for
                all event types.
            force_polling: Whether to explicitly force file polling to check for
                changes. Note that even if set to False, file polling will still
                be used as a fallback when the underlying file system does not
                support event-based notifications.
            polling_interval: The interval to poll for changes. Only relevant if
                polling is enabled.
        """
        self.event_types: frozenset[EventType] = frozenset(event_types)
        """The types of events to watch for."""

        self._stop_event: asyncio.Event = asyncio.Event()
        self._paths: list[pathlib.Path] = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
            *self._paths,
            stop_event=self._stop_event,
            watch_filter=self._filter_events,
            force_polling=force_polling,
            poll_delay_ms=int(polling_interval.total_seconds() * 1_000),
        )
        self._awatch_stopped_exc: Exception | None = None
        self._changes: set[FileChange] = set()

    def _filter_events(
        self,
        change: Change,
        path: str,  # pylint: disable=unused-argument
    ) -> bool:
        """Filter events based on the event type and path.

        Args:
            change: The type of change to be notified.
            path: The path of the file that changed.

        Returns:
            Whether the event should be notified.
        """
        return change in [event_type.value for event_type in self.event_types]

    def __del__(self) -> None:
        """Finalize this file watcher."""
        # We need to set the stop event to make sure that the awatch background task
        # is stopped.
        self._stop_event.set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a message or an error.

        Once a call to `ready()` has finished, the message should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return True

        # if it was already stopped, return immediately.
        if self._awatch_stopped_exc is not None:
            return False

        try:
            self._changes = await anext(self._awatch)
        except StopAsyncIteration as err:
            self._awatch_stopped_exc = err
            return False

        return True

    def consume(self) -> Event:
        """Return the latest event once `ready` is complete.

        Returns:
            The next event that was received.

        Raises:
            ReceiverStoppedError: If there is some problem with the receiver.
        """
        if not self._changes and self._awatch_stopped_exc is not None:
            raise ReceiverStoppedError(self) from self._awatch_stopped_exc

        assert self._changes, "`consume()` must be preceded by a call to `ready()`"
        # Tuple of (Change, path) returned by watchfiles
        change, path_str = self._changes.pop()
        return Event(type=EventType(change), path=pathlib.Path(path_str))

    def __str__(self) -> str:
        """Return a string representation of this receiver."""
        if len(self._paths) > 3:
            paths = [str(p) for p in self._paths[:3]]
            paths.append("…")
        else:
            paths = [str(p) for p in self._paths]
        event_types = [event_type.name for event_type in self.event_types]
        return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}"

    def __repr__(self) -> str:
        """Return a string representation of this receiver."""
        return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"
Attributes¤
event_types instance-attribute ¤

The types of events to watch for.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Get an async iterator over the received messages.

RETURNS DESCRIPTION
Self

This receiver, as it is already an async iterator.

Source code in frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Get an async iterator over the received messages.

    Returns:
        This receiver, as it is already an async iterator.
    """
    return self
__anext__ async ¤
__anext__() -> ReceiverMessageT_co

Await the next message in the async iteration over received messages.

RETURNS DESCRIPTION
ReceiverMessageT_co

The next received message.

RAISES DESCRIPTION
StopAsyncIteration

If the receiver stopped producing messages.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def __anext__(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Await the next message in the async iteration over received messages.

    Returns:
        The next received message.

    Raises:
        StopAsyncIteration: If the receiver stopped producing messages.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__del__ ¤
__del__() -> None

Finalize this file watcher.

Source code in frequenz/channels/file_watcher.py
def __del__(self) -> None:
    """Finalize this file watcher."""
    # We need to set the stop event to make sure that the awatch background task
    # is stopped.
    self._stop_event.set()
__init__ ¤
__init__(
    paths: list[Path | str],
    event_types: Iterable[EventType] = frozenset(EventType),
    *,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1)
) -> None

Initialize this file watcher.

PARAMETER DESCRIPTION
paths

The paths to watch for changes.

TYPE: list[Path | str]

event_types

The types of events to watch for. Defaults to watch for all event types.

TYPE: Iterable[EventType] DEFAULT: frozenset(EventType)

force_polling

Whether to explicitly force file polling to check for changes. Note that even if set to False, file polling will still be used as a fallback when the underlying file system does not support event-based notifications.

TYPE: bool DEFAULT: True

polling_interval

The interval to poll for changes. Only relevant if polling is enabled.

TYPE: timedelta DEFAULT: timedelta(seconds=1)

Source code in frequenz/channels/file_watcher.py
def __init__(
    self,
    paths: list[pathlib.Path | str],
    event_types: abc.Iterable[EventType] = frozenset(EventType),
    *,
    force_polling: bool = True,
    polling_interval: timedelta = timedelta(seconds=1),
) -> None:
    """Initialize this file watcher.

    Args:
        paths: The paths to watch for changes.
        event_types: The types of events to watch for. Defaults to watch for
            all event types.
        force_polling: Whether to explicitly force file polling to check for
            changes. Note that even if set to False, file polling will still
            be used as a fallback when the underlying file system does not
            support event-based notifications.
        polling_interval: The interval to poll for changes. Only relevant if
            polling is enabled.
    """
    self.event_types: frozenset[EventType] = frozenset(event_types)
    """The types of events to watch for."""

    self._stop_event: asyncio.Event = asyncio.Event()
    self._paths: list[pathlib.Path] = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
        *self._paths,
        stop_event=self._stop_event,
        watch_filter=self._filter_events,
        force_polling=force_polling,
        poll_delay_ms=int(polling_interval.total_seconds() * 1_000),
    )
    self._awatch_stopped_exc: Exception | None = None
    self._changes: set[FileChange] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this receiver.

Source code in frequenz/channels/file_watcher.py
def __repr__(self) -> str:
    """Return a string representation of this receiver."""
    return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

Source code in frequenz/channels/file_watcher.py
def __str__(self) -> str:
    """Return a string representation of this receiver."""
    if len(self._paths) > 3:
        paths = [str(p) for p in self._paths[:3]]
        paths.append("…")
    else:
        paths = [str(p) for p in self._paths]
    event_types = [event_type.name for event_type in self.event_types]
    return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}"
consume ¤
consume() -> Event

Return the latest event once ready is complete.

RETURNS DESCRIPTION
Event

The next event that was received.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

Source code in frequenz/channels/file_watcher.py
def consume(self) -> Event:
    """Return the latest event once `ready` is complete.

    Returns:
        The next event that was received.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
    """
    if not self._changes and self._awatch_stopped_exc is not None:
        raise ReceiverStoppedError(self) from self._awatch_stopped_exc

    assert self._changes, "`consume()` must be preceded by a call to `ready()`"
    # Tuple of (Change, path) returned by watchfiles
    change, path_str = self._changes.pop()
    return Event(type=EventType(change), path=pathlib.Path(path_str))
filter ¤
filter(
    filter_function: Callable[
        [ReceiverMessageT_co],
        TypeGuard[FilteredMessageT_co],
    ]
) -> Receiver[FilteredMessageT_co]
filter(
    filter_function: Callable[[ReceiverMessageT_co], bool]
) -> Receiver[ReceiverMessageT_co]
filter(
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[
            [ReceiverMessageT_co],
            TypeGuard[FilteredMessageT_co],
        ]
    )
) -> (
    Receiver[ReceiverMessageT_co]
    | Receiver[FilteredMessageT_co]
)

Apply a filter function on the messages on a receiver.

Note

You can pass a type guard as the filter function to narrow the type of the messages that pass the filter.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
filter_function

The function to be applied on incoming messages to determine if they should be received.

TYPE: Callable[[ReceiverMessageT_co], bool] | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]

RETURNS DESCRIPTION
Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]

A new receiver that only receives messages that pass the filter.

Source code in frequenz/channels/_receiver.py
def filter(
    self,
    filter_function: (
        Callable[[ReceiverMessageT_co], bool]
        | Callable[[ReceiverMessageT_co], TypeGuard[FilteredMessageT_co]]
    ),
    /,
) -> Receiver[ReceiverMessageT_co] | Receiver[FilteredMessageT_co]:
    """Apply a filter function on the messages on a receiver.

    Note:
        You can pass a [type guard][typing.TypeGuard] as the filter function to
        narrow the type of the messages that pass the filter.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        filter_function: The function to be applied on incoming messages to
            determine if they should be received.

    Returns:
        A new receiver that only receives messages that pass the filter.
    """
    return _Filter(receiver=self, filter_function=filter_function)
map ¤

Apply a mapping function on the received message.

Tip

The returned receiver type won't have all the methods of the original receiver. If you need to access methods of the original receiver that are not part of the Receiver interface you should save a reference to the original receiver and use that instead.

PARAMETER DESCRIPTION
mapping_function

The function to be applied on incoming messages.

TYPE: Callable[[ReceiverMessageT_co], MappedMessageT_co]

RETURNS DESCRIPTION
Receiver[MappedMessageT_co]

A new receiver that applies the function on the received messages.

Source code in frequenz/channels/_receiver.py
def map(
    self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
) -> Receiver[MappedMessageT_co]:
    """Apply a mapping function on the received message.

    Tip:
        The returned receiver type won't have all the methods of the original
        receiver. If you need to access methods of the original receiver that are
        not part of the `Receiver` interface you should save a reference to the
        original receiver and use that instead.

    Args:
        mapping_function: The function to be applied on incoming messages.

    Returns:
        A new receiver that applies the function on the received messages.
    """
    return _Mapper(receiver=self, mapping_function=mapping_function)
ready async ¤
ready() -> bool

Wait until the receiver is ready with a message or an error.

Once a call to ready() has finished, the message should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in frequenz/channels/file_watcher.py
async def ready(self) -> bool:
    """Wait until the receiver is ready with a message or an error.

    Once a call to `ready()` has finished, the message should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return True

    # if it was already stopped, return immediately.
    if self._awatch_stopped_exc is not None:
        return False

    try:
        self._changes = await anext(self._awatch)
    except StopAsyncIteration as err:
        self._awatch_stopped_exc = err
        return False

    return True
receive async ¤
receive() -> ReceiverMessageT_co

Receive a message.

RETURNS DESCRIPTION
ReceiverMessageT_co

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

If there is some problem with the receiver.

ReceiverError

If there is some problem with the receiver.

Source code in frequenz/channels/_receiver.py
async def receive(self) -> ReceiverMessageT_co:  # noqa: DOC503
    """Receive a message.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: If there is some problem with the receiver.
        ReceiverError: If there is some problem with the receiver.
    """
    try:
        received = await anext(self)
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            and exc.__cause__.receiver is self
        ):
            # This is a false positive, we are actually checking __cause__ is a
            # ReceiverStoppedError which is an exception.
            raise exc.__cause__  # pylint: disable=raising-non-exception
        raise ReceiverStoppedError(self) from exc
    return received
triggered ¤
triggered(
    selected: Selected[Any],
) -> TypeGuard[Selected[ReceiverMessageT_co]]

Check whether this receiver was selected by select().

This method is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

RETURNS DESCRIPTION
TypeGuard[Selected[ReceiverMessageT_co]]

Whether this receiver was selected.

Source code in frequenz/channels/_receiver.py
def triggered(
    self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
    """Check whether this receiver was selected by [`select()`][frequenz.channels.select].

    This method is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.

    Returns:
        Whether this receiver was selected.
    """
    if handled := selected._recv is self:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled