init claude-code

This commit is contained in:
2026-04-01 17:32:37 +02:00
commit 73b208c009
1902 changed files with 513237 additions and 0 deletions
+282
View File
@@ -0,0 +1,282 @@
import axios, { type AxiosError } from 'axios'
import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
import { SerialBatchEventUploader } from './SerialBatchEventUploader.js'
import {
WebSocketTransport,
type WebSocketTransportOptions,
} from './WebSocketTransport.js'
const BATCH_FLUSH_INTERVAL_MS = 100
// Per-attempt POST timeout. Bounds how long a single stuck POST can block
// the serialized queue. Without this, a hung connection stalls all writes.
const POST_TIMEOUT_MS = 15_000
// Grace period for queued writes on close(). Covers a healthy POST (~100ms)
// plus headroom; best-effort, not a delivery guarantee under degraded network.
// Void-ed (nothing awaits it) so this is a last resort — replBridge teardown
// now closes AFTER archive so archive latency is the primary drain window.
// NOTE: gracefulShutdown's cleanup budget is 2s (not the 5s outer failsafe);
// 3s here exceeds it, but the process lives ~2s longer for hooks+analytics.
const CLOSE_GRACE_MS = 3000
/**
* Hybrid transport: WebSocket for reads, HTTP POST for writes.
*
* Write flow:
*
* write(stream_event) ─┐
* │ (100ms timer)
* │
* ▼
* write(other) ────► uploader.enqueue() (SerialBatchEventUploader)
* ▲ │
* writeBatch() ────────┘ │ serial, batched, retries indefinitely,
* │ backpressure at maxQueueSize
* ▼
* postOnce() (single HTTP POST, throws on retryable)
*
* stream_event messages accumulate in streamEventBuffer for up to 100ms
* before enqueue (reduces POST count for high-volume content deltas). A
* non-stream write flushes any buffered stream_events first to preserve order.
*
* Serialization + retry + backpressure are delegated to SerialBatchEventUploader
* (same primitive CCR uses). At most one POST in-flight; events arriving during
* a POST batch into the next one. On failure, the uploader re-queues and retries
* with exponential backoff + jitter. If the queue fills past maxQueueSize,
* enqueue() blocks — giving awaiting callers backpressure.
*
* Why serialize? Bridge mode fires writes via `void transport.write()`
* (fire-and-forget). Without this, concurrent POSTs → concurrent Firestore
* writes to the same document → collisions → retry storms → pages oncall.
*/
export class HybridTransport extends WebSocketTransport {
private postUrl: string
private uploader: SerialBatchEventUploader<StdoutMessage>
// stream_event delay buffer — accumulates content deltas for up to
// BATCH_FLUSH_INTERVAL_MS before enqueueing (reduces POST count)
private streamEventBuffer: StdoutMessage[] = []
private streamEventTimer: ReturnType<typeof setTimeout> | null = null
constructor(
url: URL,
headers: Record<string, string> = {},
sessionId?: string,
refreshHeaders?: () => Record<string, string>,
options?: WebSocketTransportOptions & {
maxConsecutiveFailures?: number
onBatchDropped?: (batchSize: number, failures: number) => void
},
) {
super(url, headers, sessionId, refreshHeaders, options)
const { maxConsecutiveFailures, onBatchDropped } = options ?? {}
this.postUrl = convertWsUrlToPostUrl(url)
this.uploader = new SerialBatchEventUploader<StdoutMessage>({
// Large cap — session-ingress accepts arbitrary batch sizes. Events
// naturally batch during in-flight POSTs; this just bounds the payload.
maxBatchSize: 500,
// Bridge callers use `void transport.write()` — backpressure doesn't
// apply (they don't await). A batch >maxQueueSize deadlocks (see
// SerialBatchEventUploader backpressure check). So set it high enough
// to be a memory bound only. Wire real backpressure in a follow-up
// once callers await.
maxQueueSize: 100_000,
baseDelayMs: 500,
maxDelayMs: 8000,
jitterMs: 1000,
// Optional cap so a persistently-failing server can't pin the drain
// loop for the lifetime of the process. Undefined = indefinite retry.
// replBridge sets this; the 1P transportUtils path does not.
maxConsecutiveFailures,
onBatchDropped: (batchSize, failures) => {
logForDiagnosticsNoPII(
'error',
'cli_hybrid_batch_dropped_max_failures',
{
batchSize,
failures,
},
)
onBatchDropped?.(batchSize, failures)
},
send: batch => this.postOnce(batch),
})
logForDebugging(`HybridTransport: POST URL = ${this.postUrl}`)
logForDiagnosticsNoPII('info', 'cli_hybrid_transport_initialized')
}
/**
* Enqueue a message and wait for the queue to drain. Returning flush()
* preserves the contract that `await write()` resolves after the event is
* POSTed (relied on by tests and replBridge's initial flush). Fire-and-forget
* callers (`void transport.write()`) are unaffected — they don't await,
* so the later resolution doesn't add latency.
*/
override async write(message: StdoutMessage): Promise<void> {
if (message.type === 'stream_event') {
// Delay: accumulate stream_events briefly before enqueueing.
// Promise resolves immediately — callers don't await stream_events.
this.streamEventBuffer.push(message)
if (!this.streamEventTimer) {
this.streamEventTimer = setTimeout(
() => this.flushStreamEvents(),
BATCH_FLUSH_INTERVAL_MS,
)
}
return
}
// Immediate: flush any buffered stream_events (ordering), then this event.
await this.uploader.enqueue([...this.takeStreamEvents(), message])
return this.uploader.flush()
}
async writeBatch(messages: StdoutMessage[]): Promise<void> {
await this.uploader.enqueue([...this.takeStreamEvents(), ...messages])
return this.uploader.flush()
}
/** Snapshot before/after writeBatch() to detect silent drops. */
get droppedBatchCount(): number {
return this.uploader.droppedBatchCount
}
/**
* Block until all pending events are POSTed. Used by bridge's initial
* history flush so onStateChange('connected') fires after persistence.
*/
flush(): Promise<void> {
void this.uploader.enqueue(this.takeStreamEvents())
return this.uploader.flush()
}
/** Take ownership of buffered stream_events and clear the delay timer. */
private takeStreamEvents(): StdoutMessage[] {
if (this.streamEventTimer) {
clearTimeout(this.streamEventTimer)
this.streamEventTimer = null
}
const buffered = this.streamEventBuffer
this.streamEventBuffer = []
return buffered
}
/** Delay timer fired — enqueue accumulated stream_events. */
private flushStreamEvents(): void {
this.streamEventTimer = null
void this.uploader.enqueue(this.takeStreamEvents())
}
override close(): void {
if (this.streamEventTimer) {
clearTimeout(this.streamEventTimer)
this.streamEventTimer = null
}
this.streamEventBuffer = []
// Grace period for queued writes — fallback. replBridge teardown now
// awaits archive between write and close (see CLOSE_GRACE_MS), so
// archive latency is the primary drain window and this is a last
// resort. Keep close() sync (returns immediately) but defer
// uploader.close() so any remaining queue gets a chance to finish.
const uploader = this.uploader
let graceTimer: ReturnType<typeof setTimeout> | undefined
void Promise.race([
uploader.flush(),
new Promise<void>(r => {
// eslint-disable-next-line no-restricted-syntax -- need timer ref for clearTimeout
graceTimer = setTimeout(r, CLOSE_GRACE_MS)
}),
]).finally(() => {
clearTimeout(graceTimer)
uploader.close()
})
super.close()
}
/**
* Single-attempt POST. Throws on retryable failures (429, 5xx, network)
* so SerialBatchEventUploader re-queues and retries. Returns on success
* and on permanent failures (4xx non-429, no token) so the uploader moves on.
*/
private async postOnce(events: StdoutMessage[]): Promise<void> {
const sessionToken = getSessionIngressAuthToken()
if (!sessionToken) {
logForDebugging('HybridTransport: No session token available for POST')
logForDiagnosticsNoPII('warn', 'cli_hybrid_post_no_token')
return
}
const headers: Record<string, string> = {
Authorization: `Bearer ${sessionToken}`,
'Content-Type': 'application/json',
}
let response
try {
response = await axios.post(
this.postUrl,
{ events },
{
headers,
validateStatus: () => true,
timeout: POST_TIMEOUT_MS,
},
)
} catch (error) {
const axiosError = error as AxiosError
logForDebugging(`HybridTransport: POST error: ${axiosError.message}`)
logForDiagnosticsNoPII('warn', 'cli_hybrid_post_network_error')
throw error
}
if (response.status >= 200 && response.status < 300) {
logForDebugging(`HybridTransport: POST success count=${events.length}`)
return
}
// 4xx (except 429) are permanent — drop, don't retry.
if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
logForDebugging(
`HybridTransport: POST returned ${response.status} (permanent), dropping`,
)
logForDiagnosticsNoPII('warn', 'cli_hybrid_post_client_error', {
status: response.status,
})
return
}
// 429 / 5xx — retryable. Throw so uploader re-queues and backs off.
logForDebugging(
`HybridTransport: POST returned ${response.status} (retryable)`,
)
logForDiagnosticsNoPII('warn', 'cli_hybrid_post_retryable_error', {
status: response.status,
})
throw new Error(`POST failed with ${response.status}`)
}
}
/**
* Convert a WebSocket URL to the HTTP POST endpoint URL.
* From: wss://api.example.com/v2/session_ingress/ws/<session_id>
* To: https://api.example.com/v2/session_ingress/session/<session_id>/events
*/
function convertWsUrlToPostUrl(wsUrl: URL): string {
const protocol = wsUrl.protocol === 'wss:' ? 'https:' : 'http:'
// Replace /ws/ with /session/ and append /events
let pathname = wsUrl.pathname
pathname = pathname.replace('/ws/', '/session/')
if (!pathname.endsWith('/events')) {
pathname = pathname.endsWith('/')
? pathname + 'events'
: pathname + '/events'
}
return `${protocol}//${wsUrl.host}${pathname}${wsUrl.search}`
}
+711
View File
@@ -0,0 +1,711 @@
import axios, { type AxiosError } from 'axios'
import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { errorMessage } from '../../utils/errors.js'
import { getSessionIngressAuthHeaders } from '../../utils/sessionIngressAuth.js'
import { sleep } from '../../utils/sleep.js'
import { jsonParse, jsonStringify } from '../../utils/slowOperations.js'
import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
import type { Transport } from './Transport.js'
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
const RECONNECT_BASE_DELAY_MS = 1000
const RECONNECT_MAX_DELAY_MS = 30_000
/** Time budget for reconnection attempts before giving up (10 minutes). */
const RECONNECT_GIVE_UP_MS = 600_000
/** Server sends keepalives every 15s; treat connection as dead after 45s of silence. */
const LIVENESS_TIMEOUT_MS = 45_000
/**
* HTTP status codes that indicate a permanent server-side rejection.
* The transport transitions to 'closed' immediately without retrying.
*/
const PERMANENT_HTTP_CODES = new Set([401, 403, 404])
// POST retry configuration (matches HybridTransport)
const POST_MAX_RETRIES = 10
const POST_BASE_DELAY_MS = 500
const POST_MAX_DELAY_MS = 8000
/** Hoisted TextDecoder options to avoid per-chunk allocation in readStream. */
const STREAM_DECODE_OPTS: TextDecodeOptions = { stream: true }
/** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
function alwaysValidStatus(): boolean {
return true
}
// ---------------------------------------------------------------------------
// SSE Frame Parser
// ---------------------------------------------------------------------------
type SSEFrame = {
event?: string
id?: string
data?: string
}
/**
* Incrementally parse SSE frames from a text buffer.
* Returns parsed frames and the remaining (incomplete) buffer.
*
* @internal exported for testing
*/
export function parseSSEFrames(buffer: string): {
frames: SSEFrame[]
remaining: string
} {
const frames: SSEFrame[] = []
let pos = 0
// SSE frames are delimited by double newlines
let idx: number
while ((idx = buffer.indexOf('\n\n', pos)) !== -1) {
const rawFrame = buffer.slice(pos, idx)
pos = idx + 2
// Skip empty frames
if (!rawFrame.trim()) continue
const frame: SSEFrame = {}
let isComment = false
for (const line of rawFrame.split('\n')) {
if (line.startsWith(':')) {
// SSE comment (e.g., `:keepalive`)
isComment = true
continue
}
const colonIdx = line.indexOf(':')
if (colonIdx === -1) continue
const field = line.slice(0, colonIdx)
// Per SSE spec, strip one leading space after colon if present
const value =
line[colonIdx + 1] === ' '
? line.slice(colonIdx + 2)
: line.slice(colonIdx + 1)
switch (field) {
case 'event':
frame.event = value
break
case 'id':
frame.id = value
break
case 'data':
// Per SSE spec, multiple data: lines are concatenated with \n
frame.data = frame.data ? frame.data + '\n' + value : value
break
// Ignore other fields (retry:, etc.)
}
}
// Only emit frames that have data (or are pure comments which reset liveness)
if (frame.data || isComment) {
frames.push(frame)
}
}
return { frames, remaining: buffer.slice(pos) }
}
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
type SSETransportState =
| 'idle'
| 'connected'
| 'reconnecting'
| 'closing'
| 'closed'
/**
* Payload for `event: client_event` frames, matching the StreamClientEvent
* proto message in session_stream.proto. This is the only event type sent
* to worker subscribers — delivery_update, session_update, ephemeral_event,
* and catch_up_truncated are client-channel-only (see notifier.go and
* event_stream.go SubscriberClient guard).
*/
export type StreamClientEvent = {
event_id: string
sequence_num: number
event_type: string
source: string
payload: Record<string, unknown>
created_at: string
}
// ---------------------------------------------------------------------------
// SSETransport
// ---------------------------------------------------------------------------
/**
* Transport that uses SSE for reading and HTTP POST for writing.
*
* Reads events via Server-Sent Events from the CCR v2 event stream endpoint.
* Writes events via HTTP POST with retry logic (same pattern as HybridTransport).
*
* Each `event: client_event` frame carries a StreamClientEvent proto JSON
* directly in `data:`. The transport extracts `payload` and passes it to
* `onData` as newline-delimited JSON for StructuredIO consumers.
*
* Supports automatic reconnection with exponential backoff and Last-Event-ID
* for resumption after disconnection.
*/
export class SSETransport implements Transport {
private state: SSETransportState = 'idle'
private onData?: (data: string) => void
private onCloseCallback?: (closeCode?: number) => void
private onEventCallback?: (event: StreamClientEvent) => void
private headers: Record<string, string>
private sessionId?: string
private refreshHeaders?: () => Record<string, string>
private readonly getAuthHeaders: () => Record<string, string>
// SSE connection state
private abortController: AbortController | null = null
private lastSequenceNum = 0
private seenSequenceNums = new Set<number>()
// Reconnection state
private reconnectAttempts = 0
private reconnectStartTime: number | null = null
private reconnectTimer: NodeJS.Timeout | null = null
// Liveness detection
private livenessTimer: NodeJS.Timeout | null = null
// POST URL (derived from SSE URL)
private postUrl: string
// Runtime epoch for CCR v2 event format
constructor(
private readonly url: URL,
headers: Record<string, string> = {},
sessionId?: string,
refreshHeaders?: () => Record<string, string>,
initialSequenceNum?: number,
/**
* Per-instance auth header source. Omit to read the process-wide
* CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers). Required
* for concurrent multi-session callers — the env-var path is a process
* global and would stomp across sessions.
*/
getAuthHeaders?: () => Record<string, string>,
) {
this.headers = headers
this.sessionId = sessionId
this.refreshHeaders = refreshHeaders
this.getAuthHeaders = getAuthHeaders ?? getSessionIngressAuthHeaders
this.postUrl = convertSSEUrlToPostUrl(url)
// Seed with a caller-provided high-water mark so the first connect()
// sends from_sequence_num / Last-Event-ID. Without this, a fresh
// SSETransport always asks the server to replay from sequence 0 —
// the entire session history on every transport swap.
if (initialSequenceNum !== undefined && initialSequenceNum > 0) {
this.lastSequenceNum = initialSequenceNum
}
logForDebugging(`SSETransport: SSE URL = ${url.href}`)
logForDebugging(`SSETransport: POST URL = ${this.postUrl}`)
logForDiagnosticsNoPII('info', 'cli_sse_transport_initialized')
}
/**
* High-water mark of sequence numbers seen on this stream. Callers that
* recreate the transport (e.g. replBridge onWorkReceived) read this before
* close() and pass it as `initialSequenceNum` to the next instance so the
* server resumes from the right point instead of replaying everything.
*/
getLastSequenceNum(): number {
return this.lastSequenceNum
}
async connect(): Promise<void> {
if (this.state !== 'idle' && this.state !== 'reconnecting') {
logForDebugging(
`SSETransport: Cannot connect, current state is ${this.state}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_sse_connect_failed')
return
}
this.state = 'reconnecting'
const connectStartTime = Date.now()
// Build SSE URL with sequence number for resumption
const sseUrl = new URL(this.url.href)
if (this.lastSequenceNum > 0) {
sseUrl.searchParams.set('from_sequence_num', String(this.lastSequenceNum))
}
// Build headers -- use fresh auth headers (supports Cookie for session keys).
// Remove stale Authorization header from this.headers when Cookie auth is used,
// since sending both confuses the auth interceptor.
const authHeaders = this.getAuthHeaders()
const headers: Record<string, string> = {
...this.headers,
...authHeaders,
Accept: 'text/event-stream',
'anthropic-version': '2023-06-01',
'User-Agent': getClaudeCodeUserAgent(),
}
if (authHeaders['Cookie']) {
delete headers['Authorization']
}
if (this.lastSequenceNum > 0) {
headers['Last-Event-ID'] = String(this.lastSequenceNum)
}
logForDebugging(`SSETransport: Opening ${sseUrl.href}`)
logForDiagnosticsNoPII('info', 'cli_sse_connect_opening')
this.abortController = new AbortController()
try {
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const response = await fetch(sseUrl.href, {
headers,
signal: this.abortController.signal,
})
if (!response.ok) {
const isPermanent = PERMANENT_HTTP_CODES.has(response.status)
logForDebugging(
`SSETransport: HTTP ${response.status}${isPermanent ? ' (permanent)' : ''}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_sse_connect_http_error', {
status: response.status,
})
if (isPermanent) {
this.state = 'closed'
this.onCloseCallback?.(response.status)
return
}
this.handleConnectionError()
return
}
if (!response.body) {
logForDebugging('SSETransport: No response body')
this.handleConnectionError()
return
}
// Successfully connected
const connectDuration = Date.now() - connectStartTime
logForDebugging('SSETransport: Connected')
logForDiagnosticsNoPII('info', 'cli_sse_connect_connected', {
duration_ms: connectDuration,
})
this.state = 'connected'
this.reconnectAttempts = 0
this.reconnectStartTime = null
this.resetLivenessTimer()
// Read the SSE stream
await this.readStream(response.body)
} catch (error) {
if (this.abortController?.signal.aborted) {
// Intentional close
return
}
logForDebugging(
`SSETransport: Connection error: ${errorMessage(error)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_sse_connect_error')
this.handleConnectionError()
}
}
/**
* Read and process the SSE stream body.
*/
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
private async readStream(body: ReadableStream<Uint8Array>): Promise<void> {
const reader = body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, STREAM_DECODE_OPTS)
const { frames, remaining } = parseSSEFrames(buffer)
buffer = remaining
for (const frame of frames) {
// Any frame (including keepalive comments) proves the connection is alive
this.resetLivenessTimer()
if (frame.id) {
const seqNum = parseInt(frame.id, 10)
if (!isNaN(seqNum)) {
if (this.seenSequenceNums.has(seqNum)) {
logForDebugging(
`SSETransport: DUPLICATE frame seq=${seqNum} (lastSequenceNum=${this.lastSequenceNum}, seenCount=${this.seenSequenceNums.size})`,
{ level: 'warn' },
)
logForDiagnosticsNoPII('warn', 'cli_sse_duplicate_sequence')
} else {
this.seenSequenceNums.add(seqNum)
// Prevent unbounded growth: once we have many entries, prune
// old sequence numbers that are well below the high-water mark.
// Only sequence numbers near lastSequenceNum matter for dedup.
if (this.seenSequenceNums.size > 1000) {
const threshold = this.lastSequenceNum - 200
for (const s of this.seenSequenceNums) {
if (s < threshold) {
this.seenSequenceNums.delete(s)
}
}
}
}
if (seqNum > this.lastSequenceNum) {
this.lastSequenceNum = seqNum
}
}
}
if (frame.event && frame.data) {
this.handleSSEFrame(frame.event, frame.data)
} else if (frame.data) {
// data: without event: — server is emitting the old envelope format
// or a bug. Log so incidents show as a signal instead of silent drops.
logForDebugging(
'SSETransport: Frame has data: but no event: field — dropped',
{ level: 'warn' },
)
logForDiagnosticsNoPII('warn', 'cli_sse_frame_missing_event_field')
}
}
}
} catch (error) {
if (this.abortController?.signal.aborted) return
logForDebugging(
`SSETransport: Stream read error: ${errorMessage(error)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_sse_stream_read_error')
} finally {
reader.releaseLock()
}
// Stream ended — reconnect unless we're closing
if (this.state !== 'closing' && this.state !== 'closed') {
logForDebugging('SSETransport: Stream ended, reconnecting')
this.handleConnectionError()
}
}
/**
* Handle a single SSE frame. The event: field names the variant; data:
* carries the inner proto JSON directly (no envelope).
*
* Worker subscribers only receive client_event frames (see notifier.go) —
* any other event type indicates a server-side change that CC doesn't yet
* understand. Log a diagnostic so we notice in telemetry.
*/
private handleSSEFrame(eventType: string, data: string): void {
if (eventType !== 'client_event') {
logForDebugging(
`SSETransport: Unexpected SSE event type '${eventType}' on worker stream`,
{ level: 'warn' },
)
logForDiagnosticsNoPII('warn', 'cli_sse_unexpected_event_type', {
event_type: eventType,
})
return
}
let ev: StreamClientEvent
try {
ev = jsonParse(data) as StreamClientEvent
} catch (error) {
logForDebugging(
`SSETransport: Failed to parse client_event data: ${errorMessage(error)}`,
{ level: 'error' },
)
return
}
const payload = ev.payload
if (payload && typeof payload === 'object' && 'type' in payload) {
const sessionLabel = this.sessionId ? ` session=${this.sessionId}` : ''
logForDebugging(
`SSETransport: Event seq=${ev.sequence_num} event_id=${ev.event_id} event_type=${ev.event_type} payload_type=${String(payload.type)}${sessionLabel}`,
)
logForDiagnosticsNoPII('info', 'cli_sse_message_received')
// Pass the unwrapped payload as newline-delimited JSON,
// matching the format that StructuredIO/WebSocketTransport consumers expect
this.onData?.(jsonStringify(payload) + '\n')
} else {
logForDebugging(
`SSETransport: Ignoring client_event with no type in payload: event_id=${ev.event_id}`,
)
}
this.onEventCallback?.(ev)
}
/**
* Handle connection errors with exponential backoff and time budget.
*/
private handleConnectionError(): void {
this.clearLivenessTimer()
if (this.state === 'closing' || this.state === 'closed') return
// Abort any in-flight SSE fetch
this.abortController?.abort()
this.abortController = null
const now = Date.now()
if (!this.reconnectStartTime) {
this.reconnectStartTime = now
}
const elapsed = now - this.reconnectStartTime
if (elapsed < RECONNECT_GIVE_UP_MS) {
// Clear any existing timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
// Refresh headers before reconnecting
if (this.refreshHeaders) {
const freshHeaders = this.refreshHeaders()
Object.assign(this.headers, freshHeaders)
logForDebugging('SSETransport: Refreshed headers for reconnect')
}
this.state = 'reconnecting'
this.reconnectAttempts++
const baseDelay = Math.min(
RECONNECT_BASE_DELAY_MS * Math.pow(2, this.reconnectAttempts - 1),
RECONNECT_MAX_DELAY_MS,
)
// Add ±25% jitter
const delay = Math.max(
0,
baseDelay + baseDelay * 0.25 * (2 * Math.random() - 1),
)
logForDebugging(
`SSETransport: Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}, ${Math.round(elapsed / 1000)}s elapsed)`,
)
logForDiagnosticsNoPII('error', 'cli_sse_reconnect_attempt', {
reconnectAttempts: this.reconnectAttempts,
})
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null
void this.connect()
}, delay)
} else {
logForDebugging(
`SSETransport: Reconnection time budget exhausted after ${Math.round(elapsed / 1000)}s`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_sse_reconnect_exhausted', {
reconnectAttempts: this.reconnectAttempts,
elapsedMs: elapsed,
})
this.state = 'closed'
this.onCloseCallback?.()
}
}
/**
* Bound timeout callback. Hoisted from an inline closure so that
* resetLivenessTimer (called per-frame) does not allocate a new closure
* on every SSE frame.
*/
private readonly onLivenessTimeout = (): void => {
this.livenessTimer = null
logForDebugging('SSETransport: Liveness timeout, reconnecting', {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_sse_liveness_timeout')
this.abortController?.abort()
this.handleConnectionError()
}
/**
* Reset the liveness timer. If no SSE frame arrives within the timeout,
* treat the connection as dead and reconnect.
*/
private resetLivenessTimer(): void {
this.clearLivenessTimer()
this.livenessTimer = setTimeout(this.onLivenessTimeout, LIVENESS_TIMEOUT_MS)
}
private clearLivenessTimer(): void {
if (this.livenessTimer) {
clearTimeout(this.livenessTimer)
this.livenessTimer = null
}
}
// -----------------------------------------------------------------------
// Write (HTTP POST) — same pattern as HybridTransport
// -----------------------------------------------------------------------
async write(message: StdoutMessage): Promise<void> {
const authHeaders = this.getAuthHeaders()
if (Object.keys(authHeaders).length === 0) {
logForDebugging('SSETransport: No session token available for POST')
logForDiagnosticsNoPII('warn', 'cli_sse_post_no_token')
return
}
const headers: Record<string, string> = {
...authHeaders,
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': getClaudeCodeUserAgent(),
}
logForDebugging(
`SSETransport: POST body keys=${Object.keys(message as Record<string, unknown>).join(',')}`,
)
for (let attempt = 1; attempt <= POST_MAX_RETRIES; attempt++) {
try {
const response = await axios.post(this.postUrl, message, {
headers,
validateStatus: alwaysValidStatus,
})
if (response.status === 200 || response.status === 201) {
logForDebugging(`SSETransport: POST success type=${message.type}`)
return
}
logForDebugging(
`SSETransport: POST ${response.status} body=${jsonStringify(response.data).slice(0, 200)}`,
)
// 4xx errors (except 429) are permanent - don't retry
if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
logForDebugging(
`SSETransport: POST returned ${response.status} (client error), not retrying`,
)
logForDiagnosticsNoPII('warn', 'cli_sse_post_client_error', {
status: response.status,
})
return
}
// 429 or 5xx - retry
logForDebugging(
`SSETransport: POST returned ${response.status}, attempt ${attempt}/${POST_MAX_RETRIES}`,
)
logForDiagnosticsNoPII('warn', 'cli_sse_post_retryable_error', {
status: response.status,
attempt,
})
} catch (error) {
const axiosError = error as AxiosError
logForDebugging(
`SSETransport: POST error: ${axiosError.message}, attempt ${attempt}/${POST_MAX_RETRIES}`,
)
logForDiagnosticsNoPII('warn', 'cli_sse_post_network_error', {
attempt,
})
}
if (attempt === POST_MAX_RETRIES) {
logForDebugging(
`SSETransport: POST failed after ${POST_MAX_RETRIES} attempts, continuing`,
)
logForDiagnosticsNoPII('warn', 'cli_sse_post_retries_exhausted')
return
}
const delayMs = Math.min(
POST_BASE_DELAY_MS * Math.pow(2, attempt - 1),
POST_MAX_DELAY_MS,
)
await sleep(delayMs)
}
}
// -----------------------------------------------------------------------
// Transport interface
// -----------------------------------------------------------------------
isConnectedStatus(): boolean {
return this.state === 'connected'
}
isClosedStatus(): boolean {
return this.state === 'closed'
}
setOnData(callback: (data: string) => void): void {
this.onData = callback
}
setOnClose(callback: (closeCode?: number) => void): void {
this.onCloseCallback = callback
}
setOnEvent(callback: (event: StreamClientEvent) => void): void {
this.onEventCallback = callback
}
close(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
this.clearLivenessTimer()
this.state = 'closing'
this.abortController?.abort()
this.abortController = null
}
}
// ---------------------------------------------------------------------------
// URL Conversion
// ---------------------------------------------------------------------------
/**
* Convert an SSE URL to the HTTP POST endpoint URL.
* The SSE stream URL and POST URL share the same base; the POST endpoint
* is at `/events` (without `/stream`).
*
* From: https://api.example.com/v2/session_ingress/session/<session_id>/events/stream
* To: https://api.example.com/v2/session_ingress/session/<session_id>/events
*/
function convertSSEUrlToPostUrl(sseUrl: URL): string {
let pathname = sseUrl.pathname
// Remove /stream suffix to get the POST events endpoint
if (pathname.endsWith('/stream')) {
pathname = pathname.slice(0, -'/stream'.length)
}
return `${sseUrl.protocol}//${sseUrl.host}${pathname}`
}
+275
View File
@@ -0,0 +1,275 @@
import { jsonStringify } from '../../utils/slowOperations.js'
/**
* Serial ordered event uploader with batching, retry, and backpressure.
*
* - enqueue() adds events to a pending buffer
* - At most 1 POST in-flight at a time
* - Drains up to maxBatchSize items per POST
* - New events accumulate while in-flight
* - On failure: exponential backoff (clamped), retries indefinitely
* until success or close() — unless maxConsecutiveFailures is set,
* in which case the failing batch is dropped and drain advances
* - flush() blocks until pending is empty and kicks drain if needed
* - Backpressure: enqueue() blocks when maxQueueSize is reached
*/
/**
* Throw from config.send() to make the uploader wait a server-supplied
* duration before retrying (e.g. 429 with Retry-After). When retryAfterMs
* is set, it overrides exponential backoff for that attempt — clamped to
* [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can
* neither hot-loop nor stall the client, and many sessions sharing a rate
* limit don't all pounce at the same instant. Without retryAfterMs, behaves
* like any other thrown error (exponential backoff).
*/
export class RetryableError extends Error {
constructor(
message: string,
readonly retryAfterMs?: number,
) {
super(message)
}
}
type SerialBatchEventUploaderConfig<T> = {
/** Max items per POST (1 = no batching) */
maxBatchSize: number
/**
* Max serialized bytes per POST. First item always goes in regardless of
* size; subsequent items only if cumulative JSON bytes stay under this.
* Undefined = no byte limit (count-only batching).
*/
maxBatchBytes?: number
/** Max pending items before enqueue() blocks */
maxQueueSize: number
/** The actual HTTP call — caller controls payload format */
send: (batch: T[]) => Promise<void>
/** Base delay for exponential backoff (ms) */
baseDelayMs: number
/** Max delay cap (ms) */
maxDelayMs: number
/** Random jitter range added to retry delay (ms) */
jitterMs: number
/**
* After this many consecutive send() failures, drop the failing batch
* and move on to the next pending item with a fresh failure budget.
* Undefined = retry indefinitely (default).
*/
maxConsecutiveFailures?: number
/** Called when a batch is dropped for hitting maxConsecutiveFailures. */
onBatchDropped?: (batchSize: number, failures: number) => void
}
export class SerialBatchEventUploader<T> {
private pending: T[] = []
private pendingAtClose = 0
private draining = false
private closed = false
private backpressureResolvers: Array<() => void> = []
private sleepResolve: (() => void) | null = null
private flushResolvers: Array<() => void> = []
private droppedBatches = 0
private readonly config: SerialBatchEventUploaderConfig<T>
constructor(config: SerialBatchEventUploaderConfig<T>) {
this.config = config
}
/**
* Monotonic count of batches dropped via maxConsecutiveFailures. Callers
* can snapshot before flush() and compare after to detect silent drops
* (flush() resolves normally even when batches were dropped).
*/
get droppedBatchCount(): number {
return this.droppedBatches
}
/**
* Pending queue depth. After close(), returns the count at close time —
* close() clears the queue but shutdown diagnostics may read this after.
*/
get pendingCount(): number {
return this.closed ? this.pendingAtClose : this.pending.length
}
/**
* Add events to the pending buffer. Returns immediately if space is
* available. Blocks (awaits) if the buffer is full — caller pauses
* until drain frees space.
*/
async enqueue(events: T | T[]): Promise<void> {
if (this.closed) return
const items = Array.isArray(events) ? events : [events]
if (items.length === 0) return
// Backpressure: wait until there's space
while (
this.pending.length + items.length > this.config.maxQueueSize &&
!this.closed
) {
await new Promise<void>(resolve => {
this.backpressureResolvers.push(resolve)
})
}
if (this.closed) return
this.pending.push(...items)
void this.drain()
}
/**
* Block until all pending events have been sent.
* Used at turn boundaries and graceful shutdown.
*/
flush(): Promise<void> {
if (this.pending.length === 0 && !this.draining) {
return Promise.resolve()
}
void this.drain()
return new Promise<void>(resolve => {
this.flushResolvers.push(resolve)
})
}
/**
* Drop pending events and stop processing.
* Resolves any blocked enqueue() and flush() callers.
*/
close(): void {
if (this.closed) return
this.closed = true
this.pendingAtClose = this.pending.length
this.pending = []
this.sleepResolve?.()
this.sleepResolve = null
for (const resolve of this.backpressureResolvers) resolve()
this.backpressureResolvers = []
for (const resolve of this.flushResolvers) resolve()
this.flushResolvers = []
}
/**
* Drain loop. At most one instance runs at a time (guarded by this.draining).
* Sends batches serially. On failure, backs off and retries indefinitely.
*/
private async drain(): Promise<void> {
if (this.draining || this.closed) return
this.draining = true
let failures = 0
try {
while (this.pending.length > 0 && !this.closed) {
const batch = this.takeBatch()
if (batch.length === 0) continue
try {
await this.config.send(batch)
failures = 0
} catch (err) {
failures++
if (
this.config.maxConsecutiveFailures !== undefined &&
failures >= this.config.maxConsecutiveFailures
) {
this.droppedBatches++
this.config.onBatchDropped?.(batch.length, failures)
failures = 0
this.releaseBackpressure()
continue
}
// Re-queue the failed batch at the front. Use concat (single
// allocation) instead of unshift(...batch) which shifts every
// pending item batch.length times. Only hit on failure path.
this.pending = batch.concat(this.pending)
const retryAfterMs =
err instanceof RetryableError ? err.retryAfterMs : undefined
await this.sleep(this.retryDelay(failures, retryAfterMs))
continue
}
// Release backpressure waiters if space opened up
this.releaseBackpressure()
}
} finally {
this.draining = false
// Notify flush waiters if queue is empty
if (this.pending.length === 0) {
for (const resolve of this.flushResolvers) resolve()
this.flushResolvers = []
}
}
}
/**
* Pull the next batch from pending. Respects both maxBatchSize and
* maxBatchBytes. The first item is always taken; subsequent items only
* if adding them keeps the cumulative JSON size under maxBatchBytes.
*
* Un-serializable items (BigInt, circular refs, throwing toJSON) are
* dropped in place — they can never be sent and leaving them at
* pending[0] would poison the queue and hang flush() forever.
*/
private takeBatch(): T[] {
const { maxBatchSize, maxBatchBytes } = this.config
if (maxBatchBytes === undefined) {
return this.pending.splice(0, maxBatchSize)
}
let bytes = 0
let count = 0
while (count < this.pending.length && count < maxBatchSize) {
let itemBytes: number
try {
itemBytes = Buffer.byteLength(jsonStringify(this.pending[count]))
} catch {
this.pending.splice(count, 1)
continue
}
if (count > 0 && bytes + itemBytes > maxBatchBytes) break
bytes += itemBytes
count++
}
return this.pending.splice(0, count)
}
private retryDelay(failures: number, retryAfterMs?: number): number {
const jitter = Math.random() * this.config.jitterMs
if (retryAfterMs !== undefined) {
// Jitter on top of the server's hint prevents thundering herd when
// many sessions share a rate limit and all receive the same
// Retry-After. Clamp first, then spread — same shape as the
// exponential path (effective ceiling is maxDelayMs + jitterMs).
const clamped = Math.max(
this.config.baseDelayMs,
Math.min(retryAfterMs, this.config.maxDelayMs),
)
return clamped + jitter
}
const exponential = Math.min(
this.config.baseDelayMs * 2 ** (failures - 1),
this.config.maxDelayMs,
)
return exponential + jitter
}
private releaseBackpressure(): void {
const resolvers = this.backpressureResolvers
this.backpressureResolvers = []
for (const resolve of resolvers) resolve()
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => {
this.sleepResolve = resolve
setTimeout(
(self, resolve) => {
self.sleepResolve = null
resolve()
},
ms,
this,
resolve,
)
})
}
}
+800
View File
@@ -0,0 +1,800 @@
import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
import type WsWebSocket from 'ws'
import { logEvent } from '../../services/analytics/index.js'
import { CircularBuffer } from '../../utils/CircularBuffer.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { getWebSocketTLSOptions } from '../../utils/mtls.js'
import {
getWebSocketProxyAgent,
getWebSocketProxyUrl,
} from '../../utils/proxy.js'
import {
registerSessionActivityCallback,
unregisterSessionActivityCallback,
} from '../../utils/sessionActivity.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import type { Transport } from './Transport.js'
const KEEP_ALIVE_FRAME = '{"type":"keep_alive"}\n'
const DEFAULT_MAX_BUFFER_SIZE = 1000
const DEFAULT_BASE_RECONNECT_DELAY = 1000
const DEFAULT_MAX_RECONNECT_DELAY = 30000
/** Time budget for reconnection attempts before giving up (10 minutes). */
const DEFAULT_RECONNECT_GIVE_UP_MS = 600_000
const DEFAULT_PING_INTERVAL = 10000
const DEFAULT_KEEPALIVE_INTERVAL = 300_000 // 5 minutes
/**
* Threshold for detecting system sleep/wake. If the gap between consecutive
* reconnection attempts exceeds this, the machine likely slept. We reset
* the reconnection budget and retry — the server will reject with permanent
* close codes (4001/1002) if the session was reaped during sleep.
*/
const SLEEP_DETECTION_THRESHOLD_MS = DEFAULT_MAX_RECONNECT_DELAY * 2 // 60s
/**
* WebSocket close codes that indicate a permanent server-side rejection.
* The transport transitions to 'closed' immediately without retrying.
*/
const PERMANENT_CLOSE_CODES = new Set([
1002, // protocol error — server rejected handshake (e.g. session reaped)
4001, // session expired / not found
4003, // unauthorized
])
export type WebSocketTransportOptions = {
/** When false, the transport does not attempt automatic reconnection on
* disconnect. Use this when the caller has its own recovery mechanism
* (e.g. the REPL bridge poll loop). Defaults to true. */
autoReconnect?: boolean
/** Gates the tengu_ws_transport_* telemetry events. Set true at the
* REPL-bridge construction site so only Remote Control sessions (the
* Cloudflare-idle-timeout population) emit; print-mode workers stay
* silent. Defaults to false. */
isBridge?: boolean
}
type WebSocketTransportState =
| 'idle'
| 'connected'
| 'reconnecting'
| 'closing'
| 'closed'
// Common interface between globalThis.WebSocket and ws.WebSocket
type WebSocketLike = {
close(): void
send(data: string): void
ping?(): void // Bun & ws both support this
}
export class WebSocketTransport implements Transport {
private ws: WebSocketLike | null = null
private lastSentId: string | null = null
protected url: URL
protected state: WebSocketTransportState = 'idle'
protected onData?: (data: string) => void
private onCloseCallback?: (closeCode?: number) => void
private onConnectCallback?: () => void
private headers: Record<string, string>
private sessionId?: string
private autoReconnect: boolean
private isBridge: boolean
// Reconnection state
private reconnectAttempts = 0
private reconnectStartTime: number | null = null
private reconnectTimer: NodeJS.Timeout | null = null
private lastReconnectAttemptTime: number | null = null
// Wall-clock of last WS data-frame activity (inbound message or outbound
// ws.send). Used to compute idle time at close — the signal for diagnosing
// proxy idle-timeout RSTs (e.g. Cloudflare 5-min). Excludes ping/pong
// control frames (proxies don't count those).
private lastActivityTime = 0
// Ping interval for connection health checks
private pingInterval: NodeJS.Timeout | null = null
private pongReceived = true
// Periodic keep_alive data frames to reset proxy idle timers
private keepAliveInterval: NodeJS.Timeout | null = null
// Message buffering for replay on reconnection
private messageBuffer: CircularBuffer<StdoutMessage>
// Track which runtime's WS we're using so we can detach listeners
// with the matching API (removeEventListener vs. off).
private isBunWs = false
// Captured at connect() time for handleOpenEvent timing. Stored as an
// instance field so the onOpen handler can be a stable class-property
// arrow function (removable in doDisconnect) instead of a closure over
// a local variable.
private connectStartTime = 0
private refreshHeaders?: () => Record<string, string>
constructor(
url: URL,
headers: Record<string, string> = {},
sessionId?: string,
refreshHeaders?: () => Record<string, string>,
options?: WebSocketTransportOptions,
) {
this.url = url
this.headers = headers
this.sessionId = sessionId
this.refreshHeaders = refreshHeaders
this.autoReconnect = options?.autoReconnect ?? true
this.isBridge = options?.isBridge ?? false
this.messageBuffer = new CircularBuffer(DEFAULT_MAX_BUFFER_SIZE)
}
public async connect(): Promise<void> {
if (this.state !== 'idle' && this.state !== 'reconnecting') {
logForDebugging(
`WebSocketTransport: Cannot connect, current state is ${this.state}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_connect_failed')
return
}
this.state = 'reconnecting'
this.connectStartTime = Date.now()
logForDebugging(`WebSocketTransport: Opening ${this.url.href}`)
logForDiagnosticsNoPII('info', 'cli_websocket_connect_opening')
// Start with provided headers and add runtime headers
const headers = { ...this.headers }
if (this.lastSentId) {
headers['X-Last-Request-Id'] = this.lastSentId
logForDebugging(
`WebSocketTransport: Adding X-Last-Request-Id header: ${this.lastSentId}`,
)
}
if (typeof Bun !== 'undefined') {
// Bun's WebSocket supports headers/proxy options but the DOM typings don't
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const ws = new globalThis.WebSocket(this.url.href, {
headers,
proxy: getWebSocketProxyUrl(this.url.href),
tls: getWebSocketTLSOptions() || undefined,
} as unknown as string[])
this.ws = ws
this.isBunWs = true
ws.addEventListener('open', this.onBunOpen)
ws.addEventListener('message', this.onBunMessage)
ws.addEventListener('error', this.onBunError)
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
ws.addEventListener('close', this.onBunClose)
// 'pong' is Bun-specific — not in DOM typings.
ws.addEventListener('pong', this.onPong)
} else {
const { default: WS } = await import('ws')
const ws = new WS(this.url.href, {
headers,
agent: getWebSocketProxyAgent(this.url.href),
...getWebSocketTLSOptions(),
})
this.ws = ws
this.isBunWs = false
ws.on('open', this.onNodeOpen)
ws.on('message', this.onNodeMessage)
ws.on('error', this.onNodeError)
ws.on('close', this.onNodeClose)
ws.on('pong', this.onPong)
}
}
// --- Bun (native WebSocket) event handlers ---
// Stored as class-property arrow functions so they can be removed in
// doDisconnect(). Without removal, each reconnect orphans the old WS
// object + its 5 closures until GC, which accumulates under network
// instability. Mirrors the pattern in src/utils/mcpWebSocketTransport.ts.
private onBunOpen = () => {
this.handleOpenEvent()
// Bun's WebSocket doesn't expose upgrade response headers,
// so replay all buffered messages. The server deduplicates by UUID.
if (this.lastSentId) {
this.replayBufferedMessages('')
}
}
private onBunMessage = (event: MessageEvent) => {
const message =
typeof event.data === 'string' ? event.data : String(event.data)
this.lastActivityTime = Date.now()
logForDiagnosticsNoPII('info', 'cli_websocket_message_received', {
length: message.length,
})
if (this.onData) {
this.onData(message)
}
}
private onBunError = () => {
logForDebugging('WebSocketTransport: Error', {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_websocket_connect_error')
// close event fires after error — let it call handleConnectionError
}
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
private onBunClose = (event: CloseEvent) => {
const isClean = event.code === 1000 || event.code === 1001
logForDebugging(
`WebSocketTransport: Closed: ${event.code}`,
isClean ? undefined : { level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed')
this.handleConnectionError(event.code)
}
// --- Node (ws package) event handlers ---
private onNodeOpen = () => {
// Capture ws before handleOpenEvent() invokes onConnectCallback — if the
// callback synchronously closes the transport, this.ws becomes null.
// The old inline-closure code had this safety implicitly via closure capture.
const ws = this.ws
this.handleOpenEvent()
if (!ws) return
// Check for last-id in upgrade response headers (ws package only)
const nws = ws as unknown as WsWebSocket & {
upgradeReq?: { headers?: Record<string, string> }
}
const upgradeResponse = nws.upgradeReq
if (upgradeResponse?.headers?.['x-last-request-id']) {
const serverLastId = upgradeResponse.headers['x-last-request-id']
this.replayBufferedMessages(serverLastId)
}
}
private onNodeMessage = (data: Buffer) => {
const message = data.toString()
this.lastActivityTime = Date.now()
logForDiagnosticsNoPII('info', 'cli_websocket_message_received', {
length: message.length,
})
if (this.onData) {
this.onData(message)
}
}
private onNodeError = (err: Error) => {
logForDebugging(`WebSocketTransport: Error: ${err.message}`, {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_websocket_connect_error')
// close event fires after error — let it call handleConnectionError
}
private onNodeClose = (code: number, _reason: Buffer) => {
const isClean = code === 1000 || code === 1001
logForDebugging(
`WebSocketTransport: Closed: ${code}`,
isClean ? undefined : { level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed')
this.handleConnectionError(code)
}
// --- Shared handlers ---
private onPong = () => {
this.pongReceived = true
}
private handleOpenEvent(): void {
const connectDuration = Date.now() - this.connectStartTime
logForDebugging('WebSocketTransport: Connected')
logForDiagnosticsNoPII('info', 'cli_websocket_connect_connected', {
duration_ms: connectDuration,
})
// Reconnect success — capture attempt count + downtime before resetting.
// reconnectStartTime is null on first connect, non-null on reopen.
if (this.isBridge && this.reconnectStartTime !== null) {
logEvent('tengu_ws_transport_reconnected', {
attempts: this.reconnectAttempts,
downtimeMs: Date.now() - this.reconnectStartTime,
})
}
this.reconnectAttempts = 0
this.reconnectStartTime = null
this.lastReconnectAttemptTime = null
this.lastActivityTime = Date.now()
this.state = 'connected'
this.onConnectCallback?.()
// Start periodic pings to detect dead connections
this.startPingInterval()
// Start periodic keep_alive data frames to reset proxy idle timers
this.startKeepaliveInterval()
// Register callback for session activity signals
registerSessionActivityCallback(() => {
void this.write({ type: 'keep_alive' })
})
}
protected sendLine(line: string): boolean {
if (!this.ws || this.state !== 'connected') {
logForDebugging('WebSocketTransport: Not connected')
logForDiagnosticsNoPII('info', 'cli_websocket_send_not_connected')
return false
}
try {
this.ws.send(line)
this.lastActivityTime = Date.now()
return true
} catch (error) {
logForDebugging(`WebSocketTransport: Failed to send: ${error}`, {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_websocket_send_error')
// Don't null this.ws here — let doDisconnect() (via handleConnectionError)
// handle cleanup so listeners are removed before the WS is released.
this.handleConnectionError()
return false
}
}
/**
* Remove all listeners attached in connect() for the given WebSocket.
* Without this, each reconnect orphans the old WS object + its closures
* until GC — these accumulate under network instability. Mirrors the
* pattern in src/utils/mcpWebSocketTransport.ts.
*/
private removeWsListeners(ws: WebSocketLike): void {
if (this.isBunWs) {
const nws = ws as unknown as globalThis.WebSocket
nws.removeEventListener('open', this.onBunOpen)
nws.removeEventListener('message', this.onBunMessage)
nws.removeEventListener('error', this.onBunError)
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
nws.removeEventListener('close', this.onBunClose)
// 'pong' is Bun-specific — not in DOM typings
nws.removeEventListener('pong' as 'message', this.onPong)
} else {
const nws = ws as unknown as WsWebSocket
nws.off('open', this.onNodeOpen)
nws.off('message', this.onNodeMessage)
nws.off('error', this.onNodeError)
nws.off('close', this.onNodeClose)
nws.off('pong', this.onPong)
}
}
protected doDisconnect(): void {
// Stop pinging and keepalive when disconnecting
this.stopPingInterval()
this.stopKeepaliveInterval()
// Unregister session activity callback
unregisterSessionActivityCallback()
if (this.ws) {
// Remove listeners BEFORE close() so the old WS + closures can be
// GC'd promptly instead of lingering until the next mark-and-sweep.
this.removeWsListeners(this.ws)
this.ws.close()
this.ws = null
}
}
private handleConnectionError(closeCode?: number): void {
logForDebugging(
`WebSocketTransport: Disconnected from ${this.url.href}` +
(closeCode != null ? ` (code ${closeCode})` : ''),
)
logForDiagnosticsNoPII('info', 'cli_websocket_disconnected')
if (this.isBridge) {
// Fire on every close — including intermediate ones during a reconnect
// storm (those never surface to the onCloseCallback consumer). For the
// Cloudflare-5min-idle hypothesis: cluster msSinceLastActivity; if the
// peak sits at ~300s with closeCode 1006, that's the proxy RST.
logEvent('tengu_ws_transport_closed', {
closeCode,
msSinceLastActivity:
this.lastActivityTime > 0 ? Date.now() - this.lastActivityTime : -1,
// 'connected' = healthy drop (the Cloudflare case); 'reconnecting' =
// connect-rejection mid-storm. State isn't mutated until the branches
// below, so this reads the pre-close value.
wasConnected: this.state === 'connected',
reconnectAttempts: this.reconnectAttempts,
})
}
this.doDisconnect()
if (this.state === 'closing' || this.state === 'closed') return
// Permanent codes: don't retry — server has definitively ended the session.
// Exception: 4003 (unauthorized) can be retried when refreshHeaders is
// available and returns a new token (e.g. after the parent process mints
// a fresh session ingress token during reconnection).
let headersRefreshed = false
if (closeCode === 4003 && this.refreshHeaders) {
const freshHeaders = this.refreshHeaders()
if (freshHeaders.Authorization !== this.headers.Authorization) {
Object.assign(this.headers, freshHeaders)
headersRefreshed = true
logForDebugging(
'WebSocketTransport: 4003 received but headers refreshed, scheduling reconnect',
)
logForDiagnosticsNoPII('info', 'cli_websocket_4003_token_refreshed')
}
}
if (
closeCode != null &&
PERMANENT_CLOSE_CODES.has(closeCode) &&
!headersRefreshed
) {
logForDebugging(
`WebSocketTransport: Permanent close code ${closeCode}, not reconnecting`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_permanent_close', {
closeCode,
})
this.state = 'closed'
this.onCloseCallback?.(closeCode)
return
}
// When autoReconnect is disabled, go straight to closed state.
// The caller (e.g. REPL bridge poll loop) handles recovery.
if (!this.autoReconnect) {
this.state = 'closed'
this.onCloseCallback?.(closeCode)
return
}
// Schedule reconnection with exponential backoff and time budget
const now = Date.now()
if (!this.reconnectStartTime) {
this.reconnectStartTime = now
}
// Detect system sleep/wake: if the gap since our last reconnection
// attempt greatly exceeds the max delay, the machine likely slept
// (e.g. laptop lid closed). Reset the budget and retry from scratch —
// the server will reject with permanent close codes (4001/1002) if
// the session was reaped while we were asleep.
if (
this.lastReconnectAttemptTime !== null &&
now - this.lastReconnectAttemptTime > SLEEP_DETECTION_THRESHOLD_MS
) {
logForDebugging(
`WebSocketTransport: Detected system sleep (${Math.round((now - this.lastReconnectAttemptTime) / 1000)}s gap), resetting reconnection budget`,
)
logForDiagnosticsNoPII('info', 'cli_websocket_sleep_detected', {
gapMs: now - this.lastReconnectAttemptTime,
})
this.reconnectStartTime = now
this.reconnectAttempts = 0
}
this.lastReconnectAttemptTime = now
const elapsed = now - this.reconnectStartTime
if (elapsed < DEFAULT_RECONNECT_GIVE_UP_MS) {
// Clear any existing reconnection timer to avoid duplicates
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
// Refresh headers before reconnecting (e.g. to pick up a new session token).
// Skip if already refreshed by the 4003 path above.
if (!headersRefreshed && this.refreshHeaders) {
const freshHeaders = this.refreshHeaders()
Object.assign(this.headers, freshHeaders)
logForDebugging('WebSocketTransport: Refreshed headers for reconnect')
}
this.state = 'reconnecting'
this.reconnectAttempts++
const baseDelay = Math.min(
DEFAULT_BASE_RECONNECT_DELAY * Math.pow(2, this.reconnectAttempts - 1),
DEFAULT_MAX_RECONNECT_DELAY,
)
// Add ±25% jitter to avoid thundering herd
const delay = Math.max(
0,
baseDelay + baseDelay * 0.25 * (2 * Math.random() - 1),
)
logForDebugging(
`WebSocketTransport: Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}, ${Math.round(elapsed / 1000)}s elapsed)`,
)
logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_attempt', {
reconnectAttempts: this.reconnectAttempts,
})
if (this.isBridge) {
logEvent('tengu_ws_transport_reconnecting', {
attempt: this.reconnectAttempts,
elapsedMs: elapsed,
delayMs: Math.round(delay),
})
}
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null
void this.connect()
}, delay)
} else {
logForDebugging(
`WebSocketTransport: Reconnection time budget exhausted after ${Math.round(elapsed / 1000)}s for ${this.url.href}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_exhausted', {
reconnectAttempts: this.reconnectAttempts,
elapsedMs: elapsed,
})
this.state = 'closed'
// Notify close callback
if (this.onCloseCallback) {
this.onCloseCallback(closeCode)
}
}
}
close(): void {
// Clear any pending reconnection timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
// Clear ping and keepalive intervals
this.stopPingInterval()
this.stopKeepaliveInterval()
// Unregister session activity callback
unregisterSessionActivityCallback()
this.state = 'closing'
this.doDisconnect()
}
private replayBufferedMessages(lastId: string): void {
const messages = this.messageBuffer.toArray()
if (messages.length === 0) return
// Find where to start replay based on server's last received message
let startIndex = 0
if (lastId) {
const lastConfirmedIndex = messages.findIndex(
message => 'uuid' in message && message.uuid === lastId,
)
if (lastConfirmedIndex >= 0) {
// Server confirmed messages up to lastConfirmedIndex — evict them
startIndex = lastConfirmedIndex + 1
// Rebuild the buffer with only unconfirmed messages
const remaining = messages.slice(startIndex)
this.messageBuffer.clear()
this.messageBuffer.addAll(remaining)
if (remaining.length === 0) {
this.lastSentId = null
}
logForDebugging(
`WebSocketTransport: Evicted ${startIndex} confirmed messages, ${remaining.length} remaining`,
)
logForDiagnosticsNoPII(
'info',
'cli_websocket_evicted_confirmed_messages',
{
evicted: startIndex,
remaining: remaining.length,
},
)
}
}
const messagesToReplay = messages.slice(startIndex)
if (messagesToReplay.length === 0) {
logForDebugging('WebSocketTransport: No new messages to replay')
logForDiagnosticsNoPII('info', 'cli_websocket_no_messages_to_replay')
return
}
logForDebugging(
`WebSocketTransport: Replaying ${messagesToReplay.length} buffered messages`,
)
logForDiagnosticsNoPII('info', 'cli_websocket_messages_to_replay', {
count: messagesToReplay.length,
})
for (const message of messagesToReplay) {
const line = jsonStringify(message) + '\n'
const success = this.sendLine(line)
if (!success) {
this.handleConnectionError()
break
}
}
// Do NOT clear the buffer after replay — messages remain buffered until
// the server confirms receipt on the next reconnection. This prevents
// message loss if the connection drops after replay but before the server
// processes the messages.
}
isConnectedStatus(): boolean {
return this.state === 'connected'
}
isClosedStatus(): boolean {
return this.state === 'closed'
}
setOnData(callback: (data: string) => void): void {
this.onData = callback
}
setOnConnect(callback: () => void): void {
this.onConnectCallback = callback
}
setOnClose(callback: (closeCode?: number) => void): void {
this.onCloseCallback = callback
}
getStateLabel(): string {
return this.state
}
async write(message: StdoutMessage): Promise<void> {
if ('uuid' in message && typeof message.uuid === 'string') {
this.messageBuffer.add(message)
this.lastSentId = message.uuid
}
const line = jsonStringify(message) + '\n'
if (this.state !== 'connected') {
// Message buffered for replay when connected (if it has a UUID)
return
}
const sessionLabel = this.sessionId ? ` session=${this.sessionId}` : ''
const detailLabel = this.getControlMessageDetailLabel(message)
logForDebugging(
`WebSocketTransport: Sending message type=${message.type}${sessionLabel}${detailLabel}`,
)
this.sendLine(line)
}
private getControlMessageDetailLabel(message: StdoutMessage): string {
if (message.type === 'control_request') {
const { request_id, request } = message
const toolName =
request.subtype === 'can_use_tool' ? request.tool_name : ''
return ` subtype=${request.subtype} request_id=${request_id}${toolName ? ` tool=${toolName}` : ''}`
}
if (message.type === 'control_response') {
const { subtype, request_id } = message.response
return ` subtype=${subtype} request_id=${request_id}`
}
return ''
}
private startPingInterval(): void {
// Clear any existing interval
this.stopPingInterval()
this.pongReceived = true
let lastTickTime = Date.now()
// Send ping periodically to detect dead connections.
// If the previous ping got no pong, treat the connection as dead.
this.pingInterval = setInterval(() => {
if (this.state === 'connected' && this.ws) {
const now = Date.now()
const gap = now - lastTickTime
lastTickTime = now
// Process-suspension detector. If the wall-clock gap between ticks
// greatly exceeds the 10s interval, the process was suspended
// (laptop lid, SIGSTOP, VM pause). setInterval does not queue
// missed ticks — it coalesces — so on wake this callback fires
// once with a huge gap. The socket is almost certainly dead:
// NAT mappings drop in 30s5min, and the server has been
// retransmitting into the void. Don't wait for a ping/pong
// round-trip to confirm (ws.ping() on a dead socket returns
// immediately with no error — bytes go into the kernel send
// buffer). Assume dead and reconnect now. A spurious reconnect
// after a short sleep is cheap — replayBufferedMessages() handles
// it and the server dedups by UUID.
if (gap > SLEEP_DETECTION_THRESHOLD_MS) {
logForDebugging(
`WebSocketTransport: ${Math.round(gap / 1000)}s tick gap detected — process was suspended, forcing reconnect`,
)
logForDiagnosticsNoPII(
'info',
'cli_websocket_sleep_detected_on_ping',
{ gapMs: gap },
)
this.handleConnectionError()
return
}
if (!this.pongReceived) {
logForDebugging(
'WebSocketTransport: No pong received, connection appears dead',
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_pong_timeout')
this.handleConnectionError()
return
}
this.pongReceived = false
try {
this.ws.ping?.()
} catch (error) {
logForDebugging(`WebSocketTransport: Ping failed: ${error}`, {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_websocket_ping_failed')
}
}
}, DEFAULT_PING_INTERVAL)
}
private stopPingInterval(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval)
this.pingInterval = null
}
}
private startKeepaliveInterval(): void {
this.stopKeepaliveInterval()
// In CCR sessions, session activity heartbeats handle keep-alives
if (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE)) {
return
}
this.keepAliveInterval = setInterval(() => {
if (this.state === 'connected' && this.ws) {
try {
this.ws.send(KEEP_ALIVE_FRAME)
this.lastActivityTime = Date.now()
logForDebugging(
'WebSocketTransport: Sent periodic keep_alive data frame',
)
} catch (error) {
logForDebugging(
`WebSocketTransport: Periodic keep_alive failed: ${error}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_websocket_keepalive_failed')
}
}
}, DEFAULT_KEEPALIVE_INTERVAL)
}
private stopKeepaliveInterval(): void {
if (this.keepAliveInterval) {
clearInterval(this.keepAliveInterval)
this.keepAliveInterval = null
}
}
}
+131
View File
@@ -0,0 +1,131 @@
import { sleep } from '../../utils/sleep.js'
/**
* Coalescing uploader for PUT /worker (session state + metadata).
*
* - 1 in-flight PUT + 1 pending patch
* - New calls coalesce into pending (never grows beyond 1 slot)
* - On success: send pending if exists
* - On failure: exponential backoff (clamped), retries indefinitely
* until success or close(). Absorbs any pending patches before each retry.
* - No backpressure needed — naturally bounded at 2 slots
*
* Coalescing rules:
* - Top-level keys (worker_status, external_metadata) — last value wins
* - Inside external_metadata / internal_metadata — RFC 7396 merge:
* keys are added/overwritten, null values preserved (server deletes)
*/
type WorkerStateUploaderConfig = {
send: (body: Record<string, unknown>) => Promise<boolean>
/** Base delay for exponential backoff (ms) */
baseDelayMs: number
/** Max delay cap (ms) */
maxDelayMs: number
/** Random jitter range added to retry delay (ms) */
jitterMs: number
}
export class WorkerStateUploader {
private inflight: Promise<void> | null = null
private pending: Record<string, unknown> | null = null
private closed = false
private readonly config: WorkerStateUploaderConfig
constructor(config: WorkerStateUploaderConfig) {
this.config = config
}
/**
* Enqueue a patch to PUT /worker. Coalesces with any existing pending
* patch. Fire-and-forget — callers don't need to await.
*/
enqueue(patch: Record<string, unknown>): void {
if (this.closed) return
this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
void this.drain()
}
close(): void {
this.closed = true
this.pending = null
}
private async drain(): Promise<void> {
if (this.inflight || this.closed) return
if (!this.pending) return
const payload = this.pending
this.pending = null
this.inflight = this.sendWithRetry(payload).then(() => {
this.inflight = null
if (this.pending && !this.closed) {
void this.drain()
}
})
}
/** Retries indefinitely with exponential backoff until success or close(). */
private async sendWithRetry(payload: Record<string, unknown>): Promise<void> {
let current = payload
let failures = 0
while (!this.closed) {
const ok = await this.config.send(current)
if (ok) return
failures++
await sleep(this.retryDelay(failures))
// Absorb any patches that arrived during the retry
if (this.pending && !this.closed) {
current = coalescePatches(current, this.pending)
this.pending = null
}
}
}
private retryDelay(failures: number): number {
const exponential = Math.min(
this.config.baseDelayMs * 2 ** (failures - 1),
this.config.maxDelayMs,
)
const jitter = Math.random() * this.config.jitterMs
return exponential + jitter
}
}
/**
* Coalesce two patches for PUT /worker.
*
* Top-level keys: overlay replaces base (last value wins).
* Metadata keys (external_metadata, internal_metadata): RFC 7396 merge
* one level deep — overlay keys are added/overwritten, null values
* preserved for server-side delete.
*/
function coalescePatches(
base: Record<string, unknown>,
overlay: Record<string, unknown>,
): Record<string, unknown> {
const merged = { ...base }
for (const [key, value] of Object.entries(overlay)) {
if (
(key === 'external_metadata' || key === 'internal_metadata') &&
merged[key] &&
typeof merged[key] === 'object' &&
typeof value === 'object' &&
value !== null
) {
// RFC 7396 merge — overlay keys win, nulls preserved for server
merged[key] = {
...(merged[key] as Record<string, unknown>),
...(value as Record<string, unknown>),
}
} else {
merged[key] = value
}
}
return merged
}
+998
View File
@@ -0,0 +1,998 @@
import { randomUUID } from 'crypto'
import type {
SDKPartialAssistantMessage,
StdoutMessage,
} from 'src/entrypoints/sdk/controlTypes.js'
import { decodeJwtExpiry } from '../../bridge/jwtUtils.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { errorMessage, getErrnoCode } from '../../utils/errors.js'
import { createAxiosInstance } from '../../utils/proxy.js'
import {
registerSessionActivityCallback,
unregisterSessionActivityCallback,
} from '../../utils/sessionActivity.js'
import {
getSessionIngressAuthHeaders,
getSessionIngressAuthToken,
} from '../../utils/sessionIngressAuth.js'
import type {
RequiresActionDetails,
SessionState,
} from '../../utils/sessionState.js'
import { sleep } from '../../utils/sleep.js'
import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
import {
RetryableError,
SerialBatchEventUploader,
} from './SerialBatchEventUploader.js'
import type { SSETransport, StreamClientEvent } from './SSETransport.js'
import { WorkerStateUploader } from './WorkerStateUploader.js'
/** Default interval between heartbeat events (20s; server TTL is 60s). */
const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000
/**
* stream_event messages accumulate in a delay buffer for up to this many ms
* before enqueue. Mirrors HybridTransport's batching window. text_delta
* events for the same content block accumulate into a single full-so-far
* snapshot per flush — each emitted event is self-contained so a client
* connecting mid-stream sees complete text, not a fragment.
*/
const STREAM_EVENT_FLUSH_INTERVAL_MS = 100
/** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
function alwaysValidStatus(): boolean {
return true
}
export type CCRInitFailReason =
| 'no_auth_headers'
| 'missing_epoch'
| 'worker_register_failed'
/** Thrown by initialize(); carries a typed reason for the diag classifier. */
export class CCRInitError extends Error {
constructor(readonly reason: CCRInitFailReason) {
super(`CCRClient init failed: ${reason}`)
}
}
/**
* Consecutive 401/403 with a VALID-LOOKING token before giving up. An
* expired JWT short-circuits this (exits immediately — deterministic,
* retry is futile). This threshold is for the uncertain case: token's
* exp is in the future but server says 401 (userauth down, KMS hiccup,
* clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
*/
const MAX_CONSECUTIVE_AUTH_FAILURES = 10
type EventPayload = {
uuid: string
type: string
[key: string]: unknown
}
type ClientEvent = {
payload: EventPayload
ephemeral?: boolean
}
/**
* Structural subset of a stream_event carrying a text_delta. Not a narrowing
* of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and
* narrowing through two levels defeats the discriminant.
*/
type CoalescedStreamEvent = {
type: 'stream_event'
uuid: string
session_id: string
parent_tool_use_id: string | null
event: {
type: 'content_block_delta'
index: number
delta: { type: 'text_delta'; text: string }
}
}
/**
* Accumulator state for text_delta coalescing. Keyed by API message ID so
* lifetime is tied to the assistant message — cleared when the complete
* SDKAssistantMessage arrives (writeEvent), which is reliable even when
* abort/error paths skip content_block_stop/message_stop delivery.
*/
export type StreamAccumulatorState = {
/** API message ID (msg_...) → blocks[blockIndex] → chunk array. */
byMessage: Map<string, string[][]>
/**
* {session_id}:{parent_tool_use_id} → active message ID.
* content_block_delta events don't carry the message ID (only
* message_start does), so we track which message is currently streaming
* for each scope. At most one message streams per scope at a time.
*/
scopeToMessage: Map<string, string>
}
export function createStreamAccumulator(): StreamAccumulatorState {
return { byMessage: new Map(), scopeToMessage: new Map() }
}
function scopeKey(m: {
session_id: string
parent_tool_use_id: string | null
}): string {
return `${m.session_id}:${m.parent_tool_use_id ?? ''}`
}
/**
* Accumulate text_delta stream_events into full-so-far snapshots per content
* block. Each flush emits ONE event per touched block containing the FULL
* accumulated text from the start of the block — a client connecting
* mid-stream receives a self-contained snapshot, not a fragment.
*
* Non-text-delta events pass through unchanged. message_start records the
* active message ID for the scope; content_block_delta appends chunks;
* the snapshot event reuses the first text_delta UUID seen for that block in
* this flush so server-side idempotency remains stable across retries.
*
* Cleanup happens in writeEvent when the complete assistant message arrives
* (reliable), not here on stop events (abort/error paths skip those).
*/
export function accumulateStreamEvents(
buffer: SDKPartialAssistantMessage[],
state: StreamAccumulatorState,
): EventPayload[] {
const out: EventPayload[] = []
// chunks[] → snapshot already in `out` this flush. Keyed by the chunks
// array reference (stable per {messageId, index}) so subsequent deltas
// rewrite the same entry instead of emitting one event per delta.
const touched = new Map<string[], CoalescedStreamEvent>()
for (const msg of buffer) {
switch (msg.event.type) {
case 'message_start': {
const id = msg.event.message.id
const prevId = state.scopeToMessage.get(scopeKey(msg))
if (prevId) state.byMessage.delete(prevId)
state.scopeToMessage.set(scopeKey(msg), id)
state.byMessage.set(id, [])
out.push(msg)
break
}
case 'content_block_delta': {
if (msg.event.delta.type !== 'text_delta') {
out.push(msg)
break
}
const messageId = state.scopeToMessage.get(scopeKey(msg))
const blocks = messageId ? state.byMessage.get(messageId) : undefined
if (!blocks) {
// Delta without a preceding message_start (reconnect mid-stream,
// or message_start was in a prior buffer that got dropped). Pass
// through raw — can't produce a full-so-far snapshot without the
// prior chunks anyway.
out.push(msg)
break
}
const chunks = (blocks[msg.event.index] ??= [])
chunks.push(msg.event.delta.text)
const existing = touched.get(chunks)
if (existing) {
existing.event.delta.text = chunks.join('')
break
}
const snapshot: CoalescedStreamEvent = {
type: 'stream_event',
uuid: msg.uuid,
session_id: msg.session_id,
parent_tool_use_id: msg.parent_tool_use_id,
event: {
type: 'content_block_delta',
index: msg.event.index,
delta: { type: 'text_delta', text: chunks.join('') },
},
}
touched.set(chunks, snapshot)
out.push(snapshot)
break
}
default:
out.push(msg)
}
}
return out
}
/**
* Clear accumulator entries for a completed assistant message. Called from
* writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream
* signal that fires even when abort/interrupt/error skip SSE stop events.
*/
export function clearStreamAccumulatorForMessage(
state: StreamAccumulatorState,
assistant: {
session_id: string
parent_tool_use_id: string | null
message: { id: string }
},
): void {
state.byMessage.delete(assistant.message.id)
const scope = scopeKey(assistant)
if (state.scopeToMessage.get(scope) === assistant.message.id) {
state.scopeToMessage.delete(scope)
}
}
type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number }
type WorkerEvent = {
payload: EventPayload
is_compaction?: boolean
agent_id?: string
}
export type InternalEvent = {
event_id: string
event_type: string
payload: Record<string, unknown>
event_metadata?: Record<string, unknown> | null
is_compaction: boolean
created_at: string
agent_id?: string
}
type ListInternalEventsResponse = {
data: InternalEvent[]
next_cursor?: string
}
type WorkerStateResponse = {
worker?: {
external_metadata?: Record<string, unknown>
}
}
/**
* Manages the worker lifecycle protocol with CCR v2:
* - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var
* - Runtime state reporting: PUT /sessions/{id}/worker
* - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection
*
* All writes go through this.request().
*/
export class CCRClient {
private workerEpoch = 0
private readonly heartbeatIntervalMs: number
private readonly heartbeatJitterFraction: number
private heartbeatTimer: NodeJS.Timeout | null = null
private heartbeatInFlight = false
private closed = false
private consecutiveAuthFailures = 0
private currentState: SessionState | null = null
private readonly sessionBaseUrl: string
private readonly sessionId: string
private readonly http = createAxiosInstance({ keepAlive: true })
// stream_event delay buffer — accumulates content deltas for up to
// STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
// and enables text_delta coalescing). Mirrors HybridTransport's pattern.
private streamEventBuffer: SDKPartialAssistantMessage[] = []
private streamEventTimer: ReturnType<typeof setTimeout> | null = null
// Full-so-far text accumulator. Persists across flushes so each emitted
// text_delta event carries the complete text from the start of the block —
// mid-stream reconnects see a self-contained snapshot. Keyed by API message
// ID; cleared in writeEvent when the complete assistant message arrives.
private streamTextAccumulator = createStreamAccumulator()
private readonly workerState: WorkerStateUploader
private readonly eventUploader: SerialBatchEventUploader<ClientEvent>
private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent>
private readonly deliveryUploader: SerialBatchEventUploader<{
eventId: string
status: 'received' | 'processing' | 'processed'
}>
/**
* Called when the server returns 409 (a newer worker epoch superseded ours).
* Default: process.exit(1) — correct for spawn-mode children where the
* parent bridge re-spawns. In-process callers (replBridge) MUST override
* this to close gracefully instead; exit would kill the user's REPL.
*/
private readonly onEpochMismatch: () => never
/**
* Auth header source. Defaults to the process-wide session-ingress token
* (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple
* concurrent sessions with distinct JWTs MUST inject this — the env-var
* path is a process global and would stomp across sessions.
*/
private readonly getAuthHeaders: () => Record<string, string>
constructor(
transport: SSETransport,
sessionUrl: URL,
opts?: {
onEpochMismatch?: () => never
heartbeatIntervalMs?: number
heartbeatJitterFraction?: number
/**
* Per-instance auth header source. Omit to read the process-wide
* CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL,
* daemon). Required for concurrent multi-session callers.
*/
getAuthHeaders?: () => Record<string, string>
},
) {
this.onEpochMismatch =
opts?.onEpochMismatch ??
(() => {
// eslint-disable-next-line custom-rules/no-process-exit
process.exit(1)
})
this.heartbeatIntervalMs =
opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS
this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0
this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders
// Session URL: https://host/v1/code/sessions/{id}
if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') {
throw new Error(
`CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`,
)
}
const pathname = sessionUrl.pathname.replace(/\/$/, '')
this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}`
// Extract session ID from the URL path (last segment)
this.sessionId = pathname.split('/').pop() || ''
this.workerState = new WorkerStateUploader({
send: body =>
this.request(
'put',
'/worker',
{ worker_epoch: this.workerEpoch, ...body },
'PUT worker',
).then(r => r.ok),
baseDelayMs: 500,
maxDelayMs: 30_000,
jitterMs: 500,
})
this.eventUploader = new SerialBatchEventUploader<ClientEvent>({
maxBatchSize: 100,
maxBatchBytes: 10 * 1024 * 1024,
// flushStreamEventBuffer() enqueues a full 100ms window of accumulated
// stream_events in one call. A burst of mixed delta types that don't
// fold into a single snapshot could exceed the old cap (50) and deadlock
// on the SerialBatchEventUploader backpressure check. Match
// HybridTransport's bound — high enough to be memory-only.
maxQueueSize: 100_000,
send: async batch => {
const result = await this.request(
'post',
'/worker/events',
{ worker_epoch: this.workerEpoch, events: batch },
'client events',
)
if (!result.ok) {
throw new RetryableError(
'client event POST failed',
result.retryAfterMs,
)
}
},
baseDelayMs: 500,
maxDelayMs: 30_000,
jitterMs: 500,
})
this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({
maxBatchSize: 100,
maxBatchBytes: 10 * 1024 * 1024,
maxQueueSize: 200,
send: async batch => {
const result = await this.request(
'post',
'/worker/internal-events',
{ worker_epoch: this.workerEpoch, events: batch },
'internal events',
)
if (!result.ok) {
throw new RetryableError(
'internal event POST failed',
result.retryAfterMs,
)
}
},
baseDelayMs: 500,
maxDelayMs: 30_000,
jitterMs: 500,
})
this.deliveryUploader = new SerialBatchEventUploader<{
eventId: string
status: 'received' | 'processing' | 'processed'
}>({
maxBatchSize: 64,
maxQueueSize: 64,
send: async batch => {
const result = await this.request(
'post',
'/worker/events/delivery',
{
worker_epoch: this.workerEpoch,
updates: batch.map(d => ({
event_id: d.eventId,
status: d.status,
})),
},
'delivery batch',
)
if (!result.ok) {
throw new RetryableError('delivery POST failed', result.retryAfterMs)
}
},
baseDelayMs: 500,
maxDelayMs: 30_000,
jitterMs: 500,
})
// Ack each received client_event so CCR can track delivery status.
// Wired here (not in initialize()) so the callback is registered the
// moment new CCRClient() returns — remoteIO must be free to call
// transport.connect() immediately after without racing the first
// SSE catch-up frame against an unwired onEventCallback.
transport.setOnEvent((event: StreamClientEvent) => {
this.reportDelivery(event.event_id, 'received')
})
}
/**
* Initialize the session worker:
* 1. Take worker_epoch from the argument, or fall back to
* CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner)
* 2. Report state as 'idle'
* 3. Start heartbeat timer
*
* In-process callers (replBridge) pass the epoch directly — they
* registered the worker themselves and there is no parent process
* setting env vars.
*/
async initialize(epoch?: number): Promise<Record<string, unknown> | null> {
const startMs = Date.now()
if (Object.keys(this.getAuthHeaders()).length === 0) {
throw new CCRInitError('no_auth_headers')
}
if (epoch === undefined) {
const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH
epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN
}
if (isNaN(epoch)) {
throw new CCRInitError('missing_epoch')
}
this.workerEpoch = epoch
// Concurrent with the init PUT — neither depends on the other.
const restoredPromise = this.getWorkerState()
const result = await this.request(
'put',
'/worker',
{
worker_status: 'idle',
worker_epoch: this.workerEpoch,
// Clear stale pending_action/task_summary left by a prior
// worker crash — the in-session clears don't survive process restart.
external_metadata: {
pending_action: null,
task_summary: null,
},
},
'PUT worker (init)',
)
if (!result.ok) {
// 409 → onEpochMismatch may throw, but request() catches it and returns
// false. Without this check we'd continue to startHeartbeat(), leaking a
// 20s timer against a dead epoch. Throw so connect()'s rejection handler
// fires instead of the success path.
throw new CCRInitError('worker_register_failed')
}
this.currentState = 'idle'
this.startHeartbeat()
// sessionActivity's refcount-gated timer fires while an API call or tool
// is in-flight; without a write the container lease can expire mid-wait.
// v1 wires this in WebSocketTransport per-connection.
registerSessionActivityCallback(() => {
void this.writeEvent({ type: 'keep_alive' })
})
logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`)
logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', {
epoch: this.workerEpoch,
duration_ms: Date.now() - startMs,
})
// Await the concurrent GET and log state_restored here, after the PUT
// has succeeded — logging inside getWorkerState() raced: if the GET
// resolved before the PUT failed, diagnostics showed both init_failed
// and state_restored for the same session.
const { metadata, durationMs } = await restoredPromise
if (!this.closed) {
logForDiagnosticsNoPII('info', 'cli_worker_state_restored', {
duration_ms: durationMs,
had_state: metadata !== null,
})
}
return metadata
}
// Control_requests are marked processed and not re-delivered on
// restart, so read back what the prior worker wrote.
private async getWorkerState(): Promise<{
metadata: Record<string, unknown> | null
durationMs: number
}> {
const startMs = Date.now()
const authHeaders = this.getAuthHeaders()
if (Object.keys(authHeaders).length === 0) {
return { metadata: null, durationMs: 0 }
}
const data = await this.getWithRetry<WorkerStateResponse>(
`${this.sessionBaseUrl}/worker`,
authHeaders,
'worker_state',
)
return {
metadata: data?.worker?.external_metadata ?? null,
durationMs: Date.now() - startMs,
}
}
/**
* Send an authenticated HTTP request to CCR. Handles auth headers,
* 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx.
* On 429, reads Retry-After (integer seconds) so the uploader can honor
* the server's backoff hint instead of blindly exponentiating.
*/
private async request(
method: 'post' | 'put',
path: string,
body: unknown,
label: string,
{ timeout = 10_000 }: { timeout?: number } = {},
): Promise<RequestResult> {
const authHeaders = this.getAuthHeaders()
if (Object.keys(authHeaders).length === 0) return { ok: false }
try {
const response = await this.http[method](
`${this.sessionBaseUrl}${path}`,
body,
{
headers: {
...authHeaders,
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': getClaudeCodeUserAgent(),
},
validateStatus: alwaysValidStatus,
timeout,
},
)
if (response.status >= 200 && response.status < 300) {
this.consecutiveAuthFailures = 0
return { ok: true }
}
if (response.status === 409) {
this.handleEpochMismatch()
}
if (response.status === 401 || response.status === 403) {
// A 401 with an expired JWT is deterministic — no retry will
// ever succeed. Check the token's own exp before burning
// wall-clock on the threshold loop.
const tok = getSessionIngressAuthToken()
const exp = tok ? decodeJwtExpiry(tok) : null
if (exp !== null && exp * 1000 < Date.now()) {
logForDebugging(
`CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh')
this.onEpochMismatch()
}
// Token looks valid but server says 401 — possible server-side
// blip (userauth down, KMS hiccup). Count toward threshold.
this.consecutiveAuthFailures++
if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) {
logForDebugging(
`CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted')
this.onEpochMismatch()
}
}
logForDebugging(`CCRClient: ${label} returned ${response.status}`, {
level: 'warn',
})
logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', {
method,
path,
status: response.status,
})
if (response.status === 429) {
const raw = response.headers?.['retry-after']
const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN
if (!isNaN(seconds) && seconds >= 0) {
return { ok: false, retryAfterMs: seconds * 1000 }
}
}
return { ok: false }
} catch (error) {
logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, {
level: 'warn',
})
logForDiagnosticsNoPII('warn', 'cli_worker_request_error', {
method,
path,
error_code: getErrnoCode(error),
})
return { ok: false }
}
}
/** Report worker state to CCR via PUT /sessions/{id}/worker. */
reportState(state: SessionState, details?: RequiresActionDetails): void {
if (state === this.currentState && !details) return
this.currentState = state
this.workerState.enqueue({
worker_status: state,
requires_action_details: details
? {
tool_name: details.tool_name,
action_description: details.action_description,
request_id: details.request_id,
}
: null,
})
}
/** Report external metadata to CCR via PUT /worker. */
reportMetadata(metadata: Record<string, unknown>): void {
this.workerState.enqueue({ external_metadata: metadata })
}
/**
* Handle epoch mismatch (409 Conflict). A newer CC instance has replaced
* this one — exit immediately.
*/
private handleEpochMismatch(): never {
logForDebugging('CCRClient: Epoch mismatch (409), shutting down', {
level: 'error',
})
logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch')
this.onEpochMismatch()
}
/** Start periodic heartbeat. */
private startHeartbeat(): void {
this.stopHeartbeat()
const schedule = (): void => {
const jitter =
this.heartbeatIntervalMs *
this.heartbeatJitterFraction *
(2 * Math.random() - 1)
this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter)
}
const tick = (): void => {
void this.sendHeartbeat()
// stopHeartbeat nulls the timer; check after the fire-and-forget send
// but before rescheduling so close() during sendHeartbeat is honored.
if (this.heartbeatTimer === null) return
schedule()
}
schedule()
}
/** Stop heartbeat timer. */
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
/** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */
private async sendHeartbeat(): Promise<void> {
if (this.heartbeatInFlight) return
this.heartbeatInFlight = true
try {
const result = await this.request(
'post',
'/worker/heartbeat',
{ session_id: this.sessionId, worker_epoch: this.workerEpoch },
'Heartbeat',
{ timeout: 5_000 },
)
if (result.ok) {
logForDebugging('CCRClient: Heartbeat sent')
}
} finally {
this.heartbeatInFlight = false
}
}
/**
* Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events.
* These events are visible to frontend clients via the SSE stream.
* Injects a UUID if missing to ensure server-side idempotency on retry.
*
* stream_event messages are held in a 100ms delay buffer and accumulated
* (text_deltas for the same content block emit a full-so-far snapshot per
* flush). A non-stream_event write flushes the buffer first so downstream
* ordering is preserved.
*/
async writeEvent(message: StdoutMessage): Promise<void> {
if (message.type === 'stream_event') {
this.streamEventBuffer.push(message)
if (!this.streamEventTimer) {
this.streamEventTimer = setTimeout(
() => void this.flushStreamEventBuffer(),
STREAM_EVENT_FLUSH_INTERVAL_MS,
)
}
return
}
await this.flushStreamEventBuffer()
if (message.type === 'assistant') {
clearStreamAccumulatorForMessage(this.streamTextAccumulator, message)
}
await this.eventUploader.enqueue(this.toClientEvent(message))
}
/** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */
private toClientEvent(message: StdoutMessage): ClientEvent {
const msg = message as unknown as Record<string, unknown>
return {
payload: {
...msg,
uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(),
} as EventPayload,
}
}
/**
* Drain the stream_event delay buffer: accumulate text_deltas into
* full-so-far snapshots, clear the timer, enqueue the resulting events.
* Called from the timer, from writeEvent on a non-stream message, and from
* flush(). close() drops the buffer — call flush() first if you need
* delivery.
*/
private async flushStreamEventBuffer(): Promise<void> {
if (this.streamEventTimer) {
clearTimeout(this.streamEventTimer)
this.streamEventTimer = null
}
if (this.streamEventBuffer.length === 0) return
const buffered = this.streamEventBuffer
this.streamEventBuffer = []
const payloads = accumulateStreamEvents(
buffered,
this.streamTextAccumulator,
)
await this.eventUploader.enqueue(
payloads.map(payload => ({ payload, ephemeral: true })),
)
}
/**
* Write an internal worker event via POST /sessions/{id}/worker/internal-events.
* These events are NOT visible to frontend clients — they store worker-internal
* state (transcript messages, compaction markers) needed for session resume.
*/
async writeInternalEvent(
eventType: string,
payload: Record<string, unknown>,
{
isCompaction = false,
agentId,
}: {
isCompaction?: boolean
agentId?: string
} = {},
): Promise<void> {
const event: WorkerEvent = {
payload: {
type: eventType,
...payload,
uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(),
} as EventPayload,
...(isCompaction && { is_compaction: true }),
...(agentId && { agent_id: agentId }),
}
await this.internalEventUploader.enqueue(event)
}
/**
* Flush pending internal events. Call between turns and on shutdown
* to ensure transcript entries are persisted.
*/
flushInternalEvents(): Promise<void> {
return this.internalEventUploader.flush()
}
/**
* Flush pending client events (writeEvent queue). Call before close()
* when the caller needs delivery confirmation — close() abandons the
* queue. Resolves once the uploader drains or rejects; returns
* regardless of whether individual POSTs succeeded (check server state
* separately if that matters).
*/
async flush(): Promise<void> {
await this.flushStreamEventBuffer()
return this.eventUploader.flush()
}
/**
* Read foreground agent internal events from
* GET /sessions/{id}/worker/internal-events.
* Returns transcript entries from the last compaction boundary, or null on failure.
* Used for session resume.
*/
async readInternalEvents(): Promise<InternalEvent[] | null> {
return this.paginatedGet('/worker/internal-events', {}, 'internal_events')
}
/**
* Read all subagent internal events from
* GET /sessions/{id}/worker/internal-events?subagents=true.
* Returns a merged stream across all non-foreground agents, each from its
* compaction point. Used for session resume.
*/
async readSubagentInternalEvents(): Promise<InternalEvent[] | null> {
return this.paginatedGet(
'/worker/internal-events',
{ subagents: 'true' },
'subagent_events',
)
}
/**
* Paginated GET with retry. Fetches all pages from a list endpoint,
* retrying each page on failure with exponential backoff + jitter.
*/
private async paginatedGet(
path: string,
params: Record<string, string>,
context: string,
): Promise<InternalEvent[] | null> {
const authHeaders = this.getAuthHeaders()
if (Object.keys(authHeaders).length === 0) return null
const allEvents: InternalEvent[] = []
let cursor: string | undefined
do {
const url = new URL(`${this.sessionBaseUrl}${path}`)
for (const [k, v] of Object.entries(params)) {
url.searchParams.set(k, v)
}
if (cursor) {
url.searchParams.set('cursor', cursor)
}
const page = await this.getWithRetry<ListInternalEventsResponse>(
url.toString(),
authHeaders,
context,
)
if (!page) return null
allEvents.push(...(page.data ?? []))
cursor = page.next_cursor
} while (cursor)
logForDebugging(
`CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`,
)
return allEvents
}
/**
* Single GET request with retry. Returns the parsed response body
* on success, null if all retries are exhausted.
*/
private async getWithRetry<T>(
url: string,
authHeaders: Record<string, string>,
context: string,
): Promise<T | null> {
for (let attempt = 1; attempt <= 10; attempt++) {
let response
try {
response = await this.http.get<T>(url, {
headers: {
...authHeaders,
'anthropic-version': '2023-06-01',
'User-Agent': getClaudeCodeUserAgent(),
},
validateStatus: alwaysValidStatus,
timeout: 30_000,
})
} catch (error) {
logForDebugging(
`CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`,
{ level: 'warn' },
)
if (attempt < 10) {
const delay =
Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
await sleep(delay)
}
continue
}
if (response.status >= 200 && response.status < 300) {
return response.data
}
if (response.status === 409) {
this.handleEpochMismatch()
}
logForDebugging(
`CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`,
{ level: 'warn' },
)
if (attempt < 10) {
const delay =
Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
await sleep(delay)
}
}
logForDebugging('CCRClient: GET retries exhausted', { level: 'error' })
logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', {
context,
})
return null
}
/**
* Report delivery status for a client-to-worker event.
* POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint)
*/
reportDelivery(
eventId: string,
status: 'received' | 'processing' | 'processed',
): void {
void this.deliveryUploader.enqueue({ eventId, status })
}
/** Get the current epoch (for external use). */
getWorkerEpoch(): number {
return this.workerEpoch
}
/** Internal-event queue depth — shutdown-snapshot backpressure signal. */
get internalEventsPending(): number {
return this.internalEventUploader.pendingCount
}
/** Clean up uploaders and timers. */
close(): void {
this.closed = true
this.stopHeartbeat()
unregisterSessionActivityCallback()
if (this.streamEventTimer) {
clearTimeout(this.streamEventTimer)
this.streamEventTimer = null
}
this.streamEventBuffer = []
this.streamTextAccumulator.byMessage.clear()
this.streamTextAccumulator.scopeToMessage.clear()
this.workerState.close()
this.eventUploader.close()
this.internalEventUploader.close()
this.deliveryUploader.close()
}
}
+45
View File
@@ -0,0 +1,45 @@
import { URL } from 'url'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { HybridTransport } from './HybridTransport.js'
import { SSETransport } from './SSETransport.js'
import type { Transport } from './Transport.js'
import { WebSocketTransport } from './WebSocketTransport.js'
/**
* Helper function to get the appropriate transport for a URL.
*
* Transport selection priority:
* 1. SSETransport (SSE reads + POST writes) when CLAUDE_CODE_USE_CCR_V2 is set
* 2. HybridTransport (WS reads + POST writes) when CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 is set
* 3. WebSocketTransport (WS reads + WS writes) — default
*/
export function getTransportForUrl(
url: URL,
headers: Record<string, string> = {},
sessionId?: string,
refreshHeaders?: () => Record<string, string>,
): Transport {
if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
// v2: SSE for reads, HTTP POST for writes
// --sdk-url is the session URL (.../sessions/{id});
// derive the SSE stream URL by appending /worker/events/stream
const sseUrl = new URL(url.href)
if (sseUrl.protocol === 'wss:') {
sseUrl.protocol = 'https:'
} else if (sseUrl.protocol === 'ws:') {
sseUrl.protocol = 'http:'
}
sseUrl.pathname =
sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream'
return new SSETransport(sseUrl, headers, sessionId, refreshHeaders)
}
if (url.protocol === 'ws:' || url.protocol === 'wss:') {
if (isEnvTruthy(process.env.CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2)) {
return new HybridTransport(url, headers, sessionId, refreshHeaders)
}
return new WebSocketTransport(url, headers, sessionId, refreshHeaders)
} else {
throw new Error(`Unsupported protocol: ${url.protocol}`)
}
}