#!/usr/bin/env python3 """rclaude triage helper — Haiku-powered per-session summary + prioritization. Iterates ~/.claude/projects/, builds a compressed transcript per session, and runs them through claude-code-batch-sdk (Haiku) with content-addressable disk caching. Sessions whose mtime + transcript fingerprint haven't changed are served from cache. Output (TSV, one row per session, sorted by priority desc / mtime desc): \\t\\t\\t\\t\\t\\t Env tuning: RCLAUDE_TRIAGE_MODEL Claude model (default: haiku) RCLAUDE_TRIAGE_CONCURRENT Concurrent claude subprocesses (default: 4) RCLAUDE_TRIAGE_BATCH Sessions per CLI call (default: 8) RCLAUDE_TRIAGE_LIMIT Max sessions to consider (default: 100) RCLAUDE_TRIAGE_CTX_BYTES Bytes of transcript per session (default: 3000) CLI flags: --limit N override session cap --uuids U... restrict to specific session UUIDs (prefix match ok) --refresh bypass cache for selected sessions """ from __future__ import annotations import argparse import asyncio import json import os import re import signal import sys from pathlib import Path # Allow piped output to truncate without traceback. signal.signal(signal.SIGPIPE, signal.SIG_DFL) try: from claude_code_batch_sdk import ( ClaudeClient, GenerationItem, ResponseCache, run_batched, ) except ImportError as e: print( f"_claude-triage: claude-code-batch-sdk not installed for this python ({sys.executable}): {e}", file=sys.stderr, ) sys.exit(2) ROOT = Path.home() / ".claude" / "projects" CACHE_DIR = Path.home() / ".claude" / ".cache" / "rclaude-triage" MODEL = os.environ.get("RCLAUDE_TRIAGE_MODEL", "haiku") MAX_CONCURRENT = int(os.environ.get("RCLAUDE_TRIAGE_CONCURRENT", "4")) BATCH_SIZE = int(os.environ.get("RCLAUDE_TRIAGE_BATCH", "8")) CTX_PER_SESSION = int(os.environ.get("RCLAUDE_TRIAGE_CTX_BYTES", "3000")) SYSTEM_PREFIXES = ( "", "", "", "", "", "[task-persistence]", "[tts-state]", "This session is being continued", "Please continue", "", "[Request interrupted", ) def is_system_user(text: str) -> bool: s = (text or "").lstrip() return not s or s.startswith(SYSTEM_PREFIXES) def get_text(entry: dict) -> str: msg = entry.get("message") or {} content = msg.get("content") if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for block in content: if not isinstance(block, dict): continue t = block.get("type") if t == "text": parts.append(block.get("text", "")) elif t == "tool_use": parts.append(f"[tool:{block.get('name','?')}]") return " ".join(parts) return "" def read_cwd_only(jsonl: Path) -> str: """Cheap version of compress_session that stops at the first cwd field.""" try: with jsonl.open(encoding="utf-8", errors="replace") as f: for line in f: try: entry = json.loads(line) except json.JSONDecodeError: continue if entry.get("cwd"): return entry["cwd"] except OSError: return "" return "" def compress_session(jsonl: Path) -> tuple[str, str, str]: """Return (cwd, first_user_text, transcript_excerpt).""" first_user = "" cwd = "" turns: list[str] = [] try: with jsonl.open(encoding="utf-8", errors="replace") as f: for line in f: try: entry = json.loads(line) except json.JSONDecodeError: continue if not cwd and entry.get("cwd"): cwd = entry["cwd"] role = (entry.get("message") or {}).get("role") or entry.get("type") if role not in ("user", "assistant"): continue text = get_text(entry) if not text.strip(): continue if role == "user" and is_system_user(text): continue flat = re.sub(r"\s+", " ", text).strip() if role == "user" and not first_user: first_user = flat[:400] turns.append(f"[{role}] {flat[:300]}") except OSError: return ("", "", "") # Keep recent turns; head-truncate to budget. transcript = "\n".join(turns[-20:])[-CTX_PER_SESSION:] return (cwd, first_user, transcript) SYSTEM_PROMPT = """You triage Claude Code coding sessions. For each session in the input batch, emit one JSON object with these fields: - ref_index: integer matching the input's ref_index - summary: ONE short sentence describing what's happening - status: one of done, in_progress, blocked, waiting_on_user, abandoned - priority: integer 0-4 (0 = critical to resume now, 4 = abandonable). Follows the P0/P1 incident convention — LOWER number means HIGHER priority. - next_action: ONE short imperative phrase, or empty string if status is done/abandoned Output ONLY a JSON array. No markdown, no prose.""" def build_prompt(batch: list[GenerationItem]) -> str: parts = ["Triage these sessions. Respond with JSON array.\n"] for i, item in enumerate(batch): m = item.metadata parts.append(f"\n=== ref_index={i} ===") parts.append(f"INITIAL REQUEST: {m['first_user']}") parts.append("RECENT TRANSCRIPT:") parts.append(m["transcript"]) parts.append( '\n\nReply with: [{"ref_index": 0, "summary": "...", ' '"status": "...", "priority": N, "next_action": "..."}, ...]' ) return "\n".join(parts) def validate(result: dict) -> bool: if not isinstance(result, dict): return False if not all(k in result for k in ("summary", "status", "priority", "next_action")): return False try: int(result["priority"]) except (TypeError, ValueError): return False return True def enrich(result: dict, item: GenerationItem) -> dict: return { **result, "uuid": item.metadata["uuid"], "cwd": item.metadata["cwd"], "mtime": item.metadata["mtime"], } def collect_candidates(limit: int) -> list[tuple[int, Path]]: if not ROOT.is_dir(): return [] candidates: list[tuple[int, Path]] = [] for project_dir in ROOT.iterdir(): if not project_dir.is_dir(): continue for jsonl in project_dir.glob("*.jsonl"): try: mtime = int(jsonl.stat().st_mtime) except OSError: continue candidates.append((mtime, jsonl)) candidates.sort(key=lambda x: x[0], reverse=True) return candidates[:limit] async def main_async(args: argparse.Namespace) -> None: candidates = collect_candidates(args.limit) if args.uuids: wanted = list(args.uuids) candidates = [ (m, j) for (m, j) in candidates if any(j.stem == u or j.stem.startswith(u) for u in wanted) ] if not candidates: return # Cache key is (uuid, mtime) only — mtime is sufficient since a session's # content can't change without its mtime advancing, and this lets us check # the cache BEFORE reading the JSONL (the expensive part for a triage run # over hundreds of sessions where most are unchanged). cache = ResponseCache(CACHE_DIR) # v2: priority scale flipped to P0=critical / P4=abandonable convention. template_id = "rclaude-triage-v2" cached_results: list[dict] = [] uncached: list[tuple[int, Path, str]] = [] for mtime, jsonl in candidates: cache_key = f"{jsonl.stem}:{mtime}" hit = None if args.refresh else cache.get(template_id, cache_key) if hit is not None: # The SDK cache stores the raw LLM response (priority/status/ # summary/next_action); enrichment (uuid/cwd/mtime) is added at # read time by the SDK. Since we bypass run_batched for cached # items, re-enrich here. cwd needs a tiny disk read (first cwd # line of the jsonl) but no LLM call. cwd = hit.get("cwd") or read_cwd_only(jsonl) if not cwd: # Couldn't determine cwd → skip; rclaude can't resume without it. continue cached_results.append({ **hit, "uuid": hit.get("uuid") or jsonl.stem, "mtime": hit.get("mtime") or mtime, "cwd": cwd, }) continue uncached.append((mtime, jsonl, cache_key)) if args.refresh: for _, _, ck in uncached: key_hash = cache._hash_key(template_id, ck) path = cache._cache_path(template_id, key_hash) if path.exists(): try: path.unlink() except OSError: pass items: list[GenerationItem] = [] for mtime, jsonl, cache_key in uncached: cwd, first_user, transcript = compress_session(jsonl) if not cwd: continue items.append( GenerationItem( template_id=template_id, cache_key=cache_key, metadata={ "uuid": jsonl.stem, "cwd": cwd, "mtime": mtime, "first_user": first_user, "transcript": transcript, }, ) ) results = list(cached_results) if items: client = ClaudeClient(model=MODEL, max_concurrent=MAX_CONCURRENT) try: new_results = await run_batched( client=client, cache=cache, items=items, system_prompt=SYSTEM_PROMPT, build_batch_prompt=build_prompt, validate_result=validate, enrich_result=enrich, index_key="ref_index", batch_size=BATCH_SIZE, description="rclaude-triage", ) finally: await client.close() results.extend(new_results) # Lower priority number = higher importance (P0 convention). Sort # ascending by priority, descending by mtime so the most-recent within # each priority bucket floats up. def sort_key(r: dict) -> tuple[int, int]: try: prio = int(r.get("priority", 9)) except (TypeError, ValueError): prio = 9 return (prio, -int(r.get("mtime", 0))) for r in sorted(results, key=sort_key): line = "\t".join( [ str(r.get("mtime", 0)), str(r.get("uuid", "")), str(r.get("cwd", "")), str(r.get("priority", 0)), str(r.get("status", "")), re.sub(r"\s+", " ", str(r.get("summary", "")))[:200], re.sub(r"\s+", " ", str(r.get("next_action", "")))[:200], ] ) print(line) print(f"# cache {cache.stats_summary()}", file=sys.stderr) def main() -> None: ap = argparse.ArgumentParser(description="Triage Claude Code sessions with Haiku.") ap.add_argument( "--limit", type=int, default=int(os.environ.get("RCLAUDE_TRIAGE_LIMIT", "100")), help="Max sessions to consider (default: 100, env: RCLAUDE_TRIAGE_LIMIT)", ) ap.add_argument("--uuids", nargs="*", help="Restrict to specific UUIDs (prefix ok)") ap.add_argument("--refresh", action="store_true", help="Bypass cache") args = ap.parse_args() asyncio.run(main_async(args)) if __name__ == "__main__": main()