一聚教程网:一个值得你收藏的教程网站

热门教程

中间件系统:在 Agent 执行流里插入自定义逻辑

时间:2026-07-01 10:20:46 编辑:袖梨 来源:一聚教程网

读完这篇你会知道

一、为什么需要中间件

Callback 是只读的观测——你看到发生了什么,但不能改它。

中间件系统:在 Agent 执行流中插入自定义逻辑

中间件是可写的拦截——你可以:

  • 在模型调用前改写消息历史(删掉太长的工具输出)
  • 在工具调用返回后修改结果(把 error 转成友好字符串)
  • 在 Agent 开始时动态加工具或修改 instruction
  • 用自己的模型替换框架的模型(failover)

这些能力都需要中间件,Callback 做不到。

二、接口:9 个钩子

// adk/handler.gotype TypedChatModelAgentMiddleware[M MessageType] interface {// 1. Agent 运行前:改 instruction / 工具列表BeforeAgent(ctx, runCtx *ChatModelAgentContext) (ctx, *ChatModelAgentContext, error)// 2. Agent 成功结束后(最终回答 / ReturnDirectly)AfterAgent(ctx, state *TypedChatModelAgentState[M]) (ctx, error)// 3. 每次调模型前:改写消息历史 + ToolInfosBeforeModelRewriteState(ctx, state *TypedChatModelAgentState[M], mc *TypedModelContext[M]) (ctx, *TypedChatModelAgentState[M], error)// 4. 模型返回后:对模型回复做后处理AfterModelRewriteState(ctx, state *TypedChatModelAgentState[M], mc *TypedModelContext[M]) (ctx, *TypedChatModelAgentState[M], error)// 5-6. 包装同步 / 流式工具调用WrapInvokableToolCall(ctx, endpoint InvokableToolCallEndpoint, tCtx *ToolContext) (InvokableToolCallEndpoint, error)WrapStreamableToolCall(ctx, endpoint StreamableToolCallEndpoint, tCtx *ToolContext) (StreamableToolCallEndpoint, error)// 7-8. 包装 Enhanced 同步 / 流式工具调用WrapEnhancedInvokableToolCall(ctx, endpoint EnhancedInvokableToolCallEndpoint, tCtx *ToolContext) (EnhancedInvokableToolCallEndpoint, error)WrapEnhancedStreamableToolCall(ctx, endpoint EnhancedStreamableToolCallEndpoint, tCtx *ToolContext) (EnhancedStreamableToolCallEndpoint, error)// 9. 包装模型本身(retry / failover / 流式事件注入)WrapModel(ctx, m model.BaseModel[M], mc *TypedModelContext[M]) (model.BaseModel[M], error)}

9 个方法全部实现太繁琐。嵌入 TypedBaseChatModelAgentMiddleware 可以得到所有方法的空实现(no-op),只需覆盖关心的那个:

type MyMiddleware struct {*adk.TypedBaseChatModelAgentMiddleware[*schema.Message]// 提供 8 个默认 no-op}// 只实现需要的钩子func (m *MyMiddleware) BeforeModelRewriteState(ctx context.Context,state *adk.ChatModelAgentState, mc *adk.ModelContext) (context.Context, *adk.ChatModelAgentState, error) {// 在这里改写消息历史return ctx, state, nil}

注册到 Agent:

agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{Name:"my_agent",Model: m,Middlewares: []adk.ChatModelAgentMiddleware{&MyMiddleware{}},})

三、钩子时序

BeforeAgent│├── [ReAct 循环开始]│├── BeforeModelRewriteState← 改消息历史、改 ToolInfos│ ││ ├── WrapModel → 模型调用│ ││ └── AfterModelRewriteState← 处理模型回复│├── WrapInvokableToolCall / WrapStreamableToolCall← 工具执行时包装│├── [如果还需继续:回到 BeforeModelRewriteState]│└── AfterAgent← 成功结束

关键点:

  • BeforeModelRewriteState 返回的 state 会持久化到 Agent 内部状态,影响后续所有轮次;WrapModel 对消息的修改不持久化,只影响当次调用
  • WrapInvokableToolCall 在每次工具执行时调用,不是编译时一次性调用
  • AfterAgent 只在成功结束时触发;超出最大迭代次数、context 取消时不触发

四、工具错误处理:WrapToolWithErrorHandler

最常用的工具层中间件——把工具的 error 转成字符串,避免 Agent 因工具报错而崩溃(来自 components/tool/utils/error_handler.go):

import "github.com/cloudwego/eino/components/tool/utils"wrappedTool := utils.WrapToolWithErrorHandler(myTool,func(ctx context.Context, err error) string {// 把 error 变成对 LLM 友好的文字return fmt.Sprintf("工具调用失败:%v。请尝试其他方式。", err)})

内部逻辑(error_handler.go:141):

func (s *errorHelper) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {result, err := s.i(ctx, argumentsInJSON, opts...)if _, ok := compose.IsInterruptRerunError(err); ok {return result, err// 中断错误不拦截,透传}if err != nil {return s.h(ctx, err), nil// 普通错误 → 转字符串,不报错}return result, nil}

一个关键细节:中断错误(InterruptRerunError)不会被拦截——HITL 中断需要透传给框架,ErrorHandler 不应该把它吃掉。

五、patchtoolcalls:修补悬空工具调用

什么是悬空工具调用?

模型输出了一个 tool_call(assistant 消息),但还没等工具返回结果,用户就发了新消息(比如说"算了,不用了")。这时候消息历史里有一个 assistant 消息声称调用了某工具,但后面没有对应的 tool 消息。许多 LLM API 对这种情况会报错。

patchtoolcalls 中间件在每次调模型前扫描消息历史,自动为没有对应响应的工具调用插入占位 tool 消息:

import "github.com/cloudwego/eino/adk/middlewares/patchtoolcalls"mw, _ := patchtoolcalls.New(ctx, &patchtoolcalls.Config{// 可选:自定义占位内容,不填用默认值PatchedContentGenerator: func(ctx context.Context, toolName, toolCallID string) (string, error) {return fmt.Sprintf("工具 %s(ID: %s)的调用已被取消", toolName, toolCallID), nil},})agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{Middlewares: []adk.ChatModelAgentMiddleware{mw},// ...})

默认占位内容(patchtoolcalls.go:261):

"Tool call %s with id %s was canceled - another message came in before it could be completed."// 中文环境:"工具调用 %s(ID 为 %s)已被取消——在其完成之前收到了另一条消息。"

框架会根据运行环境自动选中文还是英文提示(internal.SelectPrompt)。

六、reduction:两阶段上下文压缩

这是最复杂也最实用的中间件,解决一个长期运行的 Agent 的核心问题:工具输出越积越多,Token 总量超过模型上下文窗口限制。

它分两个阶段处理:

阶段一:截断(Truncation)

触发时机:工具执行完毕后立即。

如果工具输出超过 MaxLengthForTrunc(默认 50000 字符),把全文保存到 Backend(文件系统或自定义存储),把工具消息替换成一段"截断通知",通知 Agent 可以用 read_file 工具去取完整内容:

[原始工具输出 123456 字符已被截断。文件路径:/tmp/trunc/tool_call_id_xxx预览(前 12500 字):... 预览(后 12500 字):...]

阶段二:清理(Clear)

触发时机:每次调模型前(BeforeModelRewriteState)。

如果当前消息历史的总 Token 数超过 MaxTokensForClear(默认 160000),遍历历史消息,把老的工具调用+响应对替换成占位符,并把原内容卸载到 Backend。Agent 需要时可以用 read_file 取回。

import ("github.com/cloudwego/eino/adk/middlewares/reduction""github.com/cloudwego/eino/adk/filesystem")mw, _ := reduction.New(ctx, &reduction.Config{Backend:filesystem.NewLocalBackend("/tmp/agent_offload"),// 存储后端ReadFileToolName: "read_file",// Agent 用这个工具取回卸载的内容MaxLengthForTrunc: 50000, // 单次工具输出超过这个字符数就截断MaxTokensForClear: 160000,// 总 Token 超过这个就触发清理ClearRetentionSuffixLimit: 2, // 保留最近 2 轮工具调用不清理TokenCounter: func(ctx context.Context, msgs []*schema.Message, tools []*schema.ToolInfo) (int64, error) {// 推荐用真实 tokenizer 代替默认的 字符数/4 估算return countTokens(msgs, tools), nil},})

两阶段的分工:

  • 截断是即时的——单个超大工具输出立刻处理,避免单次就撑爆上下文
  • 清理是累积触发的——多轮积累后统一压缩,减少频繁清理的开销

ClearAtLeastTokens(可选):如果设置了这个值,只有当清理操作能实际节省至少 N 个 Token 时才执行,避免因清理量太少而破坏 Prompt Cache(KV Cache)。

七、自定义中间件示例:限速工具调用

type RateLimitMiddleware struct {*adk.TypedBaseChatModelAgentMiddleware[*schema.Message]limiter *rate.Limiter}func NewRateLimitMiddleware(rps float64) *RateLimitMiddleware {return &RateLimitMiddleware{limiter: rate.NewLimiter(rate.Limit(rps), 1),}}// 只覆盖 WrapInvokableToolCall,其余 8 个方法用 TypedBase 提供的 no-opfunc (m *RateLimitMiddleware) WrapInvokableToolCall(ctx context.Context,endpoint adk.InvokableToolCallEndpoint,tCtx *adk.ToolContext) (adk.InvokableToolCallEndpoint, error) {return func(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {if err := m.limiter.Wait(ctx); err != nil {return "", fmt.Errorf("rate limit: %w", err)}return endpoint(ctx, argumentsInJSON, opts...)}, nil}

八、中间件 vs Callback:选哪个

需求用什么
记录日志 / 上报 traceCallback
统计 Token 用量Callback
不改数据,只观测Callback
改写消息历史Middleware(BeforeModelRewriteState
工具出错转字符串WrapToolWithErrorHandler
给工具调用加限速Middleware(WrapInvokableToolCall
动态修改 instructionMiddleware(BeforeAgent
模型 failover / retryMiddleware(WrapModel
上下文 Token 超限压缩reduction 中间件
修复悬空工具调用patchtoolcalls 中间件

小结

Eino 的中间件系统通过 TypedChatModelAgentMiddleware 接口提供 9 个钩子,覆盖 Agent 执行流的每一个关键节点。TypedBaseChatModelAgentMiddleware 嵌入让你只实现关心的方法。三个现成中间件解决了最常见的工程问题:WrapToolWithErrorHandler 把工具错误变成友好字符串;patchtoolcalls 修补用户打断造成的悬空工具调用;reduction 用两阶段策略(立即截断 + 累积清理)管理长对话的 Token 上限。核心区别:Callback 只读,Middleware 可写——观测用 Callback,改行为用 Middleware。

下篇继续。

代码来源:cloudwego/eino · cloudwego/eino-examples

热门栏目