diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 80486fe..2682f62 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -66,6 +66,11 @@ import { import packageJson from "../package.json"; import {isJetBrains2026_1Client} from "./JBUtils"; import {resolveTerminalOutputMode, type TerminalOutputMode} from "./TerminalOutputMode"; +import { + createAgentTextMessageChunk, + createAgentTextThoughtChunk, + createUserMessageChunk, +} from "./ContentChunks"; export interface ThreadGoalSnapshot { objective: string; @@ -966,6 +971,7 @@ export class CodexAcpServer { case "agentMessage": return [{ sessionUpdate: "agent_message_chunk", + messageId: item.id, content: { type: "text", text: item.text }, }]; case "reasoning": @@ -1005,13 +1011,11 @@ export class CodexAcpServer { private createUserMessageUpdates(item: ThreadItem & { type: "userMessage" }): UpdateSessionEvent[] { const updates: UpdateSessionEvent[] = []; + const messageId = item.id; for (const input of item.content) { const blocks = this.userInputToContentBlocks(input); for (const block of blocks) { - updates.push({ - sessionUpdate: "user_message_chunk", - content: block, - }); + updates.push(createUserMessageChunk(block, messageId)); } } return updates; @@ -1019,10 +1023,8 @@ export class CodexAcpServer { private createReasoningUpdates(item: ThreadItem & { type: "reasoning" }): UpdateSessionEvent[] { const parts = item.summary.length > 0 ? item.summary : item.content; - return parts.map((text) => ({ - sessionUpdate: "agent_thought_chunk", - content: { type: "text", text: text }, - })); + const messageId = item.id; + return parts.map((text) => createAgentTextThoughtChunk(text, messageId)); } private createWebSearchUpdate( @@ -1589,13 +1591,7 @@ export class CodexAcpServer { } await this.connection.notify(acp.methods.client.session.update, { sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Conversation interrupted*" - } - } + update: createAgentTextMessageChunk("*Conversation interrupted*"), }); } @@ -1674,26 +1670,33 @@ function mergeHistoryUpdates( merged.push(update); }; - const flushFallbackThrough = (targetKey: string): boolean => { + const flushFallbackBeforeMatchingDuplicate = (targetUpdate: UpdateSessionEvent): void => { + const targetKey = historyUpdateKey(targetUpdate); + const targetContentKey = historyUpdateContentKey(targetUpdate); + if (!targetKey && !targetContentKey) { + return; + } + const matchIndex = responseItemFallbackUpdates.findIndex((update, index) => ( - index >= fallbackIndex && historyUpdateKey(update) === targetKey + index >= fallbackIndex + && ( + (targetKey !== null && historyUpdateKey(update) === targetKey) + || (targetContentKey !== null && historyUpdateContentKey(update) === targetContentKey) + ) )); if (matchIndex === -1) { - return false; + return; } - while (fallbackIndex <= matchIndex) { + while (fallbackIndex < matchIndex) { pushUpdate(responseItemFallbackUpdates[fallbackIndex]!); fallbackIndex += 1; } - return true; + fallbackIndex += 1; }; for (const update of threadUpdates) { - const key = historyUpdateKey(update); - if (key && flushFallbackThrough(key)) { - continue; - } + flushFallbackBeforeMatchingDuplicate(update); pushUpdate(update); } @@ -1710,7 +1713,7 @@ function historyUpdateKey(update: UpdateSessionEvent): string | null { case "user_message_chunk": case "agent_message_chunk": case "agent_thought_chunk": - return `${update.sessionUpdate}:${JSON.stringify(update.content)}`; + return `${update.sessionUpdate}:${update.messageId ?? ""}:${JSON.stringify(update.content)}`; case "tool_call": return `tool_call:${update.toolCallId}:start`; case "tool_call_update": @@ -1720,6 +1723,17 @@ function historyUpdateKey(update: UpdateSessionEvent): string | null { } } +function historyUpdateContentKey(update: UpdateSessionEvent): string | null { + switch (update.sessionUpdate) { + case "user_message_chunk": + case "agent_message_chunk": + case "agent_thought_chunk": + return `${update.sessionUpdate}:${JSON.stringify(update.content)}`; + default: + return historyUpdateKey(update); + } +} + function getRequestedMcpServerNames(mcpServers: Array): Array { return Array.from(new Set(mcpServers.map(server => sanitizeMcpServerName(server.name)))); } diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index bd3c81e..57df6e9 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -7,6 +7,7 @@ import type {SessionState} from "./CodexAcpServer"; import type {RateLimitsMap} from "./RateLimitsMap"; import type {TokenCount} from "./TokenCount"; import {logger} from "./Logger"; +import {createAgentTextMessageChunk} from "./ContentChunks"; type ParsedSlashCommand = { name: string; @@ -210,20 +211,14 @@ export class CodexCommands { case "status": { const session = new ACPSessionConnection(this.connection, sessionId); const message = this.buildStatusMessage(sessionState); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: message } - }); + await session.update(createAgentTextMessageChunk(message)); return { handled: true }; } case "logout": { await this.runWithProcessCheck(() => this.codexAcpClient.logout()); await this.onLogout(); const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "Logged out from Codex account." } - }); + await session.update(createAgentTextMessageChunk("Logged out from Codex account.")); return { handled: true }; } case "skills": { @@ -237,10 +232,7 @@ export class CodexCommands { ? ["Available skills:", ...lines].join("\n") : "No skills configured."; const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { type: "text", text } - }); + await session.update(createAgentTextMessageChunk(text)); return { handled: true }; } case "mcp": { @@ -258,10 +250,7 @@ export class CodexCommands { ? ["Configured MCP servers:", ...lines].join("\n") : "No MCP servers configured."; const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { type: "text", text } - }); + await session.update(createAgentTextMessageChunk(text)); return { handled: true }; } default: @@ -316,13 +305,7 @@ export class CodexCommands { if (argument.length > 4000) { const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: 'Command "/goal" requires goal text of at most 4000 characters.' - } - }); + await session.update(createAgentTextMessageChunk('Command "/goal" requires goal text of at most 4000 characters.')); return { handled: true }; } @@ -371,13 +354,7 @@ export class CodexCommands { private async sendCommandUsageMessage(name: string, inputHint: string, sessionId: string): Promise { const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: `Command "/${name}" requires ${inputHint}.` - } - }); + await session.update(createAgentTextMessageChunk(`Command "/${name}" requires ${inputHint}.`)); } private async sendUnknownCommandMessage(name: string, sessionId: string): Promise { @@ -390,10 +367,7 @@ export class CodexCommands { text.push(...lines); } const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: text.join("\n") } - }); + await session.update(createAgentTextMessageChunk(text.join("\n"))); } private buildStatusMessage(sessionState: SessionState): string { diff --git a/src/CodexEventHandler.ts b/src/CodexEventHandler.ts index 4ed0cdb..9f61574 100644 --- a/src/CodexEventHandler.ts +++ b/src/CodexEventHandler.ts @@ -55,6 +55,10 @@ import { } from "./CodexToolCallMapper"; import { stripShellPrefix } from "./CommandUtils"; import {createTerminalOutputMeta, type TerminalOutputMode} from "./TerminalOutputMode"; +import { + createAgentTextMessageChunk, + createAgentTextThoughtChunk, +} from "./ContentChunks"; export { stripShellPrefix }; @@ -226,44 +230,20 @@ export class CodexEventHandler { } private async createTextEvent(event: AgentMessageDeltaNotification): Promise { - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: event.delta - } - } + return createAgentTextMessageChunk(event.delta, event.itemId); } private async createConfigWarningEvent(event: ConfigWarningNotification): Promise { const detailsText = event.details ? `\n\n${event.details}` : ""; - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: `Config warning: ${event.summary}${detailsText}\n\n` - } - } + return createAgentTextMessageChunk(`Config warning: ${event.summary}${detailsText}\n\n`); } private createWarningEvent(event: WarningNotification): UpdateSessionEvent { - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: `Warning: ${event.message}\n\n` - } - }; + return createAgentTextMessageChunk(`Warning: ${event.message}\n\n`); } private createModelReroutedEvent(event: ModelReroutedNotification): UpdateSessionEvent { - return { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: `Model rerouted from ${event.fromModel} to ${event.toModel} (${event.reason}).\n\n` - } - }; + return createAgentTextThoughtChunk(`Model rerouted from ${event.fromModel} to ${event.toModel} (${event.reason}).\n\n`); } private createThreadGoalUpdatedEvent(event: ThreadGoalUpdatedNotification): UpdateSessionEvent | null { @@ -278,13 +258,7 @@ export class CodexEventHandler { const text = objective.includes("\n") ? `Goal updated (${status}):\n${objective}` : `Goal updated (${status}): ${objective}`; - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: `\n\n${text}\n\n`, - }, - }; + return createAgentTextMessageChunk(`\n\n${text}\n\n`); } private formatThreadGoalStatus(status: ThreadGoalUpdatedNotification["goal"]["status"]): string { @@ -310,13 +284,7 @@ export class CodexEventHandler { } this.sessionState.currentGoal = null; - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "\n\nGoal cleared.\n\n", - }, - }; + return createAgentTextMessageChunk("\n\nGoal cleared.\n\n"); } private createThreadGoalSnapshot(event: ThreadGoalUpdatedNotification): ThreadGoalSnapshot { @@ -342,22 +310,16 @@ export class CodexEventHandler { event: ReasoningSummaryTextDeltaNotification | ReasoningTextDeltaNotification ): UpdateSessionEvent { this.seenReasoningDeltaItemIds.add(event.itemId); - return this.createAgentThoughtEvent(event.delta); + return this.createAgentThoughtEvent(event.delta, event.itemId); } private createReasoningSectionBreakEvent(event: ReasoningSummaryPartAddedNotification): UpdateSessionEvent { this.seenReasoningDeltaItemIds.add(event.itemId); - return this.createAgentThoughtEvent("\n\n"); + return this.createAgentThoughtEvent("\n\n", event.itemId); } - private createAgentThoughtEvent(text: string): UpdateSessionEvent { - return { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text, - } - }; + private createAgentThoughtEvent(text: string, messageId: string): UpdateSessionEvent { + return createAgentTextThoughtChunk(text, messageId); } private async createItemEvent(event: ItemStartedNotification): Promise { @@ -462,7 +424,7 @@ export class CodexEventHandler { if (text.length === 0) { return null; } - return this.createAgentThoughtEvent(text); + return this.createAgentThoughtEvent(text, item.id); } private createExitedReviewModeEvent(item: ThreadItem & { type: "exitedReviewMode" }): UpdateSessionEvent | null { @@ -470,23 +432,11 @@ export class CodexEventHandler { if (text.length === 0) { return null; } - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text, - } - }; + return createAgentTextMessageChunk(text); } private createContextCompactedEvent(): UpdateSessionEvent { - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Context compacted to fit the model's context window.*\n\n" - } - }; + return createAgentTextMessageChunk("*Context compacted to fit the model's context window.*\n\n"); } private createCommandOutputDeltaEvent(event: CommandExecutionOutputDeltaNotification): UpdateSessionEvent { @@ -629,13 +579,7 @@ export class CodexEventHandler { ? RequestError.internalError(this.createTurnErrorData(params.error)) : RequestError.authRequired(this.createTurnErrorData(params.error), params.error.message); } - return { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: `${params.error.message}\n\n` - } - } + return createAgentTextMessageChunk(`${params.error.message}\n\n`); } private isAuthenticationRequiredError(error: CodexErrorInfo | null): boolean { diff --git a/src/ContentChunks.ts b/src/ContentChunks.ts new file mode 100644 index 0000000..b111d34 --- /dev/null +++ b/src/ContentChunks.ts @@ -0,0 +1,52 @@ +import type {ContentBlock} from "@agentclientprotocol/sdk"; +import type {UpdateSessionEvent} from "./ACPSessionConnection"; + +export function createUserMessageChunk(content: ContentBlock, messageId?: string): UpdateSessionEvent { + if (messageId) { + return { + sessionUpdate: "user_message_chunk", + messageId, + content, + }; + } + return { + sessionUpdate: "user_message_chunk", + content, + }; +} + +export function createAgentMessageChunk(content: ContentBlock, messageId?: string): UpdateSessionEvent { + if (messageId) { + return { + sessionUpdate: "agent_message_chunk", + messageId, + content, + }; + } + return { + sessionUpdate: "agent_message_chunk", + content, + }; +} + +export function createAgentThoughtChunk(content: ContentBlock, messageId?: string): UpdateSessionEvent { + if (messageId) { + return { + sessionUpdate: "agent_thought_chunk", + messageId, + content, + }; + } + return { + sessionUpdate: "agent_thought_chunk", + content, + }; +} + +export function createAgentTextMessageChunk(text: string, messageId?: string): UpdateSessionEvent { + return createAgentMessageChunk({type: "text", text}, messageId); +} + +export function createAgentTextThoughtChunk(text: string, messageId?: string): UpdateSessionEvent { + return createAgentThoughtChunk({type: "text", text}, messageId); +} diff --git a/src/ResponseItemHistoryFallback.ts b/src/ResponseItemHistoryFallback.ts index 2591c6a..11e49d6 100644 --- a/src/ResponseItemHistoryFallback.ts +++ b/src/ResponseItemHistoryFallback.ts @@ -27,7 +27,7 @@ function historyFallbackUpdateKey(update: UpdateSessionEvent): string | null { case "user_message_chunk": case "agent_message_chunk": case "agent_thought_chunk": - return `${update.sessionUpdate}:${JSON.stringify(update.content)}`; + return `${update.sessionUpdate}:${update.messageId ?? ""}:${JSON.stringify(update.content)}`; case "tool_call": return `tool_call:${update.toolCallId}:start`; case "tool_call_update": diff --git a/src/__tests__/CodexACPAgent/data/follow-up-no-duplicates.json b/src/__tests__/CodexACPAgent/data/follow-up-no-duplicates.json index efd040d..d992b60 100644 --- a/src/__tests__/CodexACPAgent/data/follow-up-no-duplicates.json +++ b/src/__tests__/CodexACPAgent/data/follow-up-no-duplicates.json @@ -5,6 +5,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "He" @@ -20,6 +21,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "ll" @@ -35,6 +37,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "o!" diff --git a/src/__tests__/CodexACPAgent/data/load-session-history.json b/src/__tests__/CodexACPAgent/data/load-session-history.json index bb272cf..454bcf7 100644 --- a/src/__tests__/CodexACPAgent/data/load-session-history.json +++ b/src/__tests__/CodexACPAgent/data/load-session-history.json @@ -71,6 +71,7 @@ "sessionId": "session-1", "update": { "sessionUpdate": "user_message_chunk", + "messageId": "item-user-1", "content": { "type": "text", "text": "Hi" @@ -86,6 +87,7 @@ "sessionId": "session-1", "update": { "sessionUpdate": "user_message_chunk", + "messageId": "item-user-1", "content": { "type": "text", "text": "[@image](https://example.com/image.png)" @@ -101,6 +103,7 @@ "sessionId": "session-1", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "item-agent-1", "content": { "type": "text", "text": "Hello!" @@ -116,6 +119,7 @@ "sessionId": "session-1", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "item-reason-1", "content": { "type": "text", "text": "Thinking..." diff --git a/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json b/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json index c5d03f9..3fd4f91 100644 --- a/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json +++ b/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json @@ -71,6 +71,7 @@ "sessionId": "session-legacy", "update": { "sessionUpdate": "user_message_chunk", + "messageId": "item-user-1", "content": { "type": "text", "text": "List the files" @@ -86,6 +87,7 @@ "sessionId": "session-legacy", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "item-reasoning-1", "content": { "type": "text", "text": "Need to inspect the directory." @@ -245,6 +247,7 @@ "sessionId": "session-legacy", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "item-agent-1", "content": { "type": "text", "text": "The directory contains README.md and src." diff --git a/src/__tests__/CodexACPAgent/data/multiple-sessions.json b/src/__tests__/CodexACPAgent/data/multiple-sessions.json index 895ec17..4c88430 100644 --- a/src/__tests__/CodexACPAgent/data/multiple-sessions.json +++ b/src/__tests__/CodexACPAgent/data/multiple-sessions.json @@ -5,6 +5,7 @@ "sessionId": "session-1", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "Hello-1" @@ -20,6 +21,7 @@ "sessionId": "session-2", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "Hello-2" diff --git a/src/__tests__/CodexACPAgent/data/output-acp-events.json b/src/__tests__/CodexACPAgent/data/output-acp-events.json index efd040d..d992b60 100644 --- a/src/__tests__/CodexACPAgent/data/output-acp-events.json +++ b/src/__tests__/CodexACPAgent/data/output-acp-events.json @@ -5,6 +5,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "He" @@ -20,6 +21,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "ll" @@ -35,6 +37,7 @@ "sessionId": "id", "update": { "sessionUpdate": "agent_message_chunk", + "messageId": "string", "content": { "type": "text", "text": "o!" diff --git a/src/__tests__/CodexACPAgent/data/reasoning-completed-parts.json b/src/__tests__/CodexACPAgent/data/reasoning-completed-parts.json index 1a8241e..0f8d7b4 100644 --- a/src/__tests__/CodexACPAgent/data/reasoning-completed-parts.json +++ b/src/__tests__/CodexACPAgent/data/reasoning-completed-parts.json @@ -5,6 +5,7 @@ "sessionId": "test-session-id", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "reasoning-2", "content": { "type": "text", "text": "First summary\n\nSecond summary" diff --git a/src/__tests__/CodexACPAgent/data/reasoning-deltas-and-section-break.json b/src/__tests__/CodexACPAgent/data/reasoning-deltas-and-section-break.json index 43ff845..894bff9 100644 --- a/src/__tests__/CodexACPAgent/data/reasoning-deltas-and-section-break.json +++ b/src/__tests__/CodexACPAgent/data/reasoning-deltas-and-section-break.json @@ -5,6 +5,7 @@ "sessionId": "test-session-id", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "reasoning-1", "content": { "type": "text", "text": "First thought" @@ -20,6 +21,7 @@ "sessionId": "test-session-id", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "reasoning-1", "content": { "type": "text", "text": "\n\n" @@ -35,6 +37,7 @@ "sessionId": "test-session-id", "update": { "sessionUpdate": "agent_thought_chunk", + "messageId": "reasoning-1", "content": { "type": "text", "text": "Raw reasoning detail"