diff --git a/src/clare/config.py b/src/clare/config.py index cd3fdac..60df837 100644 --- a/src/clare/config.py +++ b/src/clare/config.py @@ -54,6 +54,11 @@ class OrchestratorConfig(_Strict): # the fleet, identifies blockers, and surfaces next-step recommendations # — closing the loop on "reduce required user interaction." 0 disables. rounds_interval_s: int = Field(default=0, ge=0, le=86400) + # When False (default), Clare's rounds loop only *proposes* dispatch — + # spawning a session is a real external action, so it waits for the + # user. Set True to let Clare dispatch eligible work autonomously + # (still gated by the budget envelope + per-host caps). + autonomous_dispatch: bool = False class BudgetConfig(_Strict): @@ -155,6 +160,8 @@ def _serialize(cfg: ClareConfig) -> str: lines.append(f"reply_timeout_s = {orch.reply_timeout_s}") if orch.rounds_interval_s != 0: lines.append(f"rounds_interval_s = {orch.rounds_interval_s}") + if orch.autonomous_dispatch: + lines.append("autonomous_dispatch = true") # Emit [budget] / [limits] only when non-default. bud = cfg.budget if bud.daily_token_cap != 0 or bud.low_priority_floor != 0.8: diff --git a/src/clare/orchestrator/mcp_server.py b/src/clare/orchestrator/mcp_server.py index 481b4be..1b7606c 100644 --- a/src/clare/orchestrator/mcp_server.py +++ b/src/clare/orchestrator/mcp_server.py @@ -479,6 +479,21 @@ def build_server() -> FastMCP: lambda c, _g: tools.fleet_load(c), ) + @mcp.tool() + async def dispatch_task(task_id: str, host: str, cwd: str) -> dict[str, Any]: + """Spawn a Claude session to work a task on `host` at `cwd`. + + Gated: refuses (with a `reason`) when over the daily token cap, + past the low-priority floor for non-P0/P1 work, or the host is at + its parallelism cap. Check `budget_status` + `fleet_load` first. + On success, creates an Assignment linking the task to the new + session. Spawning a session is a real action — only call this + when the user has opted into autonomous dispatch or confirmed.""" + return await _call_tool( + "dispatch_task", {"task_id": task_id, "host": host, "cwd": cwd}, + lambda c, g: tools.dispatch_task(c, g, task_id=task_id, host=host, cwd=cwd), + ) + # Send tool ------------------------------------------------------------- @mcp.tool() async def send_to_session(session_ref: str, text: str) -> dict[str, Any]: diff --git a/src/clare/orchestrator/tools.py b/src/clare/orchestrator/tools.py index cff200f..f89d5ef 100644 --- a/src/clare/orchestrator/tools.py +++ b/src/clare/orchestrator/tools.py @@ -240,6 +240,7 @@ def help_text() -> dict[str, Any]: {"name": "set_project_status", "summary": "active/paused/done/archived — archive hides stale projects."}, {"name": "budget_status", "summary": "Today's token burn vs daily cap — check before low-priority dispatch."}, {"name": "fleet_load", "summary": "Live sessions per host vs cap + rate headroom — check before dispatch."}, + {"name": "dispatch_task", "summary": "Spawn a session for a task (gated by budget + host caps)."}, ], } @@ -404,6 +405,35 @@ def suggest_assignments(conn: sqlite3.Connection) -> dict[str, Any]: return scheduler.suggest_assignments(conn, per_host_max=cap) +def dispatch_task( + conn: sqlite3.Connection, + gen: HLCGenerator, + *, + task_id: str, + host: str, + cwd: str, +) -> dict[str, Any]: + """Spawn a Claude session for a task, gated by budget + host caps. + + Refuses (with a reason) when over the daily token cap, past the + low-priority floor for a non-P0/P1 task, or the host is at its + parallelism cap. On success spawns via rclaude, discovers the new + session, and creates the Assignment. + """ + result = _wrap_service( + service.dispatch_task, + conn=conn, gen=gen, task_id=UUID(task_id), host=host, cwd=cwd, + ) + return { + "dispatched": result.dispatched, + "task_id": result.task_id, + "reason": result.reason, + "session_uuid": result.session_uuid, + "host": result.host, + "assignment_id": result.assignment_id, + } + + def fleet_load(conn: sqlite3.Connection) -> dict[str, Any]: """Live-session count per host vs the per-host cap + rate headroom. @@ -784,6 +814,7 @@ __all__ = [ "create_tag", "get_session", "budget_status", + "dispatch_task", "fleet_load", "list_projects", "set_project_org", diff --git a/src/clare/web/app.py b/src/clare/web/app.py index 69cd72e..3cdad30 100644 --- a/src/clare/web/app.py +++ b/src/clare/web/app.py @@ -156,8 +156,22 @@ def create_app( " • `set_project_status name_or_id= status=archived` for " "anything that's clearly a sync/demo/test artifact — never " "delete, just archive.\n" - "5. ALWAYS `report_status` with your own one-line summary.\n" - "6. ALWAYS Write the formatted output below to " + "5. CONSIDER DISPATCH — call `budget_status` and " + "`fleet_load`. If there is unassigned open work, pick the " + "top-priority task that fits the envelope (under the daily " + "token cap; P0/P1 only if past the low-priority floor; a " + "host with capacity). " + + ( + "Autonomous dispatch is ON — `dispatch_task(task_id, " + "host, cwd)` it directly.\n" + if cfg.orchestrator.autonomous_dispatch + else "Autonomous dispatch is OFF — do NOT dispatch; " + "instead surface the single best dispatch in NEEDS YOU " + "as a proposal (task, host, cwd, why) for the user to " + "approve.\n" + ) + + "6. ALWAYS `report_status` with your own one-line summary.\n" + "7. ALWAYS Write the formatted output below to " "`.claude/plans/clare-mobile-app-plan.md` in your cwd. This " "file IS the user's HUD when they connect via " "/remote-control clare from claude.ai/code or mobile.\n" diff --git a/src/clare/web/service.py b/src/clare/web/service.py index 4e82ef7..7ebbcde 100644 --- a/src/clare/web/service.py +++ b/src/clare/web/service.py @@ -990,3 +990,103 @@ def budget_status(conn: sqlite3.Connection, *, day: str | None = None) -> dict: "by_source": rollup["by_source"], "calls": rollup["calls"], } + + +# --------------------------------------------------------------------------- +# Dispatch — the rate/load-aware engine actually ACTS on work. Spawns a +# Claude session for a task, gated by the budget envelope + host caps. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class DispatchResult: + dispatched: bool + task_id: str + reason: str # why refused, or "ok" + session_uuid: str | None = None # the spawned session, when dispatched + host: str | None = None + assignment_id: str | None = None + + +def dispatch_task( + conn: sqlite3.Connection, + gen: HLCGenerator, + *, + task_id: UUID, + host: str, + cwd: str, + rclaude: Rclaude | None = None, + discover_timeout_s: int = 30, +) -> DispatchResult: + """Spawn a Claude session for `task_id` on `host` at `cwd`, gated by + the budget envelope and per-host load cap. + + Refuses (dispatched=False, reason explains) when: the task is closed, + the daily token cap is hit, the task is low-priority and we're past + the low-priority floor, or the host is at its parallelism cap. On + success: spawns via rclaude, discovers the new session uuid, creates + the Assignment, returns it. + """ + from ..config import load_or_init + from ..orchestrator.bootstrap import discover_session + + task = read.get_task(conn, task_id) + if task is None: + raise NotFound(f"no such task: {task_id}") + if task.status in {TaskStatus.DONE}: + return DispatchResult(False, str(task_id), "task already done") + + # Budget gate. + bs = budget_status(conn) + if bs["over_cap"]: + return DispatchResult( + False, str(task_id), + f"daily token cap reached ({bs['used_tokens']}/{bs['daily_token_cap']}) — dispatch refused", + ) + if bs["over_low_priority_floor"] and task.priority > 1: + return DispatchResult( + False, str(task_id), + f"past low-priority floor ({bs['fraction_used']:.0%} of cap) — only P0/P1 dispatch allowed", + ) + + # Host-load gate. + cfg = load_or_init() + from .. import scheduler as _sched + if not _sched.host_has_capacity(conn, host, per_host_max=cfg.limits.per_host_max): + return DispatchResult( + False, str(task_id), + f"host {host!r} at capacity ({cfg.limits.per_host_max} live sessions)", + ) + + # Spawn + discover. + rcl = rclaude or Rclaude() + try: + pre_rows = rcl.list_sessions() + except RclaudeError: + pre_rows = [] + pre_uuids = { + str(r.uuid) for r in pre_rows + if r.host == host and r.cwd and r.cwd.rstrip("/") == cwd.rstrip("/") + } + try: + rcl.spawn(host=host, cwd=cwd) + except RclaudeError as exc: + return DispatchResult(False, str(task_id), f"rclaude spawn failed: {exc}") + + new_uuid = discover_session( + cwd=cwd, host=host, rclaude=rcl, + timeout_s=discover_timeout_s, ignore_uuids=pre_uuids, + ) + if new_uuid is None: + return DispatchResult( + False, str(task_id), + "session spawned but not discovered within timeout", + ) + + assignment = create_assignment( + conn, gen, task_id=task_id, session_uuid=UUID(new_uuid), + ) + return DispatchResult( + dispatched=True, task_id=str(task_id), reason="ok", + session_uuid=new_uuid, host=host, assignment_id=str(assignment.id), + ) diff --git a/tests/test_dispatch.py b/tests/test_dispatch.py new file mode 100644 index 0000000..d097b04 --- /dev/null +++ b/tests/test_dispatch.py @@ -0,0 +1,191 @@ +"""Tests for the rate/load-gated dispatch engine.""" + +from __future__ import annotations + +import uuid as _uuid +from dataclasses import dataclass +from uuid import UUID + +import pytest + +from clare import events as ev +from clare.db import migrate, open_db +from clare.hlc import HLCGenerator +from clare import scheduler +from clare.web import service + + +def _setup() -> tuple: + conn = open_db(":memory:") + migrate(conn) + return conn, HLCGenerator("test-machine") + + +# --- host load / caps ------------------------------------------------------ + + +def _alive_session(conn, gen, host: str, cwd: str) -> UUID: + sid = _uuid.uuid4() + ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host=host, cwd=cwd)) + conn.execute("UPDATE sessions SET liveness = 'alive' WHERE uuid = ?", (str(sid),)) + return sid + + +def test_host_load_counts_only_alive() -> None: + conn, gen = _setup() + _alive_session(conn, gen, "apricot", "/a") + _alive_session(conn, gen, "apricot", "/b") + # An observed-but-not-alive session must not count. + dead = _uuid.uuid4() + ev.append(conn, gen, ev.SessionObserved(session_uuid=dead, host="apricot", cwd="/c")) + load = scheduler.host_load(conn) + assert load.get("apricot") == 2 + + +def test_host_has_capacity() -> None: + conn, gen = _setup() + for i in range(3): + _alive_session(conn, gen, "apricot", f"/p{i}") + assert scheduler.host_has_capacity(conn, "apricot", per_host_max=3) is False + assert scheduler.host_has_capacity(conn, "apricot", per_host_max=4) is True + assert scheduler.host_has_capacity(conn, "plum", per_host_max=3) is True + + +# --- fake rclaude for dispatch --------------------------------------------- + + +@dataclass +class _FakeSessionRow: + host: str + uuid: UUID + cwd: str + mtime_epoch: int + snippet: str = "" + + +class _FakeRclaude: + """spawn() records the call and makes a new session discoverable.""" + + def __init__(self, *, spawn_ok: bool = True) -> None: + self._rows: list[_FakeSessionRow] = [] + self.spawn_calls: list[dict] = [] + self._spawn_ok = spawn_ok + + def list_sessions(self) -> list[_FakeSessionRow]: + return list(self._rows) + + def list_tmux(self) -> list: + return [] + + def spawn(self, *, host: str, cwd: str, mcp_config: str | None = None) -> str: + from clare.rclaude import RclaudeError + if not self._spawn_ok: + raise RclaudeError("spawn failed") + self.spawn_calls.append({"host": host, "cwd": cwd}) + self._rows.append(_FakeSessionRow( + host=host, uuid=_uuid.uuid4(), cwd=cwd, mtime_epoch=999, + )) + return f"claude-tester-{len(self.spawn_calls)}" + + +def _project_with_task(conn, gen, *, priority: int = 2): + proj_id = _uuid.uuid4() + task_id = _uuid.uuid4() + ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p")) + ev.append(conn, gen, ev.TaskAdded( + task_id=task_id, project_id=proj_id, title="t", priority=priority, + )) + return task_id + + +def test_dispatch_success() -> None: + conn, gen = _setup() + task_id = _project_with_task(conn, gen) + rcl = _FakeRclaude() + result = service.dispatch_task( + conn, gen, task_id=task_id, host="plum", cwd="/work", + rclaude=rcl, discover_timeout_s=2, + ) + assert result.dispatched is True + assert result.reason == "ok" + assert result.session_uuid is not None + assert result.assignment_id is not None + assert len(rcl.spawn_calls) == 1 + # Assignment row exists. + rows = conn.execute( + "SELECT COUNT(*) FROM assignments WHERE task_id = ?", (str(task_id),) + ).fetchone() + assert rows[0] == 1 + + +def test_dispatch_refused_host_at_cap() -> None: + conn, gen = _setup() + task_id = _project_with_task(conn, gen) + # Fill plum to the default cap of 3. + for i in range(3): + _alive_session(conn, gen, "plum", f"/filler{i}") + rcl = _FakeRclaude() + result = service.dispatch_task( + conn, gen, task_id=task_id, host="plum", cwd="/work", + rclaude=rcl, discover_timeout_s=2, + ) + assert result.dispatched is False + assert "capacity" in result.reason + assert rcl.spawn_calls == [] # never spawned + + +def test_dispatch_refused_over_budget(monkeypatch, tmp_path) -> None: + # Point config at an isolated file with a tiny daily cap. + cfg_path = tmp_path / "config" / "clare" / "clare.toml" + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "config")) + cfg_path.parent.mkdir(parents=True, exist_ok=True) + cfg_path.write_text( + 'machine_id = "m"\n\n[web]\nhost = "127.0.0.1"\nport = 8765\n' + "\n[budget]\ndaily_token_cap = 100\nlow_priority_floor = 0.8\n", + encoding="utf-8", + ) + conn, gen = _setup() + task_id = _project_with_task(conn, gen, priority=3) + # Burn past the cap. + service.record_usage(conn, gen, source="nl", model="haiku", + input_tokens=120, output_tokens=10) + rcl = _FakeRclaude() + result = service.dispatch_task( + conn, gen, task_id=task_id, host="plum", cwd="/work", + rclaude=rcl, discover_timeout_s=2, + ) + assert result.dispatched is False + assert "cap" in result.reason.lower() + assert rcl.spawn_calls == [] + + +def test_dispatch_low_priority_floor(monkeypatch, tmp_path) -> None: + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "config")) + cfg_path = tmp_path / "config" / "clare" / "clare.toml" + cfg_path.parent.mkdir(parents=True, exist_ok=True) + cfg_path.write_text( + 'machine_id = "m"\n\n[web]\nhost = "127.0.0.1"\nport = 8765\n' + "\n[budget]\ndaily_token_cap = 1000\nlow_priority_floor = 0.5\n", + encoding="utf-8", + ) + conn, gen = _setup() + # 600/1000 used = 60% — past the 50% floor. + service.record_usage(conn, gen, source="nl", model="haiku", + input_tokens=600, output_tokens=0) + rcl = _FakeRclaude() + # A P3 task is refused past the floor... + low = _project_with_task(conn, gen, priority=3) + r_low = service.dispatch_task(conn, gen, task_id=low, host="plum", + cwd="/w", rclaude=rcl, discover_timeout_s=2) + assert r_low.dispatched is False + assert "low-priority" in r_low.reason + # ...but a P0 task still goes through. + proj_id = _uuid.uuid4() + p0 = _uuid.uuid4() + ev.append(conn, gen, ev.TaskAdded(task_id=p0, project_id=conn.execute( + "SELECT project_id FROM tasks LIMIT 1").fetchone()[0] and + UUID(conn.execute("SELECT project_id FROM tasks LIMIT 1").fetchone()[0]), + title="urgent", priority=0)) + r_p0 = service.dispatch_task(conn, gen, task_id=p0, host="plum", + cwd="/w", rclaude=rcl, discover_timeout_s=2) + assert r_p0.dispatched is True diff --git a/tests/test_usage.py b/tests/test_usage.py new file mode 100644 index 0000000..04d4155 --- /dev/null +++ b/tests/test_usage.py @@ -0,0 +1,69 @@ +"""Tests for migration 0007_usage — token-budget accounting.""" + +from __future__ import annotations + +from clare import events as ev +from clare.db import migrate, open_db +from clare.hlc import HLCGenerator +from clare.web import service + + +def _setup() -> tuple: + conn = open_db(":memory:") + migrate(conn) + return conn, HLCGenerator("test-machine") + + +def test_usage_recorded_projects() -> None: + conn, gen = _setup() + ev.append(conn, gen, ev.UsageRecorded( + source="nl", model="haiku", + input_tokens=120, output_tokens=40, cache_read_tokens=2000, + )) + row = conn.execute("SELECT * FROM usage").fetchone() + assert row["source"] == "nl" + assert row["input_tokens"] == 120 + assert row["output_tokens"] == 40 + assert row["cache_read_tokens"] == 2000 + assert len(row["day"]) == 10 # YYYY-MM-DD + + +def test_record_usage_service() -> None: + conn, gen = _setup() + service.record_usage(conn, gen, source="triage", model="haiku", + input_tokens=50, output_tokens=10) + service.record_usage(conn, gen, source="nl", model="haiku", + input_tokens=80, output_tokens=20) + from clare import read + roll = read.usage_rollup(conn) + assert roll["total_tokens"] == 160 + assert roll["calls"] == 2 + sources = {s["source"] for s in roll["by_source"]} + assert sources == {"triage", "nl"} + + +def test_budget_status_no_cap() -> None: + conn, gen = _setup() + service.record_usage(conn, gen, source="nl", model="haiku", + input_tokens=100, output_tokens=100) + b = service.budget_status(conn) + assert b["used_tokens"] == 200 + assert b["daily_token_cap"] == 0 # default — unlimited + assert b["over_cap"] is False + assert b["over_low_priority_floor"] is False + + +def test_usage_replay_roundtrip() -> None: + conn, gen = _setup() + for i in range(5): + service.record_usage(conn, gen, source="nl", model="haiku", + input_tokens=10 * i, output_tokens=i) + before = conn.execute( + "SELECT COALESCE(SUM(input_tokens+output_tokens),0) FROM usage" + ).fetchone()[0] + ev.replay(conn) + after = conn.execute( + "SELECT COALESCE(SUM(input_tokens+output_tokens),0) FROM usage" + ).fetchone()[0] + assert before == after + assert conn.execute("SELECT COUNT(*) FROM usage").fetchone()[0] == 5