Skip to content

file_watcher

frequenz.channels.utils.file_watcher ¤

A Channel receiver for watching for new (or modified) files.

Classes¤

frequenz.channels.utils.file_watcher.EventType ¤

Bases: Enum

Available types of changes to watch for.

Source code in frequenz/channels/utils/file_watcher.py
15
16
17
18
19
20
class EventType(Enum):
    """Available types of changes to watch for."""

    CREATE = Change.added
    MODIFY = Change.modified
    DELETE = Change.deleted

frequenz.channels.utils.file_watcher.FileWatcher ¤

Bases: Receiver[pathlib.Path]

A channel receiver that watches for file events.

Source code in frequenz/channels/utils/file_watcher.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class FileWatcher(Receiver[pathlib.Path]):
    """A channel receiver that watches for file events."""

    def __init__(
        self,
        paths: List[Union[pathlib.Path, str]],
        event_types: Optional[Set[EventType]] = None,
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for or `None` to watch for
                all event types.
        """
        if event_types is None:
            event_types = {EventType.CREATE, EventType.MODIFY, EventType.DELETE}

        self.event_types = event_types
        self._stop_event = asyncio.Event()
        self._paths = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch = awatch(
            *self._paths,
            stop_event=self._stop_event,
            watch_filter=lambda change, path_str: (
                change in [event_type.value for event_type in event_types]  # type: ignore
                and pathlib.Path(path_str).is_file()
            ),
        )

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def receive(self) -> Optional[pathlib.Path]:
        """Wait for the next file event and return its path.

        Returns:
            Path of next file.
        """
        while True:
            changes = await self._awatch.__anext__()
            for change in changes:
                # Tuple of (Change, path) returned by watchfiles
                if change is None or len(change) != 2:
                    return None

                _, path_str = change
                path = pathlib.Path(path_str)
                return path
Functions¤
__del__() ¤

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in frequenz/channels/utils/file_watcher.py
56
57
58
59
60
61
62
63
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__(paths, event_types=None) ¤

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: List[Union[pathlib.Path, str]]

event_types

Types of events to watch for or None to watch for all event types.

TYPE: Optional[Set[EventType]] DEFAULT: None

Source code in frequenz/channels/utils/file_watcher.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    paths: List[Union[pathlib.Path, str]],
    event_types: Optional[Set[EventType]] = None,
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for or `None` to watch for
            all event types.
    """
    if event_types is None:
        event_types = {EventType.CREATE, EventType.MODIFY, EventType.DELETE}

    self.event_types = event_types
    self._stop_event = asyncio.Event()
    self._paths = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch = awatch(
        *self._paths,
        stop_event=self._stop_event,
        watch_filter=lambda change, path_str: (
            change in [event_type.value for event_type in event_types]  # type: ignore
            and pathlib.Path(path_str).is_file()
        ),
    )
receive() async ¤

Wait for the next file event and return its path.

RETURNS DESCRIPTION
Optional[pathlib.Path]

Path of next file.

Source code in frequenz/channels/utils/file_watcher.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def receive(self) -> Optional[pathlib.Path]:
    """Wait for the next file event and return its path.

    Returns:
        Path of next file.
    """
    while True:
        changes = await self._awatch.__anext__()
        for change in changes:
            # Tuple of (Change, path) returned by watchfiles
            if change is None or len(change) != 2:
                return None

            _, path_str = change
            path = pathlib.Path(path_str)
            return path