feat(@projects/@claire): ✨ add blocker status checks in scheduler
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
7138724b88
commit
c78ca31fa8
3 changed files with 240 additions and 5 deletions
|
|
@ -9,6 +9,7 @@ ranked ordering. This makes the logic trivially unit-testable.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from collections.abc import Iterable
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
|
@ -19,6 +20,37 @@ from .domain import ProjectStatus, Session, Task, TaskStatus
|
|||
from .hlc import HLC
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def unfinished_blockers(conn: sqlite3.Connection, task: Task) -> list[Task]:
|
||||
"""Return the tasks in `task.blocked_by` that are NOT yet `done`.
|
||||
|
||||
A task is "effectively blocked" iff this list is non-empty. A blocker
|
||||
id that resolves to no existing task is treated as satisfied (it does
|
||||
NOT block) so a stale/deleted id can't permanently wedge a task — such
|
||||
ids are logged at WARNING level.
|
||||
"""
|
||||
unfinished: list[Task] = []
|
||||
for blocker_id in task.blocked_by:
|
||||
blocker = read.get_task(conn, blocker_id)
|
||||
if blocker is None:
|
||||
_log.warning(
|
||||
"task %s lists blocker %s which no longer exists — "
|
||||
"treating as satisfied",
|
||||
task.id, blocker_id,
|
||||
)
|
||||
continue
|
||||
if blocker.status != TaskStatus.DONE:
|
||||
unfinished.append(blocker)
|
||||
return unfinished
|
||||
|
||||
|
||||
def is_blocked(conn: sqlite3.Connection, task: Task) -> bool:
|
||||
"""True iff `task` has at least one unfinished blocker."""
|
||||
return bool(unfinished_blockers(conn, task))
|
||||
|
||||
|
||||
def rank_open_tasks(tasks: Iterable[Task]) -> list[Task]:
|
||||
"""Order tasks by (priority asc, created_hlc asc).
|
||||
|
||||
|
|
@ -285,9 +317,12 @@ def suggest_assignments(
|
|||
assigned_task_ids = {a.task_id for a in active}
|
||||
assigned_session_ids = {a.session_uuid for a in active}
|
||||
|
||||
unassigned_tasks = rank_open_tasks(
|
||||
[t for t in open_tasks if t.id not in assigned_task_ids]
|
||||
)
|
||||
# Exclude effectively-blocked tasks: a task with an unfinished blocker
|
||||
# is not workable, so Clare must not propose it for pairing.
|
||||
unassigned_tasks = rank_open_tasks([
|
||||
t for t in open_tasks
|
||||
if t.id not in assigned_task_ids and not is_blocked(conn, t)
|
||||
])
|
||||
free_sessions = sorted(
|
||||
[s for s in read.list_sessions(conn) if s.uuid not in assigned_session_ids],
|
||||
key=lambda s: (-session_attention_score(s), s.host, str(s.uuid)),
|
||||
|
|
|
|||
|
|
@ -1326,6 +1326,20 @@ def dispatch_task(
|
|||
if task.status in {TaskStatus.DONE}:
|
||||
return DispatchResult(False, str(task_id), "task already done")
|
||||
|
||||
from .. import scheduler as _sched
|
||||
|
||||
# Dependency gate — checked FIRST: being blocked is intrinsic to the
|
||||
# task, unlike the transient budget/host gates below, so it's the
|
||||
# truest reason to surface. Refuse a task whose `blocked_by` list
|
||||
# still has an unfinished blocker — its prerequisites aren't met.
|
||||
blockers = _sched.unfinished_blockers(conn, task)
|
||||
if blockers:
|
||||
titles = ", ".join(b.title for b in blockers)
|
||||
return DispatchResult(
|
||||
False, str(task_id),
|
||||
f"blocked by {len(blockers)} unfinished task(s): {titles}",
|
||||
)
|
||||
|
||||
# Budget gate.
|
||||
bs = budget_status(conn)
|
||||
if bs["over_cap"]:
|
||||
|
|
@ -1341,7 +1355,6 @@ def dispatch_task(
|
|||
|
||||
# Host-load gate.
|
||||
cfg = load_or_init()
|
||||
from .. import scheduler as _sched
|
||||
if not _sched.host_has_capacity(conn, host, per_host_max=cfg.limits.per_host_max):
|
||||
return DispatchResult(
|
||||
False, str(task_id),
|
||||
|
|
|
|||
|
|
@ -10,11 +10,15 @@ the projection + read layer.
|
|||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import uuid as _uuid
|
||||
from dataclasses import dataclass
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from claire import events as ev
|
||||
from claire import read
|
||||
from claire import read, scheduler
|
||||
from claire.domain import TaskStatus
|
||||
from claire.hlc import HLCGenerator
|
||||
from claire.web import service
|
||||
|
||||
|
|
@ -200,3 +204,186 @@ def test_list_tasks_returns_blocked_by(
|
|||
assert by_id[task.id].blocked_by == [blocker.id]
|
||||
# A task with no blockers projects to an empty list.
|
||||
assert by_id[blocker.id].blocked_by == []
|
||||
|
||||
|
||||
# --- blocked-check helper --------------------------------------------------
|
||||
|
||||
|
||||
def test_unfinished_blockers_excludes_done(
|
||||
conn: sqlite3.Connection, gen: HLCGenerator
|
||||
) -> None:
|
||||
service.create_project(conn, gen, name="alpha")
|
||||
task = service.add_task(conn, gen, project="alpha", title="ship it")
|
||||
b_open = service.add_task(conn, gen, project="alpha", title="open blocker")
|
||||
b_done = service.add_task(conn, gen, project="alpha", title="done blocker")
|
||||
service.transition_task_state(
|
||||
conn, gen, task_id=b_done.id, to_state=TaskStatus.DONE
|
||||
)
|
||||
service.set_task_blockers(
|
||||
conn, gen, task_ref=str(task.id),
|
||||
blocker_refs=[str(b_open.id), str(b_done.id)],
|
||||
)
|
||||
|
||||
fresh = read.get_task(conn, task.id)
|
||||
assert fresh is not None
|
||||
unfinished = scheduler.unfinished_blockers(conn, fresh)
|
||||
|
||||
# Only the still-open blocker counts.
|
||||
assert [b.id for b in unfinished] == [b_open.id]
|
||||
assert scheduler.is_blocked(conn, fresh) is True
|
||||
|
||||
|
||||
def test_unfinished_blockers_nonexistent_does_not_block(
|
||||
conn: sqlite3.Connection, gen: HLCGenerator
|
||||
) -> None:
|
||||
"""A blocker id with no matching task is treated as satisfied."""
|
||||
service.create_project(conn, gen, name="alpha")
|
||||
task = service.add_task(conn, gen, project="alpha", title="ship it")
|
||||
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
|
||||
service.set_task_blockers(
|
||||
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
|
||||
)
|
||||
# Hard-delete the blocker row directly so the id is now stale. The
|
||||
# service layer never deletes tasks, so this simulates a stale id.
|
||||
conn.execute("DELETE FROM tasks WHERE id = ?", (str(blocker.id),))
|
||||
|
||||
fresh = read.get_task(conn, task.id)
|
||||
assert fresh is not None
|
||||
assert scheduler.unfinished_blockers(conn, fresh) == []
|
||||
assert scheduler.is_blocked(conn, fresh) is False
|
||||
|
||||
|
||||
# --- dispatch enforcement --------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class _FakeSessionRow:
|
||||
host: str
|
||||
uuid: UUID
|
||||
cwd: str
|
||||
mtime_epoch: int
|
||||
snippet: str = ""
|
||||
|
||||
|
||||
class _FakeRclaude:
|
||||
"""spawn() records the call and makes a new session discoverable."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._rows: list[_FakeSessionRow] = []
|
||||
self.spawn_calls: list[dict] = []
|
||||
self.send_calls: list[dict] = []
|
||||
|
||||
def list_sessions(self) -> list[_FakeSessionRow]:
|
||||
return list(self._rows)
|
||||
|
||||
def list_tmux(self) -> list:
|
||||
return []
|
||||
|
||||
def spawn(
|
||||
self, *, host: str, cwd: str,
|
||||
mcp_config: str | None = None, name: str | None = None,
|
||||
) -> str:
|
||||
self.spawn_calls.append(
|
||||
{"host": host, "cwd": cwd, "mcp_config": mcp_config, "name": name}
|
||||
)
|
||||
self._rows.append(_FakeSessionRow(
|
||||
host=host, uuid=_uuid.uuid4(), cwd=cwd, mtime_epoch=999,
|
||||
))
|
||||
return f"claude-tester-{len(self.spawn_calls)}"
|
||||
|
||||
def send(self, *, text: str, match: str, yes: bool = False, dry_run: bool = False): # noqa: ARG002
|
||||
self.send_calls.append({"text": text, "match": match, "yes": yes})
|
||||
return None
|
||||
|
||||
|
||||
def _no_mcp_stager(host: str) -> str | None: # noqa: ARG001
|
||||
return None
|
||||
|
||||
|
||||
def test_dispatch_refuses_blocked_task(
|
||||
conn: sqlite3.Connection, gen: HLCGenerator
|
||||
) -> None:
|
||||
service.create_project(conn, gen, name="alpha")
|
||||
task = service.add_task(conn, gen, project="alpha", title="ship it")
|
||||
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
|
||||
service.set_task_blockers(
|
||||
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
|
||||
)
|
||||
rcl = _FakeRclaude()
|
||||
|
||||
result = service.dispatch_task(
|
||||
conn, gen, task_id=task.id, host="plum", cwd="/work",
|
||||
rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager,
|
||||
)
|
||||
|
||||
assert result.dispatched is False
|
||||
assert "blocked by 1 unfinished task" in result.reason
|
||||
assert "prereq work" in result.reason
|
||||
assert rcl.spawn_calls == [] # never spawned
|
||||
|
||||
|
||||
def test_dispatch_allows_task_once_blockers_done(
|
||||
conn: sqlite3.Connection, gen: HLCGenerator
|
||||
) -> None:
|
||||
service.create_project(conn, gen, name="alpha")
|
||||
task = service.add_task(conn, gen, project="alpha", title="ship it")
|
||||
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
|
||||
service.set_task_blockers(
|
||||
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
|
||||
)
|
||||
# Complete the blocker.
|
||||
service.transition_task_state(
|
||||
conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE
|
||||
)
|
||||
rcl = _FakeRclaude()
|
||||
|
||||
result = service.dispatch_task(
|
||||
conn, gen, task_id=task.id, host="plum", cwd="/work",
|
||||
rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager,
|
||||
)
|
||||
|
||||
assert result.dispatched is True
|
||||
assert result.reason == "ok"
|
||||
assert len(rcl.spawn_calls) == 1
|
||||
|
||||
|
||||
# --- scheduler enforcement -------------------------------------------------
|
||||
|
||||
|
||||
def _alive_session(conn: sqlite3.Connection, gen: HLCGenerator, host: str) -> UUID:
|
||||
sid = _uuid.uuid4()
|
||||
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host=host, cwd="/w"))
|
||||
conn.execute("UPDATE sessions SET liveness = 'alive' WHERE uuid = ?", (str(sid),))
|
||||
return sid
|
||||
|
||||
|
||||
def test_suggest_assignments_excludes_blocked_task(
|
||||
conn: sqlite3.Connection, gen: HLCGenerator
|
||||
) -> None:
|
||||
service.create_project(conn, gen, name="alpha")
|
||||
task = service.add_task(conn, gen, project="alpha", title="ship it")
|
||||
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
|
||||
service.set_task_blockers(
|
||||
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
|
||||
)
|
||||
_alive_session(conn, gen, "plum")
|
||||
|
||||
# While the blocker is open, the blocked task must not be paired or
|
||||
# surfaced as a remaining (workable) task. The blocker itself is open
|
||||
# and unassigned, so it CAN be surfaced.
|
||||
out = scheduler.suggest_assignments(conn)
|
||||
surfaced = {p["task_id"] for p in out["pairings"]} | {
|
||||
t["id"] for t in out["remaining_tasks"]
|
||||
}
|
||||
assert str(task.id) not in surfaced
|
||||
assert str(blocker.id) in surfaced
|
||||
|
||||
# Once the blocker is done, the previously-blocked task becomes eligible.
|
||||
service.transition_task_state(
|
||||
conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE
|
||||
)
|
||||
out2 = scheduler.suggest_assignments(conn)
|
||||
surfaced2 = {p["task_id"] for p in out2["pairings"]} | {
|
||||
t["id"] for t in out2["remaining_tasks"]
|
||||
}
|
||||
assert str(task.id) in surfaced2
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue