"""Atomic state-file I/O and instance lock. The conversation history lives at ``state/conversation.json``. Mutated in-memory by the daemon and written via temp+rename to avoid partial writes if the process is killed mid-write. A ``state/.lock`` advisory file stops two daemons from running against the same directory. """ from __future__ import annotations import errno import fcntl import json import os from dataclasses import dataclass from pathlib import Path from tempfile import NamedTemporaryFile class StateError(RuntimeError): pass @dataclass class InstanceLock: """Holds a flock on state/.lock for the daemon's lifetime. Released automatically on process exit (kernel closes the fd) or by calling ``release()``. ``acquire()`` raises StateError if another daemon already holds the lock. """ lock_path: Path _fd: int | None = None def acquire(self) -> None: self._fd = os.open(self.lock_path, os.O_RDWR | os.O_CREAT, 0o600) try: fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError as exc: os.close(self._fd) self._fd = None if exc.errno in {errno.EAGAIN, errno.EACCES}: raise StateError( f"Another daemon is holding {self.lock_path}; refusing to start" ) from exc raise os.write(self._fd, str(os.getpid()).encode()) def release(self) -> None: if self._fd is None: return try: fcntl.flock(self._fd, fcntl.LOCK_UN) finally: os.close(self._fd) self._fd = None def write_atomic(path: Path, data: str) -> None: """Atomically write text to ``path`` via temp file + rename. Crash-safe: a partial write leaves the temp file but does not overwrite the target. fsync the data file (not the directory) so the rename atomicity gives us durability up to the OS-level rename barrier. """ path.parent.mkdir(parents=True, exist_ok=True) with NamedTemporaryFile( mode="w", encoding="utf-8", dir=str(path.parent), prefix=f".{path.name}.", suffix=".tmp", delete=False, ) as tmp: tmp.write(data) tmp.flush() os.fsync(tmp.fileno()) tmp_path = Path(tmp.name) os.replace(tmp_path, path) def write_json_atomic(path: Path, value: object) -> None: write_atomic(path, json.dumps(value, indent=2, ensure_ascii=False, sort_keys=False)) def read_json(path: Path, default: object) -> object: """Read JSON or return default when the file is missing or empty.""" if not path.exists(): return default raw = path.read_text(encoding="utf-8").strip() if not raw: return default try: return json.loads(raw) except json.JSONDecodeError as exc: raise StateError(f"Corrupt JSON at {path}: {exc}") from exc