tv-anarchy/Sources/TVAnarchyCore/Library/WatchHistory.swift
Natalie ee3da4e101 perf(watch-history): stop the background poll freezing the main thread
The 8s watch-history poll ran refresh() on the main actor, which read and
JSON-decoded the unioned watch log THREE times (playedPaths, resumePositions,
episodeProgress each re-read) and called MediaPaths.toRemote() per event — and
every toRemote rebuilt ProcessInfo.environment (~22µs each, the whole env dict
is reconstructed on every access) plus a homeDirectory lookup. A live sample
caught the main thread 100% in this path; the app sat at 78–113% CPU.

- Cache MediaPaths.remoteRoot / mappings (process-constant) → kills the
  per-call env-dictionary rebuild storm.
- WatchHistory.derivedState(): read+decode the log ONCE, feed all three
  derived computations → 3× fewer reads/decodes per refresh.
- WatchHistoryController.refreshAsync(): the background poll now parses off the
  main thread on a utility task and only assigns the small results on main.

Settled CPU drops from ~78% sustained to ~0% idle.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 01:44:06 -04:00

458 lines
No EOL
20 KiB
Swift

import Foundation
/// Continue watching from shared watch state:
/// plum log `~/.local/state/tv-anarchy/watched.jsonl` (app, mcp, bridge, governor),
/// black mirror `~/.local/state/tv-anarchy/black-watched.jsonl` (synced TV plays).
public enum WatchHistory {
private struct WatchEvent: Codable {
let ts: String
let event: String
let show: String
let season: Int
let episode: Int
let label: String
let path: String
let resumeSeconds: Double?
let durationSeconds: Double?
let client: String
}
private static let validClients: Set<String> = ["app", "mcp", "bridge", "governor", "black"]
private static var stateOverride: URL? {
guard let dir = ProcessInfo.processInfo.environment["TV_ANARCHY_STATE_DIR"],
!dir.isEmpty else { return nil }
return URL(fileURLWithPath: dir, isDirectory: true)
}
private static func watchlogURL() -> URL {
if let dir = stateOverride { return dir.appendingPathComponent("watched.jsonl") }
return FileManager.default.homeDirectoryForCurrentUser
.appendingPathComponent(".local/state/tv-anarchy/watched.jsonl")
}
static func blackCacheURL() -> URL {
if let dir = stateOverride { return dir.appendingPathComponent("black-watched.jsonl") }
return FileManager.default.homeDirectoryForCurrentUser
.appendingPathComponent(".local/state/tv-anarchy/black-watched.jsonl")
}
static func parseTS(_ s: String, _ plain: ISO8601DateFormatter,
_ fractional: ISO8601DateFormatter) -> Date? {
plain.date(from: s) ?? fractional.date(from: s)
}
static func fractionalFormatter() -> ISO8601DateFormatter {
let f = ISO8601DateFormatter()
f.formatOptions = [.withInternetDateTime, .withFractionalSeconds]
return f
}
/// Read+decode the unioned watch logs **once** and compute every derived set the
/// controller needs. Returns only public result types (so it crosses the type
/// boundary to `WatchHistoryController`); the `[WatchEvent]` array stays private
/// inside this enum. Replaces three separate read+decode passes per refresh.
static func derivedState() -> (played: Set<String>,
resume: [String: Double],
episodes: [String: EpisodeProgress]) {
let events = readWatchlog()
return (playedPaths(from: events),
resumePositions(from: events),
episodeProgress(from: events))
}
public static func continueItems(limit: Int = 24) -> [ContinueItem] {
continueItems(from: readWatchlog(), limit: limit)
}
private static func continueItems(from events: [WatchEvent], limit: Int = 24) -> [ContinueItem] {
var byKey: [String: ContinueItem] = [:]
let iso = ISO8601DateFormatter()
let isoFrac = fractionalFormatter()
for ev in events {
let when = parseTS(ev.ts, iso, isoFrac)
let item = ContinueItem(
title: ev.label.isEmpty ? ev.show : ev.label,
path: ev.path, show: ev.show, season: ev.season, episode: ev.episode,
positionSeconds: ev.resumeSeconds, lastSeen: when, source: ev.client)
if let existing = byKey[ev.path], let e = existing.lastSeen, let w = when, e >= w { continue }
byKey[ev.path] = item
}
let items = byKey.values.sorted { lhs, rhs in
switch (lhs.lastSeen, rhs.lastSeen) {
case let (l?, r?): return l > r
case (_?, nil): return true
case (nil, _?): return false
case (nil, nil): return (lhs.positionSeconds ?? 0) > (rhs.positionSeconds ?? 0)
}
}
return Array(items.prefix(limit))
}
public static func resumePositions() -> [String: Double] {
resumePositions(from: readWatchlog())
}
private static func resumePositions(from events: [WatchEvent]) -> [String: Double] {
var out: [String: Double] = [:]
for item in continueItems(from: events, limit: 2000) {
if let p = item.positionSeconds, p > 1 { out[MediaPaths.toRemote(item.path)] = p }
}
return out
}
/// Per-episode watch progress derived from the latest event carrying a resume position
/// for that (remote) path. Duration is captured when the reporting player knew it
/// (live poll or finish check). Callers treat a path present in playedPaths() as 1.0
/// regardless of stored fraction.
public struct EpisodeProgress: Sendable, Equatable {
public let path: String
public let positionSeconds: Double
public let durationSeconds: Double?
public var fraction: Double? {
guard let d = durationSeconds, d > 0 else { return nil }
let f = positionSeconds / d
return max(0, min(1, f))
}
}
public static func episodeProgress() -> [String: EpisodeProgress] {
episodeProgress(from: readWatchlog())
}
private static func episodeProgress(from events: [WatchEvent]) -> [String: EpisodeProgress] {
var best: [String: WatchEvent] = [:]
let iso = ISO8601DateFormatter()
let isoFrac = fractionalFormatter()
for ev in events where isRealVideo(ev.path) {
guard let pos = ev.resumeSeconds, pos >= 0 else { continue }
let key = MediaPaths.toRemote(ev.path)
if let prev = best[key] {
if let pts = parseTS(ev.ts, iso, isoFrac),
let prevTS = parseTS(prev.ts, iso, isoFrac),
pts <= prevTS { continue }
}
best[key] = ev
}
var out: [String: EpisodeProgress] = [:]
for (k, ev) in best {
if let pos = ev.resumeSeconds {
out[k] = EpisodeProgress(path: k, positionSeconds: pos, durationSeconds: ev.durationSeconds)
}
}
return out
}
public static func playedPaths() -> Set<String> {
playedPaths(from: readWatchlog())
}
private static func playedPaths(from events: [WatchEvent]) -> Set<String> {
var out = Set<String>()
// Compute last reset per show so a rewatch clears the "started" set for
// badges and nextUnwatched while preserving append-only history.
var lastResetByShow: [String: Date] = [:]
let iso = ISO8601DateFormatter()
let isoFrac = fractionalFormatter()
for ev in events where ev.event == "reset" {
if let ts = parseTS(ev.ts, iso, isoFrac) {
let prev = lastResetByShow[ev.show] ?? .distantPast
if ts > prev { lastResetByShow[ev.show] = ts }
}
}
for ev in events where isRealVideo(ev.path) && ev.event == "play" {
let resetTS = lastResetByShow[ev.show] ?? .distantPast
if let pts = parseTS(ev.ts, iso, isoFrac), pts > resetTS {
out.insert(MediaPaths.toRemote(ev.path))
}
}
return out
}
/// Append a reset marker for the show. Subsequent playedPaths() and
/// nextUnwatched calculations for that show will ignore prior "play" events
/// (rewatch starts the badges and "watch next" over). History remains
/// append-only for auditing/continue.
public static func resetWatch(show: String) {
let ev = WatchEvent(
ts: ISO8601DateFormatter().string(from: Date()),
event: "reset",
show: show,
season: 0,
episode: 0,
label: "",
path: "",
resumeSeconds: nil,
durationSeconds: nil,
client: "app"
)
guard let data = try? JSONEncoder().encode(ev),
let line = String(data: data, encoding: .utf8) else { return }
let url = watchlogURL()
try? FileManager.default.createDirectory(at: url.deletingLastPathComponent(), withIntermediateDirectories: true)
if let h = try? FileHandle(forWritingTo: url) {
defer { try? h.close() }
_ = try? h.seekToEnd()
try? h.write(contentsOf: Data((line + "\n").utf8))
} else {
try? (line + "\n").write(to: url, atomically: true, encoding: .utf8)
}
}
public struct ShowProgress: Sendable, Equatable {
public let show: String
public let path: String
public let season: Int
public let episode: Int
public let lastSeen: Date?
}
public static func progressPerShow() -> [ShowProgress] {
progress(from: readWatchlog())
}
private static func progress(from events: [WatchEvent]) -> [ShowProgress] {
let iso = ISO8601DateFormatter()
let isoFrac = fractionalFormatter()
var byShow: [String: [WatchEvent]] = [:]
// Only finished episodes (`play`) advance the per-show frontier. `resume`
// lines track mid-watch position but must not bump continue-watching past
// an episode the user hasn't completed (playlist auto-advance used to log
// every skipped title as `play`).
for ev in events where isRealVideo(ev.path) && ev.event == "play" {
byShow[ev.show, default: []].append(ev)
}
let sustained: TimeInterval = 5 * 60
var out: [ShowProgress] = []
for (show, evs) in byShow {
let sorted = evs.sorted { $0.ts < $1.ts }
var latest: WatchEvent?
for (i, cur) in sorted.enumerated() {
let next = i + 1 < sorted.count ? sorted[i + 1] : nil
let isSustained: Bool
if let next, let cT = parseTS(cur.ts, iso, isoFrac), let nT = parseTS(next.ts, iso, isoFrac) {
isSustained = nT.timeIntervalSince(cT) >= sustained
} else {
isSustained = true
}
if isSustained { latest = cur }
}
guard let l = latest else { continue }
out.append(ShowProgress(show: show, path: l.path, season: l.season,
episode: l.episode, lastSeen: parseTS(l.ts, iso, isoFrac)))
}
return out.sorted { ($0.lastSeen ?? .distantPast) > ($1.lastSeen ?? .distantPast) }
}
static func isRealVideo(_ path: String) -> Bool {
if path.contains(":Zone.Identifier") || path.contains(".mp4:") { return false }
let ext = (path as NSString).pathExtension.lowercased()
return ["mp4", "mkv", "avi", "mov", "m4v", "webm"].contains(ext)
}
/// `event` is `"play"` when the episode finished (or was intentionally started
/// from the top), `"resume"` when the user jumped in mid-way. Continue-watching
/// only advances on `"play"`.
public static func recordPlay(show: String, season: Int, episode: Int,
label: String, path: String, resumeSeconds: Double? = nil,
durationSeconds: Double? = nil,
event: String = "play") {
let ev = WatchEvent(ts: ISO8601DateFormatter().string(from: Date()), event: event,
show: show, season: season, episode: episode,
label: label, path: path, resumeSeconds: resumeSeconds,
durationSeconds: durationSeconds,
client: "app")
guard let data = try? JSONEncoder().encode(ev),
let line = String(data: data, encoding: .utf8) else { return }
let url = watchlogURL()
try? FileManager.default.createDirectory(at: url.deletingLastPathComponent(),
withIntermediateDirectories: true)
if let h = try? FileHandle(forWritingTo: url) {
defer { try? h.close() }
_ = try? h.seekToEnd()
try? h.write(contentsOf: Data((line + "\n").utf8))
} else {
try? (line + "\n").write(to: url, atomically: true, encoding: .utf8)
}
}
private static func readWatchlog() -> [WatchEvent] {
parse(at: watchlogURL()) + parse(at: blackCacheURL())
}
private static func parse(at url: URL) -> [WatchEvent] {
guard let text = try? String(contentsOf: url, encoding: .utf8) else { return [] }
let dec = JSONDecoder()
var out: [WatchEvent] = []
for line in text.split(separator: "\n") {
guard let data = line.data(using: .utf8),
let ev = try? dec.decode(WatchEvent.self, from: data),
validClients.contains(ev.client) else { continue }
out.append(ev)
}
return out
}
}
// Path-only record helper (internal to the module) so live progress updates from the
// player (which only know the path) can append without a full show lookup (readers
// re-resolve show/ep from the library at use time). Also used by controller facade.
extension WatchHistory {
static func recordPlayForPath(path: String, resumeSeconds: Double?, durationSeconds: Double?, event: String) {
let (show, season, episode, label) = parseShowMetaFromPath(path)
recordPlay(show: show, season: season, episode: episode, label: label,
path: path, resumeSeconds: resumeSeconds, durationSeconds: durationSeconds,
event: event)
}
private static func parseShowMetaFromPath(_ path: String) -> (show: String, season: Int, episode: Int, label: String) {
let name = ((path as NSString).lastPathComponent as NSString).deletingPathExtension
let m = name.range(of: "S(\\d{1,2})E(\\d{1,3})", options: .regularExpression)
guard let r = m else { return (name, 0, 0, name) }
let before = name[..<r.lowerBound].trimmingCharacters(in: .whitespaces.union(.punctuationCharacters))
let nums = name[r].dropFirst().split(separator: "E")
let s = Int(nums.first ?? "0") ?? 0
let e = Int(nums.last ?? "0") ?? 0
let show = before.isEmpty ? name : String(before)
let label = name
return (show, s, e, label)
}
}
/// The unified runtime source of truth for watch history, resume positions, per-episode
/// progress (for Netflix-style bars), continue-watching frontier, and played state.
///
/// Static WatchHistory remains the pure log reader/writer + test seam. This controller
/// owns the observable derived state, throttled black sync, and a background poll that
/// picks up external appends (governor, bridge, other devices) so that continue rails,
/// badges, resume menus, and offline cache decisions stay current without every view
/// re-reading files or holding its own @State snapshot.
///
/// Single instance is created in RootView and attached to LibraryController (which
/// derives its continueWatching rail and delegates resume/played/reset through it)
/// and to PlayerController via progress callbacks (live position updates during play
/// and at finish). OfflineCacheController continues to read the derived continue list
/// from LibraryController (no direct dep, keeps layering clean).
@Observable
@MainActor
public final class WatchHistoryController {
public private(set) var playedPaths: Set<String> = []
public private(set) var resumePositions: [String: Double] = [:]
/// Per-episode (remote path) latest known position + duration (when captured at report time).
/// Used for Netflix-style episode progress bars in lists/grids. Fraction is nil when
/// duration unknown; callers should treat membership in playedPaths as 1.0.
public private(set) var episodeProgress: [String: WatchHistory.EpisodeProgress] = [:]
public private(set) var lastRefresh: Date?
private var pollTask: Task<Void, Never>?
private var lastBlackSync = Date.distantPast
private let blackSyncMinInterval: TimeInterval = 60
public init() {
refresh()
}
deinit {
// pollTask is cancelled implicitly on release; explicit cancel here would require main-actor isolation
}
/// Re-parse the unioned watch logs (plum + black mirror) and recompute all derived sets.
/// Reads+decodes the log **once** and feeds it to every derived computation (the old
/// path read and JSON-decoded the whole log three times). Synchronous used right
/// after our own appends so callers see fresh state immediately; the background poll
/// uses `refreshAsync()` instead to keep this work off the main thread.
public func refresh() {
let d = WatchHistory.derivedState()
playedPaths = d.played
resumePositions = d.resume
episodeProgress = d.episodes
lastRefresh = Date()
}
/// Off-main refresh for the background poll: the read+decode+derive runs on a
/// utility thread, and only the (small) result assignment happens on the main
/// actor. This is what stops the 8-second poll from periodically freezing the UI.
public func refreshAsync() async {
let derived = await Task.detached(priority: .utility) {
WatchHistory.derivedState()
}.value
playedPaths = derived.played
resumePositions = derived.resume
episodeProgress = derived.episodes
lastRefresh = Date()
}
/// Append a play/resume event (used for both launch and live position updates).
/// Always refreshes derived state immediately.
public func recordPlay(show: String, season: Int, episode: Int, label: String, path: String,
resumeSeconds: Double? = nil, durationSeconds: Double? = nil,
event: String = "play") {
WatchHistory.recordPlay(show: show, season: season, episode: episode, label: label,
path: path, resumeSeconds: resumeSeconds,
durationSeconds: durationSeconds, event: event)
refresh()
}
/// Record (or update) a mid-watch resume position for a path, with dur if known.
/// This is what keeps resume targets and episode bars live while a show is playing.
public func recordPosition(path: String, resumeSeconds: Double, durationSeconds: Double? = nil) {
guard WatchHistory.isRealVideo(path) else { return }
// We don't have show/season/ep here; the recordPlay overload that takes raw path
// lives in LibraryController (it resolves against current shows). For direct use
// (e.g. live from player) we go through the path-only record that the legacy
// did not expose so delegate to a lightweight path-only writer that still
// appends with client=app and lets the readers resolve show later.
WatchHistory.recordPlayForPath(path: path, resumeSeconds: resumeSeconds,
durationSeconds: durationSeconds, event: "resume")
refresh()
}
/// Append reset marker for a show (rewatch semantics).
public func resetWatch(show: String) {
WatchHistory.resetWatch(show: show)
refresh()
}
/// Pull the black-side watchlog mirror (TV plays). Throttled. Returns true if the
/// cache changed (caller may refreshContinue etc).
@discardableResult
public func syncBlack() async -> Bool {
let now = Date()
if now.timeIntervalSince(lastBlackSync) < blackSyncMinInterval { return false }
lastBlackSync = now
let updated = await Task.detached(priority: .utility) { BlackWatchlog.sync() }.value
if updated { refresh() }
return updated
}
/// Background poller: keeps our derived state fresh from disk so external writers
/// (governor on plum VLC, black sync for TV, mcp/bridge, other install members) are
/// reflected in rails/badges/resume/ cache planning without per-view timers.
public func start() {
pollTask?.cancel()
pollTask = Task { [weak self] in
while !Task.isCancelled {
await self?.refreshAsync()
// Occasional black sync (the call itself throttles internally)
_ = await self?.syncBlack()
try? await Task.sleep(for: .seconds(8))
}
}
}
public func stop() {
pollTask?.cancel()
pollTask = nil
}
public func isRealVideo(_ path: String) -> Bool {
WatchHistory.isRealVideo(path)
}
public func progressPerShow() -> [WatchHistory.ShowProgress] {
WatchHistory.progressPerShow()
}
}