feat(@projects/@claire): add session reaping logic

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-06-01 01:16:42 -06:00
parent 7492a4d4dd
commit c22cae1b74
5 changed files with 247 additions and 2 deletions

View file

@ -137,10 +137,21 @@ def agent_run(
(sync now; supervisor + telemetry later). Deployed as a systemd --user (sync now; supervisor + telemetry later). Deployed as a systemd --user
unit on apricot/black. unit on apricot/black.
""" """
import logging
import uvicorn import uvicorn
from .web.app import create_app from .web.app import create_app
# Surface the agent loops' INFO logs to stdout → journald. Without this,
# uvicorn only configures its own loggers and the `claire.*` loop logs
# (sync/supervisor/telemetry, incl. auto-continue dry-run) are invisible.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logging.getLogger("claire").setLevel(logging.INFO)
cfg = load_or_init() cfg = load_or_init()
bind_port = port or cfg.agent.port bind_port = port or cfg.agent.port
console.print( console.print(

View file

@ -19,7 +19,7 @@ import re as _re
import sqlite3 import sqlite3
from collections import Counter from collections import Counter
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from uuid import UUID from uuid import UUID
from . import events, scheduler from . import events, scheduler
@ -28,6 +28,16 @@ from .hlc import HLCGenerator
from .rclaude import Rclaude, RclaudeError from .rclaude import Rclaude, RclaudeError
# A session must be `closed` AND silent for at least this long before its
# active assignments are reaped. `liveness` is a heuristic (freshest-N-panes
# by tmux-name match) and plum's view of *remote* tmux can transiently
# misflag a live worker as closed; a live worker writes to its JSONL
# constantly, so a fresh `last_seen_mtime` protects it from a misfire. The
# window only has to exceed the longest a genuinely-live session stays silent
# between transcript writes — 30 minutes is comfortably above that.
_REAP_GRACE_MINUTES = 30
@dataclass(frozen=True) @dataclass(frozen=True)
class PullStats: class PullStats:
sessions_observed: int sessions_observed: int
@ -35,6 +45,7 @@ class PullStats:
errors: list[str] errors: list[str]
sessions_alive: int = 0 sessions_alive: int = 0
sessions_closed: int = 0 sessions_closed: int = 0
assignments_reaped: int = 0
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -42,6 +53,63 @@ class PullStats:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _reap_stale_assignments(
conn: sqlite3.Connection,
generator: HLCGenerator,
*,
now: datetime | None = None,
grace_minutes: int = _REAP_GRACE_MINUTES,
) -> int:
"""Deactivate active assignments whose worker session is gone for good.
When a worker dies its tmux pane vanishes and `pull`'s liveness pass marks
the session `closed` but nothing ever flipped the assignment to inactive,
so the task kept *looking* assigned (and `list_fleet`, which only shows
alive sessions, could never reveal the orphan). The scheduler treats a
todo/in_progress task with no active assignment as dispatchable work
(`scheduler.suggest_assignments`), so deactivating the dead assignment is
all that's needed to re-surface the task — no task-state change, which
keeps the two-gate state machine intact (`in_progress todo` is not a
legal transition by design).
Gated on staleness, not `closed` alone, to survive a liveness misfire on a
quiet-but-live worker (see `_REAP_GRACE_MINUTES`). Idempotent: an already
inactive assignment is never re-selected, so re-running is a no-op.
"""
now = now or datetime.now(timezone.utc)
cutoff = now - timedelta(minutes=grace_minutes)
rows = conn.execute(
"""
SELECT a.id AS assignment_id, s.last_seen_mtime AS last_seen
FROM assignments a
JOIN sessions s ON s.uuid = a.session_uuid
WHERE a.active = 1 AND s.liveness = 'closed'
"""
).fetchall()
reaped = 0
for row in rows:
last_seen = row["last_seen"]
if not last_seen:
# No timestamp to prove staleness — refuse to reap (conservative).
continue
try:
seen_at = datetime.fromisoformat(last_seen)
except ValueError:
continue
if seen_at.tzinfo is None:
seen_at = seen_at.replace(tzinfo=timezone.utc)
if seen_at >= cutoff:
continue # silent, but not long enough — could be a live misfire
events.append(
conn,
generator,
events.AssignmentDeactivated(assignment_id=UUID(row["assignment_id"])),
)
reaped += 1
return reaped
def _session_snapshot( def _session_snapshot(
conn: sqlite3.Connection, conn: sqlite3.Connection,
) -> dict[UUID, tuple[str, str | None, str | None]]: ) -> dict[UUID, tuple[str, str | None, str | None]]:
@ -244,6 +312,17 @@ def pull(
else: else:
sessions_closed += 1 sessions_closed += 1
# --- Reap ------------------------------------------------------------
# A worker that died left its assignment marked active; deactivate any
# whose session is now closed-and-stale so the orphaned task re-surfaces
# as dispatchable work. Must run AFTER the liveness pass above so it sees
# this cycle's fresh `closed` flags. Never break a pull — count as error.
assignments_reaped = 0
try:
assignments_reaped = _reap_stale_assignments(conn, generator)
except Exception as exc: # noqa: BLE001 — defensive: never break pull
errors.append(f"reap: {exc}")
# --- Triage ----------------------------------------------------------- # --- Triage -----------------------------------------------------------
try: try:
triage_rows = rclaude.triage() triage_rows = rclaude.triage()
@ -299,6 +378,7 @@ def pull(
errors=errors, errors=errors,
sessions_alive=sessions_alive, sessions_alive=sessions_alive,
sessions_closed=sessions_closed, sessions_closed=sessions_closed,
assignments_reaped=assignments_reaped,
) )

View file

@ -408,7 +408,13 @@ def fleet_considered_get(cg: Dep) -> dict[str, Any]:
def pull_now(cg: Dep) -> dict[str, Any]: def pull_now(cg: Dep) -> dict[str, Any]:
conn, gen = cg conn, gen = cg
stats = service.pull(conn, gen) stats = service.pull(conn, gen)
return {"sessions_observed": stats.sessions_observed, "errors": stats.errors} return {
"sessions_observed": stats.sessions_observed,
"sessions_alive": stats.sessions_alive,
"sessions_closed": stats.sessions_closed,
"assignments_reaped": stats.assignments_reaped,
"errors": stats.errors,
}
@router.get("/budget") @router.get("/budget")

118
tests/test_pull_reap.py Normal file
View file

@ -0,0 +1,118 @@
"""Reap pass: a dead worker's assignment is deactivated so its task re-surfaces.
When a worker dies its tmux pane vanishes and the liveness pass marks the
session `closed`, but nothing ever flipped the assignment inactive so the
task kept *looking* assigned and the scheduler never re-offered it. The reap
pass deactivates active assignments whose session is closed-and-stale, gated
on a grace window so a momentarily-misflagged live worker is never reaped.
"""
from __future__ import annotations
from datetime import datetime, timezone
from uuid import UUID
from claire.domain import TaskStatus
from claire.pull import pull
from claire.rclaude import SessionRow, TmuxRow
from claire.web import service
_CWD = "/var/home/lilith/Code/@projects/@claire"
_SLUG = "var-home-lilith-Code--projects--claire"
class _FakeRclaude:
def __init__(self, sessions: list[SessionRow], tmux: list[TmuxRow]):
self._sessions = sessions
self._tmux = tmux
def list_sessions(self) -> list[SessionRow]:
return list(self._sessions)
def list_tmux(self) -> list[TmuxRow]:
return list(self._tmux)
def triage(self) -> list:
return []
def _session(uuid_hex: str, mtime: int) -> SessionRow:
return SessionRow(
host="apricot", uuid=UUID(uuid_hex), snippet="x", cwd=_CWD, mtime_epoch=mtime,
)
def _active(conn, assignment_id: UUID) -> bool:
row = conn.execute(
"SELECT active FROM assignments WHERE id = ?", (str(assignment_id),)
).fetchone()
return bool(row["active"])
def _setup_in_progress_task(conn, gen) -> UUID:
service.create_project(conn, gen, name="proj")
task = service.add_task(conn, gen, project="proj", title="t", priority=1)
service.transition_task_state(
conn, gen, task_id=task.id, to_state=TaskStatus.IN_PROGRESS
)
return task.id
def test_reaps_only_closed_and_stale_assignment(conn, gen) -> None:
"""Three workers on one in-progress task: dead-stale reaped; closed-fresh
and alive both protected."""
now = int(datetime.now(timezone.utc).timestamp())
dead = "11111111-1111-1111-1111-111111111111" # old → closed + stale
fresh_closed = "22222222-2222-2222-2222-222222222222" # closed but within grace
alive = "33333333-3333-3333-3333-333333333333" # live pane → alive
task_id = _setup_in_progress_task(conn, gen)
a_dead = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
a_fresh = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(fresh_closed))
a_alive = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(alive))
fake = _FakeRclaude(
sessions=[
_session(dead, 1_700_000_000), # years old
_session(fresh_closed, now - 300), # 5 min ago — inside the 30-min grace
_session(alive, now - 60), # freshest → claims the one live pane
],
# One live pane at the workspace → only the freshest session is alive.
tmux=[TmuxRow(host="apricot", session_name=f"claude-natalie-{_SLUG}-1779320000", detail="")],
)
stats = pull(conn, gen, rclaude=fake)
assert stats.assignments_reaped == 1
assert _active(conn, a_dead.id) is False # dead + stale → reaped
assert _active(conn, a_fresh.id) is True # closed but fresh → protected
assert _active(conn, a_alive.id) is True # alive → protected
def test_reap_is_idempotent(conn, gen) -> None:
"""A second pull over unchanged fleet reaps nothing — the dead assignment
is already inactive."""
dead = "44444444-4444-4444-4444-444444444444"
task_id = _setup_in_progress_task(conn, gen)
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
first = pull(conn, gen, rclaude=fake)
second = pull(conn, gen, rclaude=fake)
assert first.assignments_reaped == 1
assert second.assignments_reaped == 0
def test_reaped_task_becomes_dispatchable_again(conn, gen) -> None:
"""After reaping, the in-progress task has no active assignment, which is
exactly what the scheduler treats as unassigned open work."""
dead = "55555555-5555-5555-5555-555555555555"
task_id = _setup_in_progress_task(conn, gen)
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
pull(conn, gen, rclaude=fake)
active_for_task = service.read.list_assignments(conn, task_id=task_id, active_only=True)
assert active_for_task == []

30
uv.lock generated
View file

@ -181,6 +181,7 @@ dependencies = [
{ name = "httpx" }, { name = "httpx" },
{ name = "jinja2" }, { name = "jinja2" },
{ name = "mcp" }, { name = "mcp" },
{ name = "psutil" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "python-multipart" }, { name = "python-multipart" },
{ name = "rich" }, { name = "rich" },
@ -204,6 +205,7 @@ requires-dist = [
{ name = "jinja2", specifier = ">=3.1" }, { name = "jinja2", specifier = ">=3.1" },
{ name = "mcp", specifier = ">=1.27.1" }, { name = "mcp", specifier = ">=1.27.1" },
{ name = "mypy", marker = "extra == 'dev'", specifier = ">=1.10" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.10" },
{ name = "psutil", specifier = ">=5.9" },
{ name = "pydantic", specifier = ">=2.7" }, { name = "pydantic", specifier = ">=2.7" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8" },
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" },
@ -776,6 +778,34 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
] ]
[[package]]
name = "psutil"
version = "7.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/aa/c6/d1ddf4abb55e93cebc4f2ed8b5d6dbad109ecb8d63748dd2b20ab5e57ebe/psutil-7.2.2.tar.gz", hash = "sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372", size = 493740, upload-time = "2026-01-28T18:14:54.428Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/51/08/510cbdb69c25a96f4ae523f733cdc963ae654904e8db864c07585ef99875/psutil-7.2.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b", size = 130595, upload-time = "2026-01-28T18:14:57.293Z" },
{ url = "https://files.pythonhosted.org/packages/d6/f5/97baea3fe7a5a9af7436301f85490905379b1c6f2dd51fe3ecf24b4c5fbf/psutil-7.2.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea", size = 131082, upload-time = "2026-01-28T18:14:59.732Z" },
{ url = "https://files.pythonhosted.org/packages/37/d6/246513fbf9fa174af531f28412297dd05241d97a75911ac8febefa1a53c6/psutil-7.2.2-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63", size = 181476, upload-time = "2026-01-28T18:15:01.884Z" },
{ url = "https://files.pythonhosted.org/packages/b8/b5/9182c9af3836cca61696dabe4fd1304e17bc56cb62f17439e1154f225dd3/psutil-7.2.2-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312", size = 184062, upload-time = "2026-01-28T18:15:04.436Z" },
{ url = "https://files.pythonhosted.org/packages/16/ba/0756dca669f5a9300d0cbcbfae9a4c30e446dfc7440ffe43ded5724bfd93/psutil-7.2.2-cp313-cp313t-win_amd64.whl", hash = "sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b", size = 139893, upload-time = "2026-01-28T18:15:06.378Z" },
{ url = "https://files.pythonhosted.org/packages/1c/61/8fa0e26f33623b49949346de05ec1ddaad02ed8ba64af45f40a147dbfa97/psutil-7.2.2-cp313-cp313t-win_arm64.whl", hash = "sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9", size = 135589, upload-time = "2026-01-28T18:15:08.03Z" },
{ url = "https://files.pythonhosted.org/packages/81/69/ef179ab5ca24f32acc1dac0c247fd6a13b501fd5534dbae0e05a1c48b66d/psutil-7.2.2-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00", size = 130664, upload-time = "2026-01-28T18:15:09.469Z" },
{ url = "https://files.pythonhosted.org/packages/7b/64/665248b557a236d3fa9efc378d60d95ef56dd0a490c2cd37dafc7660d4a9/psutil-7.2.2-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9", size = 131087, upload-time = "2026-01-28T18:15:11.724Z" },
{ url = "https://files.pythonhosted.org/packages/d5/2e/e6782744700d6759ebce3043dcfa661fb61e2fb752b91cdeae9af12c2178/psutil-7.2.2-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a", size = 182383, upload-time = "2026-01-28T18:15:13.445Z" },
{ url = "https://files.pythonhosted.org/packages/57/49/0a41cefd10cb7505cdc04dab3eacf24c0c2cb158a998b8c7b1d27ee2c1f5/psutil-7.2.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf", size = 185210, upload-time = "2026-01-28T18:15:16.002Z" },
{ url = "https://files.pythonhosted.org/packages/dd/2c/ff9bfb544f283ba5f83ba725a3c5fec6d6b10b8f27ac1dc641c473dc390d/psutil-7.2.2-cp314-cp314t-win_amd64.whl", hash = "sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1", size = 141228, upload-time = "2026-01-28T18:15:18.385Z" },
{ url = "https://files.pythonhosted.org/packages/f2/fc/f8d9c31db14fcec13748d373e668bc3bed94d9077dbc17fb0eebc073233c/psutil-7.2.2-cp314-cp314t-win_arm64.whl", hash = "sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841", size = 136284, upload-time = "2026-01-28T18:15:19.912Z" },
{ url = "https://files.pythonhosted.org/packages/e7/36/5ee6e05c9bd427237b11b3937ad82bb8ad2752d72c6969314590dd0c2f6e/psutil-7.2.2-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486", size = 129090, upload-time = "2026-01-28T18:15:22.168Z" },
{ url = "https://files.pythonhosted.org/packages/80/c4/f5af4c1ca8c1eeb2e92ccca14ce8effdeec651d5ab6053c589b074eda6e1/psutil-7.2.2-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979", size = 129859, upload-time = "2026-01-28T18:15:23.795Z" },
{ url = "https://files.pythonhosted.org/packages/b5/70/5d8df3b09e25bce090399cf48e452d25c935ab72dad19406c77f4e828045/psutil-7.2.2-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9", size = 155560, upload-time = "2026-01-28T18:15:25.976Z" },
{ url = "https://files.pythonhosted.org/packages/63/65/37648c0c158dc222aba51c089eb3bdfa238e621674dc42d48706e639204f/psutil-7.2.2-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e", size = 156997, upload-time = "2026-01-28T18:15:27.794Z" },
{ url = "https://files.pythonhosted.org/packages/8e/13/125093eadae863ce03c6ffdbae9929430d116a246ef69866dad94da3bfbc/psutil-7.2.2-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8", size = 148972, upload-time = "2026-01-28T18:15:29.342Z" },
{ url = "https://files.pythonhosted.org/packages/04/78/0acd37ca84ce3ddffaa92ef0f571e073faa6d8ff1f0559ab1272188ea2be/psutil-7.2.2-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc", size = 148266, upload-time = "2026-01-28T18:15:31.597Z" },
{ url = "https://files.pythonhosted.org/packages/b4/90/e2159492b5426be0c1fef7acba807a03511f97c5f86b3caeda6ad92351a7/psutil-7.2.2-cp37-abi3-win_amd64.whl", hash = "sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988", size = 137737, upload-time = "2026-01-28T18:15:33.849Z" },
{ url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" },
]
[[package]] [[package]]
name = "pycparser" name = "pycparser"
version = "3.0" version = "3.0"