72 lines
2.6 KiB
Python
72 lines
2.6 KiB
Python
|
|
"""Dispatch writer.
|
||
|
|
|
||
|
|
Each registered CC session has a directory ``dispatch/<session_id>/``.
|
||
|
|
The daemon delivers a chat-side response by writing it to
|
||
|
|
``dispatch/<session_id>/input.txt``. CC's polling loop reads, acts on
|
||
|
|
the content, and **deletes** the file as the acknowledgement.
|
||
|
|
|
||
|
|
The "only write when prior is consumed" rule means the daemon must not
|
||
|
|
overwrite a pending dispatch — otherwise CC could miss a turn. If the
|
||
|
|
daemon has new content for a session that hasn't yet consumed the
|
||
|
|
previous one, it queues internally and waits.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
from collections import defaultdict, deque
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from relay.state import write_atomic
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class DispatchManager:
|
||
|
|
dispatch_dir: Path
|
||
|
|
_pending: dict[str, deque[str]] = field(default_factory=lambda: defaultdict(deque))
|
||
|
|
|
||
|
|
def session_dir(self, session_id: str) -> Path:
|
||
|
|
return self.dispatch_dir / session_id
|
||
|
|
|
||
|
|
def input_path(self, session_id: str) -> Path:
|
||
|
|
return self.session_dir(session_id) / "input.txt"
|
||
|
|
|
||
|
|
def has_pending(self, session_id: str) -> bool:
|
||
|
|
return bool(self._pending[session_id])
|
||
|
|
|
||
|
|
def session_input_present(self, session_id: str) -> bool:
|
||
|
|
"""True iff the session's input.txt exists (CC hasn't consumed yet)."""
|
||
|
|
return self.input_path(session_id).exists()
|
||
|
|
|
||
|
|
def queue_or_write(self, session_id: str, content: str) -> bool:
|
||
|
|
"""Try to deliver content. If session's input.txt is still present,
|
||
|
|
queue internally and return False; otherwise write and return True.
|
||
|
|
"""
|
||
|
|
self._pending[session_id].append(content)
|
||
|
|
return self._flush_session(session_id)
|
||
|
|
|
||
|
|
def flush_all(self) -> int:
|
||
|
|
"""Try to flush queued dispatches for every session. Returns count delivered."""
|
||
|
|
delivered = 0
|
||
|
|
for session_id in list(self._pending.keys()):
|
||
|
|
while self._pending[session_id]:
|
||
|
|
if not self._flush_session(session_id):
|
||
|
|
break
|
||
|
|
delivered += 1
|
||
|
|
return delivered
|
||
|
|
|
||
|
|
def _flush_session(self, session_id: str) -> bool:
|
||
|
|
if not self._pending[session_id]:
|
||
|
|
return False
|
||
|
|
if self.session_input_present(session_id):
|
||
|
|
return False
|
||
|
|
next_content = self._pending[session_id].popleft()
|
||
|
|
path = self.input_path(session_id)
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
write_atomic(path, next_content)
|
||
|
|
logger.info("dispatched %d chars to %s -> %s", len(next_content), session_id, path)
|
||
|
|
return True
|