从零搭建 Agent Harness 系列(七)稳定性

在前面的模块中,我们已经为为Agent赋予了一定的稳定性和自驱力:它有了一颗带“慢思考”的心脏(Main Loop),能操作底层操作系统的手脚(极简工具集),还能在 Plan 模式下利用文件系统(PLAN.md / TODO.md)进行超长记忆和规划。甚至学会了在遇到底层报错时通过系统注入的模板协助 Agent 进行“错误自愈(Error Recovery)”。

Doom Loop(死循环)

可以说,我们的 Agent 已经是一个极其勤奋且不轻言放弃的“初级程序员”了。但是,只要你带过新人程序员,你就一定经历过这样的崩溃瞬间:新人遇到一个报错,比如某个环境变量没配好导致命令找不到,他没有停下来去 Google 搜一下根本原因,也没有去向导师求助,而是开始疯狂地凭直觉在命令行里盲试:加 sudo、改绝对路径、加 ./ 前缀…… 他陷入了逻辑的死胡同,整整一下午都在原地打转。

在驾驭工程中,我们称这种现象为 **Doom Loop(死循环)**或者 Exploration Spiral(探索螺旋)。这是阻碍大模型走向全自动工业级可用的一大拦路虎。

如果你的 Harness 不能像一位资深导师那样,在 Agent 钻牛角尖时及时拍拍它的肩膀说:“停下,你这条路走不通,换个思路吧”,那么你的引擎就是不合格的。

为什么 System Prompt 拦不住死循环?

你可能会问一个极其尖锐的问题:“既然我们在每一轮循环的开头,都把 System Prompt 重新塞进了上下文数组的最前面,大模型怎么可能会忘记写在里面的规则呢?”。

是的,拥有 128k 以上窗口的前沿模型在字面意义上并没有忘记。如果你直接问它:“系统规则第 3 条是什么?”,它能一字不差地背出来。

导致死循环的真正原因,是驾驭工程中极具挑战性的两个大模型行为陷阱:

  1. 上下文内容分布偏移:当模型连续几次遇到同一个棘手的 Error 时,上下文末尾会堆积大量结构相似的错误信息(ToolResult)。这些高度重复的 token 在内容分布上占据了绝对主导,使得模型的下一步生成被这些近期输入强力牵引,表现出“只想解决眼前报错”的行为倾向。这并非注意力机制本身发生了结构性故障,而是输入内容的分布决定了输出的走向。

  2. 近因偏差(Recency Bias):这一现象在学术上有实证支撑——研究表明,当关键信息位于长上下文的头部或中部时,模型对其的响应权重会显著低于位于上下文末尾的信息(即 Lost in the Middle 效应)。相比于写在上下文最顶端、长达数千字的、泛泛而谈的系统规则,模型更倾向于对距离它最近的输入(即刚刚返回的那个 ToolResult 报错信息)做出强烈反应。

简单来说就是大模型注意力机制的缺陷问题,越靠后的注意力权重越大,即使一段话在上下文窗口中,如果很靠前,也是可能被无视。甚至如果上下文中有两段矛盾的话,靠后的会更容易被采纳。

这两个因素叠加,就导致了一种典型的行为失控:模型陷入“只要我再微调一下这个 bash 命令的参数,下一秒肯定能成功”的局部最优幻觉,从而完全无视了位于前部的系统规则中连续失败请停止的宏观警告。

解决方案:Reminder

要让“陷入疯魔”的大模型立刻清醒过来,你不能指望远在天边的 System Prompt。你必须在它做决定的前一刻(Point of decision),也就是即将发起下一次 LLM 推理调用的地方,将高优先级的引导指令伪装成最新的一条 User Message,直接怼到它的脸上!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// internal/engine/reminder.go
package engine

import (
"crypto/md5"
"encoding/hex"
"fmt"
"log"

"github.com/yourname/go-tiny-claw/internal/schema"
)

// ReminderInjector 负责在运行时监控上下文,并在模型陷入执念时动态注入强力打断信息
type ReminderInjector struct {
// 用于记录连续失败的工具调用指纹 (ToolName + Arguments 的 Hash)
consecutiveFailures map[string]int
}

func NewReminderInjector() *ReminderInjector {
return &ReminderInjector{
consecutiveFailures: make(map[string]int),
}
}

// generateFingerprint 生成工具调用的唯一指纹,用于判断大模型是否在重复相同的动作
func generateFingerprint(toolName string, args []byte) string {
hasher := md5.New()
hasher.Write([]byte(toolName))
hasher.Write(args)
return hex.EncodeToString(hasher.Sum(nil))
}

// CheckAndInject 分析本轮的执行结果,决定是否要在 Context 尾部追加 Reminder
// 返回的 schema.Message 将作为最新的用户输入,强制大模型优先阅读。
func (r *ReminderInjector) CheckAndInject(lastToolCall schema.ToolCall, lastResult schema.ToolResult) *schema.Message {
fingerprint := generateFingerprint(lastToolCall.Name, lastToolCall.Arguments)

// 如果工具执行成功,说明 Agent 在这条路径上走通了,清空所有失败计数器
if !lastResult.IsError {
r.consecutiveFailures = make(map[string]int)
return nil
}

// 如果执行失败,累加该特征的失败次数
r.consecutiveFailures[fingerprint]++
failCount := r.consecutiveFailures[fingerprint]

log.Printf("[Reminder] 监控到工具 %s 执行失败,该参数特征连续失败次数: %d\n", lastToolCall.Name, failCount)

// 【驾驭底线】:触发死循环打断机制!
// 我们设定阈值为 3 次。如果大模型连续 3 次都在同一个地方跌倒,必须强行打断它的局部执念。
if failCount >= 3 {
log.Println("[Reminder] ⚠️ 触发死循环干预!注入强力修正指令。")

// 构造一条极其严厉的行动指南
nudgeMsg := fmt.Sprintf(`[SYSTEM REMINDER 警告]
你似乎陷入了死循环。你刚刚连续 %d 次使用相同的参数调用了 '%s' 工具,并且都失败了。
请立即停止这种无效的重试!你的注意力被当前的报错过度吸引了。
你需要:
1. 停止猜测参数。跳出当前的局部思维。
2. 彻底改变你的策略。
3. 如果你确实无法通过系统工具解决当前问题,请直接结束任务并向用户说明你需要什么人工帮助,而不是继续盲目消耗 API 资源尝试。`, failCount, lastToolCall.Name)

return &schema.Message{
Role: schema.RoleUser, // 【核心】必须是 RoleUser,以保证在下一次 API 请求时拥有最高的近因效应权重
Content: nudgeMsg,
}
}

return nil
}

高危操作

想象一下:在一个深夜的运维群里,你让 Agent 帮忙清理一下某台机器上无用的日志。Agent “聪明”地组合出了一条命令:bash: “rm -rf /var/log/*”。

如果此时系统依然处于 YOLO 模式,它会瞬间清空这台机器的日志目录,第二天你可能就会收到公司的严重警告。

在驾驭工程中,安全性绝对不能依赖于大模型的“理智”,更不能寄希望于写在 System Prompt 里的那句“千万别删库”。 我们必须在底层的执行节点,构筑一道坚不可摧的物理防线。

Middleware 实现 Human-in-Loop

优秀的 Harness 引擎采用了 Middleware/ Hook 模式。

  1. 统一拦截点:在 Registry 接收到大模型的 ToolCall 请求后,但在真正调用底层 tool.Execute() 之前。

  2. 审批通道:当检测到高危操作(如 bash 匹配到了 rm、sudo 等黑名单正则)时,Middleware 会阻塞当前的执行协程。

  3. Human-in-the-loop:通过第 9 讲建立的 Reporter 通道,向飞书发送一张包含“同意”和“拒绝”指令的交互信息。

  4. 放行或阻断:人类在飞书上确认回复后,触发 Webhook 回调,通过 Go 的 channel 发送信号解除阻塞。同意则继续执行;拒绝则直接向模型返回“人类拒绝执行”的报错。

Middleware

架构权衡

我们极力推崇了 YOLO 的极简哲学:放弃本地“安全剧场”,默认全权信任,从而换取较高的执行效率。而今天,我们却要大费周章地引入飞书人工审批。这矛盾吗?并不矛盾。驾驭工程的本质,就是可以针对不同的物理环境,进行动态的安全与效率折中。

  • 在本地单机开发时(CLI 场景): 总是通过飞书或者弹窗进行人工审批,效率极低,会严重打断开发者的心流。在这一场景下,业界公认的“既要效率、又要安全”的做法是:沙箱(Sandboxing) + YOLO 机制。开发者可以将 Agent 运行在隔离的 Docker 容器、轻量级沙箱或 MicroVM 中。由于环境是完全物理隔离且易于销毁重置的,Agent 即使在里面执行了 rm -rf / 也无伤大雅。这种“物理层隔离”打消了权限配置的顾虑,让 Agent 能在沙箱内享受极致的 YOLO 执行快感。

  • 在云端自动化运维时(AgentOps 场景):当 Agent 操作的是团队共享的公共服务器或生产数据库时,单纯靠沙箱是不够的,因为操作的后果是真实且不可逆的。此时,必须引入细粒度的权限体系(Permission System)。 我们可以通过 Middleware 模拟类似 allow / ask / deny 的三态控制:

    • allow:白名单命令(如 git status),直接放行。

    • ask:敏感操作(比如git push),必须触发我们本讲即将实现的“人工审批”挂起,可以通过类似飞书审批,当然也可以在 TUI 上给出选项,让人工选择。

    • deny:黑名单操作,直接拦截并报错。

实现方案

改造 Registry,引入 Middleware 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// internal/tools/registry.go
package tools

import (
"context"
"fmt"
"log"

"github.com/yourname/go-tiny-claw/internal/schema"
)

// MiddlewareFunc 定义了中间件的签名。
// 它接收当前的 ToolCall,并返回一个是否允许执行的布尔值 (allowed),以及拦截时的原因 (rejectReason)。
type MiddlewareFunc func(ctx context.Context, call schema.ToolCall) (allowed bool, rejectReason string)

// BaseTool 接口保持不变...

// Registry 接口增加挂载 Middleware 的方法
type Registry interface {
Register(tool BaseTool)
Use(mw MiddlewareFunc) // 【新增】全局 Middleware 挂载点
GetAvailableTools() []schema.ToolDefinition
Execute(ctx context.Context, call schema.ToolCall) schema.ToolResult
}

type registryImpl struct {
tools map[string]BaseTool
middlewares []MiddlewareFunc // 【新增】保存挂载的中间件链
}

func NewRegistry() Registry {
return &registryImpl{
tools: make(map[string]BaseTool),
middlewares: make([]MiddlewareFunc, 0),
}
}

func (r *registryImpl) Use(mw MiddlewareFunc) {
r.middlewares = append(r.middlewares, mw)
}

// ... Register 和 GetAvailableTools 保持不变 ...

func (r *registryImpl) Execute(ctx context.Context, call schema.ToolCall) schema.ToolResult {
// 1. 路由查找
tool, exists := r.tools[call.Name]
if !exists {
return schema.ToolResult{
ToolCallID: call.ID,
Output: fmt.Sprintf("Error: 系统中不存在名为 '%s' 的工具。", call.Name),
IsError: true,
}
}

// 2. 【核心防御】在执行底层逻辑前,依次运行所有的 Middleware
for _, mw := range r.middlewares {
allowed, reason := mw(ctx, call)
if !allowed {
log.Printf("[Registry] ⚠️ 工具 %s 被 Middleware 拦截: %s\n", call.Name, reason)
return schema.ToolResult{
ToolCallID: call.ID,
Output: fmt.Sprintf("执行被系统拦截。原因: %s", reason),
IsError: true, // 必须返回 Error,强制大模型阅读拒绝理由
}
}
}

// 3. 执行工具逻辑 (如果所有 Middleware 都放行了)
output, err := tool.Execute(ctx, call.Arguments)
if err != nil {
return schema.ToolResult{
ToolCallID: call.ID,
Output: fmt.Sprintf("Error executing %s: %v", call.Name, err),
IsError: true,
}
}

return schema.ToolResult{
ToolCallID: call.ID,
Output: output,
IsError: false,
}
}

现在,Registry 拥有了一道坚固的防火墙。只要任何一个 Middleware 返回 allowed: false,工具的底层 Execute 就绝对不会被触发。

实现跨协程的审批中枢(Approval Manager)

当 Middleware 判断需要拦截时,它必须把当前大模型的请求“挂起”。但同时,我们的飞书 Webhook 回调(监听用户的指令)是运行在另一个 Goroutine 中的。因此,我们需要一个基于 channel 的并发安全管理器,用于在两者之间传递“放行”或“拒绝”的信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// internal/feishu/approval.go
package feishu

import (
"fmt"
"log"
"regexp"
"sync"
)

// ApprovalResult 审批结果包
type ApprovalResult struct {
Allowed bool
Reason string
}

// ApprovalManager 统一管理当前正在等待人类审批的任务
type ApprovalManager struct {
mu sync.RWMutex
// Key 是用于审批的唯一 TaskID,Value 是接收审批结果的 Channel
pendingTasks map[string]chan ApprovalResult
}

// 全局单例,方便在 Registry Middleware 和 Feishu Webhook 之间共享状态
var GlobalApprovalMgr = &ApprovalManager{
pendingTasks: make(map[string]chan ApprovalResult),
}

// WaitForApproval 发送飞书通知,并阻塞当前协程等待回调结果
func (m *ApprovalManager) WaitForApproval(taskID string, toolName string, args string, reporter *FeishuReporter) (bool, string) {
// 1. 创建用于阻塞当前引擎协程的 channel (容量为 1 防止死锁)
ch := make(chan ApprovalResult, 1)

m.mu.Lock()
m.pendingTasks[taskID] = ch
m.mu.Unlock()

// 2. 通过 Reporter 向飞书发送请求信息
// (在实际的高级应用中,这里可以构建一张带有交互 Button 的精致飞书卡片)
noticeMsg := fmt.Sprintf(`⚠️ **高危操作审批请求**
Agent 试图执行以下动作:
- 工具: %s
- 参数: %s

任务 ID: **%s**

👉 请在此消息下方回复 "approve %s" 或 "reject %s" 来决定是否放行。`, toolName, args, taskID, taskID, taskID)

// 注意:因为 Middleware 的签名里没有带 Reporter,我们在 main.go 里初始化时必须把 reporter 传进来
if reporter != nil {
reporter.sendMsg(noticeMsg)
} else {
// 回退到终端打印 (兼容本地 CLI 模式)
fmt.Printf("\n\033[31m[需要审批 TaskID: %s]\033[0m %s\n", taskID, noticeMsg)
}

log.Printf("[Approval] 已发送审批请求 (TaskID: %s),协程挂起等待...\n", taskID)

// 3. 【驾驭核心】:死死阻塞,等待飞书 Webhook 唤醒!
result := <-ch

// 4. 获取到结果后,清理内存资源
m.mu.Lock()
delete(m.pendingTasks, taskID)
m.mu.Unlock()

return result.Allowed, result.Reason
}

// ResolveApproval 由飞书 Webhook 回调触发,向 channel 发送信号解开阻塞
func (m *ApprovalManager) ResolveApproval(taskID string, allowed bool, reason string) {
m.mu.RLock()
ch, exists := m.pendingTasks[taskID]
m.mu.RUnlock()

if exists {
log.Printf("[Approval] 收到来自飞书的审批结果 (TaskID: %s, Allowed: %v)\n", taskID, allowed)
ch <- ApprovalResult{Allowed: allowed, Reason: reason}
} else {
log.Printf("[Approval] 找不到对应的 TaskID: %s,可能已超时或处理完毕\n", taskID)
}
}

// IsDangerousCommand 简单的正则检查黑名单,判断该工具调用是否需要审批
func IsDangerousCommand(toolName string, args string) bool {
// 对于纯读取的工具,默认 YOLO 模式,全部放行
if toolName != "bash" && toolName != "write_file" && toolName != "edit_file" {
return false
}

// 针对 bash 的高危模式匹配
if toolName == "bash" {
dangerousPatterns := []string{
`rm\s+-r`, // 级联删除
`sudo\s+`, // 提权
`drop\s+`, // 数据库删除
`>.*\.go`, // 恶意覆盖源代码
}
for _, p := range dangerousPatterns {
matched, _ := regexp.MatchString(p, args)
if matched {
return true
}
}
}
return false
}

在飞书 Bot 中监听审批口令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// internal/feishu/bot.go
package feishu

import (
"context"
"strings"
// ... 保持原有导入 ...
)

type FeishuBot struct {
client *lark.Client
appID string
appSecret string
engine *engine.AgentEngine
sess *ctxpkg.Session // 新增session信息
r *FeishuReporter // 新增实现Reporter接口的FeishuReporter实例
}

func NewFeishuBot(eng *engine.AgentEngine, sess *ctxpkg.Session) *FeishuBot {
appID := os.Getenv("FEISHU_APP_ID")
appSecret := os.Getenv("FEISHU_APP_SECRET")

if appID == "" || appSecret == "" {
log.Fatal("请设置 FEISHU_APP_ID 和 FEISHU_APP_SECRET")
}
// ... 保持原有代码 ...

client := lark.NewClient(appID, appSecret)

return &FeishuBot{
client: client,
appID: appID,
appSecret: appSecret,
engine: eng,
sess: sess, // 绑定session信息
}
}

func (b *FeishuBot) GetEventDispatcher() *dispatcher.EventDispatcher {
encryptKey := os.Getenv("FEISHU_ENCRYPT_KEY")
verifyToken := os.Getenv("FEISHU_VERIFY_TOKEN")

handler := dispatcher.NewEventDispatcher(verifyToken, encryptKey).
OnP2MessageReceiveV1(func(ctx context.Context, event *larkim.P2MessageReceiveV1) error {
contentStr := *event.Event.Message.Content
contentStr = strings.TrimPrefix(contentStr, `{"text":"`)
contentStr = strings.TrimSuffix(contentStr, `"}`)

chatId := *event.Event.Message.ChatId
log.Printf("[Feishu] 收到会话 %s 消息: %s\n", chatId, contentStr)

// 【新增】:拦截人工审批的特殊口令
if strings.HasPrefix(contentStr, "approve ") {
taskID := strings.TrimPrefix(contentStr, "approve ")
taskID = strings.TrimSpace(taskID)
// 唤醒挂起的引擎协程!
GlobalApprovalMgr.ResolveApproval(taskID, true, "人类管理员已批准操作")
log.Printf("[Feishu] 会话 %s: ✅ 已为您批准任务 %s", chatId, taskID)
return nil
}
if strings.HasPrefix(contentStr, "reject ") {
taskID := strings.TrimPrefix(contentStr, "reject ")
taskID = strings.TrimSpace(taskID)
// 唤醒挂起的引擎协程,并反馈拒绝理由!
GlobalApprovalMgr.ResolveApproval(taskID, false, "人类管理员认为该操作存在极高风险,已无情拒绝")
log.Printf("[Feishu] 会话 %s: 🚫 已拒绝任务 %s", chatId, taskID)
return nil
}

// 如果不是审批命令,则是正常对话,启动一个新的 Agent 任务去处理
go b.handleAgentRun(chatId, contentStr)

return nil
}).
OnP2MessageReadV1(func(ctx context.Context, event *larkim.P2MessageReadV1) error {
// 消息已读事件,静默忽略
return nil
})

return handler
}

// 新增一个方法,返回FeishuBot绑定的Reporter
func (b *FeishuBot) Reporter() *FeishuReporter {
return b.r
}

func (b *FeishuBot) handleAgentRun(chatId string, prompt string) {
reporter := &FeishuReporter{
client: b.client,
chatId: chatId,
}
b.r = reporter
b.sess.Append(schema.Message{Role: schema.RoleUser, Content: prompt}) // 将prompt加入会话中
err := b.engine.Run(context.Background(), b.sess, reporter)
if err != nil {
reporter.sendMsg(fmt.Sprintf("❌ Agent 运行崩溃: %v", err))
}
}

// FeishuReporter 的实现保持不变 ...

在入口组装并挂载 Middleware

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// cmd/claw/main.go

func main() {
if os.Getenv("ZHIPU_API_KEY") == "" {
log.Fatal("请先导出 ZHIPU_API_KEY 环境变量")
}

workDir, _ := os.Getwd()
workDir += "/workspace"

llmProvider := provider.NewZhipuOpenAIProvider("glm-4.5-air")

registry := tools.NewRegistry()
registry.Register(tools.NewReadFileTool(workDir))
registry.Register(tools.NewWriteFileTool(workDir))
registry.Register(tools.NewBashTool(workDir))
registry.Register(tools.NewEditFileTool(workDir))

eng := engine.NewAgentEngine(llmProvider, registry, false, false)

// 假设一个bot绑定一个session
sessionID := "test_command_intercept_001"
sess := ctxpkg.GlobalSessionMgr.GetOrCreate(sessionID, workDir)
sess.Append(schema.Message{Role: schema.RoleUser, Content: ""})

bot := feishu.NewFeishuBot(eng, sess)
handler := httpserverext.NewEventHandlerFunc(bot.GetEventDispatcher())

// 【核心注入】注册安全拦截 Middleware
registry.Use(func(ctx context.Context, call schema.ToolCall) (bool, string) {
argsStr := string(call.Arguments)

// 检查是否命中高危特征库
if feishu.IsDangerousCommand(call.Name, argsStr) {
taskID := call.ID // 使用大模型生成的唯一 ToolCallID 作为 TaskID

// 挂起当前协程,发送消息给飞书,死死等待人类的审批!
allowed, reason := feishu.GlobalApprovalMgr.WaitForApproval(taskID, call.Name, argsStr, bot.Reporter())

if !allowed {
return false, reason // 拒绝,将理由传回给大模型
}
return true, "" // 同意,放行底层工具
}

// 没命中黑名单,直接 YOLO 放行
return true, ""
})

// 3. 注册路由并启动 HTTP 服务
http.HandleFunc("/webhook/event", handler)

port := ":48080"
log.Printf("🚀 go-tiny-claw 飞书服务端已启动,正在监听 %s 端口\n", port)

err := http.ListenAndServe(port, nil)
if err != nil {
log.Fatalf("服务器启动失败: %v", err)
}
}