Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions src/CodexAcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ import {
import packageJson from "../package.json";
import {isJetBrains2026_1Client} from "./JBUtils";
import {resolveTerminalOutputMode, type TerminalOutputMode} from "./TerminalOutputMode";
import {
createAgentTextMessageChunk,
createAgentTextThoughtChunk,
createUserMessageChunk,
} from "./ContentChunks";

export interface ThreadGoalSnapshot {
objective: string;
Expand Down Expand Up @@ -966,6 +971,7 @@ export class CodexAcpServer {
case "agentMessage":
return [{
sessionUpdate: "agent_message_chunk",
messageId: item.id,
content: { type: "text", text: item.text },
}];
case "reasoning":
Expand Down Expand Up @@ -1005,24 +1011,20 @@ export class CodexAcpServer {

private createUserMessageUpdates(item: ThreadItem & { type: "userMessage" }): UpdateSessionEvent[] {
const updates: UpdateSessionEvent[] = [];
const messageId = item.id;
for (const input of item.content) {
const blocks = this.userInputToContentBlocks(input);
for (const block of blocks) {
updates.push({
sessionUpdate: "user_message_chunk",
content: block,
});
updates.push(createUserMessageChunk(block, messageId));
}
}
return updates;
}

private createReasoningUpdates(item: ThreadItem & { type: "reasoning" }): UpdateSessionEvent[] {
const parts = item.summary.length > 0 ? item.summary : item.content;
return parts.map((text) => ({
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: text },
}));
const messageId = item.id;
return parts.map((text) => createAgentTextThoughtChunk(text, messageId));
}

private createWebSearchUpdate(
Expand Down Expand Up @@ -1589,13 +1591,7 @@ export class CodexAcpServer {
}
await this.connection.notify(acp.methods.client.session.update, {
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "*Conversation interrupted*"
}
}
update: createAgentTextMessageChunk("*Conversation interrupted*"),
});
}

Expand Down Expand Up @@ -1674,26 +1670,33 @@ function mergeHistoryUpdates(
merged.push(update);
};

const flushFallbackThrough = (targetKey: string): boolean => {
const flushFallbackBeforeMatchingDuplicate = (targetUpdate: UpdateSessionEvent): void => {
const targetKey = historyUpdateKey(targetUpdate);
const targetContentKey = historyUpdateContentKey(targetUpdate);
if (!targetKey && !targetContentKey) {
return;
}

const matchIndex = responseItemFallbackUpdates.findIndex((update, index) => (
index >= fallbackIndex && historyUpdateKey(update) === targetKey
index >= fallbackIndex
&& (
(targetKey !== null && historyUpdateKey(update) === targetKey)
|| (targetContentKey !== null && historyUpdateContentKey(update) === targetContentKey)
)
));
if (matchIndex === -1) {
return false;
return;
}

while (fallbackIndex <= matchIndex) {
while (fallbackIndex < matchIndex) {
pushUpdate(responseItemFallbackUpdates[fallbackIndex]!);
fallbackIndex += 1;
}
return true;
fallbackIndex += 1;
};

for (const update of threadUpdates) {
const key = historyUpdateKey(update);
if (key && flushFallbackThrough(key)) {
continue;
}
flushFallbackBeforeMatchingDuplicate(update);
pushUpdate(update);
}

Expand All @@ -1710,7 +1713,7 @@ function historyUpdateKey(update: UpdateSessionEvent): string | null {
case "user_message_chunk":
case "agent_message_chunk":
case "agent_thought_chunk":
return `${update.sessionUpdate}:${JSON.stringify(update.content)}`;
return `${update.sessionUpdate}:${update.messageId ?? ""}:${JSON.stringify(update.content)}`;
case "tool_call":
return `tool_call:${update.toolCallId}:start`;
case "tool_call_update":
Expand All @@ -1720,6 +1723,17 @@ function historyUpdateKey(update: UpdateSessionEvent): string | null {
}
}

function historyUpdateContentKey(update: UpdateSessionEvent): string | null {
switch (update.sessionUpdate) {
case "user_message_chunk":
case "agent_message_chunk":
case "agent_thought_chunk":
return `${update.sessionUpdate}:${JSON.stringify(update.content)}`;
default:
return historyUpdateKey(update);
}
}

function getRequestedMcpServerNames(mcpServers: Array<acp.McpServer>): Array<string> {
return Array.from(new Set(mcpServers.map(server => sanitizeMcpServerName(server.name))));
}
42 changes: 8 additions & 34 deletions src/CodexCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {SessionState} from "./CodexAcpServer";
import type {RateLimitsMap} from "./RateLimitsMap";
import type {TokenCount} from "./TokenCount";
import {logger} from "./Logger";
import {createAgentTextMessageChunk} from "./ContentChunks";

type ParsedSlashCommand = {
name: string;
Expand Down Expand Up @@ -210,20 +211,14 @@ export class CodexCommands {
case "status": {
const session = new ACPSessionConnection(this.connection, sessionId);
const message = this.buildStatusMessage(sessionState);
await session.update({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: message }
});
await session.update(createAgentTextMessageChunk(message));
return { handled: true };
}
case "logout": {
await this.runWithProcessCheck(() => this.codexAcpClient.logout());
await this.onLogout();
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Logged out from Codex account." }
});
await session.update(createAgentTextMessageChunk("Logged out from Codex account."));
return { handled: true };
}
case "skills": {
Expand All @@ -237,10 +232,7 @@ export class CodexCommands {
? ["Available skills:", ...lines].join("\n")
: "No skills configured.";
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text }
});
await session.update(createAgentTextMessageChunk(text));
return { handled: true };
}
case "mcp": {
Expand All @@ -258,10 +250,7 @@ export class CodexCommands {
? ["Configured MCP servers:", ...lines].join("\n")
: "No MCP servers configured.";
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text }
});
await session.update(createAgentTextMessageChunk(text));
return { handled: true };
}
default:
Expand Down Expand Up @@ -316,13 +305,7 @@ export class CodexCommands {

if (argument.length > 4000) {
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: 'Command "/goal" requires goal text of at most 4000 characters.'
}
});
await session.update(createAgentTextMessageChunk('Command "/goal" requires goal text of at most 4000 characters.'));
return { handled: true };
}

Expand Down Expand Up @@ -371,13 +354,7 @@ export class CodexCommands {

private async sendCommandUsageMessage(name: string, inputHint: string, sessionId: string): Promise<void> {
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: `Command "/${name}" requires ${inputHint}.`
}
});
await session.update(createAgentTextMessageChunk(`Command "/${name}" requires ${inputHint}.`));
}

private async sendUnknownCommandMessage(name: string, sessionId: string): Promise<void> {
Expand All @@ -390,10 +367,7 @@ export class CodexCommands {
text.push(...lines);
}
const session = new ACPSessionConnection(this.connection, sessionId);
await session.update({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: text.join("\n") }
});
await session.update(createAgentTextMessageChunk(text.join("\n")));
}

private buildStatusMessage(sessionState: SessionState): string {
Expand Down
92 changes: 18 additions & 74 deletions src/CodexEventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ import {
} from "./CodexToolCallMapper";
import { stripShellPrefix } from "./CommandUtils";
import {createTerminalOutputMeta, type TerminalOutputMode} from "./TerminalOutputMode";
import {
createAgentTextMessageChunk,
createAgentTextThoughtChunk,
} from "./ContentChunks";

export { stripShellPrefix };

Expand Down Expand Up @@ -226,44 +230,20 @@ export class CodexEventHandler {
}

private async createTextEvent(event: AgentMessageDeltaNotification): Promise<UpdateSessionEvent> {
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: event.delta
}
}
return createAgentTextMessageChunk(event.delta, event.itemId);
}

private async createConfigWarningEvent(event: ConfigWarningNotification): Promise<UpdateSessionEvent> {
const detailsText = event.details ? `\n\n${event.details}` : "";
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: `Config warning: ${event.summary}${detailsText}\n\n`
}
}
return createAgentTextMessageChunk(`Config warning: ${event.summary}${detailsText}\n\n`);
}

private createWarningEvent(event: WarningNotification): UpdateSessionEvent {
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: `Warning: ${event.message}\n\n`
}
};
return createAgentTextMessageChunk(`Warning: ${event.message}\n\n`);
}

private createModelReroutedEvent(event: ModelReroutedNotification): UpdateSessionEvent {
return {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text: `Model rerouted from ${event.fromModel} to ${event.toModel} (${event.reason}).\n\n`
}
};
return createAgentTextThoughtChunk(`Model rerouted from ${event.fromModel} to ${event.toModel} (${event.reason}).\n\n`);
}

private createThreadGoalUpdatedEvent(event: ThreadGoalUpdatedNotification): UpdateSessionEvent | null {
Expand All @@ -278,13 +258,7 @@ export class CodexEventHandler {
const text = objective.includes("\n")
? `Goal updated (${status}):\n${objective}`
: `Goal updated (${status}): ${objective}`;
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: `\n\n${text}\n\n`,
},
};
return createAgentTextMessageChunk(`\n\n${text}\n\n`);
}

private formatThreadGoalStatus(status: ThreadGoalUpdatedNotification["goal"]["status"]): string {
Expand All @@ -310,13 +284,7 @@ export class CodexEventHandler {
}
this.sessionState.currentGoal = null;

return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "\n\nGoal cleared.\n\n",
},
};
return createAgentTextMessageChunk("\n\nGoal cleared.\n\n");
}

private createThreadGoalSnapshot(event: ThreadGoalUpdatedNotification): ThreadGoalSnapshot {
Expand All @@ -342,22 +310,16 @@ export class CodexEventHandler {
event: ReasoningSummaryTextDeltaNotification | ReasoningTextDeltaNotification
): UpdateSessionEvent {
this.seenReasoningDeltaItemIds.add(event.itemId);
return this.createAgentThoughtEvent(event.delta);
return this.createAgentThoughtEvent(event.delta, event.itemId);
}

private createReasoningSectionBreakEvent(event: ReasoningSummaryPartAddedNotification): UpdateSessionEvent {
this.seenReasoningDeltaItemIds.add(event.itemId);
return this.createAgentThoughtEvent("\n\n");
return this.createAgentThoughtEvent("\n\n", event.itemId);
}

private createAgentThoughtEvent(text: string): UpdateSessionEvent {
return {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text,
}
};
private createAgentThoughtEvent(text: string, messageId: string): UpdateSessionEvent {
return createAgentTextThoughtChunk(text, messageId);
}

private async createItemEvent(event: ItemStartedNotification): Promise<UpdateSessionEvent | null> {
Expand Down Expand Up @@ -462,31 +424,19 @@ export class CodexEventHandler {
if (text.length === 0) {
return null;
}
return this.createAgentThoughtEvent(text);
return this.createAgentThoughtEvent(text, item.id);
}

private createExitedReviewModeEvent(item: ThreadItem & { type: "exitedReviewMode" }): UpdateSessionEvent | null {
const text = item.review.trim();
if (text.length === 0) {
return null;
}
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text,
}
};
return createAgentTextMessageChunk(text);
}

private createContextCompactedEvent(): UpdateSessionEvent {
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "*Context compacted to fit the model's context window.*\n\n"
}
};
return createAgentTextMessageChunk("*Context compacted to fit the model's context window.*\n\n");
}

private createCommandOutputDeltaEvent(event: CommandExecutionOutputDeltaNotification): UpdateSessionEvent {
Expand Down Expand Up @@ -629,13 +579,7 @@ export class CodexEventHandler {
? RequestError.internalError(this.createTurnErrorData(params.error))
: RequestError.authRequired(this.createTurnErrorData(params.error), params.error.message);
}
return {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: `${params.error.message}\n\n`
}
}
return createAgentTextMessageChunk(`${params.error.message}\n\n`);
}

private isAuthenticationRequiredError(error: CodexErrorInfo | null): boolean {
Expand Down
Loading