from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Callable, Optional
from pathlib import Path
class FileWatcher:
def __init__(self, path: str, callback: Callable, recursive: bool = true):
self.path = Path(path)
self.callback = callback
self.recursive = recursive
self.observer = Observer()
self.handler = FileChangeHandler(callback)
def start(self):
"""Start watching for file changes."""
self.observer.schedule(self.handler, str(self.path), recursive=self.recursive)
self.observer.start()
def stop(self):
"""Stop watching for file changes."""
self.observer.stop()
self.observer.join()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, callback: Callable):
self.callback = callback
def on_created(self, event):
if not event.is_directory:
self.callback('created', event.src_path)
def on_modified(self, event):
if not event.is_directory:
self.callback('modified', event.src_path)
def on_deleted(self, event):
if not event.is_directory:
self.callback('deleted', event.src_path)