Claude Code 核心源码深度解析
目录
1. 核心查询引擎 (query.ts) 深度解析
1.1 文件概览
1.2 query() 函数签名
export async function* query(
params: QueryParams,
): AsyncGenerator<
StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage,
Terminal // 返回值类型
>
设计精妙之处:使用 AsyncGenerator 实现「边迭代边产出」,调用方(REPL)可以在工具执行、API 流式响应等各个阶段实时拿到更新并渲染 UI,而不必等整个轮次结束。
1.3 核心状态机 (State 类型)
type State = {
messages: Message[] // 当前对话历史
toolUseContext: ToolUseContext // 工具执行上下文(可变)
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number // max_output_tokens 重试计数
hasAttemptedReactiveCompact: boolean // 是否已尝试响应式压缩
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<...> | undefined
stopHookActive: boolean | undefined
turnCount: number // 当前轮次计数
transition: Continue | undefined // 上一轮的延续原因
}
1.4 主循环伪代码流程
function* queryLoop(params, consumedCommandUuids):
1. 初始化 State(messages, toolUseContext 等)
2. 创建 budgetTracker(TOKEN_BUDGET 特性门控)
3. 构建 QueryConfig(一次性快照,避免每轮重复读取)
↓
LOOP(无限循环,直到 return 或 throw):
├── 4. 启动预取:
│ ├── startRelevantMemoryPrefetch() — 记忆文件预取
│ └── startSkillDiscoveryPrefetch() — 技能发现预取
│
├── 5. 上下文预处理(压缩三部曲):
│ ├── [a] snipCompactIfNeeded() — HISTORY_SNIP:裁剪旧消息
│ ├── [b] microcompact() — CACHED_MICROCOMPACT:缓存编辑压缩
│ └── [c] applyCollapsesIfNeeded() — CONTEXT_COLLAPSE:上下文折叠
│
├── 6. autoCompact() — 自动压缩(超 token 阈值时触发)
│
├── 7. 构建 API 请求参数:
│ ├── messagesForQuery = getMessagesAfterCompactBoundary(messages)
│ ├── applyToolResultBudget() — 限制单条消息的工具结果大小
│ ├── fullSystemPrompt = appendSystemContext(systemPrompt, systemContext)
│ └── 确定 currentModel(考虑 plan 模式 + 200k 阈值)
│
├── 8. 阻塞限制检查:
│ └── calculateTokenWarningState() — 超阈值则拒绝,要求手动 /compact
│
├── 9. 调用 API(callModel):
│ └── for await (const message of deps.callModel(...)):
│ ├── text_delta → 累积文本,yield 给 UI
│ ├── tool_use → 收集 toolUseBlocks,加入 streamingToolExecutor
│ ├── thinking → 累积思考过程
│ └── error → withholding(延迟到恢复路径确定后再 yield)
│
├── 10. 工具执行阶段:
│ └── yield* runTools(toolUseBlocks, assistantMessages, ...)
│ ├── 并发安全工具(read-only)→ 并发执行
│ └── 非并发安全工具(write)→ 串行执行
│
├── 11. 后处理:
│ ├── 执行 stop hooks
│ ├── 检查 needsFollowUp(是否有待处理工具调用)
│ └── 如果 needsFollowUp = false → break(轮次结束)
│
└── 12. yield tombstone 消息,清理,进入下一轮
1.5 错误恢复机制
query.ts 实现了多层错误恢复:
1.6 Task Budget 跟踪
// task_budget 跨压缩边界跟踪
let taskBudgetRemaining: number | undefined = undefined
// 每次压缩前捕获「压缩前的 context window」
// 压缩后,服务器只看到摘要,会少算 spend
// remaining 告诉服务器「被压缩掉的那部分 context 也占 budget」
if (params.taskBudget) {
const preCompactContext = finalContextTokensFromLastResponse(messagesForQuery)
taskBudgetRemaining = Math.max(
0,
(taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext,
)
}
1.7 设计亮点
AsyncGenerator双向流:API 流 → 工具执行 → UI 渲染,全部在一个生成器内完成,调用方通过for await驱动State对象集中管理可变状态:每次迭代解构state,continue站点统一写回state = { ... },避免散落各处的赋值withheld错误延迟机制:可恢复错误先不 yield,等恢复路径(collapse / reactive compact)完成再决定是 yield 错误还是 yield 恢复结果QueryConfig一次性快照:buildQueryConfig()在循环外调用一次,避免每轮重复读取 GrowthBook / env / session,减少不必要的重新渲染
2. 工具编排系统 (toolOrchestration.ts) 深度解析
2.1 文件概览
2.2 runTools() 函数签名
export async function* runTools(
toolUseMessages: ToolUseBlock[],
assistantMessages: AssistantMessage[],
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void>
2.3 并发安全分区算法 (partitionToolCalls)
这是整个工具系统的核心调度算法:
type Batch = {
isConcurrencySafe: boolean // true = 可并发(read-only)
blocks: ToolUseBlock[] // 本批的工具调用块
}
function partitionToolCalls(
toolUseMessages: ToolUseBlock[],
toolUseContext: ToolUseContext,
): Batch[] {
return toolUseMessages.reduce((acc: Batch[], toolUse) => {
const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
const isConcurrencySafe = parsedInput?.success
? (() => {
try {
return tool!.isConcurrencySafe(parsedInput.data, toolUseContext)
} catch {
return false // 保守:异常时假定不安全
}
})()
: false
const lastBatch = acc[acc.length - 1]
if (isConcurrencySafe) {
// 连续多个 read-only 工具合并到同一批
if (lastBatch?.isConcurrencySafe) {
lastBatch.blocks.push(toolUse)
} else {
acc.push({ isConcurrencySafe: true, blocks: [toolUse] })
}
} else {
// 每个非并发安全工具独占一批(保证串行)
acc.push({ isConcurrencySafe: false, blocks: [toolUse] })
}
return acc
}, [])
}
分区规则总结:
Read-only 工具(如
FileReadTool、GlobTool、GrepTool)→isConcurrencySafe = true→ 合并并发Write 工具(如
FileEditTool、FileWriteTool、BashTool)→isConcurrencySafe = false→ 每个独占一批,串行执行并发上限:
CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY(默认 10)
2.4 并发执行路径 (runToolsConcurrently)
async function* runToolsConcurrently(...) {
// 使用 Promise.all 并发执行所有工具
const results = await Promise.all(
blocks.map(block =>
runToolUse(block, assistantMessage, ...)
.next() // 取第一个 yield(最终结果)
)
)
// 收集 contextModifier(用于跨工具上下文传递)
for (const result of results) {
if (result.value.contextModifier) {
// 排队,等所有并发完成后统一应用
queuedContextModifiers[block.id].push(modifyContext)
}
}
// 统一应用所有 contextModifier
for (const block of blocks) {
for (const modifier of queuedContextModifiers[block.id]) {
currentContext = modifier(currentContext)
}
}
}
为什么 contextModifier 要排队后统一应用?
并发执行时,工具 A 和工具 B 同时修改 context,如果各自直接修改,会产生竞态条件。排队后在所有并发完成后按固定顺序应用,保证确定性。
2.5 串行执行路径 (runToolsSerially)
async function* runToolsSerially(...) {
for (const block of blocks) {
// 逐个执行,每次 yield 中间状态
for await (const update of runToolUse(block, ...)) {
if (update.newContext) {
currentContext = update.newContext // 串行:立即应用上下文变更
}
yield update
}
}
}
2.6 设计亮点
分区算法保证正确性:read-only 并发,write 串行,无需分布式锁
contextModifier排队机制:并发工具的上下文修改延迟到所有工具完成后统一应用,避免竞态MessageUpdateLazy惰性更新:工具执行产生的中间状态通过生成器流式 yield,UI 可以实时渲染进度
3. 工具执行管道 (toolExecution.ts) 深度解析
3.1 文件概览
3.2 runToolUse() 完整生命周期
runToolUse(toolUse, assistantMessage, canUseTool, toolUseContext)
│
├── 1. 查找工具
│ ├── findToolByName(tools, toolName) → 在可用工具列表中查找
│ └── 未找到?→ 检查别名(aliases)→ 仍找不到 → yield 错误信息
│
├── 2. 检查取消
│ └── abortController.signal.aborted?→ yield CANCEL_MESSAGE,返回
│
├── 3. 权限检查 + 执行(streamedCheckPermissionsAndCallTool)
│ ├── 3a. pre-tool-use hooks(runPreToolUseHooks)
│ ├── 3b. canUseTool() → 权限决策
│ │ ├── allow → 继续执行
│ │ ├── deny → yield 错误结果,触发 denial tracking
│ │ └── ask → 暂停,等待用户批准(通过 Stream 桥接)
│ ├── 3c. 启动 OTel tracing span(isBetaTracingEnabled)
│ ├── 3d. 执行工具:tool.call(input, context, canUseTool, assistantMessage)
│ ├── 3e. post-tool-use hooks(runPostToolUseHooks)
│ └── 3f. 后处理 hooks(runPostToolUseFailureHooks / runPostToolUseSuccessHooks)
│
├── 4. 处理工具结果
│ ├── 成功:yield { message: UserMessage (tool_result), newContext }
│ └── 失败:yield { message: UserMessage (is_error: true), newContext }
│
└── 5. 返回 (done)
3.3 权限检查详细流程 (checkPermissionsAndCallTool 内部)
async function checkPermissionsAndCallTool(...) {
// 1. 解析工具输入(Zod schema 校验)
const parseResult = tool.inputSchema.safeParse(toolInput)
if (!parseResult.success) {
// 输入校验失败 → 返回 Zod 错误信息
return yield { message: createUserMessage({ content: formatZodValidationError(...) }) }
}
// 2. 检查工具是否被禁用(isEnabled)
if (!tool.isEnabled?.(toolInput, toolUseContext.options)) {
return yield { message: createUserMessage({ content: 'Tool is disabled' }) }
}
// 3. 权限决策(核心)
const permissionResult: PermissionResult = await canUseTool(
tool.name,
parsedInput.data,
toolUseContext,
assistantMessage,
)
// 4. 根据决策执行
if (permissionResult.behavior === 'allow') {
// 执行工具
const toolResult = await tool.call(parsedInput.data, toolUseContext, canUseTool, assistantMessage)
// 处理 toolResult(可能是 ToolResult 或 AsyncGenerator)
...
} else if (permissionResult.behavior === 'deny') {
// 记录 denial tracking(触发多次拒绝后的自动建议)
recordDenial(tool.name, toolUseContext)
// 执行 denial hooks
await executePermissionDeniedHooks(...)
// yield 拒绝消息
yield { message: createUserMessage({ content: [{ type: 'tool_result', is_error: true, ... }] }) }
} else {
// 'ask' — 暂停,等待用户响应
// 通过 Stream 桥接实现「暂停-恢复」
const decision = await resolveHookPermissionDecision(...)
// 递归调用自身处理用户响应
return yield* checkPermissionsAndCallTool(...)
}
}
3.4 MCP 工具特殊处理
function findMcpServerConnection(
toolName: string,
mcpClients: MCPServerConnection[],
): MCPServerConnection | undefined {
if (!toolName.startsWith('mcp__')) return undefined
const mcpInfo = mcpInfoFromString(toolName)
// 归一化比较:工具名可能是归一化后的(下划线),client.name 可能是原始名(空格)
return mcpClients.find(
client => normalizeNameForMCP(client.name) === mcpInfo.serverName
)
}
MCP 工具的 call() 实现在 MCPTool 类里,它会:
通过
MCPServerConnection找到对应的 MCP 客户端序列化工具参数为 JSON-RPC 请求
通过
StdioClientTransport/StreamableHTTPClientTransport发送到 MCP 服务器将 MCP 服务器的 JSON-RPC 响应反序列化为
ToolResult
3.5 工具执行结果处理 (processToolResultBlock)
function processToolResultBlock(
toolResult: ToolResult,
toolUseID: string,
tool: Tool,
): UserMessage {
// 1. 提取 data(工具返回的主要结果)
// 2. 检查是否有 contextModifier(需要修改 ToolUseContext)
// 3. 检查是否有附加消息(newMessages)
// 4. 组装标准的 tool_result content block
return createUserMessage({
content: [{
type: 'tool_result',
tool_use_id: toolUseID,
content: toolResult.data, // 可能是 string | object | binary
is_error: toolResult.isError ?? false,
}],
toolUseResult: toolResult.data,
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
3.6 错误分类遥测 (classifyToolError)
function classifyToolError(error: unknown): string {
// 1. TelemetrySafeError → 使用其 telemetryMessage(已脱敏)
if (error instanceof TelemetrySafeError) {
return error.telemetryMessage.slice(0, 200)
}
// 2. Node.js fs 错误 → 返回错误码(ENOENT, EACCES 等,安全且有用)
const errnoCode = getErrnoCode(error)
if (typeof errnoCode === 'string') return `Error:${errnoCode}`
// 3. 已知错误类型(ShellError, ImageSizeError 等)→ 返回 name(minification 安全)
if (error.name && error.name !== 'Error' && error.name.length > 3) {
return error.name.slice(0, 60)
}
// 4. 兜底 → 'Error'(比 minified 后的 'nJT' 更有意义)
return 'Error'
}
3.7 设计亮点
Stream实现「暂停-等待用户响应」:权限询问时,函数暂停在await resolveHookPermissionDecision(),用户响应后通过 Stream.enqueue 恢复执行OTel tracing 集成:每个工具调用都有完整的 span 生命周期(
startToolSpan→endToolSpan),支持分布式追踪classifyToolError遥测安全:minified 生产环境中error.constructor.name是乱码,这个函数提供了结构化的、安全的错误分类Hook 生命周期完整:pre-tool-use → 执行 → post-tool-use → success/failure hooks,完整覆盖
4. API 通信层 (claude.ts) 深度解析
4.1 文件概览
4.2 callModel() 调用链
deps.callModel(options)
│
├── callModel 内部调用 setupCallModel(deps)()
│
└── setupCallModel(deps)(options)
│
├── 1. 构建请求参数(BetaMessageStreamParams)
│ ├── messages: normalizeMessagesForAPI(messages)
│ ├── system: fullSystemPrompt(可能包含缓存断点)
│ ├── tools: toolToAPISchema(tools)(转换为 Anthropic API schema)
│ ├── model: currentModel
│ ├── thinking: thinkingConfig
│ ├── betas: 合并所有 beta headers
│ └── extra_body: getExtraBodyParams(betaHeaders)
│
├── 2. 注入自定义 headers
│ ├── anthropic-beta → beta headers
│ ├── X-API-Key → API Key
│ ├── Claude-Code-* → 自定义业务 headers
│ └──(client_request_id) → 请求追踪 ID
│
├── 3. 调用 Anthropic SDK
│ └── anthropic.beta.messages.stream(params, options)
│ └── 返回 AsyncStream<BetaRawMessageStreamEvent>
│
├── 4. 流式事件处理(for await)
│ ├── message_start → 初始化 assistantMessage
│ ├── content_block_start → 开始新的 content block(text / tool_use / thinking)
│ ├── content_block_delta → 增量更新(text_delta / input_json_delta)
│ ├── content_block_stop → 结束当前 block
│ ├── message_delta → 更新 usage(input_tokens, output_tokens)
│ └── message_stop → 流结束
│
├── 5. 错误处理 + 重试
│ ├── withRetry() 包裹整个调用
│ ├── FallbackTriggeredError → 切换模型重试
│ ├── 529 (Overloaded) → 指数退避重试
│ └── 其他 APIError → 有限次数重试
│
└── 6. 返回最终结果
└── yield assistantMessage(完整组装好的 AssistantMessage)
4.3 Beta Headers 管理
claude.ts 中处理了大量的 beta 功能开关,通过 headers 传递给 API:
// 从 getMergedBetas() 收集所有需要的 beta headers
const betaHeaders = [
// 1. 基础 betas(从工具、MCP 等收集)
...toolBetas,
...mcpBetas,
// 2. 模型特定 betas
...getModelBetas(model),
// 3. 功能特定 betas
isFastModeEnabled && FAST_MODE_BETA_HEADER,
isAssistantMode && AFK_MODE_BETA_HEADER,
isContextManagementEnabled && CONTEXT_MANAGEMENT_BETA_HEADER,
// ...
].filter(Boolean)
// 合并到 extra_body
const extraBody = getExtraBodyParams(betaHeaders)
extraBody.anthropic_beta = betaHeaders // 传递给 API
4.4 Prompt Caching 实现
export function getPromptCachingEnabled(model: string): boolean {
// 1. 全局禁用检查
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
// 2. 按模型禁用检查(Haiku / Sonnet / Opus 可分别禁用)
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) { ... }
// 3. 检查模型是否支持 caching
return modelSupportsCaching(model)
}
// 在系统提示中插入缓存断点:
// Anthropic API 使用 cache_control 字段标记缓存断点
// claude.ts 通过 splitSysPromptPrefix 在系统提示的合适位置插入断点
const { prefix, rest } = splitSysPromptPrefix(fullSystemPrompt)
// prefix 部分:cache_control = { type: 'ephemeral' }
// rest 部分:不缓存
4.5 withRetry() 重试机制
// services/api/withRetry.ts
export async function withRetry<T>(
fn: () => Promise<T>,
shouldRetry: (error: unknown, context: RetryContext) => boolean,
maxRetries: number = 3,
): Promise<T> {
let attempt = 0
while (true) {
try {
return await fn()
} catch (error) {
attempt++
if (attempt > maxRetries || !shouldRetry(error, { attempt, error })) {
throw error
}
// 指数退避
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 30000)
await sleep(delay)
}
}
}
可重试错误:
529 Overloaded:Anthropic 服务器过载APIConnectionTimeoutError:网络超时FallbackTriggeredError:模型不可用,切换后重试
4.6 dumpPromptsFetch — 提示词调试
// 创建 fetch wrapper 来捕获完整的 API 请求体(用于调试)
const dumpPromptsFetch = config.gates.isAnt
? createDumpPromptsFetch(toolUseContext.agentId ?? config.sessionId)
: undefined
// createDumpPromptsFetch 返回一个自定义的 fetch 函数
// 它拦截所有发往 api.anthropic.com 的请求
// 将 request body(完整 prompt)写入调试文件
// 每个 query() 调用创建一个闭包,只捕获最新请求(避免内存泄漏)
4.7 设计亮点
BetaMessageStreamParams类型安全:完整映射 Anthropic Beta API 的所有参数,编译时检查withRetry策略可插拔:shouldRetry函数作为参数传入,不同调用方可以定制重试策略Prompt Caching 透明化:上层代码无需关心缓存断点插入逻辑,
getPromptCachingEnabled()+splitSysPromptPrefix()自动处理dumpPromptsFetch内存安全:每个query()调用创建一个新闭包,只保留最新请求体(~700KB),而不是所有历史(可能 500MB+)
5. 权限系统 (permissions.ts) 深度解析
5.1 文件概览
5.2 权限决策流程
canUseTool(toolName, input, context, assistantMessage)
│
├── 1. 规则匹配(最快路径,无 I/O)
│ ├── 检查 alwaysDenyRules → 命中 → return { behavior: 'deny', reason: { type: 'rule', rule } }
│ ├── 检查 alwaysAllowRules → 命中 → return { behavior: 'allow', reason: { type: 'rule', rule } }
│ └── 未命中 → 继续
│
├── 2. Hook 检查(可异步,可阻断)
│ ├── runPreToolUseHooks(toolName, input)
│ │ └── 任何 hook 返回 deny → return { behavior: 'deny', reason: { type: 'hook', hookName } }
│ └── Hook 可修改工具输入(通过 contextModifier)
│
├── 3. 分类器检查(BASH_CLASSIFIER 特性)
│ ├── classifierDecisionModule?.classify(input) → 返回分类结果
│ ├── 高风险命令 → return { behavior: 'ask', reason: { type: 'classifier', ... } }
│ └── 低风险 → 继续
│
├── 4. 沙箱检查(Sandbox 模式)
│ ├── shouldUseSandbox() → true → 在沙箱中执行,无需用户批准
│ └── 沙箱外的操作 → 继续
│
├── 5. 模式检查(最后一道门)
│ ├── mode = 'default' → return { behavior: 'ask' }(询问用户)
│ ├── mode = 'acceptEdits' → allow 编辑类工具,ask 其他
│ ├── mode = 'bypass' → return { behavior: 'allow' }(全部允许)
│ └── mode = 'plan' → 只允许查看类工具
│
└── 6. 返回最终决策
return { behavior: 'allow' | 'deny' | 'ask', reason?: PermissionDecisionReason }
5.3 权限规则系统
type PermissionRuleSource =
| 'cliArg' // 命令行参数(最高优先级)
| 'session' // 会话内临时规则
| 'localSettings' // .claude/settings.json(项目级)
| 'userSettings' // ~/.claude/settings.json(用户级)
| 'policySettings' // MDM / 策略配置
| 'command' // 通过 /allow 命令添加
type PermissionRule = {
source: PermissionRuleSource
ruleBehavior: 'allow' | 'deny' | 'ask'
ruleValue: PermissionRuleValue // 解析后的规则(工具名 + 可选的内容匹配器)
}
// 规则匹配示例:
// "Bash" → 匹配整个 Bash 工具
// "Bash(prefix:git)" → 匹配 Bash 工具中以 git 开头的命令
// "mcp__server1" → 匹配来自 server1 的所有 MCP 工具
// "mcp__server1__*" → 通配符,匹配 server1 的所有工具
5.4 toolMatchesRule() — 规则匹配算法
function toolMatchesRule(
tool: Pick<Tool, 'name' | 'mcpInfo'>,
rule: PermissionRule,
): boolean {
// 1. 规则没有 content 匹配器 → 匹配整个工具
if (rule.ruleValue.ruleContent === undefined) {
// 直接比较工具名
return rule.ruleValue.toolName === getToolNameForPermissionCheck(tool)
}
// 2. 规则有 content 匹配器 → 需要检查工具输入
// 例如 "Bash(prefix:rm)" 只匹配 rm 命令,不匹配 ls
if (tool.name === 'Bash') {
const bashInput = toolInput as BashToolInput
return matchesPrefix(bashInput.command, rule.ruleValue.ruleContent)
}
// ... 其他工具的 content 匹配逻辑
}
5.5 Denial Tracking — 智能建议系统
// utils/permissions/denialTracking.ts
const DENIAL_LIMITS = [1, 3, 5] // 拒绝次数阈值
function recordDenial(toolName: string, context: ToolUseContext): void {
const state = getDenialTrackingState(context)
state.denials[toolName] = (state.denials[toolName] || 0) + 1
// 达到阈值?→ 建议用户添加规则
if (DENIAL_LIMITS.includes(state.denials[toolName])) {
showSuggestion(`Allow ${toolName} for this session? [y/n]`)
}
}
用户体验优化:连续拒绝同一工具 3 次后,Claude Code 会主动询问「是否要永久允许这个工具?」,减少重复确认。
5.6 OTel 遥测集成
// decisionReasonToOTelSource() — 将权限决策原因映射到 OTel 标准词汇
function decisionReasonToOTelSource(
reason: PermissionDecisionReason | undefined,
behavior: 'allow' | 'deny',
): string {
switch (reason?.type) {
case 'rule':
return reason.rule.source === 'session'
? (behavior === 'allow' ? 'user_temporary' : 'user_reject')
: (behavior === 'allow' ? 'user_permanent' : 'user_reject')
case 'hook':
return 'hook'
case 'mode':
return 'config'
// ...
}
}
5.7 设计亮点
三层决策(规则 → Hook → 模式):规则最快(纯内存查找),Hook 灵活(可调用外部脚本),模式兜底(用户体验保证)
source精细化控制:规则来源影响持久化策略(session = 临时,localSettings = 永久)Denial Tracking 主动建议:减少用户重复操作,提升体验
OTel 标准遥测:权限决策全程可追踪,支持
user_temporary/user_permanent/user_reject/hook/config等标准词汇
6. 上下文压缩系统 (compact.ts) 深度解析
6.1 文件概览
6.2 压缩触发条件
export function isAutoCompactEnabled(): boolean {
// 1. 检查特性开关
if (!feature('AUTO_COMPACT')) return false
// 2. 检查用户设置
const settings = getSettings_DEPRECATED()
if (settings.disableAutoCompact) return false
return true
}
function shouldCompact(
messages: Message[],
model: string,
autoCompactTracking?: AutoCompactTrackingState,
): boolean {
const tokenCount = tokenCountWithEstimation(messages)
const threshold = getAutoCompactThreshold(model) // 通常是模型限制的 80%
return tokenCount > threshold
}
6.3 压缩流程
autocompact(messages, toolUseContext, options, querySource, tracking, snipTokensFreed)
│
├── 1. 检查是否应该压缩
│ └── shouldCompact() → false → return null(不压缩)
│
├── 2. 执行 pre-compact hooks
│ └── executePreCompactHooks()
│
├── 3. 构建压缩提示词
│ ├── 选取要压缩的消息(通常是最早的 N 条)
│ ├── 保留最近的消息(最近 10 条或更少)
│ └── 组装成压缩请求(发给 API,让模型生成摘要)
│
├── 4. 调用 API 生成压缩摘要
│ └── API 返回一个简洁的摘要(替代被压缩的消息)
│
├── 5. 构建压缩后的消息列表
│ ├── SystemCompactBoundaryMessage(标记压缩边界)
│ ├── 压缩摘要消息(AssistantMessage)
│ └── 未压缩的近期消息
│
├── 6. 执行 post-compact hooks
│ └── executePostCompactHooks(compactionResult)
│
├── 7. 返回压缩结果
│ └── return {
│ preCompactTokenCount,
│ postCompactTokenCount,
│ summaryMessages,
│ compactionUsage, // API 调用的 token 使用
│ }
│
└── 8. 如果压缩失败(consecutiveFailures > 0)
└── 触发熔断:暂时禁用自动压缩,避免无限重试
6.4 三种压缩策略对比
执行顺序(在 query.ts 主循环中):
snipCompact → microCompact → contextCollapse → autoCompact
6.5 压缩边界消息
// 压缩后在消息列表中插入边界标记
function createCompactBoundaryMessage(
preCount: number,
postCount: number,
toolUseSummary?: ToolUseSummaryMessage,
): SystemCompactBoundaryMessage {
return {
type: 'system',
systemMessageType: 'compact_boundary',
content: `Context compressed: ${preCount} → ${postCount} messages`,
...
}
}
6.6 设计亮点
三层压缩策略:Snip(快)→ Micro(中)→ Auto(质量高),逐级尝试
压缩熔断机制:
consecutiveFailures跟踪连续失败次数,超过阈值后暂时禁用,避免无限重试循环Hook 生命周期:pre-compact → 压缩 → post-compact,完整可扩展
Task Budget 感知:压缩后正确更新
taskBudgetRemaining,避免预算计算错误
7. 会话持久化 (sessionStorage.ts) 深度解析
7.1 文件概览
7.2 存储格式
每条消息写入 ~/.claude/projects/<cwd-hash>/sessions/<session-id>.jsonl,每行一个 JSON 对象(JSONL 格式,便于追加和流式读取):
{"type":"user","uuid":"...","message":{"role":"user","content":"Hello"}}
{"type":"assistant","uuid":"...","message":{"role":"assistant","content":[{"type":"text","text":"Hi!"}]}}
{"type":"user","uuid":"...","message":{"role":"user","content":[{"type":"tool_result",...}]}}
7.3 appendTranscript() — 追加写入
export function appendTranscript(
sessionId: SessionId,
messages: TranscriptMessage[], // 只追加 TranscriptMessage(不含 ProgressMessage)
options?: LogOption,
): void {
const filePath = getTranscriptPath(sessionId)
// 使用 fs.createWriteStream 追加,而不是读入内存再重写
const stream = fs.createWriteStream(filePath, { flags: 'a' })
for (const msg of messages) {
stream.write(JSON.stringify(msg) + '\n')
}
stream.end()
}
为什么用 JSONL 而不是 JSON 数组?
会话可能非常大(GB 级),JSON 数组需要全部读入内存才能解析。JSONL 可以流式读取,且追加时只需写文件末尾,不需要重写整个文件。
7.4 loadTranscript() — 流式加载
export async function* loadTranscript(
sessionId: SessionId,
): AsyncGenerator<TranscriptMessage, void> {
const filePath = getTranscriptPath(sessionId)
// 使用 readline 逐行读取(内存友好)
const fileStream = createReadStream(filePath)
const rl = createInterface({ input: fileStream })
for await (const line of rl) {
const entry = JSON.parse(line)
if (isTranscriptMessage(entry)) {
yield entry // 只 yield TranscriptMessage,跳过 ProgressMessage
}
}
}
7.5 resumeSession() — 会话恢复
resumeSession(sessionId)
│
├── 1. 加载完整会话文件
│ └── loadTranscript(sessionId) → Message[]
│
├── 2. 截断到最后一个 compact_boundary
│ └── 只保留最近一个压缩边界后的消息(避免 context 过大)
│
├── 3. 重建 ToolUseContext
│ ├── 恢复 messages
│ ├── 恢复 abortController(新的 AbortController)
│ └── 恢复其他上下文状态
│
└── 4. 返回恢复后的状态
return { messages, toolUseContext, ... }
7.6 Tombstone 机制
// 当 API 调用中途失败(如 fallback 切换模型)时,
// 已产生的部分 AssistantMessage 属于「孤儿消息」,
// 不能参与后续 API 调用(tool_use_id 不匹配)
// 解决:写入 tombstone 标记,加载时跳过这些消息
function createTombstoneMessage(message: AssistantMessage): TombstoneMessage {
return {
type: 'tombstone',
tombstoneOf: message.uuid,
// 不包含实际内容,只标记「此消息已废弃」
}
}
7.7 设计亮点
JSONL 格式:追加友好、流式读取、内存占用低
Tombstone 机制:处理「API 调用中途失败」的边缘情况,保证会话文件一致性
isTranscriptMessage()类型守卫:明确区分TranscriptMessage和ProgressMessage,避免进度消息污染对话链大文件处理:
readHeadAndTail()只读取文件头和尾(用于预览),不需要加载整个文件
8. 应用状态管理 (AppStateStore.ts) 深度解析
8.1 文件概览
8.2 AppState 类型(精简版)
export type AppState = DeepImmutable<{
// 基础设置
settings: SettingsJson
verbose: boolean
mainLoopModel: ModelSetting
statusLineText: string | undefined
// 权限
toolPermissionContext: ToolPermissionContext
// MCP
mcp: {
clients: MCPServerConnection[]
tools: Tool[]
commands: Command[]
resources: Record<string, ServerResource[]>
pluginReconnectKey: number
}
// 任务(Agent / Teammate / Workflow)
tasks: { [taskId: string]: TaskState }
// Kairos(持久助手)
kairosEnabled: boolean
remoteSessionUrl: string | undefined
remoteBackgroundTaskCount: number
// Bridge(远程控制)
replBridgeEnabled: boolean
replBridgeConnected: boolean
replBridgeSessionActive: boolean
// ...
// 同伴(BUDDY 宠物)
companionReaction?: string
companionPetAt?: number
}> & {
// 非 DeepImmutable 部分(包含函数类型)
plugins: { ... }
}
8.3 状态更新机制
class AppStateStore {
private state: AppState
private listeners: Set<() => void> = new Set()
// React 组件通过此函数订阅状态更新
subscribe(listener: () => void): () => void {
this.listeners.add(listener)
return () => this.listeners.delete(listener) // 返回取消订阅函数
}
// 更新状态(不变性保证)
setState(update: (state: AppState) => AppState): void {
const oldState = this.state
const newState = update(oldState)
this.state = newState
// 通知所有监听器(触发 React 重新渲染)
for (const listener of this.listeners) {
listener()
}
}
// 在 ToolUseContext 中使用(工具执行时读取/修改状态)
getAppState(): AppState {
return this.state
}
setAppState(update: (state: AppState) => AppState): void {
this.setState(update)
}
}
8.4 ToolUseContext 中的状态传递
type ToolUseContext = {
// ...
getAppState: () => AppState // 读取当前状态
setAppState: (update: (state: AppState) => AppState) => void // 修改状态
// ...
}
设计意图:工具执行时可能需要修改应用状态(例如 TodoWriteTool 更新 todo 列表,AgentTool 更新 tasks)。通过 ToolUseContext 传递 getAppState / setAppState,工具可以安全地修改状态,而不需要直接依赖 AppStateStore 单例。
8.5 React 集成(推测,基于 Ink 模式)
// REPL.tsx(主循环组件)
function REPL() {
const [appState, setAppState] = useState(AppStateStore.getAppState())
useEffect(() => {
// 订阅状态更新
return AppStateStore.subscribe(() => {
setAppState(AppStateStore.getAppState())
})
}, [])
return (
<AppStateContext.Provider value={appState}>
{/* 渲染 UI */}
</AppStateContext.Provider>
)
}
8.6 设计亮点
DeepImmutable<AppState>:编译时强制不可变性,防止意外修改爱挑剔更新模式:
setState(update)接收函数而非值,避免竞态条件ToolUseContext解耦:工具不需要知道AppStateStore的存在,通过 context 传递状态操作函数MCP 状态集中管理:
state.mcp统一维护所有 MCP 连接、工具、资源,避免散落各处
9. 关键时序图汇总
9.1 完整工具执行时序图
9.2 权限检查详细时序图
9.3 会话恢复时序图
10. 设计模式与架构亮点总结
10.1 异步生成器模式(AsyncGenerator)
使用场景:query()、runTools()、runToolUse()优势:
流式产出中间结果,UI 可以实时渲染
调用方通过
for await驱动,控制节奏支持「暂停-恢复」(通过
Stream桥接)
10.2 依赖注入模式(deps 参数)
// query.ts
type QueryDeps = {
callModel: (options) => AsyncGenerator<...>
microcompact: (messages, ...) => Promise<...>
autocompact: (messages, ...) => Promise<...>
uuid: () => string
// ...
}
function query(params: QueryParams & { deps?: QueryDeps }) {
const deps = params.deps ?? productionDeps() // 生产环境使用真实实现
// 测试环境可以传入 mock deps
}
优势:可测试性、可替换性(如 VCR 录制/回放)
10.3 分区并发模式(partitionToolCalls)
问题:如何安全地并发执行工具调用?
解决:将工具调用分成「可并发批」和「必须串行批」,read-only 工具并发,write 工具串行。
10.4 规则- Hook-模式三层决策
问题:如何灵活且高效地做权限决策?解决:
规则匹配(快,无 I/O)
Hook 检查(灵活,可调用外部脚本)
模式检查(兜底,保证用户体验)
10.5 JSONL 持久化模式
问题:会话可能增长到 GB 级,如何高效持久化?解决:
JSONL 格式:追加友好、流式读取
Tombstone 机制:处理中途失败的边缘情况
isTranscriptMessage()类型守卫:明确区分对话消息和进度消息
10.6 上下文压缩三级策略
问题:对话历史太长,超过 API 限制怎么办?解决:
Snip Compact(快,直接删除)
Micro Compact(中,缓存编辑)
Auto Compact(慢,调用 API 生成摘要)
附录:文件索引
- 感谢你赐予我前进的力量
