130 lines
3.7 KiB
Python
130 lines
3.7 KiB
Python
|
|
"""Queue intake.
|
||
|
|
|
||
|
|
CC sessions drop JSON envelopes into ``queue/``. The envelope shape:
|
||
|
|
|
||
|
|
{
|
||
|
|
"session_id": "session-1",
|
||
|
|
"timestamp": "2026-05-02T12:34:56Z",
|
||
|
|
"content": "..."
|
||
|
|
}
|
||
|
|
|
||
|
|
The daemon picks the oldest entry by mtime, validates the envelope,
|
||
|
|
returns it, and deletes the file once processing succeeds. A
|
||
|
|
malformed envelope is moved to ``queue/.rejected/`` so the daemon
|
||
|
|
doesn't retry it forever.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import shutil
|
||
|
|
import time
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class QueueError(RuntimeError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class QueueEntry:
|
||
|
|
path: Path
|
||
|
|
session_id: str
|
||
|
|
timestamp: str
|
||
|
|
content: str
|
||
|
|
mtime: float
|
||
|
|
|
||
|
|
|
||
|
|
def _validate_envelope(payload: object, *, path: Path) -> tuple[str, str, str]:
|
||
|
|
if not isinstance(payload, dict):
|
||
|
|
raise QueueError(f"{path}: envelope must be a JSON object")
|
||
|
|
for required in ("session_id", "timestamp", "content"):
|
||
|
|
if required not in payload:
|
||
|
|
raise QueueError(f"{path}: missing required key '{required}'")
|
||
|
|
session_id = str(payload["session_id"]).strip()
|
||
|
|
timestamp = str(payload["timestamp"]).strip()
|
||
|
|
content = str(payload["content"])
|
||
|
|
if not session_id:
|
||
|
|
raise QueueError(f"{path}: session_id is empty")
|
||
|
|
if not timestamp:
|
||
|
|
raise QueueError(f"{path}: timestamp is empty")
|
||
|
|
if not content.strip():
|
||
|
|
raise QueueError(f"{path}: content is empty")
|
||
|
|
return session_id, timestamp, content
|
||
|
|
|
||
|
|
|
||
|
|
def _reject(path: Path, reason: str) -> None:
|
||
|
|
rejected_dir = path.parent / ".rejected"
|
||
|
|
rejected_dir.mkdir(exist_ok=True)
|
||
|
|
target = rejected_dir / path.name
|
||
|
|
shutil.move(str(path), str(target))
|
||
|
|
logger.warning("rejected queue entry %s -> %s: %s", path, target, reason)
|
||
|
|
|
||
|
|
|
||
|
|
def list_pending(queue_dir: Path) -> list[Path]:
|
||
|
|
"""Return queue entries oldest-first, excluding the .rejected dir."""
|
||
|
|
if not queue_dir.exists():
|
||
|
|
return []
|
||
|
|
entries = [
|
||
|
|
p
|
||
|
|
for p in queue_dir.iterdir()
|
||
|
|
if p.is_file() and p.suffix == ".json" and not p.name.startswith(".")
|
||
|
|
]
|
||
|
|
entries.sort(key=lambda p: p.stat().st_mtime)
|
||
|
|
return entries
|
||
|
|
|
||
|
|
|
||
|
|
def take_oldest(queue_dir: Path) -> QueueEntry | None:
|
||
|
|
"""Read and validate the oldest queue entry. Returns None if queue empty.
|
||
|
|
|
||
|
|
Rejects malformed envelopes by moving them to .rejected/. The caller
|
||
|
|
is responsible for calling ``ack(entry)`` once processing succeeds
|
||
|
|
to delete the file; until then it remains in the queue.
|
||
|
|
"""
|
||
|
|
pending = list_pending(queue_dir)
|
||
|
|
if not pending:
|
||
|
|
return None
|
||
|
|
path = pending[0]
|
||
|
|
try:
|
||
|
|
raw = path.read_text(encoding="utf-8")
|
||
|
|
payload = json.loads(raw)
|
||
|
|
session_id, timestamp, content = _validate_envelope(payload, path=path)
|
||
|
|
except (OSError, json.JSONDecodeError) as exc:
|
||
|
|
_reject(path, f"unreadable / not JSON: {exc}")
|
||
|
|
return None
|
||
|
|
except QueueError as exc:
|
||
|
|
_reject(path, str(exc))
|
||
|
|
return None
|
||
|
|
|
||
|
|
return QueueEntry(
|
||
|
|
path=path,
|
||
|
|
session_id=session_id,
|
||
|
|
timestamp=timestamp,
|
||
|
|
content=content,
|
||
|
|
mtime=path.stat().st_mtime,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def ack(entry: QueueEntry) -> None:
|
||
|
|
"""Delete the queue file. Called by the daemon after the turn is dispatched."""
|
||
|
|
try:
|
||
|
|
entry.path.unlink()
|
||
|
|
except FileNotFoundError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def stuck_age_seconds(queue_dir: Path) -> float:
|
||
|
|
"""How long the oldest pending entry has been waiting. 0 if queue empty.
|
||
|
|
|
||
|
|
Used by the daemon's heartbeat to fire a ntfy alert when the queue
|
||
|
|
is stuck (e.g., persistent API failure).
|
||
|
|
"""
|
||
|
|
pending = list_pending(queue_dir)
|
||
|
|
if not pending:
|
||
|
|
return 0.0
|
||
|
|
return time.time() - pending[0].stat().st_mtime
|