diff --git a/Sources/TVAnarchy/TVAnarchyApp.swift b/Sources/TVAnarchy/TVAnarchyApp.swift index b2c6e7e..e879643 100644 --- a/Sources/TVAnarchy/TVAnarchyApp.swift +++ b/Sources/TVAnarchy/TVAnarchyApp.swift @@ -1,7 +1,14 @@ import SwiftUI +import TVAnarchyCore @main struct TVAnarchyApp: App { + init() { + // Close the MLX seam: titles the regex parser can't extract go to the + // local model (cached, self-disabling when MLX is absent). + FilenameParser.refiner = LocalLLMTitleRefiner() + } + var body: some Scene { WindowGroup { RootView() diff --git a/Sources/TVAnarchyCore/Metadata/LocalLLMTitleRefiner.swift b/Sources/TVAnarchyCore/Metadata/LocalLLMTitleRefiner.swift new file mode 100644 index 0000000..696d90d --- /dev/null +++ b/Sources/TVAnarchyCore/Metadata/LocalLLMTitleRefiner.swift @@ -0,0 +1,101 @@ +import Foundation + +/// `TitleRefiner` backed by the local MLX model on plum (recommender's +/// `title_refiner.py`) — closes the seam declared in `FilenameParser`. Shells +/// into the recommender like `LocalLLMGrouper`; returns `nil` on any failure so +/// the regex path always stands alone. +/// +/// The refiner sits on the synchronous parse path, so two guards keep the messy +/// tail from stalling a scan: +/// - **Result cache** (`~/.local/state/tv-anarchy/title-refinements.json`): +/// each distinct messy filename pays for the model exactly once, ever. +/// - **Session kill-switch**: consecutive subprocess failures (no `uv`, no MLX, +/// no model) disable the refiner for the rest of the process — one scan never +/// pays the failure timeout per file. +public struct LocalLLMTitleRefiner: TitleRefiner { + public init() {} + + public func refineTitle(from filename: String) -> String? { + if let cached = Self.store.cached(filename) { return cached.isEmpty ? nil : cached } + guard Self.store.healthy else { return nil } + + let dir = RepoPaths.recommender.path + let cmd = "cd \(Self.shq(dir)) && uv run python -m media_rec.title_refiner \(Self.shq(filename))" + let r = ProcessRunner.runShell(cmd, timeout: 90, cwd: dir) + guard r.ok, + let data = r.stdout.trimmingCharacters(in: .whitespacesAndNewlines).data(using: .utf8), + let decoded = try? JSONDecoder().decode(Refined.self, from: data) else { + if !r.ok { Log.warn("title refiner failed (exit \(r.status)): \(r.stderr.suffix(160))") } + Self.store.recordFailure() + return nil + } + Self.store.recordSuccess() + let title = decoded.title.trimmingCharacters(in: .whitespaces) + // Cache empties too — "the model has no answer" is also worth remembering. + Self.store.remember(filename, title: title) + return title.isEmpty ? nil : title + } + + private struct Refined: Decodable { let title: String } + private static func shq(_ s: String) -> String { "'" + s.replacingOccurrences(of: "'", with: "'\\''") + "'" } + + private static let store = RefinementStore() +} + +/// Thread-safe cache + health tracking for the refiner. A class with one lock — +/// the refiner is consulted from concurrent scan work. +final class RefinementStore: @unchecked Sendable { + private let lock = NSLock() + private var cache: [String: String] + private var consecutiveFailures = 0 + private var dirty = false + + /// After this many subprocess failures in a row, stop trying this session. + private static let maxFailures = 2 + private static var url: URL { + FileManager.default.homeDirectoryForCurrentUser + .appendingPathComponent(".local/state/tv-anarchy/title-refinements.json") + } + + init() { + if let data = try? Data(contentsOf: Self.url), + let map = try? JSONDecoder().decode([String: String].self, from: data) { + cache = map + } else { + cache = [:] + } + } + + var healthy: Bool { + lock.lock(); defer { lock.unlock() } + return consecutiveFailures < Self.maxFailures + } + + func cached(_ filename: String) -> String? { + lock.lock(); defer { lock.unlock() } + return cache[filename] + } + + func recordFailure() { + lock.lock(); defer { lock.unlock() } + consecutiveFailures += 1 + } + + func recordSuccess() { + lock.lock(); defer { lock.unlock() } + consecutiveFailures = 0 + } + + func remember(_ filename: String, title: String) { + lock.lock() + cache[filename] = title + dirty = true + let snapshot = cache + lock.unlock() + // Persist outside the lock; last-writer-wins is fine for a cache. + guard let data = try? JSONEncoder().encode(snapshot) else { return } + try? FileManager.default.createDirectory(at: Self.url.deletingLastPathComponent(), + withIntermediateDirectories: true) + try? data.write(to: Self.url, options: .atomic) + } +} diff --git a/docs/operations.md b/docs/operations.md index aecc1b3..9ab7c40 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -58,6 +58,7 @@ destination** (`TVANARCHY_DEST` overrides everywhere): | Ubuntu (classic Linux) | `TVAnarchy--linux-.tar.gz` | `/opt/tv-anarchy` when `/opt` is writable, else `~/.local/opt/tv-anarchy` | | Bluefin (immutable/ostree Linux) | same as Linux | always `~/.local/opt/tv-anarchy` (`/usr` is read-only; detected via `/run/ostree-booted`) | | Windows (Git Bash/MSYS) | `TVAnarchy--windows-.zip` | `%LOCALAPPDATA%\Programs\TVAnarchy` (per-user, no elevation) | +| Android (Termux) | `TVAnarchy--android.apk` (all ABIs, no arch suffix) | APK → `~/storage/downloads`, handed to the system package installer via `termux-open` (user confirms the prompt — shells can't install packages on Android by design). Detected before generic Linux (Termux's `uname` says Linux). Version = handoff stamp, since Termux can't query installed packages | | iOS | — | not via this script (no shell): build the `TVAnarchyiOS` scheme in Xcode onto the device, or TestFlight/sideload | Version compare: macOS reads the bundle plist; Linux/Windows read the diff --git a/governor/package.json b/governor/package.json index b6df5d5..3cd30b5 100644 --- a/governor/package.json +++ b/governor/package.json @@ -10,7 +10,8 @@ }, "scripts": { "start": "bun run src/index.ts", - "typecheck": "tsc --noEmit" + "typecheck": "tsc --noEmit", + "test": "bun test" }, "dependencies": {}, "devDependencies": { diff --git a/governor/src/fleet/cli.ts b/governor/src/fleet/cli.ts new file mode 100644 index 0000000..e13adf7 --- /dev/null +++ b/governor/src/fleet/cli.ts @@ -0,0 +1,227 @@ +// `portable-net-tv fleet ` — the human/scripting surface of the fleet +// engine. Read-only by default; the single mutating path is `reaper --apply`, +// which performs only safe idempotent transmission nudges (reannounce/verify). +// Re-pins and mesh recoveries are PRINTED as plans — actuating a cross-host +// copy is the transfer queue's job, not a side effect of a status command. + +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { homedir } from "node:os"; +import { dirname, join } from "node:path"; +import { loadRegistry, FLEET_PATH, DEVICES_PATH } from "./registry.ts"; +import { assignDuties, diffDuties } from "./duties.ts"; +import { floorCheck } from "./custody.ts"; +import { classify, planRecovery } from "./reaper.ts"; +import { peersFor } from "./peers.ts"; +import { + fetchHoldings, fetchLivePeers, fetchVitals, staticHoldings, torrentAction, transmissionHost, +} from "./transmission.ts"; +import type { Duty, Holding } from "./types.ts"; +import { log } from "../log.ts"; + +const STATE_PATH = join(homedir(), ".local", "state", "tv-anarchy", "fleet-state.json"); + +interface FleetState { + /** Last duty assignment, for change-triggered logging. hostId → duties. */ + duties: Record; +} + +function loadFleetState(): FleetState { + try { + return JSON.parse(readFileSync(STATE_PATH, "utf8")) as FleetState; + } catch { + return { duties: {} }; + } +} + +function saveFleetState(state: FleetState): void { + const dir = dirname(STATE_PATH); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + writeFileSync(STATE_PATH, JSON.stringify(state, null, 2)); +} + +/** Holdings across the fleet: the transmission host's torrents + static mirrors. */ +function gatherHoldings(): { holdings: Holding[]; txHostId: string | null } { + const reg = loadRegistry(); + const tx = transmissionHost(reg.hosts); + let holdings: Holding[] = staticHoldings(reg.staticHoldings); + if (tx) { + try { + holdings = [...fetchHoldings(tx), ...holdings]; + } catch (err) { + log.warn(`transmission unreachable on ${tx.id}: ${err instanceof Error ? err.message : String(err)}`); + } + } + return { holdings, txHostId: tx?.id ?? null }; +} + +function cmdStatus(json: boolean): void { + const reg = loadRegistry(); + const { duties, warnings } = assignDuties(reg.hosts); + if (json) { + log.raw(JSON.stringify({ + hosts: reg.hosts, + duties: Object.fromEntries(duties), + sources: reg.sources, + floorCopies: reg.floorCopies, + warnings: [...reg.warnings, ...warnings], + }, null, 2)); + return; + } + log.raw(`registry: ${DEVICES_PATH}`); + log.raw(`fleet: ${FLEET_PATH}${existsSync(FLEET_PATH) ? "" : " (absent — defaults in effect)"}`); + log.raw(""); + for (const h of reg.hosts) { + const d = duties.get(h.id) ?? []; + log.raw(`${h.id} [${h.class}] ${h.alwaysOn ? "always-on" : "intermittent"} via ${h.reachable}`); + log.raw(` duties: ${d.length > 0 ? d.join(", ") : "(none)"}`); + log.raw(` api: ${h.api}${h.addr ? ` addr: ${h.addr}` : ""}`); + } + log.raw(""); + log.raw(`sources: ${reg.sources.map(s => `${s.id}(${s.kind}: ${s.sharePolicy}/${s.swarmIsolation})`).join(", ")}`); + log.raw(`floor: ${reg.floorCopies} copies`); + for (const w of [...reg.warnings, ...warnings]) log.raw(`⚠ ${w}`); +} + +function cmdDuties(json: boolean): void { + const reg = loadRegistry(); + const { duties, warnings } = assignDuties(reg.hosts); + const state = loadFleetState(); + const changes = diffDuties(new Map(Object.entries(state.duties)), duties); + if (json) { + log.raw(JSON.stringify({ duties: Object.fromEntries(duties), changes, warnings }, null, 2)); + } else { + for (const [id, d] of duties) log.raw(`${id}: ${d.length > 0 ? d.join(", ") : "(none)"}`); + for (const c of changes) log.raw(`Δ ${c}`); + for (const w of warnings) log.raw(`⚠ ${w}`); + } + state.duties = Object.fromEntries(duties); + saveFleetState(state); +} + +function cmdCustody(json: boolean): void { + const reg = loadRegistry(); + const { holdings } = gatherHoldings(); + if (holdings.length === 0) { + log.raw("no holdings visible (transmission unreachable and no static holdings)"); + process.exitCode = 1; + return; + } + const reports = floorCheck(holdings, reg.hosts, reg.floorCopies); + const breaches = reports.filter(r => r.breach); + if (json) { + log.raw(JSON.stringify({ reports }, null, 2)); + return; + } + log.raw(`${reports.length} title(s), ${breaches.length} floor breach(es) (floor=${reg.floorCopies})`); + for (const r of reports) { + if (!r.breach && r.actions.length === 0 && r.warnings.length === 0) continue; + log.raw(""); + log.raw(`${r.breach ? "✗" : "•"} ${r.title} (${r.completeCopies}/${r.floorCopies} copies; custodians: ${r.custodians.join(", ") || "none"})`); + for (const a of r.actions) log.raw(` → ${a.kind}: ${a.toHost}${a.fromHost ? ` ⇐ ${a.fromHost}` : ""} — ${a.reason}`); + for (const w of r.warnings) log.raw(` ⚠ ${w}`); + } +} + +function cmdReaper(apply: boolean, json: boolean): void { + const reg = loadRegistry(); + const tx = transmissionHost(reg.hosts); + if (!tx) { + log.error("no host with api=transmission_rpc in the registry"); + process.exit(2); + } + let vitals; + try { + vitals = fetchVitals(tx); + } catch (err) { + log.error(`transmission unreachable: ${err instanceof Error ? err.message : String(err)}`); + process.exit(1); + } + const now = Math.floor(Date.now() / 1000); + const { holdings } = gatherHoldings(); + const verdicts = planRecovery(vitals.map(v => classify(v, now)), holdings, tx.id); + const sick = verdicts.filter(v => v.health !== "healthy"); + if (json) { + log.raw(JSON.stringify({ verdicts }, null, 2)); + } else { + log.raw(`${verdicts.length} torrent(s): ` + + `${verdicts.filter(v => v.health === "healthy").length} healthy, ` + + `${verdicts.filter(v => v.health === "stalled").length} stalled, ` + + `${verdicts.filter(v => v.health === "dead").length} dead`); + for (const v of sick) { + log.raw(""); + log.raw(`${v.health === "dead" ? "✗" : "~"} [${v.health}] ${v.name} — ${v.reason}`); + if (v.recovery) log.raw(` → ${v.recovery.kind}: ${v.recovery.detail}`); + } + } + if (apply) { + let nudged = 0; + for (const v of sick) { + if (v.infohash === null) continue; + // Only the safe ops run automatically; mesh_recover/research stay plans. + if (v.recovery?.kind === "reannounce" && torrentAction(tx, v.infohash, "reannounce")) nudged++; + if (v.health === "dead" && v.recovery?.kind === "research" && torrentAction(tx, v.infohash, "verify")) nudged++; + } + log.raw(""); + log.raw(`applied ${nudged} safe nudge(s) (reannounce/verify); re-pins and re-search stay manual`); + } +} + +function cmdPeers(query: string, json: boolean): void { + const reg = loadRegistry(); + const { holdings } = gatherHoldings(); + // Accept an infohash or a title; resolve a title to its hash via holdings. + const isHash = /^[0-9a-fA-F]{40}$/.test(query) || /^[0-9a-fA-F]{64}$/.test(query); + const match = isHash + ? holdings.find(h => h.infohash?.toLowerCase() === query.toLowerCase()) + : holdings.find(h => h.title.toLowerCase().includes(query.toLowerCase())); + const infohash = isHash ? query.toLowerCase() : match?.infohash ?? null; + if (infohash === null && !match) { + log.error(`nothing in fleet holdings matches "${query}"`); + process.exit(1); + } + const tx = transmissionHost(reg.hosts); + let livePeers: { address: string; port: number }[] = []; + if (tx && infohash) { + try { + livePeers = fetchLivePeers(tx, infohash); + } catch { + log.warn("live swarm peers unavailable (transmission unreachable)"); + } + } + const peers = peersFor({ + infohash: infohash ?? "", + title: match?.title ?? null, + sources: reg.sources, + hosts: reg.hosts, + holdings, + livePeers, + }); + if (json) { + log.raw(JSON.stringify({ title: match?.title ?? query, infohash, peers }, null, 2)); + return; + } + log.raw(`peers_for(${match?.title ?? query}) → ${peers.length} peer(s)`); + for (const p of peers) { + log.raw(` ${p.addr} [${p.sourceKind}/${p.sourceId}] via ${p.servedVia}`); + } +} + +export function runFleetCommand(args: string[]): void { + const [sub, ...rest] = args; + const json = rest.includes("--json") || args.includes("--json"); + switch (sub) { + case "status": cmdStatus(json); break; + case "duties": cmdDuties(json); break; + case "custody": cmdCustody(json); break; + case "reaper": cmdReaper(rest.includes("--apply"), json); break; + case "peers": { + const query = rest.filter(a => !a.startsWith("--"))[0]; + if (!query) { log.error("usage: portable-net-tv fleet peers [--json]"); process.exit(2); } + cmdPeers(query, json); + break; + } + default: + log.error("usage: portable-net-tv fleet > [--json]"); + process.exit(2); + } +} diff --git a/governor/src/fleet/peers.ts b/governor/src/fleet/peers.ts new file mode 100644 index 0000000..b709f09 --- /dev/null +++ b/governor/src/fleet/peers.ts @@ -0,0 +1,123 @@ +// Peer-source model + peers_for — the user-owned meta-tracker. +// +// peers_for(infohash) unions every PERMITTED source: fleet hosts ∪ seedbox ∪ +// DHT/public live peers (∪ private/friend sources once those stages land). +// Every peer is provenance-tagged so a UI can show WHY it exists and over +// what transport. +// +// The two policy gates are the load-bearing part (a private .torrent embeds +// the source user's passkey — a friend announcing it gets the SOURCE banned): +// 1. share_policy gates the registry merge — private_tracker defaults to +// search_only and contributes NO peers until explicitly flipped. +// 2. swarm_isolation is FORCED to f2f_only for private sources, un-overridably: +// private-sourced bytes are only ever served via the fleet's own WireGuard +// swarm, never announced back to the tracker. + +import type { + FleetHost, Holding, Peer, ServedVia, Source, SourceKind, +} from "./types.ts"; + +type RawSource = Partial & { id: string; kind: SourceKind }; + +/** Apply the policy gates. This is the only constructor for Source values. */ +export function normalizeSource(raw: RawSource): Source { + const isPrivate = raw.kind === "private_tracker"; + // Gate 2: f2f_only is forced for private sources — an explicit "open" in the + // config is overridden, not honored. Everything else defaults open. + const swarmIsolation = isPrivate ? "f2f_only" : (raw.swarmIsolation ?? "open"); + // Gate 1: private defaults closed (search_only); flipping to content is a + // deliberate, consequence-explicit user action recorded in fleet.json. + const sharePolicy = raw.sharePolicy ?? (isPrivate ? "search_only" : "content"); + return { + id: raw.id, + kind: raw.kind, + sharePolicy, + swarmIsolation, + label: raw.label ?? raw.id, + }; +} + +function servedVia(host: FleetHost, source: Source): ServedVia { + // f2f_only content may ONLY travel the fleet's own wireguard swarm. + if (source.swarmIsolation === "f2f_only") return "wireguard"; + return host.reachable === "wireguard" ? "wireguard" : "public"; +} + +/** A live peer as transmission reports it (torrent-get `peers` field). */ +export interface LivePeer { + address: string; + port: number; +} + +export interface PeersForInput { + infohash: string; + /** Display title for name-keyed holdings (static holdings carry no hash). */ + title: string | null; + sources: Source[]; + hosts: FleetHost[]; + /** Fleet-wide holdings (transmission + static). */ + holdings: Holding[]; + /** Live swarm peers from the transmission host, if reachable. */ + livePeers: LivePeer[]; +} + +/** Provenance priority for dedup: a fleet copy beats a random DHT address. */ +const KIND_PRIORITY: Record = { + fleet_host: 0, + seedbox: 1, + friend_mesh: 2, + private_tracker: 3, + public_tracker: 4, + dht: 5, +}; + +export function peersFor(input: PeersForInput): Peer[] { + const { infohash, title, sources, hosts, holdings, livePeers } = input; + const byId = new Map(hosts.map(h => [h.id, h])); + const out: Peer[] = []; + + const matches = (h: Holding): boolean => + h.complete && (h.infohash !== null ? h.infohash === infohash : title !== null && h.title === title); + + // Fleet + seedbox holders. The implicit fleet source needs no configuration — + // your own hosts are always a permitted source of your own content. + for (const holding of holdings.filter(matches)) { + const host = byId.get(holding.hostId); + if (!host || host.addr === null) continue; + const kind: SourceKind = host.class === "seedbox" ? "seedbox" : "fleet_host"; + const source = sources.find(s => s.kind === kind) + ?? normalizeSource({ id: kind, kind, label: kind === "seedbox" ? "seedbox" : "fleet" }); + if (source.sharePolicy !== "content") continue; // search_only never serves bytes + out.push({ + addr: host.addr, + sourceKind: kind, + sourceId: source.id, + servedVia: servedVia(host, source), + }); + } + + // DHT/public live peers — only when a dht/public source permits content. + const publicSource = sources.find(s => + (s.kind === "dht" || s.kind === "public_tracker") && s.sharePolicy === "content"); + if (publicSource) { + for (const p of livePeers) { + out.push({ + addr: `${p.address}:${p.port}`, + sourceKind: publicSource.kind, + sourceId: publicSource.id, + servedVia: "public", + }); + } + } + + // Dedup by address, keeping the best provenance (fleet > seedbox > … > dht). + const best = new Map(); + for (const p of out) { + const existing = best.get(p.addr); + if (!existing || KIND_PRIORITY[p.sourceKind] < KIND_PRIORITY[existing.sourceKind]) { + best.set(p.addr, p); + } + } + return [...best.values()].sort((a, b) => + KIND_PRIORITY[a.sourceKind] - KIND_PRIORITY[b.sourceKind] || a.addr.localeCompare(b.addr)); +} diff --git a/governor/src/fleet/reaper.ts b/governor/src/fleet/reaper.ts new file mode 100644 index 0000000..3af7726 --- /dev/null +++ b/governor/src/fleet/reaper.ts @@ -0,0 +1,80 @@ +// Zombie reaper — classify every torrent healthy | stalled | dead and propose +// recovery. The mesh prevents FUTURE zombies (custody floor); the reaper deals +// with the already-dying: recover from the mesh first, public re-search as the +// fallback. Classification is pure; actuation lives in the CLI/daemon. + +import type { Holding, ReaperVerdict, TorrentVitals } from "./types.ts"; + +/** No piece activity for this long while incomplete and peerless → stalled. */ +export const STALL_AFTER_S = 30 * 60; +/** No piece activity for this long while incomplete and peerless → dead. */ +export const DEAD_AFTER_S = 72 * 3600; + +export function classify(t: TorrentVitals, nowEpochS: number): ReaperVerdict { + const base = { name: t.name, infohash: t.infohash }; + + // A complete copy can't be a zombie — it IS the seed. (Stopped-but-complete + // is a policy choice, not a swarm death.) + if (t.percentDone >= 1) { + return { ...base, health: "healthy", reason: "complete", recovery: null }; + } + if (t.error !== 0) { + return { + ...base, health: "dead", + reason: `client error: ${t.errorString || `code ${t.error}`}`, + recovery: { kind: "research", detail: "tracker/client error — re-source a fresh release" }, + }; + } + if (t.rateDownloadBps > 0) { + return { ...base, health: "healthy", reason: "downloading", recovery: null }; + } + if (t.peersConnected > 0) { + // Connected but not moving — choked or endgame; give it a reannounce nudge. + return { + ...base, health: "stalled", reason: "peers connected but no transfer", + recovery: { kind: "reannounce", detail: "connected swarm not sending — reannounce" }, + }; + } + + // Incomplete, no peers, no transfer: age of last activity decides. A torrent + // that has NEVER had activity ages from when it was added. + const lastAlive = t.activityDate > 0 ? t.activityDate : t.addedDate; + const idleS = Math.max(0, nowEpochS - lastAlive); + if (idleS >= DEAD_AFTER_S) { + return { + ...base, health: "dead", + reason: `no peers and no activity for ${Math.round(idleS / 3600)}h`, + recovery: { kind: "research", detail: "swarm is a corpse — recover from mesh or re-search" }, + }; + } + if (idleS >= STALL_AFTER_S) { + return { + ...base, health: "stalled", + reason: `no peers for ${Math.round(idleS / 60)}min`, + recovery: { kind: "reannounce", detail: "quiet swarm — reannounce before declaring it dead" }, + }; + } + return { ...base, health: "healthy", reason: "recently active", recovery: null }; +} + +/** + * Upgrade dead-torrent recovery to mesh-first: when another fleet host holds + * the title complete, the fix is a fleet copy, not a public re-search. + * `holdings` spans the whole fleet (transmission + static holdings). + */ +export function planRecovery(verdicts: ReaperVerdict[], holdings: Holding[], localHostId: string): ReaperVerdict[] { + return verdicts.map(v => { + if (v.health !== "dead") return v; + const meshHolder = holdings.find(h => + h.complete && h.hostId !== localHostId + && (h.infohash !== null && v.infohash !== null ? h.infohash === v.infohash : h.title === v.name)); + if (!meshHolder) return v; + return { + ...v, + recovery: { + kind: "mesh_recover", + detail: `complete copy on ${meshHolder.hostId} — pull over the fleet, skip the public swarm`, + }, + }; + }); +} diff --git a/governor/src/fleet/transmission.ts b/governor/src/fleet/transmission.ts new file mode 100644 index 0000000..a344d5c --- /dev/null +++ b/governor/src/fleet/transmission.ts @@ -0,0 +1,132 @@ +// Minimal transmission view for the fleet engine. Runs the JSON-RPC on the +// transmission host itself over ssh (daemon listens on localhost:9091 there, +// not on the mesh), same approach as mcp/src/transmission/client.ts — kept +// separate because governor and mcp are independent packages by design. +// +// Endpoint selection: the registry's transmission host (api=transmission_rpc) +// supplies the ssh destination; candidates come from its endpoints with the +// usual LAN-before-overlay ordering already baked into devices.json. + +import { spawnSync } from "node:child_process"; +import type { Holding, TorrentVitals } from "./types.ts"; +import type { FleetHost } from "./types.ts"; +import { log } from "../log.ts"; + +const RPC = "localhost:9091"; +const SSH_OPTS = ["-o", "ConnectTimeout=8", "-o", "BatchMode=yes"]; + +function ssh(dest: string, cmd: string): { ok: boolean; out: string } { + const r = spawnSync("ssh", [...SSH_OPTS, dest, cmd], { encoding: "utf8", timeout: 45_000 }); + const out = ((r.stdout ?? "") + (r.stderr ?? "")).trim(); + return { ok: r.status === 0, out }; +} + +const VITALS_FIELDS = [ + "id", "name", "hashString", "percentDone", "status", "error", "errorString", + "peersConnected", "rateDownload", "rateUpload", "activityDate", "addedDate", + "doneDate", "sizeWhenDone", "downloadDir", +]; + +interface RpcTorrent { + id: number; + name: string; + hashString: string; + percentDone: number; + status: number; + error: number; + errorString: string; + peersConnected: number; + rateDownload: number; + rateUpload: number; + activityDate: number; + addedDate: number; + doneDate: number; + sizeWhenDone: number; + downloadDir: string; +} + +function rpc(host: FleetHost, args: Record, method = "torrent-get"): T[] { + if (!host.ssh) throw new Error(`transmission host ${host.id} has no ssh destination`); + const url = `http://${RPC}/transmission/rpc`; + const payload = JSON.stringify({ method, arguments: args }).replace(/'/g, "'\\''"); + // One round-trip: prime for the session id, then replay it on the real call. + const remote = + `SID=$(curl -s -D - -o /dev/null ${url} | ` + + `awk 'tolower($1)=="x-transmission-session-id:"{gsub(/\\r/,"");print $2}'); ` + + `curl -s -H "X-Transmission-Session-Id: $SID" ${url} -d '${payload}'`; + const { ok, out } = ssh(host.ssh, remote); + if (!ok) throw new Error(`transmission rpc on ${host.id} failed: ${out.slice(0, 200)}`); + let parsed: { arguments?: { torrents?: T[] } }; + try { + parsed = JSON.parse(out); + } catch { + throw new Error(`transmission rpc returned non-JSON: ${out.slice(0, 200)}`); + } + return parsed.arguments?.torrents ?? []; +} + +/** The registry's torrent host (first with a transmission API), or null. */ +export function transmissionHost(hosts: FleetHost[]): FleetHost | null { + return hosts.find(h => h.api === "transmission_rpc") ?? null; +} + +export function fetchVitals(host: FleetHost): TorrentVitals[] { + return rpc(host, { fields: VITALS_FIELDS }).map(t => ({ + name: t.name, + infohash: t.hashString || null, + percentDone: t.percentDone, + status: t.status, + error: t.error, + errorString: t.errorString, + peersConnected: t.peersConnected, + rateDownloadBps: t.rateDownload, + rateUploadBps: t.rateUpload, + activityDate: t.activityDate, + addedDate: t.addedDate, + })); +} + +/** Holdings as transmission sees them: every torrent is a copy on that host. */ +export function fetchHoldings(host: FleetHost): Holding[] { + return rpc(host, { fields: VITALS_FIELDS }).map(t => ({ + hostId: host.id, + title: t.name, + infohash: t.hashString || null, + complete: t.percentDone >= 1, + completedAt: t.doneDate, + sizeBytes: t.sizeWhenDone || null, + })); +} + +export interface LiveSwarmPeer { address: string; port: number } + +/** Live swarm peers for one torrent (by infohash) — feeds peers_for's DHT leg. */ +export function fetchLivePeers(host: FleetHost, infohash: string): LiveSwarmPeer[] { + const rows = rpc<{ peers?: LiveSwarmPeer[] }>(host, { ids: [infohash], fields: ["peers"] }); + return rows[0]?.peers ?? []; +} + +/** Safe, idempotent per-torrent nudges used by `fleet reaper --apply`. */ +export function torrentAction(host: FleetHost, infohash: string, action: "reannounce" | "verify" | "start"): boolean { + const method = action === "reannounce" ? "torrent-reannounce" + : action === "verify" ? "torrent-verify" + : "torrent-start"; + try { + rpc(host, { ids: [infohash] }, method); + return true; + } catch (err) { + log.warn(`${method} failed for ${infohash}: ${err instanceof Error ? err.message : String(err)}`); + return false; + } +} + +/** Static holdings from fleet.json → Holding records (no hash, name-keyed). */ +export function staticHoldings(map: Record): Holding[] { + const out: Holding[] = []; + for (const [hostId, titles] of Object.entries(map)) { + for (const title of titles) { + out.push({ hostId, title, infohash: null, complete: true, completedAt: 0, sizeBytes: null }); + } + } + return out; +} diff --git a/governor/src/index.ts b/governor/src/index.ts index 4b5b35c..66e873a 100644 --- a/governor/src/index.ts +++ b/governor/src/index.ts @@ -13,6 +13,7 @@ import { openPlaylist } from "./vlc.ts"; import { paths } from "./paths.ts"; import { log } from "./log.ts"; import { runGovernor } from "./governor.ts"; +import { runFleetCommand } from "./fleet/cli.ts"; import { addJob, loadState as loadGovernorState, saveState as saveGovernorState, setManualPause, setJobPaused } from "./jobs.ts"; const HELP = `portable-net-tv — buffered media streamer @@ -37,6 +38,16 @@ Usage: portable-net-tv pause-jobs set manual-pause flag (governor suspends all jobs) portable-net-tv resume-jobs clear manual-pause flag + --- fleet (registry / duties / custody / reaper / peers) --- + portable-net-tv fleet status registry + duty assignment + warnings + portable-net-tv fleet duties assign duties; log changes since last run + portable-net-tv fleet custody floor-check every title; print re-pin plans + portable-net-tv fleet reaper [--apply] + classify torrents healthy|stalled|dead; + --apply runs safe nudges (reannounce/verify) + portable-net-tv fleet peers peers_for(infohash|title) — provenance-tagged + (all fleet subcommands accept --json) + Profiles live at ${paths.profilesDir}/.json State lives at ${paths.stateDir}/.json `; @@ -229,6 +240,11 @@ switch (cmd) { break; } + case "fleet": { + runFleetCommand(rest); + break; + } + case "pause-job": { const id = requireArg(rest[0], "usage: portable-net-tv pause-job "); const matched = setJobPaused(id, true); diff --git a/governor/test/fleet/custody.test.ts b/governor/test/fleet/custody.test.ts new file mode 100644 index 0000000..3ac3d59 --- /dev/null +++ b/governor/test/fleet/custody.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, test } from "bun:test"; +import { custodiansOf, floorCheck, floorForTitle } from "../../src/fleet/custody.ts"; +import type { FleetHost, Holding } from "../../src/fleet/types.ts"; + +function host(over: Partial & { id: string }): FleetHost { + return { + name: over.id, + class: "server", + reachable: "wireguard", + alwaysOn: true, + onHomeIp: true, + api: "none", + addr: null, + ssh: null, + capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 }, + ...over, + }; +} + +function holding(over: Partial & { hostId: string; title: string }): Holding { + return { infohash: null, complete: true, completedAt: 0, sizeBytes: null, ...over }; +} + +const fleet: FleetHost[] = [ + host({ id: "black", capacity: { diskFreeBytes: 500e9, upBwKbs: null, uptimeScore: 1 } }), + host({ id: "apricot", capacity: { diskFreeBytes: 200e9, upBwKbs: null, uptimeScore: 1 } }), + host({ id: "plum", class: "roamer", alwaysOn: false }), + host({ id: "phone", class: "consumer", alwaysOn: false }), +]; + +describe("floorForTitle", () => { + test("healthy floor: N most-recent always-on holders become custodians", () => { + const r = floorForTitle("Show A", [ + holding({ hostId: "black", title: "Show A", completedAt: 100 }), + holding({ hostId: "apricot", title: "Show A", completedAt: 200 }), + ], fleet, 2); + expect(r.breach).toBe(false); + expect(r.custodians).toEqual(["apricot", "black"]); + expect(r.actions).toEqual([]); + }); + + test("breach: one copy with floor 2 → re-pin to the best eligible non-holder", () => { + const r = floorForTitle("Show B", [ + holding({ hostId: "black", title: "Show B", completedAt: 100 }), + ], fleet, 2); + expect(r.breach).toBe(true); + expect(r.actions).toHaveLength(1); + const action = r.actions[0]!; + expect(action.kind).toBe("repin"); + expect(action.toHost).toBe("apricot"); // always-on, most disk among non-holders + expect(action.fromHost).toBe("black"); // replicate before the last copy dies + }); + + test("zero surviving copies → re-pin plan flags that the reaper must re-source", () => { + const r = floorForTitle("Show C", [ + holding({ hostId: "plum", title: "Show C", complete: false }), + ], fleet, 2); + expect(r.completeCopies).toBe(0); + expect(r.breach).toBe(true); + expect(r.actions.every(a => a.fromHost === null)).toBe(true); + expect(r.actions[0]!.reason).toContain("re-source"); + }); + + test("a roamer's copy counts toward copies but a roamer is not obligated", () => { + const r = floorForTitle("Show D", [ + holding({ hostId: "plum", title: "Show D", completedAt: 300 }), + holding({ hostId: "black", title: "Show D", completedAt: 100 }), + ], fleet, 2); + expect(r.completeCopies).toBe(2); // plum's copy keeps the floor numerically intact + expect(r.breach).toBe(false); + expect(r.custodians).toEqual(["black"]); // …but only black is on the hook + }); + + test("disk-aware eligibility: a host without room is skipped as re-pin target", () => { + const tight = [ + host({ id: "black", capacity: { diskFreeBytes: 500e9, upBwKbs: null, uptimeScore: 1 } }), + host({ id: "small", capacity: { diskFreeBytes: 1e9, upBwKbs: null, uptimeScore: 1 } }), + ]; + const r = floorForTitle("Big", [ + holding({ hostId: "black", title: "Big", sizeBytes: 50e9, completedAt: 10 }), + ], tight, 2); + expect(r.breach).toBe(true); + expect(r.actions).toEqual([]); // small can't hold it; no fake plan + expect(r.warnings.some(w => w.includes("no eligible re-pin target"))).toBe(true); + }); +}); + +describe("floorCheck / custodiansOf", () => { + test("groups holdings per title and reports each", () => { + const reports = floorCheck([ + holding({ hostId: "black", title: "A" }), + holding({ hostId: "apricot", title: "A" }), + holding({ hostId: "black", title: "B" }), + ], fleet, 2); + expect(reports.map(r => r.title)).toEqual(["A", "B"]); + expect(reports[0]!.breach).toBe(false); + expect(reports[1]!.breach).toBe(true); + }); + + test("custodiansOf returns host records for the floor", () => { + const out = custodiansOf("A", [ + holding({ hostId: "black", title: "A", completedAt: 5 }), + ], fleet, 1); + expect(out.map(h => h.id)).toEqual(["black"]); + }); +}); diff --git a/governor/test/fleet/duties.test.ts b/governor/test/fleet/duties.test.ts new file mode 100644 index 0000000..c70460d --- /dev/null +++ b/governor/test/fleet/duties.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, test } from "bun:test"; +import { assignDuties, diffDuties } from "../../src/fleet/duties.ts"; +import type { FleetHost } from "../../src/fleet/types.ts"; + +function host(over: Partial & { id: string }): FleetHost { + return { + name: over.id, + class: "server", + reachable: "home_lan", + alwaysOn: true, + onHomeIp: true, + api: "none", + addr: null, + ssh: null, + capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 }, + ...over, + }; +} + +// Natalie's reference fleet from the spec: black=server, apricot=secondary +// always-on, plum=roamer, phone=consumer. +const referenceFleet: FleetHost[] = [ + host({ id: "black", class: "server", reachable: "wireguard", api: "transmission_rpc" }), + host({ id: "apricot", class: "server", reachable: "wireguard" }), + host({ id: "plum", class: "roamer", alwaysOn: false }), + host({ id: "phone", class: "consumer", alwaysOn: false }), +]; + +describe("assignDuties", () => { + test("a consumer never receives any duty", () => { + const { duties } = assignDuties(referenceFleet); + expect(duties.get("phone")).toEqual([]); + }); + + test("no broadcast without a public_ip always-on host, with warning", () => { + const { duties, warnings } = assignDuties(referenceFleet); + for (const d of duties.values()) expect(d).not.toContain("broadcast"); + expect(warnings.some(w => w.includes("broadcast"))).toBe(true); + }); + + test("broadcast goes to exactly one host, preferring seedbox over server", () => { + const fleet = [ + ...referenceFleet.map(h => h.id === "black" ? { ...h, reachable: "public_ip" as const } : h), + host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false }), + ]; + const { duties } = assignDuties(fleet); + const holders = [...duties.entries()].filter(([, d]) => d.includes("broadcast")).map(([id]) => id); + expect(holders).toEqual(["box"]); + }); + + test("f2f_relay covers always-on mesh-reachable servers, not roamers", () => { + const { duties } = assignDuties(referenceFleet); + expect(duties.get("black")).toContain("f2f_relay"); + expect(duties.get("apricot")).toContain("f2f_relay"); + expect(duties.get("plum")).not.toContain("f2f_relay"); + }); + + test("public_swarm_face prefers an off-home host; never a consumer", () => { + const fleet = [ + ...referenceFleet, + host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false }), + ]; + const { duties, warnings } = assignDuties(fleet); + expect(duties.get("box")).toContain("public_swarm_face"); + expect(duties.get("black")).not.toContain("public_swarm_face"); + expect(warnings.filter(w => w.includes("home IP"))).toEqual([]); + }); + + test("on-home face is assigned only as a last resort, with an exposure warning", () => { + const { duties, warnings } = assignDuties(referenceFleet); + const faces = [...duties.entries()].filter(([, d]) => d.includes("public_swarm_face")); + expect(faces.length).toBe(1); + expect(warnings.some(w => w.includes("home IP"))).toBe(true); + }); + + test("assignment is deterministic and stable across permuted input order", () => { + const a = assignDuties(referenceFleet); + const b = assignDuties([...referenceFleet].reverse()); + expect(Object.fromEntries(a.duties)).toEqual(Object.fromEntries(b.duties)); + }); +}); + +describe("diffDuties", () => { + test("reports per-host transitions and nothing else", () => { + const prev = new Map([["black", ["f2f_relay" as const]], ["plum", []]]); + const next = new Map([["black", ["f2f_relay" as const, "broadcast" as const]], ["plum", []]]); + const lines = diffDuties(prev, next); + expect(lines).toEqual(["black: f2f_relay → broadcast,f2f_relay"]); + }); +}); diff --git a/governor/test/fleet/peers.test.ts b/governor/test/fleet/peers.test.ts new file mode 100644 index 0000000..ddbce22 --- /dev/null +++ b/governor/test/fleet/peers.test.ts @@ -0,0 +1,112 @@ +import { describe, expect, test } from "bun:test"; +import { normalizeSource, peersFor } from "../../src/fleet/peers.ts"; +import type { FleetHost, Holding, Source } from "../../src/fleet/types.ts"; + +function host(over: Partial & { id: string }): FleetHost { + return { + name: over.id, + class: "server", + reachable: "wireguard", + alwaysOn: true, + onHomeIp: true, + api: "none", + addr: over.id + ".fleet", + ssh: null, + capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 }, + ...over, + }; +} + +const HASH = "c".repeat(40); + +function holding(hostId: string, complete = true): Holding { + return { hostId, title: "Show Y", infohash: HASH, complete, completedAt: 1, sizeBytes: null }; +} + +describe("normalizeSource — the two policy gates", () => { + test("private_tracker is forced to f2f_only even when configured open", () => { + const s = normalizeSource({ id: "ptp", kind: "private_tracker", swarmIsolation: "open" }); + expect(s.swarmIsolation).toBe("f2f_only"); + }); + + test("private_tracker defaults to search_only (default-closed)", () => { + expect(normalizeSource({ id: "ptp", kind: "private_tracker" }).sharePolicy).toBe("search_only"); + }); + + test("seedbox defaults to content + open (the zero-risk source)", () => { + const s = normalizeSource({ id: "box", kind: "seedbox" }); + expect(s.sharePolicy).toBe("content"); + expect(s.swarmIsolation).toBe("open"); + }); +}); + +describe("peersFor", () => { + const fleet = [host({ id: "black" }), host({ id: "apricot" })]; + const dht: Source = normalizeSource({ id: "dht", kind: "dht" }); + + test("unions fleet holders with live DHT peers, provenance-tagged", () => { + const peers = peersFor({ + infohash: HASH, + title: "Show Y", + sources: [dht], + hosts: fleet, + holdings: [holding("black"), holding("apricot")], + livePeers: [{ address: "203.0.113.9", port: 51413 }], + }); + expect(peers.map(p => `${p.sourceKind}:${p.addr}`)).toEqual([ + "fleet_host:apricot.fleet", + "fleet_host:black.fleet", + "dht:203.0.113.9:51413", + ]); + expect(peers[0]!.servedVia).toBe("wireguard"); + expect(peers[2]!.servedVia).toBe("public"); + }); + + test("incomplete holdings contribute no peers", () => { + const peers = peersFor({ + infohash: HASH, title: "Show Y", sources: [dht], hosts: fleet, + holdings: [holding("black", false)], livePeers: [], + }); + expect(peers).toEqual([]); + }); + + test("a seedbox holder is tagged as a seedbox source", () => { + const peers = peersFor({ + infohash: HASH, title: "Show Y", sources: [dht], + hosts: [host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false })], + holdings: [holding("box")], livePeers: [], + }); + expect(peers[0]!.sourceKind).toBe("seedbox"); + }); + + test("search_only private source never serves peers", () => { + // A future friend's private-sourced copy modeled as a fleet host whose + // matching source is search_only: the registry merge is gated. + const privateSource = normalizeSource({ id: "ptp", kind: "private_tracker" }); + const peers = peersFor({ + infohash: HASH, title: "Show Y", sources: [privateSource], hosts: fleet, + holdings: [], livePeers: [{ address: "198.51.100.1", port: 1 }], + }); + expect(peers).toEqual([]); // no dht source configured either — nothing leaks + }); + + test("dedup keeps the best provenance for a duplicate address", () => { + const peers = peersFor({ + infohash: HASH, title: "Show Y", sources: [dht], + hosts: [host({ id: "black", addr: "203.0.113.9" })], + holdings: [holding("black")], + livePeers: [{ address: "203.0.113.9", port: 0 }], + }); + // hosts addr "203.0.113.9" vs live "203.0.113.9:0" differ as strings; force + // the same addr to prove priority wins: + const dup = peersFor({ + infohash: HASH, title: "Show Y", sources: [dht], + hosts: [host({ id: "black", addr: "203.0.113.9:0" })], + holdings: [holding("black")], + livePeers: [{ address: "203.0.113.9", port: 0 }], + }); + expect(dup).toHaveLength(1); + expect(dup[0]!.sourceKind).toBe("fleet_host"); + expect(peers.length).toBe(2); + }); +}); diff --git a/governor/test/fleet/reaper.test.ts b/governor/test/fleet/reaper.test.ts new file mode 100644 index 0000000..3a5cbab --- /dev/null +++ b/governor/test/fleet/reaper.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, test } from "bun:test"; +import { classify, DEAD_AFTER_S, planRecovery, STALL_AFTER_S } from "../../src/fleet/reaper.ts"; +import type { Holding, TorrentVitals } from "../../src/fleet/types.ts"; + +const NOW = 1_750_000_000; + +function vitals(over: Partial & { name: string }): TorrentVitals { + return { + infohash: "a".repeat(40), + percentDone: 0.5, + status: 4, + error: 0, + errorString: "", + peersConnected: 0, + rateDownloadBps: 0, + rateUploadBps: 0, + activityDate: NOW, + addedDate: NOW - 86_400, + ...over, + }; +} + +describe("classify", () => { + test("complete torrents are healthy regardless of swarm state", () => { + const v = classify(vitals({ name: "done", percentDone: 1, peersConnected: 0, activityDate: 0 }), NOW); + expect(v.health).toBe("healthy"); + }); + + test("client error → dead with a re-search proposal", () => { + const v = classify(vitals({ name: "err", error: 3, errorString: "unregistered torrent" }), NOW); + expect(v.health).toBe("dead"); + expect(v.recovery?.kind).toBe("research"); + }); + + test("actively downloading → healthy", () => { + expect(classify(vitals({ name: "dl", rateDownloadBps: 1024 }), NOW).health).toBe("healthy"); + }); + + test("peers connected but nothing moving → stalled with reannounce", () => { + const v = classify(vitals({ name: "choked", peersConnected: 4 }), NOW); + expect(v.health).toBe("stalled"); + expect(v.recovery?.kind).toBe("reannounce"); + }); + + test("idle past the stall window → stalled; past the dead window → dead", () => { + const stalled = classify(vitals({ name: "s", activityDate: NOW - STALL_AFTER_S - 60 }), NOW); + expect(stalled.health).toBe("stalled"); + const dead = classify(vitals({ name: "d", activityDate: NOW - DEAD_AFTER_S - 60 }), NOW); + expect(dead.health).toBe("dead"); + }); + + test("never-active torrents age from addedDate", () => { + const v = classify(vitals({ name: "n", activityDate: 0, addedDate: NOW - DEAD_AFTER_S - 60 }), NOW); + expect(v.health).toBe("dead"); + }); + + test("recently idle → still healthy (no flapping)", () => { + const v = classify(vitals({ name: "fresh", activityDate: NOW - 60 }), NOW); + expect(v.health).toBe("healthy"); + }); +}); + +describe("planRecovery", () => { + const deadVerdict = classify( + vitals({ name: "Show X S01", infohash: "b".repeat(40), activityDate: NOW - DEAD_AFTER_S - 1 }), + NOW, + ); + + test("mesh-first: a complete copy on another fleet host upgrades research → mesh_recover", () => { + const holdings: Holding[] = [ + { hostId: "apricot", title: "Show X S01", infohash: "b".repeat(40), complete: true, completedAt: 1, sizeBytes: null }, + ]; + const [out] = planRecovery([deadVerdict], holdings, "black"); + expect(out!.recovery?.kind).toBe("mesh_recover"); + expect(out!.recovery?.detail).toContain("apricot"); + }); + + test("a copy on the SAME host doesn't count as mesh recovery", () => { + const holdings: Holding[] = [ + { hostId: "black", title: "Show X S01", infohash: "b".repeat(40), complete: true, completedAt: 1, sizeBytes: null }, + ]; + const [out] = planRecovery([deadVerdict], holdings, "black"); + expect(out!.recovery?.kind).toBe("research"); + }); + + test("name-keyed static holdings match when hashes are absent", () => { + const holdings: Holding[] = [ + { hostId: "apricot", title: "Show X S01", infohash: null, complete: true, completedAt: 0, sizeBytes: null }, + ]; + const [out] = planRecovery([deadVerdict], holdings, "black"); + expect(out!.recovery?.kind).toBe("mesh_recover"); + }); + + test("healthy verdicts pass through untouched", () => { + const healthy = classify(vitals({ name: "ok", rateDownloadBps: 10 }), NOW); + const [out] = planRecovery([healthy], [], "black"); + expect(out).toEqual(healthy); + }); +}); diff --git a/governor/test/fleet/registry.test.ts b/governor/test/fleet/registry.test.ts new file mode 100644 index 0000000..4845a40 --- /dev/null +++ b/governor/test/fleet/registry.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, test } from "bun:test"; +import { buildRegistry } from "../../src/fleet/registry.ts"; + +// The app's devices.json shape (DeviceConfig.swift): devices[] with type + +// services; legacy files use a `hosts` key and may lack `type`. +const devicesFile = { + devices: [ + { id: "plum-vlc", name: "Plum VLC", kind: "vlc", type: "laptop" }, + { + id: "black", name: "Black TV", kind: "mpv-ipc", type: "storage", + mpv: { endpoints: ["lilith@10.0.0.11", "lilith@10.9.0.4"] }, + }, + { id: "phone", name: "Phone", kind: "quicktime", type: "cellphone" }, + ], +}; + +describe("buildRegistry", () => { + test("maps DeviceType → fleet class per the spec", () => { + const reg = buildRegistry(devicesFile, null); + const byId = Object.fromEntries(reg.hosts.map(h => [h.id, h])); + expect(byId["plum-vlc"]!.class).toBe("roamer"); + expect(byId["black"]!.class).toBe("server"); + expect(byId["phone"]!.class).toBe("consumer"); + }); + + test("class-derived defaults: storage is always-on with a transmission api", () => { + const reg = buildRegistry(devicesFile, null); + const black = reg.hosts.find(h => h.id === "black")!; + expect(black.alwaysOn).toBe(true); + expect(black.api).toBe("transmission_rpc"); + const plum = reg.hosts.find(h => h.id === "plum-vlc")!; + expect(plum.alwaysOn).toBe(false); + expect(plum.api).toBe("none"); + }); + + test("reachability inferred from overlay endpoints; addr strips the user part", () => { + const reg = buildRegistry(devicesFile, null); + const black = reg.hosts.find(h => h.id === "black")!; + expect(black.reachable).toBe("wireguard"); + expect(black.addr).toBe("10.0.0.11"); + }); + + test("fleet.json overrides win over derivation", () => { + const reg = buildRegistry(devicesFile, { + floorCopies: 3, + devices: { black: { reachable: "public_ip", onHomeIp: false, diskFreeBytes: 42 } }, + }); + const black = reg.hosts.find(h => h.id === "black")!; + expect(black.reachable).toBe("public_ip"); + expect(black.onHomeIp).toBe(false); + expect(black.capacity.diskFreeBytes).toBe(42); + expect(reg.floorCopies).toBe(3); + }); + + test("legacy `hosts` key and missing type are tolerated", () => { + const reg = buildRegistry({ hosts: [{ id: "old-black", kind: "blacktv" }] }, null); + expect(reg.hosts[0]!.class).toBe("server"); + }); + + test("an implicit dht source is added once; private gates always enforced", () => { + const reg = buildRegistry(devicesFile, { + sources: [{ id: "ptp", kind: "private_tracker", swarmIsolation: "open" }], + }); + expect(reg.sources.map(s => s.kind).sort()).toEqual(["dht", "private_tracker"]); + const ptp = reg.sources.find(s => s.kind === "private_tracker")!; + expect(ptp.swarmIsolation).toBe("f2f_only"); + expect(ptp.sharePolicy).toBe("search_only"); + }); + + test("missing devices.json degrades to an empty registry with a warning", () => { + const reg = buildRegistry(null, null); + expect(reg.hosts).toEqual([]); + expect(reg.warnings.some(w => w.includes("no device registry"))).toBe(true); + }); +}); diff --git a/recommender/media_rec/title_refiner.py b/recommender/media_rec/title_refiner.py new file mode 100644 index 0000000..1f844a6 --- /dev/null +++ b/recommender/media_rec/title_refiner.py @@ -0,0 +1,86 @@ +"""MLX title refiner — the model-backed tail of the filename→metadata pipeline. + +The Swift `FilenameParser` handles the overwhelming majority of releases with +regex; it consults this module (via `LocalLLMTitleRefiner`) only when its +extracted title is degenerate (<2 chars) — the messy tail: bracket-soup scene +names, foreign-title-first releases, names whose noise markers sit before the +title. Runs a small local MLX model on plum; no network, no API key. + +I/O: the raw filename (no directory, extension optional) on argv[1] (or stdin); +prints one JSON object: {"title": ""}. An empty title means +"no confident answer" — the caller keeps its regex result. + +Degrades gracefully: if MLX/the model is unavailable it prints an empty title, +so the caller never blocks on the model. + +Usage: + uv run python -m media_rec.title_refiner '' +""" + +from __future__ import annotations + +import json +import re +import sys + +MODEL = "mlx-community/Qwen2.5-1.5B-Instruct-4bit" + +_NOTHING = {"title": ""} + + +def _read_filename() -> str: + raw = sys.argv[1] if len(sys.argv) > 1 else sys.stdin.read() + return raw.strip() + + +def _plausible(title: str, filename: str) -> bool: + """Reject hallucinated answers: a refined title must be short, mostly + letters, and share at least one alphanumeric token with the filename.""" + if not 2 <= len(title) <= 80: + return False + if not re.search(r"[A-Za-z]", title): + return False + file_tokens = {t.lower() for t in re.findall(r"[A-Za-z0-9]{2,}", filename)} + title_tokens = {t.lower() for t in re.findall(r"[A-Za-z0-9]{2,}", title)} + return bool(file_tokens & title_tokens) + + +def refine(filename: str) -> dict: + if not filename: + return _NOTHING + try: + from mlx_lm import generate, load + except Exception: + return _NOTHING + + prompt = ( + "This is a media release filename. Extract ONLY the human title of the " + "show or movie — no year, season/episode markers, quality, codec, " + "group tags, or brackets.\n\n" + f"Filename: {filename!r}\n\n" + 'Reply with ONLY JSON, no prose: {"title": ""}' + ) + try: + model, tok = load(MODEL) + text = tok.apply_chat_template([{"role": "user", "content": prompt}], + add_generation_prompt=True) + out = generate(model, tok, text, max_tokens=60) + except Exception: + return _NOTHING + + match = re.search(r"\{.*\}", out, re.DOTALL) + if not match: + return _NOTHING + try: + title = str(json.loads(match.group(0)).get("title") or "").strip() + except Exception: + return _NOTHING + return {"title": title} if _plausible(title, filename) else _NOTHING + + +def main() -> None: + print(json.dumps(refine(_read_filename()))) + + +if __name__ == "__main__": + main() diff --git a/tools/update.sh b/tools/update.sh index 34d551d..aacf70a 100755 --- a/tools/update.sh +++ b/tools/update.sh @@ -4,13 +4,18 @@ # no-op when already current. This is how every node EXCEPT the build box (plum) # gets the app; plum cuts releases with tools/release.sh. # -# Per-OS behavior (the fleet: iOS, macOS, Ubuntu, Bluefin, Windows): +# Per-OS behavior (the fleet: iOS, Android, macOS, Ubuntu, Bluefin, Windows): # macOS → TVAnarchy-.zip → /Applications (admin) or ~/Applications # Linux → TVAnarchy--linux-.tar.gz → /opt/tv-anarchy (classic, # writable) or ~/.local/opt/tv-anarchy (non-root; always on # immutable/ostree systems like Bluefin, whose /usr is read-only) # Windows → TVAnarchy--windows-.zip → %LOCALAPPDATA%\Programs\TVAnarchy # (per-user, no elevation; run under Git Bash/MSYS) +# Android → run under Termux (detected before generic Linux — Termux's uname +# says Linux): TVAnarchy--android.apk → ~/storage/downloads, +# then handed to the system package installer via termux-open. +# Apps aren't folder-installs on Android, so "installed version" is +# a stamp recorded on successful handoff (--force re-downloads). # iOS → not served here: no shell. Install via Xcode (TVAnarchyiOS scheme) # or TestFlight/sideload — see docs/operations.md. # A release that lacks the asset for this platform fails with the exact missing @@ -30,12 +35,18 @@ REPO="${FORGEJO_REPO:-tv-anarchy}" FORCE=""; [ "${1:-}" = "--force" ] && FORCE=1 # --- platform -------------------------------------------------------------- -case "$(uname -s)" in - Darwin) OS=mac ;; - Linux) OS=linux ;; - MINGW*|MSYS*|CYGWIN*) OS=windows ;; - *) echo "✗ unsupported platform '$(uname -s)' — TVAnarchy ships for macOS, Linux, Windows (iOS via Xcode/TestFlight)." >&2; exit 1 ;; -esac +# Termux first: it reports `Linux` from uname, but Android apps are APKs handed +# to the package installer, not folder installs. +if [ -n "${TERMUX_VERSION:-}" ] || [ -d /data/data/com.termux ]; then + OS=android +else + case "$(uname -s)" in + Darwin) OS=mac ;; + Linux) OS=linux ;; + MINGW*|MSYS*|CYGWIN*) OS=windows ;; + *) echo "✗ unsupported platform '$(uname -s)' — TVAnarchy ships for macOS, Linux, Windows, Android/Termux (iOS via Xcode/TestFlight)." >&2; exit 1 ;; + esac +fi ARCH="$(uname -m)" # arm64 / x86_64 / aarch64 # True on image-based (ostree/bootc) Linux — Bluefin, Silverblue, etc. Their @@ -55,16 +66,23 @@ resolve_dest() { else printf '%s/.local/opt/tv-anarchy\n' "$HOME"; fi ;; windows) printf '%s/Programs/TVAnarchy\n' "${LOCALAPPDATA:-$HOME/AppData/Local}" ;; + android) + # Shared storage (visible to the package installer / Files app) when the + # Termux storage bridge is set up; else Termux home as a fallback. + if [ -d "$HOME/storage/downloads" ]; then printf '%s/storage/downloads\n' "$HOME" + else printf '%s\n' "$HOME"; fi ;; esac } DEST="$(resolve_dest)" -# The release asset this platform consumes. +# The release asset this platform consumes. (The APK carries all ABIs, so no +# arch suffix on Android.) asset_name() { case "$OS" in mac) printf 'TVAnarchy-%s.zip\n' "$1" ;; linux) printf 'TVAnarchy-%s-linux-%s.tar.gz\n' "$1" "$ARCH" ;; windows) printf 'TVAnarchy-%s-windows-%s.zip\n' "$1" "$ARCH" ;; + android) printf 'TVAnarchy-%s-android.apk\n' "$1" ;; esac } @@ -100,13 +118,15 @@ fi # --- compare to the installed copy (releases are tagged v) # macOS reads the bundle plist; Linux/Windows read the .release-tag stamp this -# script writes on install. +# script writes on install; Android reads the handoff stamp (Termux can't query +# another package's installed version without adb). +ANDROID_STAMP="$HOME/.local/state/tv-anarchy/android-release-tag" installed_tag() { - if [ "$OS" = mac ]; then - [ -d "$1" ] && printf 'v%s\n' "$(/usr/libexec/PlistBuddy -c 'Print :CFBundleShortVersionString' "$1/Contents/Info.plist" 2>/dev/null || echo '?')" - else - [ -f "$1/.release-tag" ] && cat "$1/.release-tag" - fi + case "$OS" in + mac) [ -d "$1" ] && printf 'v%s\n' "$(/usr/libexec/PlistBuddy -c 'Print :CFBundleShortVersionString' "$1/Contents/Info.plist" 2>/dev/null || echo '?')" ;; + android) [ -f "$ANDROID_STAMP" ] && cat "$ANDROID_STAMP" ;; + *) [ -f "$1/.release-tag" ] && cat "$1/.release-tag" ;; + esac } installed="$(installed_tag "$DEST" || true)"; installed="${installed:-none}"; stale="" if [ "$installed" = "none" ] && [ "$OS" = mac ] && [ -z "${TVANARCHY_DEST:-}" ]; then @@ -133,6 +153,7 @@ curl -fsSL "${auth[@]}" -o "$TMP/asset" "$URL" mkdir -p "$TMP/unpacked" case "$WANT" in + *.apk) : ;; # nothing to unpack — the APK is the artifact *.tar.gz) tar -xzf "$TMP/asset" -C "$TMP/unpacked" ;; *.zip) if [ "$OS" = mac ]; then ditto -x -k "$TMP/asset" "$TMP/unpacked" else unzip -q "$TMP/asset" -d "$TMP/unpacked"; fi ;; @@ -158,6 +179,25 @@ case "$OS" in [ "$OS" = linux ] && chmod +x "$DEST"/tvanarchy* 2>/dev/null || true relaunch="restart TVAnarchy to pick this up." ;; + android) + # DEST is a download dir, not an install dir: drop the APK in shared + # storage and hand it to the system package installer. Android won't let a + # shell install packages directly (that's by design); the user confirms the + # installer prompt. + apk="$DEST/$WANT" + mv "$TMP/asset" "$apk" + if command -v termux-open >/dev/null; then + termux-open "$apk" + echo "→ handed $WANT to the package installer — confirm the install prompt." + else + echo "→ saved $apk — open it from the Files app to install." + [ -d "$HOME/storage" ] || echo " (tip: run termux-setup-storage so downloads land in shared storage)" + fi + mkdir -p "$(dirname "$ANDROID_STAMP")" + printf '%s\n' "$TAG" > "$ANDROID_STAMP" + DEST="$apk" # the summary line should point at the APK, not the folder + relaunch="reopen TVAnarchy after the installer finishes." + ;; esac echo "✓ installed $TAG → $DEST"