feat(metadata): add local llm title refiner integration

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-06-09 21:10:47 -07:00
parent ef3ed6dcfe
commit 8f12f470b7
16 changed files with 1311 additions and 15 deletions

View file

@ -1,7 +1,14 @@
import SwiftUI
import TVAnarchyCore
@main
struct TVAnarchyApp: App {
init() {
// Close the MLX seam: titles the regex parser can't extract go to the
// local model (cached, self-disabling when MLX is absent).
FilenameParser.refiner = LocalLLMTitleRefiner()
}
var body: some Scene {
WindowGroup {
RootView()

View file

@ -0,0 +1,101 @@
import Foundation
/// `TitleRefiner` backed by the local MLX model on plum (recommender's
/// `title_refiner.py`) closes the seam declared in `FilenameParser`. Shells
/// into the recommender like `LocalLLMGrouper`; returns `nil` on any failure so
/// the regex path always stands alone.
///
/// The refiner sits on the synchronous parse path, so two guards keep the messy
/// tail from stalling a scan:
/// - **Result cache** (`~/.local/state/tv-anarchy/title-refinements.json`):
/// each distinct messy filename pays for the model exactly once, ever.
/// - **Session kill-switch**: consecutive subprocess failures (no `uv`, no MLX,
/// no model) disable the refiner for the rest of the process one scan never
/// pays the failure timeout per file.
public struct LocalLLMTitleRefiner: TitleRefiner {
public init() {}
public func refineTitle(from filename: String) -> String? {
if let cached = Self.store.cached(filename) { return cached.isEmpty ? nil : cached }
guard Self.store.healthy else { return nil }
let dir = RepoPaths.recommender.path
let cmd = "cd \(Self.shq(dir)) && uv run python -m media_rec.title_refiner \(Self.shq(filename))"
let r = ProcessRunner.runShell(cmd, timeout: 90, cwd: dir)
guard r.ok,
let data = r.stdout.trimmingCharacters(in: .whitespacesAndNewlines).data(using: .utf8),
let decoded = try? JSONDecoder().decode(Refined.self, from: data) else {
if !r.ok { Log.warn("title refiner failed (exit \(r.status)): \(r.stderr.suffix(160))") }
Self.store.recordFailure()
return nil
}
Self.store.recordSuccess()
let title = decoded.title.trimmingCharacters(in: .whitespaces)
// Cache empties too "the model has no answer" is also worth remembering.
Self.store.remember(filename, title: title)
return title.isEmpty ? nil : title
}
private struct Refined: Decodable { let title: String }
private static func shq(_ s: String) -> String { "'" + s.replacingOccurrences(of: "'", with: "'\\''") + "'" }
private static let store = RefinementStore()
}
/// Thread-safe cache + health tracking for the refiner. A class with one lock
/// the refiner is consulted from concurrent scan work.
final class RefinementStore: @unchecked Sendable {
private let lock = NSLock()
private var cache: [String: String]
private var consecutiveFailures = 0
private var dirty = false
/// After this many subprocess failures in a row, stop trying this session.
private static let maxFailures = 2
private static var url: URL {
FileManager.default.homeDirectoryForCurrentUser
.appendingPathComponent(".local/state/tv-anarchy/title-refinements.json")
}
init() {
if let data = try? Data(contentsOf: Self.url),
let map = try? JSONDecoder().decode([String: String].self, from: data) {
cache = map
} else {
cache = [:]
}
}
var healthy: Bool {
lock.lock(); defer { lock.unlock() }
return consecutiveFailures < Self.maxFailures
}
func cached(_ filename: String) -> String? {
lock.lock(); defer { lock.unlock() }
return cache[filename]
}
func recordFailure() {
lock.lock(); defer { lock.unlock() }
consecutiveFailures += 1
}
func recordSuccess() {
lock.lock(); defer { lock.unlock() }
consecutiveFailures = 0
}
func remember(_ filename: String, title: String) {
lock.lock()
cache[filename] = title
dirty = true
let snapshot = cache
lock.unlock()
// Persist outside the lock; last-writer-wins is fine for a cache.
guard let data = try? JSONEncoder().encode(snapshot) else { return }
try? FileManager.default.createDirectory(at: Self.url.deletingLastPathComponent(),
withIntermediateDirectories: true)
try? data.write(to: Self.url, options: .atomic)
}
}

View file

@ -58,6 +58,7 @@ destination** (`TVANARCHY_DEST` overrides everywhere):
| Ubuntu (classic Linux) | `TVAnarchy-<tag>-linux-<arch>.tar.gz` | `/opt/tv-anarchy` when `/opt` is writable, else `~/.local/opt/tv-anarchy` |
| Bluefin (immutable/ostree Linux) | same as Linux | always `~/.local/opt/tv-anarchy` (`/usr` is read-only; detected via `/run/ostree-booted`) |
| Windows (Git Bash/MSYS) | `TVAnarchy-<tag>-windows-<arch>.zip` | `%LOCALAPPDATA%\Programs\TVAnarchy` (per-user, no elevation) |
| Android (Termux) | `TVAnarchy-<tag>-android.apk` (all ABIs, no arch suffix) | APK → `~/storage/downloads`, handed to the system package installer via `termux-open` (user confirms the prompt — shells can't install packages on Android by design). Detected before generic Linux (Termux's `uname` says Linux). Version = handoff stamp, since Termux can't query installed packages |
| iOS | — | not via this script (no shell): build the `TVAnarchyiOS` scheme in Xcode onto the device, or TestFlight/sideload |
Version compare: macOS reads the bundle plist; Linux/Windows read the

View file

@ -10,7 +10,8 @@
},
"scripts": {
"start": "bun run src/index.ts",
"typecheck": "tsc --noEmit"
"typecheck": "tsc --noEmit",
"test": "bun test"
},
"dependencies": {},
"devDependencies": {

227
governor/src/fleet/cli.ts Normal file
View file

@ -0,0 +1,227 @@
// `portable-net-tv fleet <sub>` — the human/scripting surface of the fleet
// engine. Read-only by default; the single mutating path is `reaper --apply`,
// which performs only safe idempotent transmission nudges (reannounce/verify).
// Re-pins and mesh recoveries are PRINTED as plans — actuating a cross-host
// copy is the transfer queue's job, not a side effect of a status command.
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { homedir } from "node:os";
import { dirname, join } from "node:path";
import { loadRegistry, FLEET_PATH, DEVICES_PATH } from "./registry.ts";
import { assignDuties, diffDuties } from "./duties.ts";
import { floorCheck } from "./custody.ts";
import { classify, planRecovery } from "./reaper.ts";
import { peersFor } from "./peers.ts";
import {
fetchHoldings, fetchLivePeers, fetchVitals, staticHoldings, torrentAction, transmissionHost,
} from "./transmission.ts";
import type { Duty, Holding } from "./types.ts";
import { log } from "../log.ts";
const STATE_PATH = join(homedir(), ".local", "state", "tv-anarchy", "fleet-state.json");
interface FleetState {
/** Last duty assignment, for change-triggered logging. hostId → duties. */
duties: Record<string, Duty[]>;
}
function loadFleetState(): FleetState {
try {
return JSON.parse(readFileSync(STATE_PATH, "utf8")) as FleetState;
} catch {
return { duties: {} };
}
}
function saveFleetState(state: FleetState): void {
const dir = dirname(STATE_PATH);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
writeFileSync(STATE_PATH, JSON.stringify(state, null, 2));
}
/** Holdings across the fleet: the transmission host's torrents + static mirrors. */
function gatherHoldings(): { holdings: Holding[]; txHostId: string | null } {
const reg = loadRegistry();
const tx = transmissionHost(reg.hosts);
let holdings: Holding[] = staticHoldings(reg.staticHoldings);
if (tx) {
try {
holdings = [...fetchHoldings(tx), ...holdings];
} catch (err) {
log.warn(`transmission unreachable on ${tx.id}: ${err instanceof Error ? err.message : String(err)}`);
}
}
return { holdings, txHostId: tx?.id ?? null };
}
function cmdStatus(json: boolean): void {
const reg = loadRegistry();
const { duties, warnings } = assignDuties(reg.hosts);
if (json) {
log.raw(JSON.stringify({
hosts: reg.hosts,
duties: Object.fromEntries(duties),
sources: reg.sources,
floorCopies: reg.floorCopies,
warnings: [...reg.warnings, ...warnings],
}, null, 2));
return;
}
log.raw(`registry: ${DEVICES_PATH}`);
log.raw(`fleet: ${FLEET_PATH}${existsSync(FLEET_PATH) ? "" : " (absent — defaults in effect)"}`);
log.raw("");
for (const h of reg.hosts) {
const d = duties.get(h.id) ?? [];
log.raw(`${h.id} [${h.class}] ${h.alwaysOn ? "always-on" : "intermittent"} via ${h.reachable}`);
log.raw(` duties: ${d.length > 0 ? d.join(", ") : "(none)"}`);
log.raw(` api: ${h.api}${h.addr ? ` addr: ${h.addr}` : ""}`);
}
log.raw("");
log.raw(`sources: ${reg.sources.map(s => `${s.id}(${s.kind}: ${s.sharePolicy}/${s.swarmIsolation})`).join(", ")}`);
log.raw(`floor: ${reg.floorCopies} copies`);
for (const w of [...reg.warnings, ...warnings]) log.raw(`${w}`);
}
function cmdDuties(json: boolean): void {
const reg = loadRegistry();
const { duties, warnings } = assignDuties(reg.hosts);
const state = loadFleetState();
const changes = diffDuties(new Map(Object.entries(state.duties)), duties);
if (json) {
log.raw(JSON.stringify({ duties: Object.fromEntries(duties), changes, warnings }, null, 2));
} else {
for (const [id, d] of duties) log.raw(`${id}: ${d.length > 0 ? d.join(", ") : "(none)"}`);
for (const c of changes) log.raw(`Δ ${c}`);
for (const w of warnings) log.raw(`${w}`);
}
state.duties = Object.fromEntries(duties);
saveFleetState(state);
}
function cmdCustody(json: boolean): void {
const reg = loadRegistry();
const { holdings } = gatherHoldings();
if (holdings.length === 0) {
log.raw("no holdings visible (transmission unreachable and no static holdings)");
process.exitCode = 1;
return;
}
const reports = floorCheck(holdings, reg.hosts, reg.floorCopies);
const breaches = reports.filter(r => r.breach);
if (json) {
log.raw(JSON.stringify({ reports }, null, 2));
return;
}
log.raw(`${reports.length} title(s), ${breaches.length} floor breach(es) (floor=${reg.floorCopies})`);
for (const r of reports) {
if (!r.breach && r.actions.length === 0 && r.warnings.length === 0) continue;
log.raw("");
log.raw(`${r.breach ? "✗" : "•"} ${r.title} (${r.completeCopies}/${r.floorCopies} copies; custodians: ${r.custodians.join(", ") || "none"})`);
for (const a of r.actions) log.raw(`${a.kind}: ${a.toHost}${a.fromHost ? `${a.fromHost}` : ""}${a.reason}`);
for (const w of r.warnings) log.raw(`${w}`);
}
}
function cmdReaper(apply: boolean, json: boolean): void {
const reg = loadRegistry();
const tx = transmissionHost(reg.hosts);
if (!tx) {
log.error("no host with api=transmission_rpc in the registry");
process.exit(2);
}
let vitals;
try {
vitals = fetchVitals(tx);
} catch (err) {
log.error(`transmission unreachable: ${err instanceof Error ? err.message : String(err)}`);
process.exit(1);
}
const now = Math.floor(Date.now() / 1000);
const { holdings } = gatherHoldings();
const verdicts = planRecovery(vitals.map(v => classify(v, now)), holdings, tx.id);
const sick = verdicts.filter(v => v.health !== "healthy");
if (json) {
log.raw(JSON.stringify({ verdicts }, null, 2));
} else {
log.raw(`${verdicts.length} torrent(s): `
+ `${verdicts.filter(v => v.health === "healthy").length} healthy, `
+ `${verdicts.filter(v => v.health === "stalled").length} stalled, `
+ `${verdicts.filter(v => v.health === "dead").length} dead`);
for (const v of sick) {
log.raw("");
log.raw(`${v.health === "dead" ? "✗" : "~"} [${v.health}] ${v.name}${v.reason}`);
if (v.recovery) log.raw(`${v.recovery.kind}: ${v.recovery.detail}`);
}
}
if (apply) {
let nudged = 0;
for (const v of sick) {
if (v.infohash === null) continue;
// Only the safe ops run automatically; mesh_recover/research stay plans.
if (v.recovery?.kind === "reannounce" && torrentAction(tx, v.infohash, "reannounce")) nudged++;
if (v.health === "dead" && v.recovery?.kind === "research" && torrentAction(tx, v.infohash, "verify")) nudged++;
}
log.raw("");
log.raw(`applied ${nudged} safe nudge(s) (reannounce/verify); re-pins and re-search stay manual`);
}
}
function cmdPeers(query: string, json: boolean): void {
const reg = loadRegistry();
const { holdings } = gatherHoldings();
// Accept an infohash or a title; resolve a title to its hash via holdings.
const isHash = /^[0-9a-fA-F]{40}$/.test(query) || /^[0-9a-fA-F]{64}$/.test(query);
const match = isHash
? holdings.find(h => h.infohash?.toLowerCase() === query.toLowerCase())
: holdings.find(h => h.title.toLowerCase().includes(query.toLowerCase()));
const infohash = isHash ? query.toLowerCase() : match?.infohash ?? null;
if (infohash === null && !match) {
log.error(`nothing in fleet holdings matches "${query}"`);
process.exit(1);
}
const tx = transmissionHost(reg.hosts);
let livePeers: { address: string; port: number }[] = [];
if (tx && infohash) {
try {
livePeers = fetchLivePeers(tx, infohash);
} catch {
log.warn("live swarm peers unavailable (transmission unreachable)");
}
}
const peers = peersFor({
infohash: infohash ?? "",
title: match?.title ?? null,
sources: reg.sources,
hosts: reg.hosts,
holdings,
livePeers,
});
if (json) {
log.raw(JSON.stringify({ title: match?.title ?? query, infohash, peers }, null, 2));
return;
}
log.raw(`peers_for(${match?.title ?? query}) → ${peers.length} peer(s)`);
for (const p of peers) {
log.raw(` ${p.addr} [${p.sourceKind}/${p.sourceId}] via ${p.servedVia}`);
}
}
export function runFleetCommand(args: string[]): void {
const [sub, ...rest] = args;
const json = rest.includes("--json") || args.includes("--json");
switch (sub) {
case "status": cmdStatus(json); break;
case "duties": cmdDuties(json); break;
case "custody": cmdCustody(json); break;
case "reaper": cmdReaper(rest.includes("--apply"), json); break;
case "peers": {
const query = rest.filter(a => !a.startsWith("--"))[0];
if (!query) { log.error("usage: portable-net-tv fleet peers <infohash|title> [--json]"); process.exit(2); }
cmdPeers(query, json);
break;
}
default:
log.error("usage: portable-net-tv fleet <status|duties|custody|reaper [--apply]|peers <q>> [--json]");
process.exit(2);
}
}

123
governor/src/fleet/peers.ts Normal file
View file

@ -0,0 +1,123 @@
// Peer-source model + peers_for — the user-owned meta-tracker.
//
// peers_for(infohash) unions every PERMITTED source: fleet hosts seedbox
// DHT/public live peers ( private/friend sources once those stages land).
// Every peer is provenance-tagged so a UI can show WHY it exists and over
// what transport.
//
// The two policy gates are the load-bearing part (a private .torrent embeds
// the source user's passkey — a friend announcing it gets the SOURCE banned):
// 1. share_policy gates the registry merge — private_tracker defaults to
// search_only and contributes NO peers until explicitly flipped.
// 2. swarm_isolation is FORCED to f2f_only for private sources, un-overridably:
// private-sourced bytes are only ever served via the fleet's own WireGuard
// swarm, never announced back to the tracker.
import type {
FleetHost, Holding, Peer, ServedVia, Source, SourceKind,
} from "./types.ts";
type RawSource = Partial<Source> & { id: string; kind: SourceKind };
/** Apply the policy gates. This is the only constructor for Source values. */
export function normalizeSource(raw: RawSource): Source {
const isPrivate = raw.kind === "private_tracker";
// Gate 2: f2f_only is forced for private sources — an explicit "open" in the
// config is overridden, not honored. Everything else defaults open.
const swarmIsolation = isPrivate ? "f2f_only" : (raw.swarmIsolation ?? "open");
// Gate 1: private defaults closed (search_only); flipping to content is a
// deliberate, consequence-explicit user action recorded in fleet.json.
const sharePolicy = raw.sharePolicy ?? (isPrivate ? "search_only" : "content");
return {
id: raw.id,
kind: raw.kind,
sharePolicy,
swarmIsolation,
label: raw.label ?? raw.id,
};
}
function servedVia(host: FleetHost, source: Source): ServedVia {
// f2f_only content may ONLY travel the fleet's own wireguard swarm.
if (source.swarmIsolation === "f2f_only") return "wireguard";
return host.reachable === "wireguard" ? "wireguard" : "public";
}
/** A live peer as transmission reports it (torrent-get `peers` field). */
export interface LivePeer {
address: string;
port: number;
}
export interface PeersForInput {
infohash: string;
/** Display title for name-keyed holdings (static holdings carry no hash). */
title: string | null;
sources: Source[];
hosts: FleetHost[];
/** Fleet-wide holdings (transmission + static). */
holdings: Holding[];
/** Live swarm peers from the transmission host, if reachable. */
livePeers: LivePeer[];
}
/** Provenance priority for dedup: a fleet copy beats a random DHT address. */
const KIND_PRIORITY: Record<SourceKind, number> = {
fleet_host: 0,
seedbox: 1,
friend_mesh: 2,
private_tracker: 3,
public_tracker: 4,
dht: 5,
};
export function peersFor(input: PeersForInput): Peer[] {
const { infohash, title, sources, hosts, holdings, livePeers } = input;
const byId = new Map(hosts.map(h => [h.id, h]));
const out: Peer[] = [];
const matches = (h: Holding): boolean =>
h.complete && (h.infohash !== null ? h.infohash === infohash : title !== null && h.title === title);
// Fleet + seedbox holders. The implicit fleet source needs no configuration —
// your own hosts are always a permitted source of your own content.
for (const holding of holdings.filter(matches)) {
const host = byId.get(holding.hostId);
if (!host || host.addr === null) continue;
const kind: SourceKind = host.class === "seedbox" ? "seedbox" : "fleet_host";
const source = sources.find(s => s.kind === kind)
?? normalizeSource({ id: kind, kind, label: kind === "seedbox" ? "seedbox" : "fleet" });
if (source.sharePolicy !== "content") continue; // search_only never serves bytes
out.push({
addr: host.addr,
sourceKind: kind,
sourceId: source.id,
servedVia: servedVia(host, source),
});
}
// DHT/public live peers — only when a dht/public source permits content.
const publicSource = sources.find(s =>
(s.kind === "dht" || s.kind === "public_tracker") && s.sharePolicy === "content");
if (publicSource) {
for (const p of livePeers) {
out.push({
addr: `${p.address}:${p.port}`,
sourceKind: publicSource.kind,
sourceId: publicSource.id,
servedVia: "public",
});
}
}
// Dedup by address, keeping the best provenance (fleet > seedbox > … > dht).
const best = new Map<string, Peer>();
for (const p of out) {
const existing = best.get(p.addr);
if (!existing || KIND_PRIORITY[p.sourceKind] < KIND_PRIORITY[existing.sourceKind]) {
best.set(p.addr, p);
}
}
return [...best.values()].sort((a, b) =>
KIND_PRIORITY[a.sourceKind] - KIND_PRIORITY[b.sourceKind] || a.addr.localeCompare(b.addr));
}

View file

@ -0,0 +1,80 @@
// Zombie reaper — classify every torrent healthy | stalled | dead and propose
// recovery. The mesh prevents FUTURE zombies (custody floor); the reaper deals
// with the already-dying: recover from the mesh first, public re-search as the
// fallback. Classification is pure; actuation lives in the CLI/daemon.
import type { Holding, ReaperVerdict, TorrentVitals } from "./types.ts";
/** No piece activity for this long while incomplete and peerless → stalled. */
export const STALL_AFTER_S = 30 * 60;
/** No piece activity for this long while incomplete and peerless → dead. */
export const DEAD_AFTER_S = 72 * 3600;
export function classify(t: TorrentVitals, nowEpochS: number): ReaperVerdict {
const base = { name: t.name, infohash: t.infohash };
// A complete copy can't be a zombie — it IS the seed. (Stopped-but-complete
// is a policy choice, not a swarm death.)
if (t.percentDone >= 1) {
return { ...base, health: "healthy", reason: "complete", recovery: null };
}
if (t.error !== 0) {
return {
...base, health: "dead",
reason: `client error: ${t.errorString || `code ${t.error}`}`,
recovery: { kind: "research", detail: "tracker/client error — re-source a fresh release" },
};
}
if (t.rateDownloadBps > 0) {
return { ...base, health: "healthy", reason: "downloading", recovery: null };
}
if (t.peersConnected > 0) {
// Connected but not moving — choked or endgame; give it a reannounce nudge.
return {
...base, health: "stalled", reason: "peers connected but no transfer",
recovery: { kind: "reannounce", detail: "connected swarm not sending — reannounce" },
};
}
// Incomplete, no peers, no transfer: age of last activity decides. A torrent
// that has NEVER had activity ages from when it was added.
const lastAlive = t.activityDate > 0 ? t.activityDate : t.addedDate;
const idleS = Math.max(0, nowEpochS - lastAlive);
if (idleS >= DEAD_AFTER_S) {
return {
...base, health: "dead",
reason: `no peers and no activity for ${Math.round(idleS / 3600)}h`,
recovery: { kind: "research", detail: "swarm is a corpse — recover from mesh or re-search" },
};
}
if (idleS >= STALL_AFTER_S) {
return {
...base, health: "stalled",
reason: `no peers for ${Math.round(idleS / 60)}min`,
recovery: { kind: "reannounce", detail: "quiet swarm — reannounce before declaring it dead" },
};
}
return { ...base, health: "healthy", reason: "recently active", recovery: null };
}
/**
* Upgrade dead-torrent recovery to mesh-first: when another fleet host holds
* the title complete, the fix is a fleet copy, not a public re-search.
* `holdings` spans the whole fleet (transmission + static holdings).
*/
export function planRecovery(verdicts: ReaperVerdict[], holdings: Holding[], localHostId: string): ReaperVerdict[] {
return verdicts.map(v => {
if (v.health !== "dead") return v;
const meshHolder = holdings.find(h =>
h.complete && h.hostId !== localHostId
&& (h.infohash !== null && v.infohash !== null ? h.infohash === v.infohash : h.title === v.name));
if (!meshHolder) return v;
return {
...v,
recovery: {
kind: "mesh_recover",
detail: `complete copy on ${meshHolder.hostId} — pull over the fleet, skip the public swarm`,
},
};
});
}

View file

@ -0,0 +1,132 @@
// Minimal transmission view for the fleet engine. Runs the JSON-RPC on the
// transmission host itself over ssh (daemon listens on localhost:9091 there,
// not on the mesh), same approach as mcp/src/transmission/client.ts — kept
// separate because governor and mcp are independent packages by design.
//
// Endpoint selection: the registry's transmission host (api=transmission_rpc)
// supplies the ssh destination; candidates come from its endpoints with the
// usual LAN-before-overlay ordering already baked into devices.json.
import { spawnSync } from "node:child_process";
import type { Holding, TorrentVitals } from "./types.ts";
import type { FleetHost } from "./types.ts";
import { log } from "../log.ts";
const RPC = "localhost:9091";
const SSH_OPTS = ["-o", "ConnectTimeout=8", "-o", "BatchMode=yes"];
function ssh(dest: string, cmd: string): { ok: boolean; out: string } {
const r = spawnSync("ssh", [...SSH_OPTS, dest, cmd], { encoding: "utf8", timeout: 45_000 });
const out = ((r.stdout ?? "") + (r.stderr ?? "")).trim();
return { ok: r.status === 0, out };
}
const VITALS_FIELDS = [
"id", "name", "hashString", "percentDone", "status", "error", "errorString",
"peersConnected", "rateDownload", "rateUpload", "activityDate", "addedDate",
"doneDate", "sizeWhenDone", "downloadDir",
];
interface RpcTorrent {
id: number;
name: string;
hashString: string;
percentDone: number;
status: number;
error: number;
errorString: string;
peersConnected: number;
rateDownload: number;
rateUpload: number;
activityDate: number;
addedDate: number;
doneDate: number;
sizeWhenDone: number;
downloadDir: string;
}
function rpc<T>(host: FleetHost, args: Record<string, unknown>, method = "torrent-get"): T[] {
if (!host.ssh) throw new Error(`transmission host ${host.id} has no ssh destination`);
const url = `http://${RPC}/transmission/rpc`;
const payload = JSON.stringify({ method, arguments: args }).replace(/'/g, "'\\''");
// One round-trip: prime for the session id, then replay it on the real call.
const remote =
`SID=$(curl -s -D - -o /dev/null ${url} | ` +
`awk 'tolower($1)=="x-transmission-session-id:"{gsub(/\\r/,"");print $2}'); ` +
`curl -s -H "X-Transmission-Session-Id: $SID" ${url} -d '${payload}'`;
const { ok, out } = ssh(host.ssh, remote);
if (!ok) throw new Error(`transmission rpc on ${host.id} failed: ${out.slice(0, 200)}`);
let parsed: { arguments?: { torrents?: T[] } };
try {
parsed = JSON.parse(out);
} catch {
throw new Error(`transmission rpc returned non-JSON: ${out.slice(0, 200)}`);
}
return parsed.arguments?.torrents ?? [];
}
/** The registry's torrent host (first with a transmission API), or null. */
export function transmissionHost(hosts: FleetHost[]): FleetHost | null {
return hosts.find(h => h.api === "transmission_rpc") ?? null;
}
export function fetchVitals(host: FleetHost): TorrentVitals[] {
return rpc<RpcTorrent>(host, { fields: VITALS_FIELDS }).map(t => ({
name: t.name,
infohash: t.hashString || null,
percentDone: t.percentDone,
status: t.status,
error: t.error,
errorString: t.errorString,
peersConnected: t.peersConnected,
rateDownloadBps: t.rateDownload,
rateUploadBps: t.rateUpload,
activityDate: t.activityDate,
addedDate: t.addedDate,
}));
}
/** Holdings as transmission sees them: every torrent is a copy on that host. */
export function fetchHoldings(host: FleetHost): Holding[] {
return rpc<RpcTorrent>(host, { fields: VITALS_FIELDS }).map(t => ({
hostId: host.id,
title: t.name,
infohash: t.hashString || null,
complete: t.percentDone >= 1,
completedAt: t.doneDate,
sizeBytes: t.sizeWhenDone || null,
}));
}
export interface LiveSwarmPeer { address: string; port: number }
/** Live swarm peers for one torrent (by infohash) — feeds peers_for's DHT leg. */
export function fetchLivePeers(host: FleetHost, infohash: string): LiveSwarmPeer[] {
const rows = rpc<{ peers?: LiveSwarmPeer[] }>(host, { ids: [infohash], fields: ["peers"] });
return rows[0]?.peers ?? [];
}
/** Safe, idempotent per-torrent nudges used by `fleet reaper --apply`. */
export function torrentAction(host: FleetHost, infohash: string, action: "reannounce" | "verify" | "start"): boolean {
const method = action === "reannounce" ? "torrent-reannounce"
: action === "verify" ? "torrent-verify"
: "torrent-start";
try {
rpc(host, { ids: [infohash] }, method);
return true;
} catch (err) {
log.warn(`${method} failed for ${infohash}: ${err instanceof Error ? err.message : String(err)}`);
return false;
}
}
/** Static holdings from fleet.json → Holding records (no hash, name-keyed). */
export function staticHoldings(map: Record<string, string[]>): Holding[] {
const out: Holding[] = [];
for (const [hostId, titles] of Object.entries(map)) {
for (const title of titles) {
out.push({ hostId, title, infohash: null, complete: true, completedAt: 0, sizeBytes: null });
}
}
return out;
}

View file

@ -13,6 +13,7 @@ import { openPlaylist } from "./vlc.ts";
import { paths } from "./paths.ts";
import { log } from "./log.ts";
import { runGovernor } from "./governor.ts";
import { runFleetCommand } from "./fleet/cli.ts";
import { addJob, loadState as loadGovernorState, saveState as saveGovernorState, setManualPause, setJobPaused } from "./jobs.ts";
const HELP = `portable-net-tv — buffered media streamer
@ -37,6 +38,16 @@ Usage:
portable-net-tv pause-jobs set manual-pause flag (governor suspends all jobs)
portable-net-tv resume-jobs clear manual-pause flag
--- fleet (registry / duties / custody / reaper / peers) ---
portable-net-tv fleet status registry + duty assignment + warnings
portable-net-tv fleet duties assign duties; log changes since last run
portable-net-tv fleet custody floor-check every title; print re-pin plans
portable-net-tv fleet reaper [--apply]
classify torrents healthy|stalled|dead;
--apply runs safe nudges (reannounce/verify)
portable-net-tv fleet peers <q> peers_for(infohash|title) provenance-tagged
(all fleet subcommands accept --json)
Profiles live at ${paths.profilesDir}/<name>.json
State lives at ${paths.stateDir}/<name>.json
`;
@ -229,6 +240,11 @@ switch (cmd) {
break;
}
case "fleet": {
runFleetCommand(rest);
break;
}
case "pause-job": {
const id = requireArg(rest[0], "usage: portable-net-tv pause-job <job-id-or-prefix>");
const matched = setJobPaused(id, true);

View file

@ -0,0 +1,106 @@
import { describe, expect, test } from "bun:test";
import { custodiansOf, floorCheck, floorForTitle } from "../../src/fleet/custody.ts";
import type { FleetHost, Holding } from "../../src/fleet/types.ts";
function host(over: Partial<FleetHost> & { id: string }): FleetHost {
return {
name: over.id,
class: "server",
reachable: "wireguard",
alwaysOn: true,
onHomeIp: true,
api: "none",
addr: null,
ssh: null,
capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 },
...over,
};
}
function holding(over: Partial<Holding> & { hostId: string; title: string }): Holding {
return { infohash: null, complete: true, completedAt: 0, sizeBytes: null, ...over };
}
const fleet: FleetHost[] = [
host({ id: "black", capacity: { diskFreeBytes: 500e9, upBwKbs: null, uptimeScore: 1 } }),
host({ id: "apricot", capacity: { diskFreeBytes: 200e9, upBwKbs: null, uptimeScore: 1 } }),
host({ id: "plum", class: "roamer", alwaysOn: false }),
host({ id: "phone", class: "consumer", alwaysOn: false }),
];
describe("floorForTitle", () => {
test("healthy floor: N most-recent always-on holders become custodians", () => {
const r = floorForTitle("Show A", [
holding({ hostId: "black", title: "Show A", completedAt: 100 }),
holding({ hostId: "apricot", title: "Show A", completedAt: 200 }),
], fleet, 2);
expect(r.breach).toBe(false);
expect(r.custodians).toEqual(["apricot", "black"]);
expect(r.actions).toEqual([]);
});
test("breach: one copy with floor 2 → re-pin to the best eligible non-holder", () => {
const r = floorForTitle("Show B", [
holding({ hostId: "black", title: "Show B", completedAt: 100 }),
], fleet, 2);
expect(r.breach).toBe(true);
expect(r.actions).toHaveLength(1);
const action = r.actions[0]!;
expect(action.kind).toBe("repin");
expect(action.toHost).toBe("apricot"); // always-on, most disk among non-holders
expect(action.fromHost).toBe("black"); // replicate before the last copy dies
});
test("zero surviving copies → re-pin plan flags that the reaper must re-source", () => {
const r = floorForTitle("Show C", [
holding({ hostId: "plum", title: "Show C", complete: false }),
], fleet, 2);
expect(r.completeCopies).toBe(0);
expect(r.breach).toBe(true);
expect(r.actions.every(a => a.fromHost === null)).toBe(true);
expect(r.actions[0]!.reason).toContain("re-source");
});
test("a roamer's copy counts toward copies but a roamer is not obligated", () => {
const r = floorForTitle("Show D", [
holding({ hostId: "plum", title: "Show D", completedAt: 300 }),
holding({ hostId: "black", title: "Show D", completedAt: 100 }),
], fleet, 2);
expect(r.completeCopies).toBe(2); // plum's copy keeps the floor numerically intact
expect(r.breach).toBe(false);
expect(r.custodians).toEqual(["black"]); // …but only black is on the hook
});
test("disk-aware eligibility: a host without room is skipped as re-pin target", () => {
const tight = [
host({ id: "black", capacity: { diskFreeBytes: 500e9, upBwKbs: null, uptimeScore: 1 } }),
host({ id: "small", capacity: { diskFreeBytes: 1e9, upBwKbs: null, uptimeScore: 1 } }),
];
const r = floorForTitle("Big", [
holding({ hostId: "black", title: "Big", sizeBytes: 50e9, completedAt: 10 }),
], tight, 2);
expect(r.breach).toBe(true);
expect(r.actions).toEqual([]); // small can't hold it; no fake plan
expect(r.warnings.some(w => w.includes("no eligible re-pin target"))).toBe(true);
});
});
describe("floorCheck / custodiansOf", () => {
test("groups holdings per title and reports each", () => {
const reports = floorCheck([
holding({ hostId: "black", title: "A" }),
holding({ hostId: "apricot", title: "A" }),
holding({ hostId: "black", title: "B" }),
], fleet, 2);
expect(reports.map(r => r.title)).toEqual(["A", "B"]);
expect(reports[0]!.breach).toBe(false);
expect(reports[1]!.breach).toBe(true);
});
test("custodiansOf returns host records for the floor", () => {
const out = custodiansOf("A", [
holding({ hostId: "black", title: "A", completedAt: 5 }),
], fleet, 1);
expect(out.map(h => h.id)).toEqual(["black"]);
});
});

View file

@ -0,0 +1,90 @@
import { describe, expect, test } from "bun:test";
import { assignDuties, diffDuties } from "../../src/fleet/duties.ts";
import type { FleetHost } from "../../src/fleet/types.ts";
function host(over: Partial<FleetHost> & { id: string }): FleetHost {
return {
name: over.id,
class: "server",
reachable: "home_lan",
alwaysOn: true,
onHomeIp: true,
api: "none",
addr: null,
ssh: null,
capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 },
...over,
};
}
// Natalie's reference fleet from the spec: black=server, apricot=secondary
// always-on, plum=roamer, phone=consumer.
const referenceFleet: FleetHost[] = [
host({ id: "black", class: "server", reachable: "wireguard", api: "transmission_rpc" }),
host({ id: "apricot", class: "server", reachable: "wireguard" }),
host({ id: "plum", class: "roamer", alwaysOn: false }),
host({ id: "phone", class: "consumer", alwaysOn: false }),
];
describe("assignDuties", () => {
test("a consumer never receives any duty", () => {
const { duties } = assignDuties(referenceFleet);
expect(duties.get("phone")).toEqual([]);
});
test("no broadcast without a public_ip always-on host, with warning", () => {
const { duties, warnings } = assignDuties(referenceFleet);
for (const d of duties.values()) expect(d).not.toContain("broadcast");
expect(warnings.some(w => w.includes("broadcast"))).toBe(true);
});
test("broadcast goes to exactly one host, preferring seedbox over server", () => {
const fleet = [
...referenceFleet.map(h => h.id === "black" ? { ...h, reachable: "public_ip" as const } : h),
host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false }),
];
const { duties } = assignDuties(fleet);
const holders = [...duties.entries()].filter(([, d]) => d.includes("broadcast")).map(([id]) => id);
expect(holders).toEqual(["box"]);
});
test("f2f_relay covers always-on mesh-reachable servers, not roamers", () => {
const { duties } = assignDuties(referenceFleet);
expect(duties.get("black")).toContain("f2f_relay");
expect(duties.get("apricot")).toContain("f2f_relay");
expect(duties.get("plum")).not.toContain("f2f_relay");
});
test("public_swarm_face prefers an off-home host; never a consumer", () => {
const fleet = [
...referenceFleet,
host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false }),
];
const { duties, warnings } = assignDuties(fleet);
expect(duties.get("box")).toContain("public_swarm_face");
expect(duties.get("black")).not.toContain("public_swarm_face");
expect(warnings.filter(w => w.includes("home IP"))).toEqual([]);
});
test("on-home face is assigned only as a last resort, with an exposure warning", () => {
const { duties, warnings } = assignDuties(referenceFleet);
const faces = [...duties.entries()].filter(([, d]) => d.includes("public_swarm_face"));
expect(faces.length).toBe(1);
expect(warnings.some(w => w.includes("home IP"))).toBe(true);
});
test("assignment is deterministic and stable across permuted input order", () => {
const a = assignDuties(referenceFleet);
const b = assignDuties([...referenceFleet].reverse());
expect(Object.fromEntries(a.duties)).toEqual(Object.fromEntries(b.duties));
});
});
describe("diffDuties", () => {
test("reports per-host transitions and nothing else", () => {
const prev = new Map([["black", ["f2f_relay" as const]], ["plum", []]]);
const next = new Map([["black", ["f2f_relay" as const, "broadcast" as const]], ["plum", []]]);
const lines = diffDuties(prev, next);
expect(lines).toEqual(["black: f2f_relay → broadcast,f2f_relay"]);
});
});

View file

@ -0,0 +1,112 @@
import { describe, expect, test } from "bun:test";
import { normalizeSource, peersFor } from "../../src/fleet/peers.ts";
import type { FleetHost, Holding, Source } from "../../src/fleet/types.ts";
function host(over: Partial<FleetHost> & { id: string }): FleetHost {
return {
name: over.id,
class: "server",
reachable: "wireguard",
alwaysOn: true,
onHomeIp: true,
api: "none",
addr: over.id + ".fleet",
ssh: null,
capacity: { diskFreeBytes: null, upBwKbs: null, uptimeScore: 1 },
...over,
};
}
const HASH = "c".repeat(40);
function holding(hostId: string, complete = true): Holding {
return { hostId, title: "Show Y", infohash: HASH, complete, completedAt: 1, sizeBytes: null };
}
describe("normalizeSource — the two policy gates", () => {
test("private_tracker is forced to f2f_only even when configured open", () => {
const s = normalizeSource({ id: "ptp", kind: "private_tracker", swarmIsolation: "open" });
expect(s.swarmIsolation).toBe("f2f_only");
});
test("private_tracker defaults to search_only (default-closed)", () => {
expect(normalizeSource({ id: "ptp", kind: "private_tracker" }).sharePolicy).toBe("search_only");
});
test("seedbox defaults to content + open (the zero-risk source)", () => {
const s = normalizeSource({ id: "box", kind: "seedbox" });
expect(s.sharePolicy).toBe("content");
expect(s.swarmIsolation).toBe("open");
});
});
describe("peersFor", () => {
const fleet = [host({ id: "black" }), host({ id: "apricot" })];
const dht: Source = normalizeSource({ id: "dht", kind: "dht" });
test("unions fleet holders with live DHT peers, provenance-tagged", () => {
const peers = peersFor({
infohash: HASH,
title: "Show Y",
sources: [dht],
hosts: fleet,
holdings: [holding("black"), holding("apricot")],
livePeers: [{ address: "203.0.113.9", port: 51413 }],
});
expect(peers.map(p => `${p.sourceKind}:${p.addr}`)).toEqual([
"fleet_host:apricot.fleet",
"fleet_host:black.fleet",
"dht:203.0.113.9:51413",
]);
expect(peers[0]!.servedVia).toBe("wireguard");
expect(peers[2]!.servedVia).toBe("public");
});
test("incomplete holdings contribute no peers", () => {
const peers = peersFor({
infohash: HASH, title: "Show Y", sources: [dht], hosts: fleet,
holdings: [holding("black", false)], livePeers: [],
});
expect(peers).toEqual([]);
});
test("a seedbox holder is tagged as a seedbox source", () => {
const peers = peersFor({
infohash: HASH, title: "Show Y", sources: [dht],
hosts: [host({ id: "box", class: "seedbox", reachable: "public_ip", onHomeIp: false })],
holdings: [holding("box")], livePeers: [],
});
expect(peers[0]!.sourceKind).toBe("seedbox");
});
test("search_only private source never serves peers", () => {
// A future friend's private-sourced copy modeled as a fleet host whose
// matching source is search_only: the registry merge is gated.
const privateSource = normalizeSource({ id: "ptp", kind: "private_tracker" });
const peers = peersFor({
infohash: HASH, title: "Show Y", sources: [privateSource], hosts: fleet,
holdings: [], livePeers: [{ address: "198.51.100.1", port: 1 }],
});
expect(peers).toEqual([]); // no dht source configured either — nothing leaks
});
test("dedup keeps the best provenance for a duplicate address", () => {
const peers = peersFor({
infohash: HASH, title: "Show Y", sources: [dht],
hosts: [host({ id: "black", addr: "203.0.113.9" })],
holdings: [holding("black")],
livePeers: [{ address: "203.0.113.9", port: 0 }],
});
// hosts addr "203.0.113.9" vs live "203.0.113.9:0" differ as strings; force
// the same addr to prove priority wins:
const dup = peersFor({
infohash: HASH, title: "Show Y", sources: [dht],
hosts: [host({ id: "black", addr: "203.0.113.9:0" })],
holdings: [holding("black")],
livePeers: [{ address: "203.0.113.9", port: 0 }],
});
expect(dup).toHaveLength(1);
expect(dup[0]!.sourceKind).toBe("fleet_host");
expect(peers.length).toBe(2);
});
});

View file

@ -0,0 +1,99 @@
import { describe, expect, test } from "bun:test";
import { classify, DEAD_AFTER_S, planRecovery, STALL_AFTER_S } from "../../src/fleet/reaper.ts";
import type { Holding, TorrentVitals } from "../../src/fleet/types.ts";
const NOW = 1_750_000_000;
function vitals(over: Partial<TorrentVitals> & { name: string }): TorrentVitals {
return {
infohash: "a".repeat(40),
percentDone: 0.5,
status: 4,
error: 0,
errorString: "",
peersConnected: 0,
rateDownloadBps: 0,
rateUploadBps: 0,
activityDate: NOW,
addedDate: NOW - 86_400,
...over,
};
}
describe("classify", () => {
test("complete torrents are healthy regardless of swarm state", () => {
const v = classify(vitals({ name: "done", percentDone: 1, peersConnected: 0, activityDate: 0 }), NOW);
expect(v.health).toBe("healthy");
});
test("client error → dead with a re-search proposal", () => {
const v = classify(vitals({ name: "err", error: 3, errorString: "unregistered torrent" }), NOW);
expect(v.health).toBe("dead");
expect(v.recovery?.kind).toBe("research");
});
test("actively downloading → healthy", () => {
expect(classify(vitals({ name: "dl", rateDownloadBps: 1024 }), NOW).health).toBe("healthy");
});
test("peers connected but nothing moving → stalled with reannounce", () => {
const v = classify(vitals({ name: "choked", peersConnected: 4 }), NOW);
expect(v.health).toBe("stalled");
expect(v.recovery?.kind).toBe("reannounce");
});
test("idle past the stall window → stalled; past the dead window → dead", () => {
const stalled = classify(vitals({ name: "s", activityDate: NOW - STALL_AFTER_S - 60 }), NOW);
expect(stalled.health).toBe("stalled");
const dead = classify(vitals({ name: "d", activityDate: NOW - DEAD_AFTER_S - 60 }), NOW);
expect(dead.health).toBe("dead");
});
test("never-active torrents age from addedDate", () => {
const v = classify(vitals({ name: "n", activityDate: 0, addedDate: NOW - DEAD_AFTER_S - 60 }), NOW);
expect(v.health).toBe("dead");
});
test("recently idle → still healthy (no flapping)", () => {
const v = classify(vitals({ name: "fresh", activityDate: NOW - 60 }), NOW);
expect(v.health).toBe("healthy");
});
});
describe("planRecovery", () => {
const deadVerdict = classify(
vitals({ name: "Show X S01", infohash: "b".repeat(40), activityDate: NOW - DEAD_AFTER_S - 1 }),
NOW,
);
test("mesh-first: a complete copy on another fleet host upgrades research → mesh_recover", () => {
const holdings: Holding[] = [
{ hostId: "apricot", title: "Show X S01", infohash: "b".repeat(40), complete: true, completedAt: 1, sizeBytes: null },
];
const [out] = planRecovery([deadVerdict], holdings, "black");
expect(out!.recovery?.kind).toBe("mesh_recover");
expect(out!.recovery?.detail).toContain("apricot");
});
test("a copy on the SAME host doesn't count as mesh recovery", () => {
const holdings: Holding[] = [
{ hostId: "black", title: "Show X S01", infohash: "b".repeat(40), complete: true, completedAt: 1, sizeBytes: null },
];
const [out] = planRecovery([deadVerdict], holdings, "black");
expect(out!.recovery?.kind).toBe("research");
});
test("name-keyed static holdings match when hashes are absent", () => {
const holdings: Holding[] = [
{ hostId: "apricot", title: "Show X S01", infohash: null, complete: true, completedAt: 0, sizeBytes: null },
];
const [out] = planRecovery([deadVerdict], holdings, "black");
expect(out!.recovery?.kind).toBe("mesh_recover");
});
test("healthy verdicts pass through untouched", () => {
const healthy = classify(vitals({ name: "ok", rateDownloadBps: 10 }), NOW);
const [out] = planRecovery([healthy], [], "black");
expect(out).toEqual(healthy);
});
});

View file

@ -0,0 +1,75 @@
import { describe, expect, test } from "bun:test";
import { buildRegistry } from "../../src/fleet/registry.ts";
// The app's devices.json shape (DeviceConfig.swift): devices[] with type +
// services; legacy files use a `hosts` key and may lack `type`.
const devicesFile = {
devices: [
{ id: "plum-vlc", name: "Plum VLC", kind: "vlc", type: "laptop" },
{
id: "black", name: "Black TV", kind: "mpv-ipc", type: "storage",
mpv: { endpoints: ["lilith@10.0.0.11", "lilith@10.9.0.4"] },
},
{ id: "phone", name: "Phone", kind: "quicktime", type: "cellphone" },
],
};
describe("buildRegistry", () => {
test("maps DeviceType → fleet class per the spec", () => {
const reg = buildRegistry(devicesFile, null);
const byId = Object.fromEntries(reg.hosts.map(h => [h.id, h]));
expect(byId["plum-vlc"]!.class).toBe("roamer");
expect(byId["black"]!.class).toBe("server");
expect(byId["phone"]!.class).toBe("consumer");
});
test("class-derived defaults: storage is always-on with a transmission api", () => {
const reg = buildRegistry(devicesFile, null);
const black = reg.hosts.find(h => h.id === "black")!;
expect(black.alwaysOn).toBe(true);
expect(black.api).toBe("transmission_rpc");
const plum = reg.hosts.find(h => h.id === "plum-vlc")!;
expect(plum.alwaysOn).toBe(false);
expect(plum.api).toBe("none");
});
test("reachability inferred from overlay endpoints; addr strips the user part", () => {
const reg = buildRegistry(devicesFile, null);
const black = reg.hosts.find(h => h.id === "black")!;
expect(black.reachable).toBe("wireguard");
expect(black.addr).toBe("10.0.0.11");
});
test("fleet.json overrides win over derivation", () => {
const reg = buildRegistry(devicesFile, {
floorCopies: 3,
devices: { black: { reachable: "public_ip", onHomeIp: false, diskFreeBytes: 42 } },
});
const black = reg.hosts.find(h => h.id === "black")!;
expect(black.reachable).toBe("public_ip");
expect(black.onHomeIp).toBe(false);
expect(black.capacity.diskFreeBytes).toBe(42);
expect(reg.floorCopies).toBe(3);
});
test("legacy `hosts` key and missing type are tolerated", () => {
const reg = buildRegistry({ hosts: [{ id: "old-black", kind: "blacktv" }] }, null);
expect(reg.hosts[0]!.class).toBe("server");
});
test("an implicit dht source is added once; private gates always enforced", () => {
const reg = buildRegistry(devicesFile, {
sources: [{ id: "ptp", kind: "private_tracker", swarmIsolation: "open" }],
});
expect(reg.sources.map(s => s.kind).sort()).toEqual(["dht", "private_tracker"]);
const ptp = reg.sources.find(s => s.kind === "private_tracker")!;
expect(ptp.swarmIsolation).toBe("f2f_only");
expect(ptp.sharePolicy).toBe("search_only");
});
test("missing devices.json degrades to an empty registry with a warning", () => {
const reg = buildRegistry(null, null);
expect(reg.hosts).toEqual([]);
expect(reg.warnings.some(w => w.includes("no device registry"))).toBe(true);
});
});

View file

@ -0,0 +1,86 @@
"""MLX title refiner — the model-backed tail of the filename→metadata pipeline.
The Swift `FilenameParser` handles the overwhelming majority of releases with
regex; it consults this module (via `LocalLLMTitleRefiner`) only when its
extracted title is degenerate (<2 chars) the messy tail: bracket-soup scene
names, foreign-title-first releases, names whose noise markers sit before the
title. Runs a small local MLX model on plum; no network, no API key.
I/O: the raw filename (no directory, extension optional) on argv[1] (or stdin);
prints one JSON object: {"title": "<clean human title>"}. An empty title means
"no confident answer" the caller keeps its regex result.
Degrades gracefully: if MLX/the model is unavailable it prints an empty title,
so the caller never blocks on the model.
Usage:
uv run python -m media_rec.title_refiner '<filename>'
"""
from __future__ import annotations
import json
import re
import sys
MODEL = "mlx-community/Qwen2.5-1.5B-Instruct-4bit"
_NOTHING = {"title": ""}
def _read_filename() -> str:
raw = sys.argv[1] if len(sys.argv) > 1 else sys.stdin.read()
return raw.strip()
def _plausible(title: str, filename: str) -> bool:
"""Reject hallucinated answers: a refined title must be short, mostly
letters, and share at least one alphanumeric token with the filename."""
if not 2 <= len(title) <= 80:
return False
if not re.search(r"[A-Za-z]", title):
return False
file_tokens = {t.lower() for t in re.findall(r"[A-Za-z0-9]{2,}", filename)}
title_tokens = {t.lower() for t in re.findall(r"[A-Za-z0-9]{2,}", title)}
return bool(file_tokens & title_tokens)
def refine(filename: str) -> dict:
if not filename:
return _NOTHING
try:
from mlx_lm import generate, load
except Exception:
return _NOTHING
prompt = (
"This is a media release filename. Extract ONLY the human title of the "
"show or movie — no year, season/episode markers, quality, codec, "
"group tags, or brackets.\n\n"
f"Filename: {filename!r}\n\n"
'Reply with ONLY JSON, no prose: {"title": "<clean title>"}'
)
try:
model, tok = load(MODEL)
text = tok.apply_chat_template([{"role": "user", "content": prompt}],
add_generation_prompt=True)
out = generate(model, tok, text, max_tokens=60)
except Exception:
return _NOTHING
match = re.search(r"\{.*\}", out, re.DOTALL)
if not match:
return _NOTHING
try:
title = str(json.loads(match.group(0)).get("title") or "").strip()
except Exception:
return _NOTHING
return {"title": title} if _plausible(title, filename) else _NOTHING
def main() -> None:
print(json.dumps(refine(_read_filename())))
if __name__ == "__main__":
main()

View file

@ -4,13 +4,18 @@
# no-op when already current. This is how every node EXCEPT the build box (plum)
# gets the app; plum cuts releases with tools/release.sh.
#
# Per-OS behavior (the fleet: iOS, macOS, Ubuntu, Bluefin, Windows):
# Per-OS behavior (the fleet: iOS, Android, macOS, Ubuntu, Bluefin, Windows):
# macOS → TVAnarchy-<tag>.zip → /Applications (admin) or ~/Applications
# Linux → TVAnarchy-<tag>-linux-<arch>.tar.gz → /opt/tv-anarchy (classic,
# writable) or ~/.local/opt/tv-anarchy (non-root; always on
# immutable/ostree systems like Bluefin, whose /usr is read-only)
# Windows → TVAnarchy-<tag>-windows-<arch>.zip → %LOCALAPPDATA%\Programs\TVAnarchy
# (per-user, no elevation; run under Git Bash/MSYS)
# Android → run under Termux (detected before generic Linux — Termux's uname
# says Linux): TVAnarchy-<tag>-android.apk → ~/storage/downloads,
# then handed to the system package installer via termux-open.
# Apps aren't folder-installs on Android, so "installed version" is
# a stamp recorded on successful handoff (--force re-downloads).
# iOS → not served here: no shell. Install via Xcode (TVAnarchyiOS scheme)
# or TestFlight/sideload — see docs/operations.md.
# A release that lacks the asset for this platform fails with the exact missing
@ -30,12 +35,18 @@ REPO="${FORGEJO_REPO:-tv-anarchy}"
FORCE=""; [ "${1:-}" = "--force" ] && FORCE=1
# --- platform --------------------------------------------------------------
case "$(uname -s)" in
Darwin) OS=mac ;;
Linux) OS=linux ;;
MINGW*|MSYS*|CYGWIN*) OS=windows ;;
*) echo "✗ unsupported platform '$(uname -s)' — TVAnarchy ships for macOS, Linux, Windows (iOS via Xcode/TestFlight)." >&2; exit 1 ;;
esac
# Termux first: it reports `Linux` from uname, but Android apps are APKs handed
# to the package installer, not folder installs.
if [ -n "${TERMUX_VERSION:-}" ] || [ -d /data/data/com.termux ]; then
OS=android
else
case "$(uname -s)" in
Darwin) OS=mac ;;
Linux) OS=linux ;;
MINGW*|MSYS*|CYGWIN*) OS=windows ;;
*) echo "✗ unsupported platform '$(uname -s)' — TVAnarchy ships for macOS, Linux, Windows, Android/Termux (iOS via Xcode/TestFlight)." >&2; exit 1 ;;
esac
fi
ARCH="$(uname -m)" # arm64 / x86_64 / aarch64
# True on image-based (ostree/bootc) Linux — Bluefin, Silverblue, etc. Their
@ -55,16 +66,23 @@ resolve_dest() {
else printf '%s/.local/opt/tv-anarchy\n' "$HOME"; fi ;;
windows)
printf '%s/Programs/TVAnarchy\n' "${LOCALAPPDATA:-$HOME/AppData/Local}" ;;
android)
# Shared storage (visible to the package installer / Files app) when the
# Termux storage bridge is set up; else Termux home as a fallback.
if [ -d "$HOME/storage/downloads" ]; then printf '%s/storage/downloads\n' "$HOME"
else printf '%s\n' "$HOME"; fi ;;
esac
}
DEST="$(resolve_dest)"
# The release asset this platform consumes.
# The release asset this platform consumes. (The APK carries all ABIs, so no
# arch suffix on Android.)
asset_name() {
case "$OS" in
mac) printf 'TVAnarchy-%s.zip\n' "$1" ;;
linux) printf 'TVAnarchy-%s-linux-%s.tar.gz\n' "$1" "$ARCH" ;;
windows) printf 'TVAnarchy-%s-windows-%s.zip\n' "$1" "$ARCH" ;;
android) printf 'TVAnarchy-%s-android.apk\n' "$1" ;;
esac
}
@ -100,13 +118,15 @@ fi
# --- compare to the installed copy (releases are tagged v<marketing version>)
# macOS reads the bundle plist; Linux/Windows read the .release-tag stamp this
# script writes on install.
# script writes on install; Android reads the handoff stamp (Termux can't query
# another package's installed version without adb).
ANDROID_STAMP="$HOME/.local/state/tv-anarchy/android-release-tag"
installed_tag() {
if [ "$OS" = mac ]; then
[ -d "$1" ] && printf 'v%s\n' "$(/usr/libexec/PlistBuddy -c 'Print :CFBundleShortVersionString' "$1/Contents/Info.plist" 2>/dev/null || echo '?')"
else
[ -f "$1/.release-tag" ] && cat "$1/.release-tag"
fi
case "$OS" in
mac) [ -d "$1" ] && printf 'v%s\n' "$(/usr/libexec/PlistBuddy -c 'Print :CFBundleShortVersionString' "$1/Contents/Info.plist" 2>/dev/null || echo '?')" ;;
android) [ -f "$ANDROID_STAMP" ] && cat "$ANDROID_STAMP" ;;
*) [ -f "$1/.release-tag" ] && cat "$1/.release-tag" ;;
esac
}
installed="$(installed_tag "$DEST" || true)"; installed="${installed:-none}"; stale=""
if [ "$installed" = "none" ] && [ "$OS" = mac ] && [ -z "${TVANARCHY_DEST:-}" ]; then
@ -133,6 +153,7 @@ curl -fsSL "${auth[@]}" -o "$TMP/asset" "$URL"
mkdir -p "$TMP/unpacked"
case "$WANT" in
*.apk) : ;; # nothing to unpack — the APK is the artifact
*.tar.gz) tar -xzf "$TMP/asset" -C "$TMP/unpacked" ;;
*.zip) if [ "$OS" = mac ]; then ditto -x -k "$TMP/asset" "$TMP/unpacked"
else unzip -q "$TMP/asset" -d "$TMP/unpacked"; fi ;;
@ -158,6 +179,25 @@ case "$OS" in
[ "$OS" = linux ] && chmod +x "$DEST"/tvanarchy* 2>/dev/null || true
relaunch="restart TVAnarchy to pick this up."
;;
android)
# DEST is a download dir, not an install dir: drop the APK in shared
# storage and hand it to the system package installer. Android won't let a
# shell install packages directly (that's by design); the user confirms the
# installer prompt.
apk="$DEST/$WANT"
mv "$TMP/asset" "$apk"
if command -v termux-open >/dev/null; then
termux-open "$apk"
echo "→ handed $WANT to the package installer — confirm the install prompt."
else
echo "→ saved $apk — open it from the Files app to install."
[ -d "$HOME/storage" ] || echo " (tip: run termux-setup-storage so downloads land in shared storage)"
fi
mkdir -p "$(dirname "$ANDROID_STAMP")"
printf '%s\n' "$TAG" > "$ANDROID_STAMP"
DEST="$apk" # the summary line should point at the APK, not the folder
relaunch="reopen TVAnarchy after the installer finishes."
;;
esac
echo "✓ installed $TAG$DEST"