This repository has been archived on 2026-05-02. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
risv3-relay/relay/conversation.py

100 lines
3.4 KiB
Python
Raw Permalink Normal View History

feat: relay daemon skeleton — queue, dispatch, conversation, ntfy (#1) First-PR scope from #1. Single-process Python daemon that relays between Claude Code instances and chat-Claude (Anthropic API). Components: * relay.config — .env + config.yaml loader. Auto-generates ntfy topic on first run and persists it back to .env. * relay.state — atomic file I/O via tempfile + rename, advisory flock at state/.lock to enforce single-instance. * relay.conversation — append-only history with summarization. Triggers a summarize call when total chars exceed HISTORY_CHAR_CAP (default 400k); replaces history with the summary plus the most recent 10 turns. * relay.anthropic_client — SDK wrapper. Marks the system prompt cacheable (5-min ephemeral cache); concatenates text blocks; estimates per-call cost from the Anthropic price table with cache-write/read accounted for. * relay.queue — JSON envelope intake; oldest-by-mtime; malformed envelopes moved to queue/.rejected/. * relay.dispatch — one-input-at-a-time per session (dispatch/<session_id>/input.txt). Won't overwrite a pending dispatch; queues internally and waits for CC to delete. * relay.ntfy — best-effort POST to https://ntfy.sh/<topic>; failures logged but never block the main loop. * relay.daemon — main loop. Polls jc_input.txt (priority) then queue/. Detects [NEEDS-JC] in the first 200 chars of any response and pauses dispatch until JC writes jc_input.txt. JC override supports @session-N: prefix for direct dispatch without an API call. * relay.__main__ — CLI: relay run / relay status / relay topic. Tests: 57 unit tests pass (config, state, conversation, queue, dispatch, anthropic_client, ntfy, full daemon loop with a fake client). One real-API smoke test marked real_api, opt-in via pytest -m real_api; skips cleanly on credit-balance errors. Out of scope for this PR (deferred to follow-ups): Flask status endpoint, multi-session config in production, exponential backoff, systemd unit, cost-tracking aggregation. Closes #1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 15:24:47 +00:00
"""Conversation history with summarization.
History shape: list of turns ``{role, content, ts, session_id?}`` where
``role`` is ``"user"`` or ``"assistant"``. The user role represents
either CC output or JC override; the assistant role represents the
chat-Claude response.
Summarization fires when the total content character count exceeds
``HISTORY_CHAR_CAP``. The summarization prompt is sent as a normal user
turn, the API's response replaces all earlier turns, and the most
recent ``RECENT_TURNS_KEPT`` turns are appended verbatim. The summary
turn is marked with ``meta="summary"`` so the daemon can recognize it
when paginating.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from relay.state import read_json, write_json_atomic
RECENT_TURNS_KEPT = 10
@dataclass
class Turn:
role: str # 'user' | 'assistant'
content: str
ts: str = field(default_factory=lambda: datetime.utcnow().isoformat(timespec="seconds") + "Z")
session_id: str | None = None
meta: str | None = None # 'summary' for replaced summary turns
def to_api_message(self) -> dict[str, str]:
"""Anthropic Messages API shape — only role + content needed."""
return {"role": self.role, "content": self.content}
class Conversation:
"""In-memory + on-disk conversation history."""
def __init__(self, history_path: Path):
self._path = history_path
raw = read_json(self._path, default=[])
if not isinstance(raw, list):
raise ValueError(f"Expected list at {self._path}, got {type(raw).__name__}")
self._turns: list[Turn] = [Turn(**dict(t)) for t in raw]
@property
def turns(self) -> list[Turn]:
return list(self._turns)
def append(
self, role: str, content: str, *, session_id: str | None = None, meta: str | None = None
) -> Turn:
turn = Turn(role=role, content=content, session_id=session_id, meta=meta)
self._turns.append(turn)
self._persist()
return turn
def replace_with_summary(self, summary_text: str) -> None:
"""Replace all but the last RECENT_TURNS_KEPT turns with one summary turn."""
recent = self._turns[-RECENT_TURNS_KEPT:] if len(self._turns) > RECENT_TURNS_KEPT else []
summary_turn = Turn(role="assistant", content=summary_text, meta="summary")
self._turns = [summary_turn, *recent]
self._persist()
def total_chars(self) -> int:
return sum(len(t.content) for t in self._turns)
def needs_summarization(self, cap: int) -> bool:
return self.total_chars() > cap
def to_api_messages(self) -> list[dict[str, str]]:
return [t.to_api_message() for t in self._turns]
def _persist(self) -> None:
write_json_atomic(self._path, [asdict(t) for t in self._turns])
# Test/debug helper
def reset(self) -> None:
self._turns = []
self._persist()
def render_for_log(turn: Turn, max_chars: int = 200) -> dict[str, Any]:
"""Compact representation for log lines — full content elided."""
content = turn.content
if len(content) > max_chars:
content = content[:max_chars] + f"... <{len(turn.content) - max_chars} more chars>"
return {
"role": turn.role,
"ts": turn.ts,
"session_id": turn.session_id,
"meta": turn.meta,
"content": content,
}