一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

BoxAgnts 工具系统(6):多 Provider 适配与 Agent 查询循环

时间:2026-07-02 09:35:47 编辑:袖梨 来源:一聚教程网

BoxAgnts 的工具系统从底层的 WASM 沙箱到上层的 Tool trait,解决了"工具怎么安全地跑"。但工具最终要被 AI 模型调用——这就涉及两个工程问题:不同 AI 厂商的 API 格式完全不兼容,以及对话流与工具执行的交替编排。这两个问题分别由 Provider 抽象层和 Agent 查询循环解决。

BoxAgnts 工具系统(6)——多 Provider 适配与 Agent 查询循环


Provider 抽象:做一个 LLM 厂商不可知论者

不同类型的 AI 模型 API 在请求格式、响应格式和错误处理上差异很大。

先看请求侧。Anthropic 把角色分为 user 和 assistant,系统 Prompt 是一个独立的顶层字段 system;OpenAI 把系统 Prompt 当作一个 role: "system" 的消息;Google Gemini 把 system_instruction 放在请求体顶层但格式又和 Anthropic 不同。如果让上层的 Agent 循环直接处理这些差异,代码会变成一个巨大的 match provider_id { ... } 分支。

BoxAgnts 的解法是引入三层抽象:

第一层:ProviderRequest / ProviderResponse 统一数据模型

// provider_types.rs
pub struct ProviderRequest {
    pub messages: Vec<ApiMessage>,
    pub system: Option<String>,
    pub tools: Vec<ApiToolDefinition>,
    pub max_tokens: u32,
    pub temperature: Option<f32>,
}pub struct ProviderResponse {
    pub content: Vec<ContentBlock>,
    pub usage: UsageInfo,
    pub stop_reason: String,
}

Agent 循环只和这两个结构打交道,不需要知道用户配置的是 Anthropic 还是 OpenAI。

第二层:LlmProvider trait

pub trait LlmProvider: Send + Sync {
    fn id(&self) -> &ProviderId;
    async fn create_message_stream(
        &self, request: ProviderRequest
    ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, ProviderError>> + Send>>>;
    async fn list_models(&self) -> Result<Vec<ModelInfo>>;
}

create_message_stream 返回的是一个 Pin<Box<dyn Stream>>,这是 Rust 异步生态收拢多种流类型的标准写法(类似于 Java 的 Stream<T> 或 Python 的 AsyncIterator)。每个 Provider 的实现内部处理自己的 HTTP 请求构建、认证、SSE 解析,对外暴露统一的 StreamEvent

第三层:Transformer(消息格式转换)

Transformer 负责消除厂商格式差异的"最后一公里":

// transformers/anthropic.rs
pub fn to_anthropic_request(req: &ProviderRequest) -> AnthropicMessagesRequest { ... }// transformers/openai_chat.rs
pub fn to_openai_request(req: &ProviderRequest) -> OpenAIChatRequest { ... }

Transformer 是纯函数——输入统一格式,输出厂商格式。加入新 Provider 只需要实现一个新的 Transformer 和对应的 LlmProvider 实现。共享的 ProviderRegistry 通过 Provider ID 查找对应的实现:

pub struct ProviderRegistry {
    providers: HashMap<ProviderId, Arc<dyn LlmProvider>>,
    default_provider_id: ProviderId,
}

流式协议与 SSE 解析

所有 Provider 的流式交互都依赖 SSE(Server-Sent Events)。但每个厂商的 SSE 事件粒度和语义又不同:

  • Anthropic 的 content_block_start / content_block_delta / content_block_stop 是三级事件,一个 ContentBlock 从 start 到 stop 横跨多条 SSE 消息
  • OpenAI 的 choices[0].delta 是扁平增量,没有显式的 block start/stop
  • Google Gemini 用的是 gRPC-web 协议,有自己的流式格式

BoxAgnts 的 stream_parser 模块把所有这些差异消化掉,向上暴露一个统一的 StreamEvent 枚举:

pub enum StreamEvent {
    TextDelta { text: String },
    ToolUseStart { id: String, name: String },
    ToolUseDelta { id: String, json: String },
    ToolUseEnd { id: String },
    ThinkingDelta { text: String },
    UsageUpdate { input_tokens: u32, output_tokens: u32 },
    MessageStop,
}

每个 Provider 的 stream parser 内部是一个有限状态机。以 Anthropic 为例:

等待 message_start
  │
  ├── message_start ──► 提取 model, usage 初始值
  │
  ├── content_block_start
  │     │ type = "text"        → 创建 TextBlock 状态
  │     │ type = "tool_use"    → 创建 ToolUseBlock 状态,emit ToolUseStart
  │     │ type = "thinking"    → 创建 ThinkingBlock 状态
  │
  ├── content_block_delta
  │     │ text_delta           → 追加到当前 TextBlock,emit TextDelta
  │     │ input_json_delta     → 拼接 JSON 片段到 ToolUseBlock,emit ToolUseDelta
  │     │ thinking_delta       → 追加到 ThinkingBlock,emit ThinkingDelta
  │
  ├── content_block_stop
  │     │ 对应 tool_use block  → emit ToolUseEnd
  │
  └── message_stop ──► emit MessageStop,累加最终 usage

StreamAccumulator 维护当前消息的所有 ContentBlock 状态:

pub struct StreamAccumulator {
    text_blocks: Vec<TextBlock>,
    tool_use_blocks: HashMap<String, ToolUseBlock>,
    thinking_block: Option<String>,
    usage: UsageInfo,
}

当 MessageStop 到达时,finish() 将累积的各块组装成完整的 Message,返回 stop_reason 和最终 UsageInfo


Agent 查询循环

流解析器把 SSE 事件转成了结构化的 Message。接下来,query::run_query_loop() 把这个 Message 交给工具系统。

核心流程:

loop {
    // 1. 把消息历史 + 系统 Prompt + 工具列表发给 AI 模型
    let request = CreateMessageRequest::builder(model, max_tokens)
        .messages(messages)
        .tools(all_tools_as_definitions(tools))
        .build();    // 2. 发起流式请求,解析 SSE 事件
    let mut rx = client.create_message_stream(request).await?;
    let mut acc = StreamAccumulator::new();    while let Some(evt) = rx.recv().await {
        acc.on_event(&evt);
        match evt {
            StreamEvent::ToolUseStart { .. } | StreamEvent::ToolUseDelta { .. } => {
                // 实时发送给前端(通过 WebSocket),让用户看到模型在用什么工具
            }
            StreamEvent::MessageStop => break,
            _ => {}
        }
    }    // 3. 组装完成的 Message,检查 stop_reason
    let (msg, usage, stop_reason) = acc.finish();    match stop_reason {
        "end_turn" => return QueryOutcome::EndTurn { message: msg, usage },
        "tool_use" => {
            // 4. 对每个 tool_use ContentBlock 调用对应工具
            for block in msg.content.iter() {
                if let ContentBlock::ToolUse { name, input, .. } = block {
                    let tool = find_tool(tools, name);
                    let result = tool.execute(input, &ctx).await;
                    messages.push(result_to_message(result));
                }
            }
            // 回到 loop 顶部,继续下一轮
        }
        "max_tokens" => {
            // 5. MaxTokens 恢复:注入提示消息,让模型继续
            messages.push(UserMessage("Output token limit hit. Resume directly."));
            max_tokens_count += 1;
            if max_tokens_count > 3 { return MaxTokens { ... }; }
        }
        _ => return Error(...),
    }    turn += 1;
    if turn >= config.max_turns { break; }
}

几个需要关注的细节:

工具列表注入策略。每轮 API 调用都会把完整的工具列表(所有工具的 name、description 和 input_schema)作为 tools 字段发送给 AI 模型。这会产生固定的 token 开销——工具数量越多,每轮的"工具描述 tokens"就越高。当工具超过 20 个时,这个开销开始显著(可能达到几千 tokens/轮)。BoxAgnts 当前的策略是全量注入,未来考虑加入工具选择和分组机制(类似 Anthropic 的 tool_choice)。

MaxTokens 恢复。如果模型在回答中途耗尽输出 token 限制,它不是真的"失败了"——它只是没说完。BoxAgnts 会自动注入一条恢复消息("Output token limit hit. Resume directly..."),让模型继续。这个循环最多执行 3 次——如果 3 次之后仍 hit max_tokens,说明任务真的太长,放弃并返回部分结果。

取消机制CancellationToken 从 tokio 生态中借用。当用户在前端点击"停止"按钮时,WebSocket 处理器会取消对应的 token,run_query_loop 在下一次检查时返回 QueryOutcome::Cancelled

费用追踪。每轮 API 调用后,CostTracker 累加当前模型的 pricing(按 input/output token 分别计价,不同模型价格不同)。如果累计费用超过 budget_limit_usd,返回 QueryOutcome::BudgetExceeded。费用信息通过 WebSocket 实时推送到前端 Dashboard。


错误处理与重试策略

AI API 调用有几种典型的失败模式:

错误类型典型 HTTP 码策略
限流 (Rate Limit)429指数退避重试,尊重 Retry-After 头
过载 (Overloaded)529指数退避重试,可选 fallback 模型
认证失败401/403不重试,直接返回错误
请求错误400不重试(参数错误重试无意义)
服务端错误500+有限重试(最多 3 次)
网络超时有限重试

指数退避使用 1s → 2s → 4s → 8s 的间隔,在 Duration 上做乘法。529(Overloaded)额外支持模型切换——如果用户配置了 fallback 模型(比如 claude-sonnet-4-5 过载时切换到 claude-haiku-4-5),后续调用自动使用 fallback。


Provider 扩展性

加入新 Provider 的步骤很清晰:

  1. 在 providers/ 下新增一个模块,实现 LlmProvider trait
  2. 实现对应的 Transformer(如果需要格式转换)
  3. 在 registry.rs 的 provider_from_key() 中注册
  4. 在 model_registry.rs 中添加该 Provider 支持的模型列表

openai_compat_providers 模块是一个快捷方式:对于使用 OpenAI API 格式的服务(DeepSeek、OpenCode、一部分国产模型),只需要配置 API base URL 和 API key,不需要写任何 Provider 代码。这些服务共享同一个 OpenAI 兼容的 SSE 解析器和 Request 构建器,只是配置不同。

// 配置示例
"deepseek": {
    "provider_id": "deepseek",
    "api_base": "https://api.deepseek.com/v1",
    "api_key": "sk-...",
    "provider_type": "openai_compat"
}

总结

Provider 抽象和 Agent 查询循环构成了 BoxAgnts 工具系统的"动力系统":

  • Provider 抽象通过三层解耦(ProviderRequest/Response 统一数据模型 → LlmProvider trait → Transformer 格式转换)解决了 12 种 AI API 的接入问题。新增 Provider 只需实现 trait + 注册,共享的 SSE 解析器和 Request 构建器通过 openai_compat 模块进一步降低了接入成本。
  • Agent 查询循环通过 SSE 状态机解析、ToolUse 检测、工具派发、结果反馈的闭环,实现了对话和工具执行的交替编排。MaxTokens 自动恢复(最多 3 次)和指数退避重试策略保证了长任务的可靠性。
  • 两层模块的共同特征是依赖倒置——Agent 循环不依赖具体的 AI 厂商,Provider 实现不依赖具体的对话编排逻辑。所有耦合通过 trait 接口解耦。

费用追踪(CostTracker + AtomicF64)和取消机制(CancellationToken)为生产环境提供了必要的运维可观测性和用户控制能力。

参考资源

  • BoxAgnts 源代码:github.com/guyoung/box…
  • Anthropic Messages API 文档:docs.anthropic.com/en/api/mess…
  • OpenAI Chat Completions API:platform.openai.com/docs/api-re…
  • Server-Sent Events 规范:html.spec.whatwg.org/multipage/s…
  • Codex CLI Agent Loop 设计:openai.com/index/unrol…
  • Claude Code 架构分析:blog.promptlayer.com/claude-code…
  • tokio-cron-scheduler:docs.rs/tokio-cron-…

热门栏目