diff --git a/CHANGELOG.md b/CHANGELOG.md index 76cbbed3e..7275277af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Cost usage: replace repeated Foundation metadata/root checks with one portable file-stat pass so expired Codex history refreshes stay responsive on very large session archives (#1392). Thanks @TheAngryPit and @ProspectOre! - Cursor: show the Safari Full Disk Access recovery hint before the long browser login list so permission guidance remains visible when menu errors truncate (#1419, fixes #1417). Thanks @hhh2210! - Cursor: present legacy request-based plans as one Requests quota with the raw used/limit count instead of unrelated token-based Auto/API bars (#1420, fixes #1418). Thanks @hhh2210! +- Cost usage: memoize Codex priority-turn trace metadata incrementally so warm refreshes scan only appended rows instead of rescanning large trace databases (#1404). Thanks @ProspectOre! ## 0.33.0 — 2026-06-11 diff --git a/Sources/CodexBarCore/Generated/CodexParserHash.generated.swift b/Sources/CodexBarCore/Generated/CodexParserHash.generated.swift index dda51b387..4075dacef 100644 --- a/Sources/CodexBarCore/Generated/CodexParserHash.generated.swift +++ b/Sources/CodexBarCore/Generated/CodexParserHash.generated.swift @@ -1,5 +1,5 @@ // Generated by Scripts/regenerate-codex-parser-hash.sh. Do not edit by hand. enum CodexParserHash { - static let value = "41322b25ff12b545" + static let value = "ac951c637b111d71" } diff --git a/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner+CodexPriority.swift b/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner+CodexPriority.swift index 9583a0c1c..678b6da37 100644 --- a/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner+CodexPriority.swift +++ b/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner+CodexPriority.swift @@ -19,6 +19,133 @@ extension CostUsageScanner { .appendingPathComponent("logs_2.sqlite", isDirectory: false) } + #if canImport(SQLite3) + /// Accumulated priority-turn state for one trace database. The `logs` table uses an + /// `INTEGER PRIMARY KEY AUTOINCREMENT` id, so rowids are monotonic and never + /// reused. Codex prunes old rows in place, so source row IDs are retained and cheaply + /// revalidated before each incremental scan. + struct CodexPriorityTurnsMemoState: Codable { + var observationID: UInt64 + var coverageSinceEpoch: Int64 + var lastRowID: Int64 + var fileIdentity: UInt64? + var turns: [String: CodexPriorityTurnMetadata] + var requestSourcesByTurnID: [String: [Int64: CodexPriorityTurnMetadata]] + var priorityCompletedModelsByTurnID: [String: [Int64: String]] + var completedModelsByTurnID: [String: [Int64: String]] + var completedTurnIDInsertionOrder: [String] + var completedTurnIDInsertionOrderStartIndex: Int + } + + /// Completion models for known priority turns are retained with those turns. Completions + /// seen before their request are pending and may belong to non-priority turns, so that + /// separate map is bounded to keep memory constant while preserving ordering. + static let codexPriorityCompletedModelRetentionLimit = 4096 + + private final class CodexPriorityLockedState: @unchecked Sendable { + private let lock = NSLock() + private var state: State + + init(_ state: State) { + self.state = state + } + + func withLock(_ body: (inout State) throws -> Result) rethrows -> Result { + self.lock.lock() + defer { self.lock.unlock() } + return try body(&self.state) + } + } + + private static let codexPriorityTurnsMemo = + CodexPriorityLockedState<[String: CodexPriorityTurnsMemoState]>([:]) + private static let codexPriorityTurnsObservationCounter = CodexPriorityLockedState(0) + + private static func nextCodexPriorityTurnsObservationID() -> UInt64 { + self.codexPriorityTurnsObservationCounter.withLock { + $0 &+= 1 + return $0 + } + } + + /// Scans run outside the lock, so overlapping refreshes can write back out of order. + /// A monotonically increasing observation ID makes the later-started scan authoritative; + /// same-observation test snapshots still use coverage/cursor dominance. + static func storeCodexPriorityTurnsMemoIfNewer( + _ updated: CodexPriorityTurnsMemoState, + forPath path: String) + { + let stored = self.codexPriorityTurnsMemo.withLock { memo in + if let existing = memo[path], + existing.observationID > updated.observationID + { + return false + } + if let existing = memo[path], + existing.observationID == updated.observationID, + existing.fileIdentity == updated.fileIdentity, + existing.coverageSinceEpoch <= updated.coverageSinceEpoch, + existing.lastRowID >= updated.lastRowID + { + return false + } + memo[path] = updated + return true + } + if stored { self.markCodexPriorityTurnsMemoDirty() } + } + + static func _test_resetCodexPriorityTurnsMemo() { + self.codexPriorityTurnsMemo.withLock { $0.removeAll() } + self.codexPriorityTurnsObservationCounter.withLock { $0 = 0 } + } + + static func _test_codexPriorityTurnsMemoState(forPath path: String) -> CodexPriorityTurnsMemoState? { + self.codexPriorityTurnsMemo.withLock { $0[path] } + } + + static func _test_removeCodexPriorityTurnsMemoState(forPath path: String) { + self.codexPriorityTurnsMemo.withLock { $0[path] = nil } + } + + static func invalidateCodexPriorityTurnsMemo(forPath path: String) { + let removed = self.codexPriorityTurnsMemo.withLock { + $0.removeValue(forKey: path) != nil + } + if removed { self.markCodexPriorityTurnsMemoDirty() } + } + + static func _test_accumulateCodexPriorityTurns( + _ db: OpaquePointer?, + into state: inout CodexPriorityTurnsMemoState) -> Bool + { + self.accumulateCodexPriorityTurns(db, into: &state) + } + + static func _test_codexPriorityAccumulationQuery( + _ db: OpaquePointer?, + lastRowID: Int64, + coverageSinceEpoch: Int64) -> String + { + self.codexPriorityAccumulationPlan( + db, + lastRowID: lastRowID, + coverageSinceEpoch: coverageSinceEpoch).query + } + + static func codexPriorityTurnsMemoSnapshot() -> [String: CodexPriorityTurnsMemoState] { + self.codexPriorityTurnsMemo.withLock { $0 } + } + #endif + + /// Resolves priority turn metadata from the codex CLI trace database. The full-table + /// `LIKE` scan over `feedback_log_body` grows with the database (hundreds of megabytes on + /// active machines) and used to run on every refresh past the scan interval. For windows + /// that extend through today — every live refresh — the result is now accumulated per + /// database in process memory and only rows appended since the last call are examined; the + /// database shrinking or being replaced, or the requested window expanding earlier than + /// the accumulated coverage, triggers a full rescan. Windows that end before today keep + /// the original bounded one-shot query so historical lookups never pay an open-ended scan. static func codexPriorityTurns( databaseURL: URL? = nil, sinceDayKey: String? = nil, @@ -28,41 +155,136 @@ extension CostUsageScanner { guard FileManager.default.fileExists(atPath: url.path) else { return [:] } #if canImport(SQLite3) + if let untilDayKey, untilDayKey < CostUsageDayRange.dayKey(from: Date()) { + return self.boundedCodexPriorityTurns( + databaseURL: url, + sinceDayKey: sinceDayKey, + untilDayKey: untilDayKey) + } + + guard let opened = self.openCodexPriorityDatabase(at: url) else { return [:] } + let db = opened.db + let fileIdentity = opened.fileIdentity + defer { sqlite3_close(db) } + sqlite3_busy_timeout(db, 250) + + let observationID = self.nextCodexPriorityTurnsObservationID() + guard let maxRowID = self.maxCodexLogsRowID(db) else { return [:] } + + let requestedSinceEpoch: Int64 = if sinceDayKey != nil || untilDayKey != nil { + self.epochSeconds(forDayKey: sinceDayKey ?? "0000-01-01") ?? 0 + } else { + 0 + } + + var state = self.codexPriorityTurnsMemo.withLock { $0[url.path] } + if let memo = state, + maxRowID < memo.lastRowID + || requestedSinceEpoch < memo.coverageSinceEpoch + || memo.fileIdentity != fileIdentity + { + state = nil + } + var resolved = state ?? CodexPriorityTurnsMemoState( + observationID: observationID, + coverageSinceEpoch: requestedSinceEpoch, + lastRowID: 0, + fileIdentity: fileIdentity, + turns: [:], + requestSourcesByTurnID: [:], + priorityCompletedModelsByTurnID: [:], + completedModelsByTurnID: [:], + completedTurnIDInsertionOrder: [], + completedTurnIDInsertionOrderStartIndex: 0) + resolved.observationID = observationID + + var prunedDeletedSources = false + if state != nil { + var pruned = resolved + guard let didPrune = self.pruneDeletedCodexPrioritySources(db, from: &pruned) else { + return self.filteredResolvedCodexPriorityTurns( + resolved, + sinceDayKey: sinceDayKey, + untilDayKey: untilDayKey) + } + resolved = pruned + prunedDeletedSources = didPrune + } + + if maxRowID > resolved.lastRowID { + var updated = resolved + guard self.accumulateCodexPriorityTurns(db, into: &updated) else { + return self.filteredResolvedCodexPriorityTurns( + resolved, + sinceDayKey: sinceDayKey, + untilDayKey: untilDayKey) + } + updated.lastRowID = maxRowID + self.storeCodexPriorityTurnsMemoIfNewer(updated, forPath: url.path) + resolved = updated + } else if state == nil || prunedDeletedSources { + self.storeCodexPriorityTurnsMemoIfNewer(resolved, forPath: url.path) + } + + return self.filteredResolvedCodexPriorityTurns( + resolved, + sinceDayKey: sinceDayKey, + untilDayKey: untilDayKey) + #else + return [:] + #endif + } + + #if canImport(SQLite3) + private static func filteredResolvedCodexPriorityTurns( + _ state: CodexPriorityTurnsMemoState, + sinceDayKey: String?, + untilDayKey: String?) -> [String: CodexPriorityTurnMetadata] + { + var turns = state.turns + for (turnID, completedModels) in state.priorityCompletedModelsByTurnID { + turns[turnID]?.model = self.latestCodexCompletedModel(completedModels) + } + guard sinceDayKey != nil || untilDayKey != nil else { return turns } + return turns.filter { _, turn in + self.timestamp(turn.timestamp, isInRangeSince: sinceDayKey, until: untilDayKey) + } + } + + private static func latestCodexCompletedModel(_ modelsByRowID: [Int64: String]) -> String? { + modelsByRowID.max { $0.key < $1.key }?.value + } + + /// The pre-memo one-shot query, kept for windows that end before today: both `ts` bounds + /// stay in SQL, so a narrow historical window never scans the database tail. + private static func boundedCodexPriorityTurns( + databaseURL: URL, + sinceDayKey: String?, + untilDayKey: String?) -> [String: CodexPriorityTurnMetadata] + { var db: OpaquePointer? - guard sqlite3_open_v2(url.path, &db, SQLITE_OPEN_READONLY, nil) == SQLITE_OK else { + guard sqlite3_open_v2(databaseURL.path, &db, SQLITE_OPEN_READONLY, nil) == SQLITE_OK else { sqlite3_close(db) return [:] } defer { sqlite3_close(db) } sqlite3_busy_timeout(db, 250) - let query = if sinceDayKey != nil || untilDayKey != nil { - """ - select ts, feedback_log_body - from logs - where ts >= ? and ts < ? - and (feedback_log_body like '%websocket request:%' - or feedback_log_body like '%response.completed%') - """ - } else { - """ - select ts, feedback_log_body - from logs - where feedback_log_body like '%websocket request:%' - or feedback_log_body like '%response.completed%' - """ - } + let query = """ + select ts, feedback_log_body + from logs + where ts >= ? and ts < ? + and (feedback_log_body like '%websocket request:%' + or feedback_log_body like '%response.completed%') + """ var stmt: OpaquePointer? guard sqlite3_prepare_v2(db, query, -1, &stmt, nil) == SQLITE_OK else { return [:] } defer { sqlite3_finalize(stmt) } - - if sinceDayKey != nil || untilDayKey != nil { - let start = self.epochSeconds(forDayKey: sinceDayKey ?? "0000-01-01") ?? 0 - let end = self.epochSeconds(forDayKey: self.nextDayKey(after: untilDayKey ?? "9999-12-30")) - ?? Int64.max - sqlite3_bind_int64(stmt, 1, start) - sqlite3_bind_int64(stmt, 2, end) - } + let start = self.epochSeconds(forDayKey: sinceDayKey ?? "0000-01-01") ?? 0 + let end = self.epochSeconds(forDayKey: self.nextDayKey(after: untilDayKey ?? "9999-12-30")) + ?? Int64.max + sqlite3_bind_int64(stmt, 1, start) + sqlite3_bind_int64(stmt, 2, end) var turns: [String: CodexPriorityTurnMetadata] = [:] var completedModelsByTurnID: [String: String] = [:] @@ -87,11 +309,252 @@ extension CostUsageScanner { turns[parsed.turnID] = parsed } return turns - #else - return [:] - #endif } + private static func maxCodexLogsRowID(_ db: OpaquePointer?) -> Int64? { + var stmt: OpaquePointer? + guard sqlite3_prepare_v2(db, "select max(rowid) from logs", -1, &stmt, nil) == SQLITE_OK + else { return nil } + defer { sqlite3_finalize(stmt) } + guard sqlite3_step(stmt) == SQLITE_ROW else { return nil } + return sqlite3_column_int64(stmt, 0) + } + + static func openCodexPriorityDatabase( + at url: URL, + afterOpen: (() -> Void)? = nil) -> (db: OpaquePointer?, fileIdentity: UInt64)? + { + guard let fileIdentity = self.codexPriorityDatabaseFileIdentity(at: url) else { return nil } + var db: OpaquePointer? + guard sqlite3_open_v2(url.path, &db, SQLITE_OPEN_READONLY, nil) == SQLITE_OK else { + sqlite3_close(db) + return nil + } + afterOpen?() + guard self.codexPriorityDatabaseFileIdentity(at: url) == fileIdentity else { + sqlite3_close(db) + return nil + } + return (db, fileIdentity) + } + + private static func codexPriorityDatabaseFileIdentity(at url: URL) -> UInt64? { + (try? FileManager.default.attributesOfItem(atPath: url.path))?[.systemFileNumber] + .flatMap { $0 as? UInt64 } + } + + private static func pruneDeletedCodexPrioritySources( + _ db: OpaquePointer?, + from state: inout CodexPriorityTurnsMemoState) -> Bool? + { + let sourceRowIDs = state.requestSourcesByTurnID.values.flatMap(\.keys) + + state.priorityCompletedModelsByTurnID.values.flatMap(\.keys) + + state.completedModelsByTurnID.values.flatMap(\.keys) + guard let retainedRowIDs = self.retainedCodexPrioritySourceRowIDs(db, rowIDs: sourceRowIDs) else { + return nil + } + + var didPrune = false + for (turnID, sources) in state.requestSourcesByTurnID { + let retainedSources = sources.filter { retainedRowIDs.contains($0.key) } + guard retainedSources.count != sources.count else { continue } + didPrune = true + if retainedSources.isEmpty { + state.requestSourcesByTurnID.removeValue(forKey: turnID) + state.turns.removeValue(forKey: turnID) + if let completedModels = state.priorityCompletedModelsByTurnID.removeValue(forKey: turnID) { + self.storePendingCodexCompletedModels(completedModels, turnID: turnID, in: &state) + } + } else { + state.requestSourcesByTurnID[turnID] = retainedSources + state.turns[turnID] = retainedSources.max { $0.key < $1.key }?.value + } + } + + didPrune = self.pruneDeletedCodexCompletedModels( + retainedRowIDs: retainedRowIDs, + from: &state.priorityCompletedModelsByTurnID) || didPrune + didPrune = self.pruneDeletedCodexCompletedModels( + retainedRowIDs: retainedRowIDs, + from: &state.completedModelsByTurnID) || didPrune + self.compactCodexPendingCompletionOrderPrefix(in: &state) + state.completedTurnIDInsertionOrder.removeAll { state.completedModelsByTurnID[$0] == nil } + return didPrune + } + + private static func pruneDeletedCodexCompletedModels( + retainedRowIDs: Set, + from modelsByTurnID: inout [String: [Int64: String]]) -> Bool + { + var didPrune = false + for (turnID, modelsByRowID) in modelsByTurnID { + let retainedModels = modelsByRowID.filter { retainedRowIDs.contains($0.key) } + guard retainedModels.count != modelsByRowID.count else { continue } + didPrune = true + if retainedModels.isEmpty { + modelsByTurnID.removeValue(forKey: turnID) + } else { + modelsByTurnID[turnID] = retainedModels + } + } + return didPrune + } + + private static func retainedCodexPrioritySourceRowIDs( + _ db: OpaquePointer?, + rowIDs: [Int64]) -> Set? + { + guard !rowIDs.isEmpty else { return [] } + + var retained: Set = [] + let chunkSize = 500 + for start in stride(from: 0, to: rowIDs.count, by: chunkSize) { + let end = min(start + chunkSize, rowIDs.count) + let chunk = rowIDs[start.. self.codexPriorityCompletedModelRetentionLimit { + let evicted = state.completedTurnIDInsertionOrder[ + state.completedTurnIDInsertionOrderStartIndex, + ] + state.completedTurnIDInsertionOrderStartIndex += 1 + state.completedModelsByTurnID.removeValue(forKey: evicted) + if state.completedTurnIDInsertionOrderStartIndex + >= self.codexPriorityCompletedModelRetentionLimit + { + self.compactCodexPendingCompletionOrderPrefix(in: &state) + } + } + } + state.completedModelsByTurnID[turnID, default: [:]].merge(completedModels) { _, new in new } + } + + private static func compactCodexPendingCompletionOrderPrefix( + in state: inout CodexPriorityTurnsMemoState) + { + guard state.completedTurnIDInsertionOrderStartIndex > 0 else { return } + state.completedTurnIDInsertionOrder.removeFirst(state.completedTurnIDInsertionOrderStartIndex) + state.completedTurnIDInsertionOrderStartIndex = 0 + } + + private static func accumulateCodexPriorityTurns( + _ db: OpaquePointer?, + into state: inout CodexPriorityTurnsMemoState) -> Bool + { + let plan = self.codexPriorityAccumulationPlan( + db, + lastRowID: state.lastRowID, + coverageSinceEpoch: state.coverageSinceEpoch) + var stmt: OpaquePointer? + guard sqlite3_prepare_v2(db, plan.query, -1, &stmt, nil) == SQLITE_OK else { return false } + defer { sqlite3_finalize(stmt) } + if plan.usesTimestampIndex { + sqlite3_bind_int64(stmt, 1, state.coverageSinceEpoch) + } else { + sqlite3_bind_int64(stmt, 1, state.lastRowID) + sqlite3_bind_int64(stmt, 2, state.coverageSinceEpoch) + } + + while true { + let stepResult = sqlite3_step(stmt) + guard stepResult == SQLITE_ROW else { return stepResult == SQLITE_DONE } + let rowID = sqlite3_column_int64(stmt, 0) + let timestamp = self.timestamp(stmt: stmt, index: 1) + guard let body = self.text(stmt: stmt, index: 2) else { continue } + if let completed = self.parseCodexCompletedTraceRow(body: body) { + if state.turns[completed.turnID] != nil { + state.priorityCompletedModelsByTurnID[completed.turnID, default: [:]][rowID] = completed.model + } else { + self.storePendingCodexCompletedModels( + [rowID: completed.model], + turnID: completed.turnID, + in: &state) + } + continue + } + guard let parsed = self.parseCodexPriorityTraceRow(timestamp: timestamp, body: body) + else { continue } + state.turns[parsed.turnID] = parsed + state.requestSourcesByTurnID[parsed.turnID, default: [:]][rowID] = parsed + if let completedModels = state.completedModelsByTurnID.removeValue(forKey: parsed.turnID) { + self.compactCodexPendingCompletionOrderPrefix(in: &state) + state.completedTurnIDInsertionOrder.removeAll { $0 == parsed.turnID } + state.priorityCompletedModelsByTurnID[parsed.turnID] = completedModels + } + } + } + + private static func codexPriorityAccumulationPlan( + _ db: OpaquePointer?, + lastRowID: Int64, + coverageSinceEpoch: Int64) -> (query: String, usesTimestampIndex: Bool) + { + if lastRowID == 0, + coverageSinceEpoch > 0, + self.hasCodexLogsTimestampIndex(db) + { + return ( + """ + select rowid, ts, feedback_log_body + from logs indexed by idx_logs_ts + where ts >= ? + and (feedback_log_body like '%websocket request:%' + or feedback_log_body like '%response.completed%') + order by rowid + """, + true) + } + return ( + """ + select rowid, ts, feedback_log_body + from logs + where rowid > ? and ts >= ? + and (feedback_log_body like '%websocket request:%' + or feedback_log_body like '%response.completed%') + order by rowid + """, + false) + } + + private static func hasCodexLogsTimestampIndex(_ db: OpaquePointer?) -> Bool { + var stmt: OpaquePointer? + let query = """ + select 1 + from sqlite_master + where type = 'index' and tbl_name = 'logs' and name = 'idx_logs_ts' + limit 1 + """ + guard sqlite3_prepare_v2(db, query, -1, &stmt, nil) == SQLITE_OK else { return false } + defer { sqlite3_finalize(stmt) } + return sqlite3_step(stmt) == SQLITE_ROW + } + #endif + static func parseCodexPriorityTraceRow(timestamp: String?, body: String) -> CodexPriorityTurnMetadata? { guard let markerRange = body.range(of: self.requestMarker) else { return nil } let prefix = String(body[.. URL { + let root = cacheRoot + ?? FileManager.default.urls(for: .cachesDirectory, in: .userDomainMask).first! + .appendingPathComponent("CodexBar", isDirectory: true) + return root + .appendingPathComponent("cost-usage", isDirectory: true) + .appendingPathComponent( + "codex-priority-turns-v\(self.artifactVersion).json", + isDirectory: false) + } + + static func currentProducerKey(parserHash: String = CodexParserHash.value) -> String { + "codex:pt:p\(parserHash)" + } + + static func load( + cacheRoot: URL? = nil, + producerKey: String? = nil) -> [String: CostUsageScanner.CodexPriorityTurnsMemoState]? + { + let url = self.artifactURL(cacheRoot: cacheRoot) + guard let data = try? Data(contentsOf: url) else { return nil } + guard let decoded = try? JSONDecoder().decode(CodexPriorityTurnsMemoArtifact.self, from: data) + else { return nil } + guard decoded.version == self.artifactVersion else { return nil } + guard decoded.producerKey == (producerKey ?? self.currentProducerKey()) else { return nil } + return decoded.states + } + + static func save( + states: [String: CostUsageScanner.CodexPriorityTurnsMemoState], + cacheRoot: URL? = nil, + producerKey: String? = nil) + { + let url = self.artifactURL(cacheRoot: cacheRoot) + let dir = url.deletingLastPathComponent() + try? FileManager.default.createDirectory(at: dir, withIntermediateDirectories: true) + + let artifact = CodexPriorityTurnsMemoArtifact( + producerKey: producerKey ?? self.currentProducerKey(), + states: states) + guard let data = try? JSONEncoder().encode(artifact) else { return } + + let tmp = dir.appendingPathComponent(".tmp-\(UUID().uuidString).json", isDirectory: false) + do { + try data.write(to: tmp, options: [.atomic]) + if FileManager.default.fileExists(atPath: url.path) { + _ = try FileManager.default.replaceItemAt(url, withItemAt: tmp) + } else { + try FileManager.default.moveItem(at: tmp, to: url) + } + } catch { + try? FileManager.default.removeItem(at: tmp) + } + } +} + +extension CostUsageScanner { + private final class CodexPriorityDiskLockedState: @unchecked Sendable { + private let lock = NSLock() + private var state: State + + init(_ state: State) { + self.state = state + } + + func withLock(_ body: (inout State) throws -> Result) rethrows -> Result { + self.lock.lock() + defer { self.lock.unlock() } + return try body(&self.state) + } + } + + private static let codexPriorityTurnsMemoDiskState = + CodexPriorityDiskLockedState<(loaded: Bool, dirty: Bool)>((loaded: false, dirty: false)) + + /// Seeds the in-process memo from the persisted artifact once per process, before the + /// first priority-turns scan. Seeding goes through the monotonic store, so a scan that + /// somehow completed first can never be regressed by older on-disk state. + static func loadCodexPriorityTurnsMemoFromDiskIfNeeded(cacheRoot: URL? = nil) { + let shouldLoad = self.codexPriorityTurnsMemoDiskState.withLock { state in + if state.loaded { return false } + state.loaded = true + return true + } + guard shouldLoad, let persisted = CodexPriorityTurnsMemoIO.load(cacheRoot: cacheRoot) + else { return } + for (path, persistedState) in persisted { + var state = persistedState + // Observation IDs only order overlapping scans within one process. + state.observationID = 0 + self.storeCodexPriorityTurnsMemoIfNewer(state, forPath: path) + } + // Seeding marks the memo dirty through the store; the disk already holds this state. + self.codexPriorityTurnsMemoDiskState.withLock { $0.dirty = false } + } + + /// Persists the memo after a scan advanced it. Callers run on the cost-usage scan + /// executor, so the synchronous write stays off the cooperative pool and the main actor. + static func persistCodexPriorityTurnsMemoIfDirty(cacheRoot: URL? = nil) { + let shouldPersist = self.codexPriorityTurnsMemoDiskState.withLock { state in + if !state.dirty { return false } + state.dirty = false + return true + } + guard shouldPersist else { return } + let snapshot = self.codexPriorityTurnsMemoSnapshot() + CodexPriorityTurnsMemoIO.save(states: snapshot, cacheRoot: cacheRoot) + } + + static func markCodexPriorityTurnsMemoDirty() { + self.codexPriorityTurnsMemoDiskState.withLock { $0.dirty = true } + } + + static func _test_resetCodexPriorityTurnsMemoDiskState() { + self.codexPriorityTurnsMemoDiskState.withLock { $0 = (loaded: false, dirty: false) } + } +} +#else +extension CostUsageScanner { + static func loadCodexPriorityTurnsMemoFromDiskIfNeeded(cacheRoot: URL? = nil) {} + static func persistCodexPriorityTurnsMemoIfDirty(cacheRoot: URL? = nil) {} +} +#endif diff --git a/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner.swift b/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner.swift index 41e952afc..6ad99d30b 100644 --- a/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner.swift +++ b/Sources/CodexBarCore/Vendored/CostUsage/CostUsageScanner.swift @@ -2190,10 +2190,20 @@ enum CostUsageScanner { || refreshMs == 0 || cache.lastScanUnixMs == 0 || nowMs - cache.lastScanUnixMs > refreshMs + if shouldInspectPriorityTurns { + Self.loadCodexPriorityTurnsMemoFromDiskIfNeeded(cacheRoot: options.cacheRoot) + if options.forceRescan { + let databaseURL = options.codexTraceDatabaseURL ?? Self.defaultCodexPriorityDatabaseURL() + Self.invalidateCodexPriorityTurnsMemo(forPath: databaseURL.path) + } + } let priorityTurns = shouldInspectPriorityTurns ? Self.codexPriorityTurns( databaseURL: options.codexTraceDatabaseURL, sinceDayKey: range.scanSinceKey, untilDayKey: range.scanUntilKey) : [:] + if shouldInspectPriorityTurns { + Self.persistCodexPriorityTurnsMemoIfDirty(cacheRoot: options.cacheRoot) + } let priorityTurnKeys = Self.codexPriorityTurnKeys(priorityTurns) let priorityTurnIDsByDay = Self.codexPriorityTurnIDsByDay(priorityTurns) let priorityTurnsChanged = shouldInspectPriorityTurns diff --git a/Tests/CodexBarTests/CostUsageScannerCodexPriorityTests.swift b/Tests/CodexBarTests/CostUsageScannerCodexPriorityTests.swift index de842296f..ea394fa5b 100644 --- a/Tests/CodexBarTests/CostUsageScannerCodexPriorityTests.swift +++ b/Tests/CodexBarTests/CostUsageScannerCodexPriorityTests.swift @@ -9,13 +9,14 @@ struct CostUsageScannerCodexPriorityTests { func `parses priority turn metadata without exposing request body`() { let body = "INFO thread_id=11111111-1111-1111-1111-111111111111 " + "turn.id=22222222-2222-2222-2222-222222222222 websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority","instructions":"secret prompt"}"# + + #"{"type":"response.create","model":"request-model","service_tier":"priority","# + + #""instructions":"secret prompt"}"# let parsed = CostUsageScanner.parseCodexPriorityTraceRow(timestamp: "2026-05-10T12:00:00Z", body: body) #expect(parsed?.threadID == "11111111-1111-1111-1111-111111111111") #expect(parsed?.turnID == "22222222-2222-2222-2222-222222222222") - #expect(parsed?.model == "gpt-5.5") + #expect(parsed?.model == "request-model") #expect(parsed?.timestamp == "2026-05-10T12:00:00Z") } @@ -40,12 +41,12 @@ struct CostUsageScannerCodexPriorityTests { @Test func `parses completed response model without exposing response body`() { let body = "INFO thread_id=thread turn.id=turn websocket event: " - + #"{"type":"response.completed","response":{"model":"gpt-5.4","output":[{"content":"private"}]}}"# + + #"{"type":"response.completed","response":{"model":"completed-model","output":[{"content":"private"}]}}"# let parsed = CostUsageScanner.parseCodexCompletedTraceRow(body: body) #expect(parsed?.turnID == "turn") - #expect(parsed?.model == "gpt-5.4") + #expect(parsed?.model == "completed-model") } @Test @@ -58,19 +59,58 @@ struct CostUsageScannerCodexPriorityTests { dbURL: dbURL, timestamp: "2026-05-10T12:00:00Z", body: "thread_id=thread-a turn.id=turn-a websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority","input":"private"}"#) + + #"{"type":"response.create","model":"request-model","service_tier":"priority","input":"private"}"#) try Self.insertTestLog( dbURL: dbURL, timestamp: "2026-05-10T12:01:00Z", body: """ - thread_id=thread-b turn.id=turn-b websocket request: {"type":"response.create","model":"gpt-5.5"} + thread_id=thread-b turn.id=turn-b websocket request: {"type":"response.create","model":"request-model"} """) let turns = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) #expect(turns.keys.sorted() == ["turn-a"]) #expect(turns["turn-a"]?.threadID == "thread-a") - #expect(turns["turn-a"]?.model == "gpt-5.5") + #expect(turns["turn-a"]?.model == "request-model") + } + + @Test + func `cold scan uses timestamp index and warm scan uses rowid cursor`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + + var db: OpaquePointer? + guard sqlite3_open_v2(dbURL.path, &db, SQLITE_OPEN_READONLY, nil) == SQLITE_OK else { + throw SQLiteTestError.open + } + defer { sqlite3_close(db) } + + let coldQuery = CostUsageScanner._test_codexPriorityAccumulationQuery( + db, + lastRowID: 0, + coverageSinceEpoch: 1) + let coldPlan = try Self.queryPlan(db: db, query: coldQuery, bindings: [1]) + #expect(coldPlan.contains { $0.contains("USING INDEX idx_logs_ts") }) + + let unboundedColdQuery = CostUsageScanner._test_codexPriorityAccumulationQuery( + db, + lastRowID: 0, + coverageSinceEpoch: 0) + let unboundedColdPlan = try Self.queryPlan( + db: db, + query: unboundedColdQuery, + bindings: [0, 0]) + #expect(unboundedColdPlan.contains { $0.contains("USING INTEGER PRIMARY KEY") }) + #expect(!unboundedColdPlan.contains { $0.contains("USE TEMP B-TREE") }) + + let warmQuery = CostUsageScanner._test_codexPriorityAccumulationQuery( + db, + lastRowID: 1, + coverageSinceEpoch: 0) + let warmPlan = try Self.queryPlan(db: db, query: warmQuery, bindings: [1, 0]) + #expect(warmPlan.contains { $0.contains("USING INTEGER PRIMARY KEY") }) } @Test @@ -83,16 +123,16 @@ struct CostUsageScannerCodexPriorityTests { dbURL: dbURL, timestamp: "2026-05-10T12:00:00Z", body: "thread_id=thread turn.id=turn websocket request: " - + #"{"type":"response.create","model":"codex-auto-review","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-alias","service_tier":"priority"}"#) try Self.insertTestLog( dbURL: dbURL, timestamp: "2026-05-10T12:00:01Z", body: "thread_id=thread turn.id=turn websocket event: " - + #"{"type":"response.completed","response":{"model":"gpt-5.4","input":"private"}}"#) + + #"{"type":"response.completed","response":{"model":"completed-model","input":"private"}}"#) let turns = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) - #expect(turns["turn"]?.model == "gpt-5.4") + #expect(turns["turn"]?.model == "completed-model") } @Test @@ -105,16 +145,16 @@ struct CostUsageScannerCodexPriorityTests { dbURL: dbURL, timestamp: "2026-05-10T12:00:00Z", body: "thread_id=thread turn.id=turn websocket request: " - + #"{"type":"response.create","model":"codex-auto-review","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-alias","service_tier":"priority"}"#) try Self.insertTestLog( dbURL: dbURL, timestamp: "2026-05-10T12:00:01Z", body: "thread_id=thread turn.id=turn websocket event: " - + #"{"type": "response.completed", "response": {"model": "gpt-5.4"}}"#) + + #"{"type": "response.completed", "response": {"model": "completed-model"}}"#) let turns = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) - #expect(turns["turn"]?.model == "gpt-5.4") + #expect(turns["turn"]?.model == "completed-model") } @Test @@ -130,12 +170,12 @@ struct CostUsageScannerCodexPriorityTests { dbURL: dbURL, timestamp: env.isoString(for: previousDay), body: "thread_id=thread-old turn.id=turn-old websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) try Self.insertTestLog( dbURL: dbURL, timestamp: env.isoString(for: day), body: "thread_id=thread-new turn.id=turn-new websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) let turns = CostUsageScanner.codexPriorityTurns( databaseURL: dbURL, @@ -166,12 +206,12 @@ struct CostUsageScannerCodexPriorityTests { dbURL: dbURL, epochSeconds: Int64(previousSecond.timeIntervalSince1970), body: "thread_id=thread-before turn.id=turn-before websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) try Self.insertTestLog( dbURL: dbURL, epochSeconds: Int64(nextSecond.timeIntervalSince1970), body: "thread_id=thread-after turn.id=turn-after websocket request: " - + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) let turns = CostUsageScanner.codexPriorityTurns( databaseURL: dbURL, @@ -181,11 +221,508 @@ struct CostUsageScannerCodexPriorityTests { #expect(turns.keys.sorted() == ["turn-after"]) } + @Test + func `incremental memo picks up rows appended after the first query`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).keys.sorted() == ["turn-a"]) + + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:05:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:05:01Z", + body: "thread_id=thread-a turn.id=turn-a websocket event: " + + #"{"type":"response.completed","response":{"model":"resolved-model"}}"#) + + let merged = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(merged.keys.sorted() == ["turn-a", "turn-b"]) + // A completed event appended later still upgrades the model of a turn accumulated earlier. + #expect(merged["turn-a"]?.model == "resolved-model") + } + + @Test + func `memo drops pruned requests while ids keep increasing`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).keys.sorted() == ["turn-a", "turn-b"]) + + try Self.execDatabase(dbURL: dbURL, sql: "delete from logs where rowid = 1") + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:02:00Z", + body: "thread_id=thread-c turn.id=turn-c websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + let rebuilt = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(rebuilt.keys.sorted() == ["turn-b", "turn-c"]) + } + + @Test + func `memo drops a pruned completion model without losing its request`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-alias","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:01Z", + body: "thread_id=thread-a turn.id=turn-a websocket event: " + + #"{"type":"response.completed","response":{"model":"resolved-model"}}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL)["turn-a"]?.model == "resolved-model") + + try Self.execDatabase(dbURL: dbURL, sql: "delete from logs where rowid = 2") + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + let pruned = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(pruned["turn-a"]?.model == "request-alias") + #expect(pruned["turn-b"]?.model == "request-model") + } + + @Test + func `memo falls back to retained duplicate request and completion rows`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-old turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-old","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:01Z", + body: "thread_id=thread-new turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-new","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:02Z", + body: "thread_id=thread-old turn.id=turn-a websocket event: " + + #"{"type":"response.completed","response":{"model":"completed-old"}}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:03Z", + body: "thread_id=thread-new turn.id=turn-a websocket event: " + + #"{"type":"response.completed","response":{"model":"completed-new"}}"#) + + let initial = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(initial["turn-a"]?.threadID == "thread-new") + #expect(initial["turn-a"]?.model == "completed-new") + + try Self.execDatabase(dbURL: dbURL, sql: "delete from logs where rowid in (2, 4)") + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + let pruned = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(pruned["turn-a"]?.threadID == "thread-old") + #expect(pruned["turn-a"]?.model == "completed-old") + #expect(pruned["turn-b"]?.model == "request-model") + } + + @Test + func `failed incremental scan does not report completion`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + var db: OpaquePointer? + guard sqlite3_open_v2(dbURL.path, &db, SQLITE_OPEN_READONLY, nil) == SQLITE_OK else { + throw SQLiteTestError.open + } + defer { sqlite3_close(db) } + sqlite3_progress_handler(db, 1, { _ in 1 }, nil) + + var state = CostUsageScanner.CodexPriorityTurnsMemoState( + observationID: 1, + coverageSinceEpoch: 0, + lastRowID: 0, + fileIdentity: nil, + turns: [:], + requestSourcesByTurnID: [:], + priorityCompletedModelsByTurnID: [:], + completedModelsByTurnID: [:], + completedTurnIDInsertionOrder: [], + completedTurnIDInsertionOrderStartIndex: 0) + + #expect(!CostUsageScanner._test_accumulateCodexPriorityTurns(db, into: &state)) + } + + @Test + func `memo rescans when requested window expands earlier than accumulated coverage`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + // Live refreshes always query through today, which is the memoized path. + let today = Date() + let yesterday = try #require(Calendar.current.date(byAdding: .day, value: -1, to: today)) + let todayKey = CostUsageScanner.CostUsageDayRange.dayKey(from: today) + let yesterdayKey = CostUsageScanner.CostUsageDayRange.dayKey(from: yesterday) + let formatter = ISO8601DateFormatter() + try Self.insertTestLog( + dbURL: dbURL, + timestamp: formatter.string(from: yesterday), + body: "thread_id=thread-old turn.id=turn-old websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: formatter.string(from: today), + body: "thread_id=thread-new turn.id=turn-new websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + let narrow = CostUsageScanner.codexPriorityTurns( + databaseURL: dbURL, + sinceDayKey: todayKey, + untilDayKey: todayKey) + #expect(narrow.keys.sorted() == ["turn-new"]) + + let expanded = CostUsageScanner.codexPriorityTurns( + databaseURL: dbURL, + sinceDayKey: yesterdayKey, + untilDayKey: todayKey) + #expect(expanded.keys.sorted() == ["turn-new", "turn-old"]) + } + + @Test + func `memo rescans when the database shrinks or is replaced`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).count == 2) + + try FileManager.default.removeItem(at: dbURL) + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-11T09:00:00Z", + body: "thread_id=thread-c turn.id=turn-c websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + + let replaced = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(replaced.keys.sorted() == ["turn-c"]) + } + + @Test + func `database replacement during open is rejected`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + let oldURL = env.root.appendingPathComponent("logs-old.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + var replacementError: Error? + + let opened = CostUsageScanner.openCodexPriorityDatabase(at: dbURL) { + do { + try FileManager.default.moveItem(at: dbURL, to: oldURL) + try Self.createTestLogsDatabase(at: dbURL) + } catch { + replacementError = error + } + } + if let opened { + sqlite3_close(opened.db) + } + + #expect(replacementError == nil) + #expect(opened == nil) + } + + @Test + func `overlapping refresh writeback cannot replace newer memo state`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"request-model","service_tier":"priority"}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).count == 2) + let stored = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + + // A slower overlapping refresh writes back a snapshot read before the second row was + // appended: an older cursor that only observed the first turn. It must not win. + var stale = stored + stale.lastRowID -= 1 + stale.turns = stored.turns.filter { $0.key == "turn-a" } + CostUsageScanner.storeCodexPriorityTurnsMemoIfNewer(stale, forPath: dbURL.path) + + let retained = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + #expect(retained.lastRowID == stored.lastRowID) + #expect(retained.turns.keys.sorted() == ["turn-a", "turn-b"]) + + // A snapshot with a newer cursor still replaces the stored state. + var newer = stored + newer.lastRowID += 1 + CostUsageScanner.storeCodexPriorityTurnsMemoIfNewer(newer, forPath: dbURL.path) + #expect( + CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)? + .lastRowID == stored.lastRowID + 1) + + // A full rescan that expanded coverage earlier than the stored window also replaces, + // even when its cursor is not ahead, so broader history is never discarded. + var broader = stored + broader.coverageSinceEpoch -= 1 + CostUsageScanner.storeCodexPriorityTurnsMemoIfNewer(broader, forPath: dbURL.path) + #expect( + CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)? + .coverageSinceEpoch == broader.coverageSinceEpoch) + } + + @Test + func `memo bounds retained completion metadata for non-priority turns`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + let limit = CostUsageScanner.codexPriorityCompletedModelRetentionLimit + let overflow = limit + 8 + let epoch = Self.epochSeconds("2026-05-10T12:00:00Z") + + // A known priority turn keeps its resolved completion outside the bounded pending + // cache while thousands of unrelated completions flow through the process. + var rows = [ + ( + epochSeconds: epoch, + body: "thread_id=priority turn.id=priority websocket request: " + + #"{"type":"response.create","model":"priority-alias","service_tier":"priority"}"#), + ( + epochSeconds: epoch, + body: "thread_id=priority turn.id=priority websocket event: " + + #"{"type":"response.completed","response":{"model":"resolved-model"}}"#), + ] + rows.append(contentsOf: (0..<(limit + overflow)).map { index in + ( + epochSeconds: epoch, + body: "thread_id=thread-\(index) turn.id=turn-\(index) websocket event: " + + #"{"type":"response.completed","response":{"model":"completed-model"}}"#) + }) + rows.append(( + epochSeconds: epoch, + body: "thread_id=thread-0 turn.id=turn-0 websocket request: " + + #"{"type":"response.create","model":"alias-evicted","service_tier":"priority"}"#)) + let newest = limit + overflow - 1 + rows.append(( + epochSeconds: epoch, + body: "thread_id=thread-\(newest) turn.id=turn-\(newest) " + + "websocket request: " + + #"{"type":"response.create","model":"alias-retained","service_tier":"priority"}"#)) + try Self.insertTestLogs(dbURL: dbURL, rows: rows) + + let turns = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + + let memo = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + #expect(memo.completedModelsByTurnID.count == limit - 1) + #expect( + memo.completedTurnIDInsertionOrder.count + - memo.completedTurnIDInsertionOrderStartIndex == limit - 1) + #expect(memo.completedTurnIDInsertionOrder.count < limit * 2) + #expect(memo.priorityCompletedModelsByTurnID.count == 2) + // The oldest completions were evicted, so the early request keeps its alias; the + // recent completion is still retained and upgrades its request. + #expect(turns["priority"]?.model == "resolved-model") + #expect(turns["turn-0"]?.model == "alias-evicted") + #expect(turns["turn-\(newest)"]?.model == "completed-model") + } + + @Test + func `persisted memo survives a simulated relaunch and keeps refreshes incremental`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try Self.createTestLogsDatabase(at: dbURL) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:00:00Z", + body: "thread_id=thread-a turn.id=turn-a websocket request: " + + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:01:00Z", + body: "thread_id=thread-b turn.id=turn-b websocket request: " + + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).count == 2) + let scanned = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + + CostUsageScanner.persistCodexPriorityTurnsMemoIfDirty(cacheRoot: env.root) + #expect(FileManager.default.fileExists( + atPath: CodexPriorityTurnsMemoIO.artifactURL(cacheRoot: env.root).path)) + var persisted = try #require(CodexPriorityTurnsMemoIO.load(cacheRoot: env.root)) + persisted[dbURL.path]?.observationID = UInt64.max + CodexPriorityTurnsMemoIO.save(states: persisted, cacheRoot: env.root) + + // Simulated relaunch: in-process state is gone, the artifact remains. + CostUsageScanner._test_removeCodexPriorityTurnsMemoState(forPath: dbURL.path) + CostUsageScanner._test_resetCodexPriorityTurnsMemoDiskState() + + CostUsageScanner.loadCodexPriorityTurnsMemoFromDiskIfNeeded(cacheRoot: env.root) + let seeded = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + #expect(seeded.observationID == 0) + #expect(seeded.lastRowID == scanned.lastRowID) + #expect(seeded.turns.keys.sorted() == ["turn-a", "turn-b"]) + + // The next refresh resumes from the persisted cursor instead of rescanning. + try Self.insertTestLog( + dbURL: dbURL, + timestamp: "2026-05-10T12:02:00Z", + body: "thread_id=thread-c turn.id=turn-c websocket request: " + + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"#) + let turns = CostUsageScanner.codexPriorityTurns(databaseURL: dbURL) + #expect(turns.keys.sorted() == ["turn-a", "turn-b", "turn-c"]) + let advanced = try #require(CostUsageScanner._test_codexPriorityTurnsMemoState(forPath: dbURL.path)) + #expect(advanced.observationID > 0) + #expect(advanced.lastRowID == scanned.lastRowID + 1) + } + + @Test + func `persisted memo from a different parser hash or version is discarded`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let state = CostUsageScanner.CodexPriorityTurnsMemoState( + observationID: 7, + coverageSinceEpoch: 0, + lastRowID: 7, + fileIdentity: 42, + turns: [:], + requestSourcesByTurnID: [:], + priorityCompletedModelsByTurnID: [:], + completedModelsByTurnID: [:], + completedTurnIDInsertionOrder: [], + completedTurnIDInsertionOrderStartIndex: 0) + + CodexPriorityTurnsMemoIO.save(states: ["/tmp/db": state], cacheRoot: env.root, producerKey: "codex:pt:pstale") + #expect(CodexPriorityTurnsMemoIO.load(cacheRoot: env.root) == nil) + + CodexPriorityTurnsMemoIO.save(states: ["/tmp/db": state], cacheRoot: env.root) + #expect(CodexPriorityTurnsMemoIO.load(cacheRoot: env.root)?["/tmp/db"]?.lastRowID == 7) + #expect(CodexPriorityTurnsMemoIO.load(cacheRoot: env.root, producerKey: "codex:pt:pother") == nil) + } + + @Test + func `corrupted persisted memo artifact is ignored`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + let url = CodexPriorityTurnsMemoIO.artifactURL(cacheRoot: env.root) + try FileManager.default.createDirectory( + at: url.deletingLastPathComponent(), + withIntermediateDirectories: true) + try Data("not json".utf8).write(to: url) + + #expect(CodexPriorityTurnsMemoIO.load(cacheRoot: env.root) == nil) + } + + static func insertTestLogs(dbURL: URL, rows: [(epochSeconds: Int64, body: String)]) throws { + var db: OpaquePointer? + guard sqlite3_open(dbURL.path, &db) == SQLITE_OK else { throw SQLiteTestError.open } + defer { sqlite3_close(db) } + + var stmt: OpaquePointer? + guard sqlite3_prepare_v2(db, "insert into logs (ts, feedback_log_body) values (?, ?)", -1, &stmt, nil) + == SQLITE_OK + else { throw SQLiteTestError.prepare } + defer { sqlite3_finalize(stmt) } + + try self.exec(db, "begin transaction") + let transient = unsafeBitCast(-1, to: sqlite3_destructor_type.self) + for row in rows { + sqlite3_bind_int64(stmt, 1, row.epochSeconds) + sqlite3_bind_text(stmt, 2, row.body, -1, transient) + guard sqlite3_step(stmt) == SQLITE_DONE else { throw SQLiteTestError.step } + sqlite3_reset(stmt) + } + try self.exec(db, "commit") + } + static func createTestLogsDatabase(at dbURL: URL) throws { var db: OpaquePointer? guard sqlite3_open(dbURL.path, &db) == SQLITE_OK else { throw SQLiteTestError.open } defer { sqlite3_close(db) } - try self.exec(db, "create table logs (ts integer not null, feedback_log_body text)") + try self.exec( + db, + "create table logs (id integer primary key autoincrement, ts integer not null, feedback_log_body text)") + try self.exec(db, "create index idx_logs_ts on logs(ts desc, id desc)") + } + + static func queryPlan( + db: OpaquePointer?, + query: String, + bindings: [Int64]) throws -> [String] + { + var stmt: OpaquePointer? + guard sqlite3_prepare_v2(db, "explain query plan \(query)", -1, &stmt, nil) == SQLITE_OK else { + throw SQLiteTestError.prepare + } + defer { sqlite3_finalize(stmt) } + for (offset, value) in bindings.enumerated() { + sqlite3_bind_int64(stmt, Int32(offset + 1), value) + } + + var details: [String] = [] + while sqlite3_step(stmt) == SQLITE_ROW { + if let detail = sqlite3_column_text(stmt, 3) { + details.append(String(cString: detail)) + } + } + return details } static func insertTestLog(dbURL: URL, timestamp: String, body: String) throws { @@ -209,6 +746,13 @@ struct CostUsageScannerCodexPriorityTests { guard sqlite3_step(stmt) == SQLITE_DONE else { throw SQLiteTestError.step } } + private static func execDatabase(dbURL: URL, sql: String) throws { + var db: OpaquePointer? + guard sqlite3_open(dbURL.path, &db) == SQLITE_OK else { throw SQLiteTestError.open } + defer { sqlite3_close(db) } + try self.exec(db, sql) + } + private static func epochSeconds(_ timestamp: String) -> Int64 { let formatter = ISO8601DateFormatter() formatter.formatOptions = [.withInternetDateTime] diff --git a/Tests/CodexBarTests/CostUsageScannerPriorityTests.swift b/Tests/CodexBarTests/CostUsageScannerPriorityTests.swift index 8bdaaf5e2..7a4e60048 100644 --- a/Tests/CodexBarTests/CostUsageScannerPriorityTests.swift +++ b/Tests/CodexBarTests/CostUsageScannerPriorityTests.swift @@ -1,9 +1,59 @@ import Foundation #if canImport(SQLite3) +import SQLite3 import Testing @testable import CodexBarCore struct CostUsageScannerPriorityTests { + private enum SQLiteMutationError: Error { + case open + case prepare + case step + } + + @Test + func `forced codex rescan discards the priority memo cursor`() throws { + let env = try CostUsageTestEnvironment() + defer { env.cleanup() } + + let day = try env.makeLocalNoon(year: 2026, month: 5, day: 10) + let iso0 = env.isoString(for: day) + let iso1 = env.isoString(for: day.addingTimeInterval(1)) + let entries: [[String: Any]] = [ + ["type": "turn_context", "timestamp": iso0, "payload": ["model": "gpt-5.5"]], + ["type": "event_msg", "timestamp": iso1, "payload": ["type": "task_started", "turn_id": "priority-turn"]], + self.tokenCount(timestamp: iso1, input: 100, cached: 20, output: 10), + ] + _ = try env.writeCodexSessionFile(day: day, filename: "session.jsonl", contents: env.jsonl(entries)) + + let dbURL = env.root.appendingPathComponent("logs_2.sqlite") + try CostUsageScannerCodexPriorityTests.createTestLogsDatabase(at: dbURL) + try CostUsageScannerCodexPriorityTests.insertTestLog( + dbURL: dbURL, + timestamp: iso1, + body: "thread_id=thread turn.id=priority-turn websocket request: " + + #"{"type":"response.create","model":"gpt-5.5","service_tier":"default"}"#) + #expect(CostUsageScanner.codexPriorityTurns(databaseURL: dbURL).isEmpty) + + try self.replaceOnlyTraceBodyWithPriority(dbURL: dbURL) + + var options = CostUsageScanner.Options( + codexSessionsRoot: env.codexSessionsRoot, + cacheRoot: env.cacheRoot, + codexTraceDatabaseURL: dbURL, + forceRescan: true) + options.refreshMinIntervalSeconds = 0 + let report = CostUsageScanner.loadDailyReport( + provider: .codex, + since: day, + until: day, + now: day, + options: options) + let priorityCost = (80.0 * 1.25e-5) + (20.0 * 1.25e-6) + (10.0 * 7.5e-5) + + #expect(report.summary?.totalCostUSD == priorityCost) + } + @Test func `codex daily report applies gpt55 priority rates`() throws { let env = try CostUsageTestEnvironment() @@ -676,5 +726,22 @@ struct CostUsageScannerPriorityTests { body: "thread_id=thread turn.id=priority-turn websocket request: " + #"{"type":"response.create","model":""# + model + #"","service_tier":"priority"}"#) } + + private func replaceOnlyTraceBodyWithPriority(dbURL: URL) throws { + var db: OpaquePointer? + guard sqlite3_open(dbURL.path, &db) == SQLITE_OK else { throw SQLiteMutationError.open } + defer { sqlite3_close(db) } + + let body = "thread_id=thread turn.id=priority-turn websocket request: " + + #"{"type":"response.create","model":"gpt-5.5","service_tier":"priority"}"# + var statement: OpaquePointer? + guard sqlite3_prepare_v2(db, "update logs set feedback_log_body = ? where id = 1", -1, &statement, nil) + == SQLITE_OK + else { throw SQLiteMutationError.prepare } + defer { sqlite3_finalize(statement) } + + sqlite3_bind_text(statement, 1, body, -1, unsafeBitCast(-1, to: sqlite3_destructor_type.self)) + guard sqlite3_step(statement) == SQLITE_DONE else { throw SQLiteMutationError.step } + } } #endif