This repository has been archived on 2026-05-02. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
risv3-relay/relay/dispatch.py

72 lines
2.6 KiB
Python
Raw Permalink Normal View History

feat: relay daemon skeleton — queue, dispatch, conversation, ntfy (#1) First-PR scope from #1. Single-process Python daemon that relays between Claude Code instances and chat-Claude (Anthropic API). Components: * relay.config — .env + config.yaml loader. Auto-generates ntfy topic on first run and persists it back to .env. * relay.state — atomic file I/O via tempfile + rename, advisory flock at state/.lock to enforce single-instance. * relay.conversation — append-only history with summarization. Triggers a summarize call when total chars exceed HISTORY_CHAR_CAP (default 400k); replaces history with the summary plus the most recent 10 turns. * relay.anthropic_client — SDK wrapper. Marks the system prompt cacheable (5-min ephemeral cache); concatenates text blocks; estimates per-call cost from the Anthropic price table with cache-write/read accounted for. * relay.queue — JSON envelope intake; oldest-by-mtime; malformed envelopes moved to queue/.rejected/. * relay.dispatch — one-input-at-a-time per session (dispatch/<session_id>/input.txt). Won't overwrite a pending dispatch; queues internally and waits for CC to delete. * relay.ntfy — best-effort POST to https://ntfy.sh/<topic>; failures logged but never block the main loop. * relay.daemon — main loop. Polls jc_input.txt (priority) then queue/. Detects [NEEDS-JC] in the first 200 chars of any response and pauses dispatch until JC writes jc_input.txt. JC override supports @session-N: prefix for direct dispatch without an API call. * relay.__main__ — CLI: relay run / relay status / relay topic. Tests: 57 unit tests pass (config, state, conversation, queue, dispatch, anthropic_client, ntfy, full daemon loop with a fake client). One real-API smoke test marked real_api, opt-in via pytest -m real_api; skips cleanly on credit-balance errors. Out of scope for this PR (deferred to follow-ups): Flask status endpoint, multi-session config in production, exponential backoff, systemd unit, cost-tracking aggregation. Closes #1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 15:24:47 +00:00
"""Dispatch writer.
Each registered CC session has a directory ``dispatch/<session_id>/``.
The daemon delivers a chat-side response by writing it to
``dispatch/<session_id>/input.txt``. CC's polling loop reads, acts on
the content, and **deletes** the file as the acknowledgement.
The "only write when prior is consumed" rule means the daemon must not
overwrite a pending dispatch otherwise CC could miss a turn. If the
daemon has new content for a session that hasn't yet consumed the
previous one, it queues internally and waits.
"""
from __future__ import annotations
import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field
from pathlib import Path
from relay.state import write_atomic
logger = logging.getLogger(__name__)
@dataclass
class DispatchManager:
dispatch_dir: Path
_pending: dict[str, deque[str]] = field(default_factory=lambda: defaultdict(deque))
def session_dir(self, session_id: str) -> Path:
return self.dispatch_dir / session_id
def input_path(self, session_id: str) -> Path:
return self.session_dir(session_id) / "input.txt"
def has_pending(self, session_id: str) -> bool:
return bool(self._pending[session_id])
def session_input_present(self, session_id: str) -> bool:
"""True iff the session's input.txt exists (CC hasn't consumed yet)."""
return self.input_path(session_id).exists()
def queue_or_write(self, session_id: str, content: str) -> bool:
"""Try to deliver content. If session's input.txt is still present,
queue internally and return False; otherwise write and return True.
"""
self._pending[session_id].append(content)
return self._flush_session(session_id)
def flush_all(self) -> int:
"""Try to flush queued dispatches for every session. Returns count delivered."""
delivered = 0
for session_id in list(self._pending.keys()):
while self._pending[session_id]:
if not self._flush_session(session_id):
break
delivered += 1
return delivered
def _flush_session(self, session_id: str) -> bool:
if not self._pending[session_id]:
return False
if self.session_input_present(session_id):
return False
next_content = self._pending[session_id].popleft()
path = self.input_path(session_id)
path.parent.mkdir(parents=True, exist_ok=True)
write_atomic(path, next_content)
logger.info("dispatched %d chars to %s -> %s", len(next_content), session_id, path)
return True