This repository has been archived on 2026-05-02. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
risv3-relay/tests/test_anthropic_client.py

123 lines
4.4 KiB
Python
Raw Permalink Normal View History

feat: relay daemon skeleton — queue, dispatch, conversation, ntfy (#1) First-PR scope from #1. Single-process Python daemon that relays between Claude Code instances and chat-Claude (Anthropic API). Components: * relay.config — .env + config.yaml loader. Auto-generates ntfy topic on first run and persists it back to .env. * relay.state — atomic file I/O via tempfile + rename, advisory flock at state/.lock to enforce single-instance. * relay.conversation — append-only history with summarization. Triggers a summarize call when total chars exceed HISTORY_CHAR_CAP (default 400k); replaces history with the summary plus the most recent 10 turns. * relay.anthropic_client — SDK wrapper. Marks the system prompt cacheable (5-min ephemeral cache); concatenates text blocks; estimates per-call cost from the Anthropic price table with cache-write/read accounted for. * relay.queue — JSON envelope intake; oldest-by-mtime; malformed envelopes moved to queue/.rejected/. * relay.dispatch — one-input-at-a-time per session (dispatch/<session_id>/input.txt). Won't overwrite a pending dispatch; queues internally and waits for CC to delete. * relay.ntfy — best-effort POST to https://ntfy.sh/<topic>; failures logged but never block the main loop. * relay.daemon — main loop. Polls jc_input.txt (priority) then queue/. Detects [NEEDS-JC] in the first 200 chars of any response and pauses dispatch until JC writes jc_input.txt. JC override supports @session-N: prefix for direct dispatch without an API call. * relay.__main__ — CLI: relay run / relay status / relay topic. Tests: 57 unit tests pass (config, state, conversation, queue, dispatch, anthropic_client, ntfy, full daemon loop with a fake client). One real-API smoke test marked real_api, opt-in via pytest -m real_api; skips cleanly on credit-balance errors. Out of scope for this PR (deferred to follow-ups): Flask status endpoint, multi-session config in production, exponential backoff, systemd unit, cost-tracking aggregation. Closes #1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 15:24:47 +00:00
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import MagicMock
from relay.anthropic_client import AnthropicClient
def _fake_message(
text: str, *, in_tokens: int = 100, out_tokens: int = 50, cache_w: int = 0, cache_r: int = 0
):
"""Build a stand-in for anthropic.types.Message with .content + .usage."""
return SimpleNamespace(
content=[SimpleNamespace(type="text", text=text)],
usage=SimpleNamespace(
input_tokens=in_tokens,
output_tokens=out_tokens,
cache_creation_input_tokens=cache_w,
cache_read_input_tokens=cache_r,
),
)
def _client_with_mock(model: str = "claude-opus-4-7") -> tuple[AnthropicClient, MagicMock]:
client = AnthropicClient(api_key="sk-fake", model=model)
mock_create = MagicMock(return_value=_fake_message("response text"))
client._sdk = SimpleNamespace(messages=SimpleNamespace(create=mock_create))
return client, mock_create
def test_send_returns_assistant_text_and_usage() -> None:
client, _ = _client_with_mock()
result = client.send(system_prompt="sys", messages=[{"role": "user", "content": "u1"}])
assert result.text == "response text"
assert result.input_tokens == 100
assert result.output_tokens == 50
def test_send_passes_system_prompt_with_cache_control() -> None:
client, mock_create = _client_with_mock()
client.send(system_prompt="SYSTEM PROMPT", messages=[{"role": "user", "content": "u"}])
call = mock_create.call_args
system_arg = call.kwargs["system"]
assert system_arg[0]["text"] == "SYSTEM PROMPT"
assert system_arg[0]["cache_control"] == {"type": "ephemeral"}
def test_cost_estimation_opus() -> None:
client, _ = _client_with_mock()
# Override response with specific token counts
client._sdk.messages.create = MagicMock(
return_value=_fake_message("x", in_tokens=1_000_000, out_tokens=0)
)
result = client.send(system_prompt="s", messages=[{"role": "user", "content": "u"}])
# Opus: $15/M input → $15
assert abs(result.estimated_cost_usd - 15.0) < 0.01
def test_cost_estimation_includes_cache_savings() -> None:
client, _ = _client_with_mock()
client._sdk.messages.create = MagicMock(
return_value=_fake_message(
"x",
in_tokens=0,
out_tokens=0,
cache_w=1_000_000, # 1M cache write at $18.75/M
cache_r=1_000_000, # 1M cache read at $1.50/M
)
)
result = client.send(system_prompt="s", messages=[{"role": "user", "content": "u"}])
# cache_w 1M @ $18.75 + cache_r 1M @ $1.50 = $20.25
assert abs(result.estimated_cost_usd - 20.25) < 0.01
def test_cost_estimation_unknown_model_falls_back_to_opus() -> None:
client = AnthropicClient(api_key="sk-fake", model="claude-future-9000")
client._sdk = SimpleNamespace(
messages=SimpleNamespace(
create=MagicMock(return_value=_fake_message("x", in_tokens=1_000_000, out_tokens=0))
)
)
result = client.send(system_prompt="s", messages=[{"role": "user", "content": "u"}])
# Falls back to Opus pricing
assert abs(result.estimated_cost_usd - 15.0) < 0.01
def test_send_concatenates_multiple_text_blocks() -> None:
client, _ = _client_with_mock()
multi = SimpleNamespace(
content=[
SimpleNamespace(type="text", text="hello "),
SimpleNamespace(type="text", text="world"),
],
usage=SimpleNamespace(
input_tokens=0,
output_tokens=0,
cache_creation_input_tokens=0,
cache_read_input_tokens=0,
),
)
client._sdk.messages.create = MagicMock(return_value=multi)
result = client.send(system_prompt="s", messages=[{"role": "user", "content": "u"}])
assert result.text == "hello world"
def test_send_ignores_non_text_blocks() -> None:
client, _ = _client_with_mock()
mixed = SimpleNamespace(
content=[
SimpleNamespace(type="text", text="text part"),
SimpleNamespace(type="tool_use"), # no .text — would crash if not filtered
],
usage=SimpleNamespace(
input_tokens=0,
output_tokens=0,
cache_creation_input_tokens=0,
cache_read_input_tokens=0,
),
)
client._sdk.messages.create = MagicMock(return_value=mixed)
result = client.send(system_prompt="s", messages=[{"role": "user", "content": "u"}])
assert result.text == "text part"