"""Anthropic API wrapper. Wraps the SDK call so the daemon can send a turn and get back the assistant text + token usage + estimated cost. Implements prompt caching on the system prompt: subsequent calls within the 5-minute TTL get a cache hit on the (large, repeated) system prompt and pay the cheaper cache-hit rate. Per-turn user/assistant content is never marked cacheable because it changes every call. Cost estimation uses a model→price table; the table is the source of truth and is easy to update when pricing changes. """ from __future__ import annotations from dataclasses import dataclass from anthropic import Anthropic from anthropic.types import Message # Pricing per 1M tokens (USD), pulled from Anthropic's published schedule. # Cache-hit input is billed at the cache-read rate (~10% of standard input). # Cache-write is ~25% more than standard input. These numbers are # approximate and used only for log-line cost estimation; the source of # truth for billing is Anthropic's invoice. _PRICES_PER_MILLION: dict[str, dict[str, float]] = { "claude-opus-4-7": {"input": 15.0, "output": 75.0, "cache_write": 18.75, "cache_read": 1.50}, "claude-opus-4-7-1m": {"input": 15.0, "output": 75.0, "cache_write": 18.75, "cache_read": 1.50}, "claude-sonnet-4-6": {"input": 3.0, "output": 15.0, "cache_write": 3.75, "cache_read": 0.30}, "claude-haiku-4-5-20251001": { "input": 0.80, "output": 4.0, "cache_write": 1.00, "cache_read": 0.08, }, } def _price_for(model: str) -> dict[str, float]: if model in _PRICES_PER_MILLION: return _PRICES_PER_MILLION[model] # Fallback: charge as Opus 4.7 (worst-case) so estimates don't # under-report for unknown models. Logged once at startup. return _PRICES_PER_MILLION["claude-opus-4-7"] @dataclass class TurnResult: text: str input_tokens: int output_tokens: int cache_creation_input_tokens: int cache_read_input_tokens: int estimated_cost_usd: float raw: Message @dataclass class AnthropicClient: api_key: str model: str max_output_tokens: int = 4096 def __post_init__(self) -> None: self._sdk = Anthropic(api_key=self.api_key) def send( self, *, system_prompt: str, messages: list[dict[str, str]], ) -> TurnResult: """Send a single API turn. The system prompt is marked cacheable. ``messages`` is the full conversation history shaped for the Messages API. The most recent message is the user turn we're responding to. The daemon is responsible for appending the assistant text it gets back into history before the next call. """ response = self._sdk.messages.create( model=self.model, max_tokens=self.max_output_tokens, system=[ { "type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}, } ], messages=messages, ) text = "".join( block.text for block in response.content if getattr(block, "type", None) == "text" ) usage = response.usage in_tokens = int(getattr(usage, "input_tokens", 0) or 0) out_tokens = int(getattr(usage, "output_tokens", 0) or 0) cache_create = int(getattr(usage, "cache_creation_input_tokens", 0) or 0) cache_read = int(getattr(usage, "cache_read_input_tokens", 0) or 0) prices = _price_for(self.model) cost = ( in_tokens * prices["input"] + out_tokens * prices["output"] + cache_create * prices["cache_write"] + cache_read * prices["cache_read"] ) / 1_000_000.0 return TurnResult( text=text, input_tokens=in_tokens, output_tokens=out_tokens, cache_creation_input_tokens=cache_create, cache_read_input_tokens=cache_read, estimated_cost_usd=cost, raw=response, )