如果你的 Agent 只有一条命(一个 Main Loop 线程),它可能最终会陷入“崩溃”。哪怕我们有 Compactor 机制,大模型依然需要在一轮轮的 ReAct 循环中,使用 read_file 翻阅成百上千个文件,使用 bash 调用 grep 搜索关键词。
在这个漫长的“探索”阶段,主线程的上下文会被海量的尝试、报错、无关的代码片段塞满。最终,当它终于找到关键代码,准备开始“写代码”时,它可能早就忘记了你最初要求它“翻译成 Go 语言”的那个小细节了。
在 Harness 驾驭工程中,突破单体大模型能力天花板的解法,就是向现代企业管理学习:任务委派(Delegation)与多智能体(Multi-Agent / Subagent)架构。
为什么需要 Subagent 物理隔离?
很多初学者觉得“多智能体(Multi-Agent)”非常玄乎,其实在底层的驾驭工程中,它的原理极其简单,就是上下文环境的物理隔离。
对于上述的翻译 C++ 代码的任务,我们理想的流程应该是这样的:
主 Agent(架构师)保持极其干净、清醒的头脑。它主要负责读写 PLAN.md 和 TODO.md,并在脑海里维护最终的目标。它遇到需要阅读几百个 C++ 文件的脏活时,它不自己去读,而是派出一个 “探索子智能体(Explorer Subagent)”。
子 Agent(探路者)拥有一个全新的、纯净的 contextHistory。它开始疯狂调用 read_file 和 bash (grep) 去探索。哪怕它看了 50 个文件,触发了 5 次 Compactor 掩码压缩,它的疯狂试错也绝对不会污染主 Agent 的大脑。
子 Agent 探索完毕后,将自己看到的 5 万行代码浓缩成一段极其精炼的几百字总结,回传给主 Agent。主 Agent 看到总结后,继续清晰地推进 TODO.md。
极简驾驭:Subagent 就是一个底层的“普通工具”
很多第三方框架(如 AutoGen)会将 Multi-Agent 包装成极其复杂的聊天图谱(Chat Graph)。但按照 OpenClaw / pi 的极简哲学,子智能体不应该是一个玄学的新概念,它的实现可以仅仅是 Tool Registry 里注册的一个普通工具。
我们只需要编写一个名为 spawn_subagent 的特殊工具,它的执行逻辑就是:新建一个 AgentEngine,传入一个纯净的 Session,然后阻塞等待这个子 Engine 跑完,最后把输出作为 ToolResult 返回。
其实说到底,如果你有更好的方法能够做到在执行这个子任务的的时候,可以完全不影响主agent的记忆,进程等状态,还能复用之前的各种能力,比如ReAct,middleware等,那也完全可以。只是这个目标天然会映射回Agent这个概念,所以就启动一个子Agent来做这种隔离
为了拉起一个新的 AgentEngine,这个工具在被初始化时,必须拿到大模型的 Provider 和当前工作区的信息。
更关键的是,为了防止子智能体乱搞破坏(比如误删文件),我们在把它丢出去探索时,通常只会给它挂载“只读”工具(如 read_file、bash),绝对不给它发 edit_file。这也是驾驭工程中限制“爆炸半径(Blast Radius)”的经典做法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package tools
import ( "context" "encoding/json" "fmt" "log"
"github.com/yourname/go-tiny-claw/internal/provider" "github.com/yourname/go-tiny-claw/internal/schema" )
type AgentRunner interface { RunSub(ctx context.Context, taskPrompt string, readOnlyRegistry Registry, reporter interface{}) (string, error) }
type SubagentTool struct { runner AgentRunner
readOnlyRegistry Registry reporter interface{} }
func NewSubagentTool(runner AgentRunner, readOnlyRegistry Registry, reporter interface{}) *SubagentTool { return &SubagentTool{ runner: runner, readOnlyRegistry: readOnlyRegistry, reporter: reporter, } }
func (t *SubagentTool) Name() string { return "spawn_subagent" }
func (t *SubagentTool) Definition() schema.ToolDefinition { return schema.ToolDefinition{ Name: t.Name(), Description: "派出一个专门用于深度探索(Exploration)的子智能体。当你需要阅读大量代码、跨文件查找逻辑时请调用此工具。它在探索完毕后,会给你返回一份极度精炼的摘要报告。", InputSchema: map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "task_prompt": map[string]interface{}{ "type": "string", "description": "给子智能体下达的明确指令。", }, }, "required": []string{"task_prompt"}, }, } }
type subagentArgs struct { TaskPrompt string `json:"task_prompt"` }
|
实现拉起与执行逻辑
当主 Agent 发起 spawn_subagent 时,这个方法会被 Registry 触发。它会阻塞主线程,利用传入的 runner 接口,默默地在后台跑完一个完整的 ReAct 子循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
func (t *SubagentTool) Execute(ctx context.Context, args json.RawMessage) (string, error) { var input subagentArgs if err := json.Unmarshal(args, &input); err != nil { return "", fmt.Errorf("解析参数失败: %w", err) }
log.Printf("[Subagent] 🚀 主 Agent 发起委派!正在拉起探路者: [%s]...\n", input.TaskPrompt)
summary, err := t.runner.RunSub(ctx, input.TaskPrompt, t.readOnlyRegistry, t.reporter)
if err != nil { return fmt.Errorf("子智能体执行失败: %v", err).Error(), nil }
log.Printf("[Subagent] ✅ 子智能体任务结束。报告返回给主干...")
return fmt.Sprintf("【子智能体探索报告】:\n%s", summary), nil }
|
在 Engine 中实现 RunSub 接口支持
为了满足 AgentRunner 接口,我们需要回到 internal/engine/loop.go,为主引擎增加一个轻量级的 RunSub 方法。它实际上就是去掉了 Session 管理的“一次性”变体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
func (e *AgentEngine) RunSub(ctx context.Context, taskPrompt string, readOnlyRegistry tools.Registry, reporter any) (string, error) {
contextHistory := []schema.Message{ { Role: schema.RoleSystem, Content: `你是一个专门负责深度探索的探路者 (Explorer Subagent)。 你的任务是根据主架构师的指令,在当前工作区内仔细阅读代码、查阅日志,搜集足够的信息。
【核心纪律】 1. 你必须、且只能依靠内置工具(如 bash 的 find/grep,或 read_file)去寻找答案。绝对不允许凭空捏造或猜测! 2. 如果你没有找到确切的答案,你必须继续使用工具深入搜索。 3. 当且仅当你找到了确切的线索后,停止调用工具,直接输出一段纯文本作为你的终极汇报。主架构师会根据你的汇报来做下一步决策。`, }, { Role: schema.RoleUser, Content: taskPrompt, }, }
const maxSubTurns = 10 turnCount := 0
for { turnCount++ if turnCount > maxSubTurns { return "", fmt.Errorf("子智能体探索过于深入,超过 %d 轮被强制召回,请主 Agent 给它更明确的指令", maxSubTurns) }
availableTools := readOnlyRegistry.GetAvailableTools()
compactedContext := e.compactor.Compact(contextHistory)
actionResp, err := e.provider.Generate(ctx, compactedContext, availableTools) if err != nil { return "", fmt.Errorf("子智能体推理失败: %w", err) }
contextHistory = append(contextHistory, *actionResp)
if len(actionResp.ToolCalls) == 0 { return actionResp.Content, nil }
observationMsgs := make([]schema.Message, len(actionResp.ToolCalls)) var wg sync.WaitGroup
for i, toolCall := range actionResp.ToolCalls { wg.Add(1) go func(idx int, call schema.ToolCall) { defer wg.Done()
var r Reporter if reporter != nil { r = reporter.(Reporter) r.OnToolCall(ctx, fmt.Sprintf("[Subagent] %s", call.Name), string(call.Arguments)) }
result := readOnlyRegistry.Execute(ctx, call)
finalOutput := result.Output if result.IsError { finalOutput = e.recovery.AnalyzeAndInject(call.Name, result.Output) }
if reporter != nil { display := finalOutput if len(display) > 200 { display = display[:200] + "... (已截断)" } r.OnToolResult(ctx, fmt.Sprintf("[Subagent] %s", call.Name), display, result.IsError) }
observationMsgs[idx] = schema.Message{ Role: schema.RoleUser, Content: finalOutput, ToolCallID: call.ID, } }(i, toolCall) }
wg.Wait() contextHistory = append(contextHistory, observationMsgs...) } }
|