从零搭建 Agent Harness 系列(八)多智能体

如果你的 Agent 只有一条命(一个 Main Loop 线程),它可能最终会陷入“崩溃”。哪怕我们有 Compactor 机制,大模型依然需要在一轮轮的 ReAct 循环中,使用 read_file 翻阅成百上千个文件,使用 bash 调用 grep 搜索关键词。

在这个漫长的“探索”阶段,主线程的上下文会被海量的尝试、报错、无关的代码片段塞满。最终,当它终于找到关键代码,准备开始“写代码”时,它可能早就忘记了你最初要求它“翻译成 Go 语言”的那个小细节了。

在 Harness 驾驭工程中,突破单体大模型能力天花板的解法,就是向现代企业管理学习:任务委派(Delegation)与多智能体(Multi-Agent / Subagent)架构。

为什么需要 Subagent 物理隔离?

很多初学者觉得“多智能体(Multi-Agent)”非常玄乎,其实在底层的驾驭工程中,它的原理极其简单,就是上下文环境的物理隔离

对于上述的翻译 C++ 代码的任务,我们理想的流程应该是这样的:

主 Agent(架构师)保持极其干净、清醒的头脑。它主要负责读写 PLAN.mdTODO.md,并在脑海里维护最终的目标。它遇到需要阅读几百个 C++ 文件的脏活时,它不自己去读,而是派出一个 “探索子智能体(Explorer Subagent)”。

子 Agent(探路者)拥有一个全新的、纯净的 contextHistory。它开始疯狂调用 read_file 和 bash (grep) 去探索。哪怕它看了 50 个文件,触发了 5 次 Compactor 掩码压缩,它的疯狂试错也绝对不会污染主 Agent 的大脑。

子 Agent 探索完毕后,将自己看到的 5 万行代码浓缩成一段极其精炼的几百字总结,回传给主 Agent。主 Agent 看到总结后,继续清晰地推进 TODO.md

极简驾驭:Subagent 就是一个底层的“普通工具”

很多第三方框架(如 AutoGen)会将 Multi-Agent 包装成极其复杂的聊天图谱(Chat Graph)。但按照 OpenClaw / pi 的极简哲学,子智能体不应该是一个玄学的新概念,它的实现可以仅仅是 Tool Registry 里注册的一个普通工具。

我们只需要编写一个名为 spawn_subagent 的特殊工具,它的执行逻辑就是:新建一个 AgentEngine,传入一个纯净的 Session,然后阻塞等待这个子 Engine 跑完,最后把输出作为 ToolResult 返回。

其实说到底,如果你有更好的方法能够做到在执行这个子任务的的时候,可以完全不影响主agent的记忆,进程等状态,还能复用之前的各种能力,比如ReAct,middleware等,那也完全可以。只是这个目标天然会映射回Agent这个概念,所以就启动一个子Agent来做这种隔离

定义 SubagentTool 及防污染机制

为了拉起一个新的 AgentEngine,这个工具在被初始化时,必须拿到大模型的 Provider 和当前工作区的信息。

更关键的是,为了防止子智能体乱搞破坏(比如误删文件),我们在把它丢出去探索时,通常只会给它挂载“只读”工具(如 read_file、bash),绝对不给它发 edit_file。这也是驾驭工程中限制“爆炸半径(Blast Radius)”的经典做法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// internal/tools/subagent.go
package tools

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/yourname/go-tiny-claw/internal/provider"
"github.com/yourname/go-tiny-claw/internal/schema"
)

// AgentRunner 是一个打破循环依赖的抽象接口。
// 因为 SubagentTool 存在于 tools 包,而完整的 AgentEngine 存在于 engine 包。
// 为了让 Tool 能拉起 Engine,我们定义一个接口供外部注入。
type AgentRunner interface {
// RunSub 启动一个匿名的、一次性的子智能体任务,并返回其最终梳理出的纯文本总结
RunSub(ctx context.Context, taskPrompt string, readOnlyRegistry Registry, reporter interface{}) (string, error)
}

type SubagentTool struct {
runner AgentRunner

// 为子智能体准备的专属、受限的“只读”注册表
readOnlyRegistry Registry
reporter interface{} // 暂时用 interface 规避包循环依赖,底层通过断言使用
}

// NewSubagentTool 构造函数
func NewSubagentTool(runner AgentRunner, readOnlyRegistry Registry, reporter interface{}) *SubagentTool {
return &SubagentTool{
runner: runner,
readOnlyRegistry: readOnlyRegistry,
reporter: reporter,
}
}

func (t *SubagentTool) Name() string {
return "spawn_subagent"
}

// Definition 向主 Agent 暴露这个工具的强大能力
func (t *SubagentTool) Definition() schema.ToolDefinition {
return schema.ToolDefinition{
Name: t.Name(),
Description: "派出一个专门用于深度探索(Exploration)的子智能体。当你需要阅读大量代码、跨文件查找逻辑时请调用此工具。它在探索完毕后,会给你返回一份极度精炼的摘要报告。",
InputSchema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"task_prompt": map[string]interface{}{
"type": "string",
"description": "给子智能体下达的明确指令。",
},
},
"required": []string{"task_prompt"},
},
}
}

type subagentArgs struct {
TaskPrompt string `json:"task_prompt"`
}

实现拉起与执行逻辑

当主 Agent 发起 spawn_subagent 时,这个方法会被 Registry 触发。它会阻塞主线程,利用传入的 runner 接口,默默地在后台跑完一个完整的 ReAct 子循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// internal/tools/subagent.go (续)

func (t *SubagentTool) Execute(ctx context.Context, args json.RawMessage) (string, error) {
var input subagentArgs
if err := json.Unmarshal(args, &input); err != nil {
return "", fmt.Errorf("解析参数失败: %w", err)
}

log.Printf("[Subagent] 🚀 主 Agent 发起委派!正在拉起探路者: [%s]...\n", input.TaskPrompt)

// 【核心降维打击】:拉起一个完全物理隔离的子循环
// 我们把针对该任务的专项指令传给子智能体,并仅提供 readOnlyRegistry。
// (子智能体只能读文件或执行只读的 bash,不能搞破坏)
summary, err := t.runner.RunSub(ctx, input.TaskPrompt, t.readOnlyRegistry, t.reporter)

if err != nil {
return fmt.Errorf("子智能体执行失败: %v", err).Error(), nil
}

log.Printf("[Subagent] ✅ 子智能体任务结束。报告返回给主干...")

// 最终,几万字的代码探索,化作了这一段轻量级的 Summary,
// 就像一次普通的 API 调用一样,返回给了始终保持清醒的主 Agent。
return fmt.Sprintf("【子智能体探索报告】:\n%s", summary), nil
}

在 Engine 中实现 RunSub 接口支持

为了满足 AgentRunner 接口,我们需要回到 internal/engine/loop.go,为主引擎增加一个轻量级的 RunSub 方法。它实际上就是去掉了 Session 管理的“一次性”变体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// internal/engine/loop.go (续加在末尾)

// RunSub 是专为 Subagent 拉起的一次性受限循环。
// 它不依赖外部 Session,打完就跑。
// Reporter:为了让用户在终端看到子智能体的工作轨迹,我们将主线程的 Reporter 透传进来,并打上特殊标记。
func (e *AgentEngine) RunSub(ctx context.Context, taskPrompt string, readOnlyRegistry tools.Registry, reporter any) (string, error) {

// 【核心优化】:子智能体极其容易偷懒。我们必须在 System Prompt 中严厉警告它必须使用工具!
contextHistory := []schema.Message{
{
Role: schema.RoleSystem,
Content: `你是一个专门负责深度探索的探路者 (Explorer Subagent)。
你的任务是根据主架构师的指令,在当前工作区内仔细阅读代码、查阅日志,搜集足够的信息。

【核心纪律】
1. 你必须、且只能依靠内置工具(如 bash 的 find/grep,或 read_file)去寻找答案。绝对不允许凭空捏造或猜测!
2. 如果你没有找到确切的答案,你必须继续使用工具深入搜索。
3. 当且仅当你找到了确切的线索后,停止调用工具,直接输出一段纯文本作为你的终极汇报。主架构师会根据你的汇报来做下一步决策。`,
},
{
Role: schema.RoleUser,
Content: taskPrompt,
},
}

// 限制子智能体最多只能跑 10 个 Turn,防止它自己卡死
const maxSubTurns = 10
turnCount := 0

for {
turnCount++
if turnCount > maxSubTurns {
return "", fmt.Errorf("子智能体探索过于深入,超过 %d 轮被强制召回,请主 Agent 给它更明确的指令", maxSubTurns)
}

// 【驾驭底线】:子智能体仅能获取传入的只读工具注册表
availableTools := readOnlyRegistry.GetAvailableTools()

compactedContext := e.compactor.Compact(contextHistory)

// 子任务要求急速响应,强制关闭主体的慢思考,直接预测行动
actionResp, err := e.provider.Generate(ctx, compactedContext, availableTools)
if err != nil {
return "", fmt.Errorf("子智能体推理失败: %w", err)
}

contextHistory = append(contextHistory, *actionResp)

// 【核心退出条件】:子智能体一旦不调用工具了,说明它做好了总结汇报
if len(actionResp.ToolCalls) == 0 {
// 直接将它的这段汇报内容剥离出来返回给上层
return actionResp.Content, nil
}

// 执行只读工具的并发循环
observationMsgs := make([]schema.Message, len(actionResp.ToolCalls))
var wg sync.WaitGroup

for i, toolCall := range actionResp.ToolCalls {
wg.Add(1)
go func(idx int, call schema.ToolCall) {
defer wg.Done()

// 【可视化的关键】:让终端用户看到 Subagent 正在干嘛
var r Reporter
if reporter != nil {
r = reporter.(Reporter)
r.OnToolCall(ctx, fmt.Sprintf("[Subagent] %s", call.Name), string(call.Arguments))
}

result := readOnlyRegistry.Execute(ctx, call)

finalOutput := result.Output
if result.IsError {
finalOutput = e.recovery.AnalyzeAndInject(call.Name, result.Output)
}

if reporter != nil {
display := finalOutput
if len(display) > 200 {
display = display[:200] + "... (已截断)"
}
r.OnToolResult(ctx, fmt.Sprintf("[Subagent] %s", call.Name), display, result.IsError)
}

observationMsgs[idx] = schema.Message{
Role: schema.RoleUser,
Content: finalOutput,
ToolCallID: call.ID,
}
}(i, toolCall)
}

wg.Wait()
contextHistory = append(contextHistory, observationMsgs...)
}
}