91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from relay.queue import ack, list_pending, stuck_age_seconds, take_oldest
|
||
|
|
|
||
|
|
|
||
|
|
def _drop(queue_dir: Path, name: str, payload: dict) -> Path:
|
||
|
|
queue_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
path = queue_dir / name
|
||
|
|
path.write_text(json.dumps(payload))
|
||
|
|
return path
|
||
|
|
|
||
|
|
|
||
|
|
def test_take_oldest_returns_none_on_empty_queue(tmp_path: Path) -> None:
|
||
|
|
assert take_oldest(tmp_path / "queue") is None
|
||
|
|
|
||
|
|
|
||
|
|
def test_take_oldest_returns_oldest_by_mtime(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
older = _drop(queue_dir, "a.json", {"session_id": "s1", "timestamp": "t1", "content": "first"})
|
||
|
|
time.sleep(0.01)
|
||
|
|
newer = _drop(queue_dir, "b.json", {"session_id": "s2", "timestamp": "t2", "content": "second"})
|
||
|
|
entry = take_oldest(queue_dir)
|
||
|
|
assert entry is not None
|
||
|
|
assert entry.path == older
|
||
|
|
assert entry.session_id == "s1"
|
||
|
|
assert entry.content == "first"
|
||
|
|
assert newer.exists()
|
||
|
|
|
||
|
|
|
||
|
|
def test_invalid_json_is_rejected(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
queue_dir.mkdir()
|
||
|
|
bad = queue_dir / "bad.json"
|
||
|
|
bad.write_text("not json")
|
||
|
|
assert take_oldest(queue_dir) is None
|
||
|
|
assert not bad.exists()
|
||
|
|
assert (queue_dir / ".rejected" / "bad.json").exists()
|
||
|
|
|
||
|
|
|
||
|
|
def test_envelope_missing_keys_is_rejected(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
_drop(queue_dir, "x.json", {"session_id": "s1"}) # no timestamp / content
|
||
|
|
assert take_oldest(queue_dir) is None
|
||
|
|
assert (queue_dir / ".rejected" / "x.json").exists()
|
||
|
|
|
||
|
|
|
||
|
|
def test_envelope_blank_content_is_rejected(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
_drop(queue_dir, "x.json", {"session_id": "s1", "timestamp": "t", "content": " "})
|
||
|
|
assert take_oldest(queue_dir) is None
|
||
|
|
|
||
|
|
|
||
|
|
def test_ack_deletes_file(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
_drop(queue_dir, "a.json", {"session_id": "s1", "timestamp": "t", "content": "x"})
|
||
|
|
entry = take_oldest(queue_dir)
|
||
|
|
assert entry is not None
|
||
|
|
ack(entry)
|
||
|
|
assert not entry.path.exists()
|
||
|
|
|
||
|
|
|
||
|
|
def test_list_pending_skips_rejected_dir(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
queue_dir.mkdir()
|
||
|
|
rejected = queue_dir / ".rejected"
|
||
|
|
rejected.mkdir()
|
||
|
|
(rejected / "old.json").write_text("{}")
|
||
|
|
a = _drop(queue_dir, "a.json", {"session_id": "s", "timestamp": "t", "content": "x"})
|
||
|
|
pending = list_pending(queue_dir)
|
||
|
|
assert pending == [a]
|
||
|
|
|
||
|
|
|
||
|
|
def test_stuck_age_returns_zero_on_empty_queue(tmp_path: Path) -> None:
|
||
|
|
assert stuck_age_seconds(tmp_path / "queue") == 0
|
||
|
|
|
||
|
|
|
||
|
|
def test_stuck_age_reports_oldest_age(tmp_path: Path) -> None:
|
||
|
|
queue_dir = tmp_path / "queue"
|
||
|
|
path = _drop(queue_dir, "a.json", {"session_id": "s", "timestamp": "t", "content": "x"})
|
||
|
|
# Backdate the file
|
||
|
|
old_mtime = time.time() - 60
|
||
|
|
import os
|
||
|
|
|
||
|
|
os.utime(path, (old_mtime, old_mtime))
|
||
|
|
age = stuck_age_seconds(queue_dir)
|
||
|
|
assert age >= 59
|