从零搭建 Agent Harness 系列(五)并行调用多个互相独立的工具

在真实的工业级开发场景中,效率同样是极其重要的指标。

试想这样一个场景:你对 Agent 说:“请帮我分析一下 handler.go、model.go、router.go 和 config.yaml 这四个文件,找出它们在鉴权逻辑上的关联。”

由于我们底层接驳的前沿大模型(如 Claude 4.x Sonnet 和 GLM-5.x)都原生支持 Parallel Tool Calling(并行工具调用) 功能,模型在经过思考后,非常聪明地在一次 API 返回中,同时吐出了 4 个 read_file 的调用请求。

但是我们之前的ReAct中的核心循环仍然是串行代码

1
2
3
4
for _, toolCall := range actionResp.ToolCalls {
result := e.registry.Execute(ctx, toolCall) // 串行执行,阻塞等待
// ...追加到 Context...
}

我们的引擎依然是在串行(Sequential)处理这些请求。它会先读取文件 A,等 I/O 结束返回后,再去读取文件 B……如果大模型同时发起了 3 个耗时较长的网络搜索请求(比如后续扩展的 fetch_url 工具),这种串行排队的机制会浪费很多时间,让用户在终端前等得不耐烦。

既然我们使用的是以并发闻名的 Go 语言,在驾驭工程中,我们就必须榨干它底层的每一滴性能。今天,我们将通过重构 Main Loop 的工具执行逻辑,引入 Goroutine 和并发控制,让我们的 Agent 拥有真正并发探索物理世界的高性能引擎。

Parallel Tool Calling 的独立性假设

在将串行改为并行之前,我们需要先在驾驭工程的理论层面上厘清一个关键问题:同一轮(Turn)中的多个工具调用,它们之间存在依赖关系吗?

假设大模型在同一个 Turn 的 JSON 数组里,同时发出了两个请求:

  1. write_file:创建一个 main.go。

  2. bash:执行 go run main.go。

如果你把这两个操作并行丢给底层去跑,大概率 bash 会报错 file not found,因为 write_file 的 Goroutine 可能还没把文件写进磁盘。

那我们到底该不该并行?

业界顶级 Harness(如 OpenClaw / Claude Code 内部逻辑)的做法是基于一个强有力的独立性假设:

如果大模型在同一个 Turn(单次 Response)中并行下发了多个工具调用,Harness 引擎必须假设这些调用是互不依赖、互相独立的。引擎应当无脑并行执行它们。为什么?因为大模型在经过大量 RLHF(基于人类反馈的强化学习)微调后,它非常清楚:如果有强先后依赖的操作,必须分两个 Turn 来完成。

它应该在 Turn 1 先输出 write_file,等引擎在下一个 Turn 把 ToolResult(文件写入成功)带回来后,它再去输出 bash 请求。

如果大模型犯傻,在同一个 Turn 里下发了存在依赖的工具导致报错,那是模型系统规划的问题。我们在 06 讲确立了 YOLO(全权信任)与自纠错(Self-Correction)哲学:错误的原样回传,会让模型在下一轮自己吸取教训,改为分步执行。

所以,作为底层 OS,我们要做的就是:放开手脚,拥抱并发

简单来说,相信LLM有能力去分该不该并发,如果它判断错了,相信它能修复报错

架构演进:从串行到并发(Fork-Join 模式)

我们要将 Main Loop 中的工具分发环节,重构为经典的 Fork-Join(分支 - 聚合)模式。我们可以用一张示意图来对比改造前后的性能差异。

Fork-Join 对比

思维实验:假设大模型在同一个 turn 中生成了有数据竞争的并发工具调用

前面提到的 Parallel Tool Calling 的独立性假设,是否一定能保证大模型不会在同一个 Turn 中生成两个针对同一个文件的工具调用呢?比如在边缘侧使用一些小模型或考虑大小模型混合使用的场景。

在正式编写实战代码之前,作为一名严谨的驾驭工程架构师,如果我们要在 Harness 引擎层完全规避掉模型并发工具调用的“竞争风险”,我们应该怎样做呢?接下来,我们就来做一个思维实验,思考一下可以用来规避风险的可行方案。

假设大模型在同一个 Turn 中,非常鲁莽地生成了两个针对同一个文件的工具调用:

  1. edit_file:试图修改 main.go 中的某行代码。

  2. read_file:试图读取 main.go 的内容。

或者更糟糕的,两个并行的 edit_file 试图同时修改同一个文件。

由于我们在接下来的代码中将使用纯并行的 Goroutine(不加任何锁),这两个工具会在底层同时触发针对 main.go 的物理 I/O 操作。这必然会导致物理文件层面的 Data Race(数据竞争):

  1. 读取工具可能会读到只写了一半的“脏数据”。

  2. 并发写入可能会导致文件内容被互相覆盖,甚至彻底损坏。

那么 Harness 引擎应该如何解决这个问题?

一个可行的思路是在 Registry 层面引入一种“基于文件路径的细粒度锁(File-Path based Mutex)”策略,使用 sync.Map 为每个文件路径维护一把独立的 RWMutex。前提是所有协程必须严格遵守“先获锁,后操作文件”的规范,RWMutex 才能将并发的文件 I/O 序列化,从而消除 Data Race。

一个可行的思路是在 Registry 层面引入一种“基于文件路径的细粒度锁(File-Path based Mutex)”策略,使用 sync.Map 为每个文件路径维护一把独立的 RWMutex。前提是所有协程必须严格遵守“先获锁,后操作文件”的规范,RWMutex 才能将并发的文件 I/O 序列化,从而消除 Data Race。

具体规则是这样的:

  1. 分发时解析路径:当 Registry 分发 ToolCall 时,首先解析参数中的 path 字段,找到对应该路径的 RWMutex。

  2. 读操作获取读锁(RLock):对于 read_file,获取该路径对应的读锁(RLock)。RWMutex 允许多个读操作同时持有读锁并发执行,但一旦有写锁存在,所有新的读锁请求都会阻塞。

  3. 写操作获取写锁(Lock):对于 write_file 或 edit_file,必须获取该路径对应的写锁(Lock)。写锁是完全排他的——它会阻塞所有新的读锁和写锁请求,并等待当前已持有的所有读锁或写锁释放后,才能被授予。这保证了写操作期间没有任何其他读写操作能够并发访问该文件。

然而,RWMutex 只是必要条件,而非充分条件。它解决的是同一时刻的互斥性问题,却无法保证跨操作的顺序语义。我们考虑以下两种典型场景:

  1. 先读后写(Read-then-Write):假设工具 A 需要“读取 main.go 的内容、依据内容决策、再执行修改”,而工具 B 是一个并发的写操作。即便 RWMutex 保证了每次单次 I/O 的原子性,工具 A 在“读完、写之前”这个窗口期内,其读到的内容已经被工具 B 悄然改变。整个“读 - 决策 - 写”序列的一致性被破坏,这是经典的 TOCTOU(Time of Check to Time of Use) 问题。

  2. 先写后读的顺序依赖:如果某个工具调用必须读到另一个写操作的最新结果(即存在明确的 happens-before 依赖),纯粹的并发模型根本无法表达和保证这种顺序关系。

这意味着,RWMutex 的保护边界,仅仅是单次 I/O 操作的原子性。一旦业务语义要求多个操作之间存在顺序依赖,并发模型就会从根本上失效。

从任务阶段的视角来看,这个问题有一个更优雅的解法思路:与其在引擎底层用复杂的锁机制去修补语义漏洞,不如从源头约束并发的适用场景。观察真实的复杂长程任务,其执行过程往往天然地呈现出两个阶段:

探索阶段:AI 模型发起 read_file、list_dir、grep 等工具调用,对代码库或环境进行全局扫描和理解。这些操作彼此完全独立、无顺序依赖,是并发加速的黄金场景。

执行阶段:模型依据探索结果,开始 edit_file、write_file、执行命令等。这些写操作之间往往存在数据依赖和顺序约束,强行并发弊大于利。

因此,一个更健壮的 Harness 并发策略可以是:由 Harness 引擎(而非模型本身)在分发 ToolCall 批次时,检查本批次是否全部为只读工具调用。若是,则启用并发 Goroutine;若批次中存在任何写操作,则退化为顺序执行。这种“只读并发、涉写串行”的策略,以极低的复杂度,在绝大多数场景下同时保证了性能与正确性。

并发架构的实现

在并发编程中,如果不加锁直接往一个共享的 []schema.Message 中 append 数据,会引发极其严重的 Data Race(数据竞争)甚至导致程序崩溃。

但加锁(Mutex)又显得过于笨重。Go 语言处理这种聚合任务的一个优秀实践是:预先分配好固定长度的切片,然后在 Goroutine 中通过确定的索引(Index)并发写入,最后通过WaitGroup等待全部完成。 这样既保证了绝对的并发安全,又完美保留了工具调用的原始顺序(这对大模型阅读上下文体验更好)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// internal/engine/loop.go
package engine

import (
"context"
"fmt"
"log"
"sync" // 【新增】引入 sync 包

"github.com/yourname/go-tiny-claw/internal/provider"
"github.com/yourname/go-tiny-claw/internal/schema"
"github.com/yourname/go-tiny-claw/internal/tools"
)

// ... 前面的结构体和初始化逻辑保持不变 ...

func (e *AgentEngine) Run(ctx context.Context, userPrompt string) error {
// ... 省略前面的引擎启动、Phase 1 慢思考、Phase 2 动作请求等逻辑 ...
// (完整代码见附录)
// 假设我们已经走到了判断是否需要调用工具的环节:

if len(actionResp.ToolCalls) == 0 {
log.Println("[Engine] 模型未请求调用工具,任务宣告完成。")
break
}

log.Printf("[Engine] 模型请求并发调用 %d 个工具...\n", len(actionResp.ToolCalls))

// 【核心改造开始】: 从串行 (Sequential) 演进为并行 (Parallel)

// 1. 预分配一个固定长度的切片,用于安全地存放各个并发工具的执行结果(Observation)
// 长度与 ToolCalls 的数量完全一致
observationMsgs := make([]schema.Message, len(actionResp.ToolCalls))

// 2. 声明 WaitGroup 用于阻塞等待所有协程完成
var wg sync.WaitGroup

// 3. 遍历模型请求的所有工具,为每一个工具单独 Fork 出一个 Goroutine
for i, toolCall := range actionResp.ToolCalls {
wg.Add(1) // 增加计数器

// 开启协程。注意:一定要将索引 i 和 toolCall 作为参数传入匿名函数,防止闭包变量捕获陷阱!
go func(idx int, call schema.ToolCall) {
defer wg.Done() // 协程结束时计数器减一

log.Printf(" -> [Go-%d] 🛠️ 触发并行执行: %s\n", idx, call.Name)

// 调用底层 Registry 执行工具(物理操作)
result := e.registry.Execute(ctx, call)

if result.IsError {
log.Printf(" -> [Go-%d] ❌ 工具执行报错: %s\n", idx, result.Output)
} else {
log.Printf(" -> [Go-%d] ✅ 工具执行成功 (返回 %d 字节)\n", idx, len(result.Output))
}

// 将执行结果封装为一条用户消息 (RoleUser)
obsMsg := schema.Message{
Role: schema.RoleUser,
Content: result.Output,
ToolCallID: call.ID,
}

// 【线程安全】: 由于每个 Goroutine 操作的是预分配切片的不同索引,
// 这里不需要加锁 (Mutex),性能极高!
observationMsgs[idx] = obsMsg

}(i, toolCall) // 闭包传参
}

// 4. Join 阻塞等待:主循环挂起,直到所有的并发协程全部执行完毕
wg.Wait()
log.Println("[Engine] 所有并发工具执行完毕,开始聚合观察结果 (Observation)...")

// 5. 聚合装填:将并行的结果,按照原本的顺序,一次性追加到上下文时间线中
// 这等价于 contextHistory = append(contextHistory, observationMsgs...)
for _, obs := range observationMsgs {
contextHistory = append(contextHistory, obs)
}

// 循环回到开头,模型将带着这一批新的 Observation 继续它的下一轮思考...
// }

// return nil
}