从零搭建 Agent Harness 系列(九)可观测性

在过去的几个模块中,我们如同打造一辆超级跑车般,为 go-tiny-claw 组装了强大的 V8 引擎(Main Loop)、防抱死刹车(Safety Middleware)、甚至是能自动寻路的“副驾驶”(Subagent)。但是,如果这辆跑车没有“仪表盘(Dashboard)”,你敢把它开上真实的赛道吗?

想象一下,你把 go-tiny-claw 部署到了公司的生产环境中,团队的 10 个开发人员每天都在飞书里唤醒它去做代码 Review 和 Bug 排查。月底结算时,老板拿着一张高达几万元的 API 账单质问你:为什么这个月的大模型费用这么高?到底是哪一个任务、调了哪个工具消耗了最多的 Token?Agent 每次回复都要等 30 秒,到底是网络慢、还是它在本地执行 go test 慢、还是大模型推理慢?

如果你无法回答这些问题,你的 Agent 依然只能是一个“玩具”,老板不会批准你将其投入到日常生产,也无法成为企业级的数字资产。

我们将通过极简的代码,在 Harness 层(而非业务层)拦截大模型的返回包,精确记录 Token 消耗、金钱成本和执行耗时。

成本追踪

成本由哪些构成

在调用大模型 API 时,成本主要由两部分构成:

  1. Prompt Tokens(输入 Token):这是大模型阅读系统提示词、对话历史和文件内容的成本。在 go-tiny-claw 中,由于上下文是在不断累加的,输入 Token 会随着对话轮数呈现出近似 O(n²) 的增长趋势。

  2. Completion Tokens(输出 Token):这是大模型生成回答、思考过程(Thinking Trace)和工具调用参数(JSON)的成本。通常比输入 Token 贵 3-5 倍。

除了金钱成本,时间成本也是决定 Agent 体验的关键。

一个 Turn 的耗时 = 大模型推理耗时 + 工具在本地的物理执行耗时(如 go build)。

代码实战:构建 Cost Tracker 中间件

接下来,我们将用 Go 语言将这个优雅的架构变现。

第 1 步:扩展基础数据结构

大模型 API 会在返回结果中附带 Token 消耗的元数据(Metadata)。我们需要在 schema 中找个地方接住它们。打开 internal/schema/message.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// internal/schema/message.go
package schema

import "encoding/json"

// Usage 记录了单次大模型 API 调用的 Token 消耗
type Usage struct {
PromptTokens int `json:"prompt_tokens"` // 输入的 Token 数量
CompletionTokens int `json:"completion_tokens"` // 产生的 Token 数量
}

// Message 代表上下文中传递的单条消息
type Message struct {
Role Role `json:"role"`
Content string `json:"content"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`

// 【新增】如果这是大模型 (Assistant) 的回复,此字段存放本次调用的 Token 消耗
Usage *Usage `json:"usage,omitempty"`
}

// ... 其余定义保持不变 ...

接着,我们需要让 Session 能够记住自己“这辈子”一共花了多少钱。打开 internal/engine/session.go,修改 Session 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// internal/engine/session.go
package engine

import (
// ... 保持原有导入 ...
)

type Session struct {
ID string
CreatedAt time.Time
UpdatedAt time.Time

// 【新增】用于统计该 Session 累计消耗的资源
TotalPromptTokens int
TotalCompletionTokens int
TotalCostCNY float64

history []schema.Message
mu sync.RWMutex
}

// RecordUsage 是一个给外部 Tracker 调用的辅助方法,用于累加账单
func (s *Session) RecordUsage(prompt int, completion int, cost float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.TotalPromptTokens += prompt
s.TotalCompletionTokens += completion
s.TotalCostCNY += cost
}

// ... 其余方法保持不变 ...

第 2 步:在 Provider 适配层提取 Token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// internal/provider/openai.go
package provider

import (
// ... 保持原有导入 ...
)

// ... NewZhipuOpenAIProvider 等保持不变 ...

func (p *OpenAIProvider) Generate(ctx context.Context, msgs []schema.Message, availableTools []schema.ToolDefinition) (*schema.Message, error) {
// ... 前面组装请求的代码完全保持不变 ...

resp, err := p.client.Chat.Completions.New(ctx, params)
if err != nil {
return nil, fmt.Errorf("OpenAI/Zhipu API 请求失败: %w", err)
}

choice := resp.Choices[0].Message
resultMsg := &schema.Message{
Role: schema.RoleAssistant,
Content: choice.Content,
}

// 【新增】提取 Usage 信息
if resp.Usage.PromptTokens > 0 || resp.Usage.CompletionTokens > 0 {
resultMsg.Usage = &schema.Usage{
PromptTokens: int(resp.Usage.PromptTokens),
CompletionTokens: int(resp.Usage.CompletionTokens),
}
}

// ... 后面解析 ToolCalls 的代码完全保持不变 ...

return resultMsg, nil
}

第 3 步:编写优雅的 Cost Tracker 装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// internal/observability/tracker.go
package observability

import (
"context"
"log"
"time"

"github.com/yourname/go-tiny-claw/internal/provider"
"github.com/yourname/go-tiny-claw/internal/schema"
ctxpkg "github.com/yourname/go-tiny-claw/internal/context"
)

// PricingModel 定义了不同大模型的计费标准 (单位: 美元/1M Tokens)
// 为了演示,这里硬编码了当前市面上几个主流模型的官方大致定价。
var PricingModel = map[string]struct {
InputPrice float64
OutputPrice float64
}{
"glm-4.5-air": {InputPrice: 0.15, OutputPrice: 0.15}, // 这里假定的大模型价格(每百万Token,tk)
}

// CostTracker 是一个包装了真实 LLMProvider 的装饰器中间件
type CostTracker struct {
nextProvider provider.LLMProvider
modelName string
session *ctxpkg.Session // 当前所属的会话 (用于累加总成本)
}

// NewCostTracker 构造函数:接收一个现有的 Provider,返回一个被监控的 Provider
func NewCostTracker(next provider.LLMProvider, modelName string, session *ctxpkg.Session) *CostTracker {
return &CostTracker{
nextProvider: next,
modelName: modelName,
session: session,
}
}

// Generate 实现了 LLMProvider 接口!这意味着它可以被无缝注入到 Main Loop 中。
func (t *CostTracker) Generate(ctx context.Context, msgs []schema.Message, availableTools []schema.ToolDefinition) (*schema.Message, error) {

// 1. 记录请求发起的时刻
startTime := time.Now()

// 2. 调用真实的底层大模型去执行耗时的网络请求
respMsg, err := t.nextProvider.Generate(ctx, msgs, availableTools)

// 3. 计算耗时
latency := time.Since(startTime)

// 如果报错了,只打印报错时间,不计费
if err != nil {
log.Printf("[Tracker] ❌ API 调用失败,耗时: %v\n", latency)
return respMsg, err
}

// 4. 解析 Token 并计算成本
if respMsg.Usage != nil {
promptTokens := respMsg.Usage.PromptTokens
completionTokens := respMsg.Usage.CompletionTokens

var cost float64
if price, exists := PricingModel[t.modelName]; exists {
// 计算美元花费 = (输入Tokens * 输入单价 + 输出Tokens * 输出单价) / 1000000
cost = (float64(promptTokens)*price.InputPrice + float64(completionTokens)*price.OutputPrice) / 1000000.0
}

// 5. 打印精美的仪表盘日志
log.Printf("[Tracker] 📊 API 调用完成 | 耗时: %v | 输入: %d tk | 输出: %d tk | 花费: ¥%.6f\n",
latency, promptTokens, completionTokens, cost)

// 6. 将账单累加到当前的 Session 中,供人类后续随时查询
if t.session != nil {
t.session.RecordUsage(promptTokens, completionTokens, cost)
log.Printf("[Tracker] 💰 当前会话 (%s) 累计花费: ¥%.6f\n", t.session.ID, t.session.TotalCostCNY)
}
} else {
log.Printf("[Tracker] ⚠️ API 调用完成,但未返回 Usage 数据 | 耗时: %v\n", latency)
}

return respMsg, nil
}

第 4 步:在 Main 函数中像组装乐高一样串联它们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// cmd/claw/main.go
package main

import (
"context"
"log"
"os"

ctxpkg "github.com/yourname/go-tiny-claw/internal/context"
"github.com/yourname/go-tiny-claw/internal/engine"
"github.com/yourname/go-tiny-claw/internal/observability" // 导入监控包
"github.com/yourname/go-tiny-claw/internal/provider"
"github.com/yourname/go-tiny-claw/internal/schema"
"github.com/yourname/go-tiny-claw/internal/tools"
)

func main() {
if os.Getenv("ZHIPU_API_KEY") == "" {
log.Fatal("请先导出 ZHIPU_API_KEY 环境变量")
}

workDir, _ := os.Getwd()
modelName := "glm-4.5-air"

// 1. 初始化真实的底层大脑
realProvider := provider.NewZhipuOpenAIProvider(modelName)

sessionID := "test_observability_001"
sess := ctxpkg.GlobalSessionMgr.GetOrCreate(sessionID, workDir)

// 2. 核心拼装:用 Tracker 将真实的大脑包裹起来
trackedProvider := observability.NewCostTracker(realProvider, modelName, sess)

registry := tools.NewRegistry()
registry.Register(tools.NewBashTool(workDir))

// 3. 将被包裹的 Provider 注入给 Engine (Engine 毫不知情)
eng := engine.NewAgentEngine(trackedProvider, registry, false, false)
reporter := engine.NewTerminalReporter()

prompt := `请用 bash 帮我用 date 命令查一下现在的时间。`

log.Println("\n>>> 🚀 启动带仪表盘的可观测性测试...")
sess.Append(schema.Message{Role: schema.RoleUser, Content: prompt})

err := eng.Run(context.Background(), sess, reporter)
if err != nil {
log.Fatalf("引擎运行崩溃: %v", err)
}

log.Printf("\n================ 财务报表 ================\n")
log.Printf("会话 ID: %s\n", sess.ID)
log.Printf("总消耗 Input Tokens: %d\n", sess.TotalPromptTokens)
log.Printf("总消耗 Output Tokens: %d\n", sess.TotalCompletionTokens)
log.Printf("总计费用 (CNY): ¥%.6f\n", sess.TotalCostCNY)
log.Printf("==========================================\n")
}

总结

  1. 算明经济账是落地的关键:在驾驭工程中,衡量一个 Agent 是否优秀,除了看它能不能把代码跑通,更要看它的 Token 效率。如果不把成本监控落到代码实处,就无法优化 System Prompt 的长度,也无从判断上下文压缩是否真的起到了省钱的作用。

  2. 装饰器模式的优雅应用:为了保持核心引擎(Main Loop)的纯粹性,我们没有在里面混入任何一行记录时间或计费的代码。我们通过实现一个包装了真实 LLMProvider 的 CostTracker,实现了功能的无缝外挂(运用了类似 AOP 面向切面编程的思想)。

  3. 长期价值的沉淀:通过将会话总账单挂载到 Session 对象上

Tracing机制

大模型本身是一个不可控的“黑盒(Black Box)”。如果在驾驭工程(Harness Engineering)中,我们不能提供透视这个黑盒的“X 光机”,一旦 Agent 发生智障行为,我们将陷入无法调试的境地。

我们将补齐可观测性体系(Observability)中最具技术含量的一环:链路追踪(Tracing)。我们将像微服务架构那样,用纯 Go 语言实现一套轻量级的上下文级联追踪机制,将 Agent 的每一次“思考 - 行动”完整固化为可供回放的 JSON 决策树。

Agent 链路追踪的本质是树(Tree)

在 Agent 的驾驭工程中,Tracing 的理念是完全一致的。只不过,我们的追踪对象从网络节点变成了智能体的决策层级。一个完整的 Agent 运行周期,天然具备一棵极度工整的树状结构:

  1. Root Span(根跨度):代表一次完整的 Run 任务。

  2. Child Spans(子跨度):代表 ReAct 循环中的每一个 Turn。

  3. Leaf Spans(叶子节点):代表每一个 Turn 内部的细分操作,例如 Generate(LLM 调用)、Execute(工具执行)、Compaction(内存压缩)。

代码实战

我们将所有的追踪代码收敛在 internal/observability/trace.go 中,并在 engine 和 tools 层进行埋点。

第 1 步:实现 Trace 数据结构与上下文传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// internal/observability/trace.go
package observability

import (
"context"
"encoding/json"
"os"
"path/filepath"
"sync"
"time"
)

// traceKey 是 Context 中存放 Span 的专属 Key
type traceKey struct{}

// Span 代表链路追踪中的一个时间跨度和操作节点
type Span struct {
Name string `json:"name"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
DurationMs int64 `json:"duration_ms"`
Attributes map[string]interface{} `json:"attributes,omitempty"` // 存放元数据 (如消耗的 Token, 执行的命令)
Children []*Span `json:"children,omitempty"` // 子跨度

mu sync.Mutex // 保护 Children 的并发写入
}

// StartSpan 开启一个新的追踪跨度,并将其级联到 Context 中
func StartSpan(ctx context.Context, name string) (context.Context, *Span) {
span := &Span{
Name: name,
StartTime: time.Now(),
Attributes: make(map[string]interface{}),
}

// 从 context 中尝试获取父 Span
if parent, ok := ctx.Value(traceKey{}).(*Span); ok {
parent.mu.Lock()
parent.Children = append(parent.Children, span)
parent.mu.Unlock()
}

// 将当前新创建的 Span 作为最新的父节点,塞入衍生 Context 并返回
newCtx := context.WithValue(ctx, traceKey{}, span)
return newCtx, span
}

// EndSpan 结束跨度,计算耗时
func (s *Span) EndSpan() {
s.EndTime = time.Now()
s.DurationMs = s.EndTime.Sub(s.StartTime).Milliseconds()
}

// AddAttribute 为当前 Span 记录关键的元数据
func (s *Span) AddAttribute(key string, value interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.Attributes[key] = value
}

// ExportTraceToFile 当整个根 Span 结束时,将其序列化并保存为本地 JSON 文件
func ExportTraceToFile(rootSpan *Span, workDir string, sessionID string) error {
traceDir := filepath.Join(workDir, ".claw", "traces")
os.MkdirAll(traceDir, 0755)

filename := filepath.Join(traceDir, fmt.Sprintf("trace_%s_%d.json", sessionID, time.Now().Unix()))

// 美化输出 JSON,便于人类和工具阅读
data, err := json.MarshalIndent(rootSpan, "", " ")
if err != nil {
return err
}

return os.WriteFile(filename, data, 0644)
}

这段代码完美利用了 Go 语言 context.WithValue 的特性。我们通过每次进入新函数时调用 ctx, span := StartSpan(ctx, “Name”),在不知不觉中构建出了一棵完整的调用树,而且完全不用担心并发安全问题。

第 2 步:在核心代码中埋点 (Instrumentation)

有了工具,接下来我们要在 Harness 的关键生命周期节点进行“埋点”。埋点在驾驭工程中是一项艺术:埋得太多,性能下降、日志噪音大;埋得太少,关键信息丢失。

在 Main Loop 中埋点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// internal/engine/loop.go
package engine

import (
"context"
// ... 其他导入保持不变 ...
"github.com/yourname/go-tiny-claw/internal/observability"
)

// ... AgentEngine 定义保持不变 ...

func (e *AgentEngine) Run(ctx context.Context, session *Session, reporter Reporter) error {
log.Printf("[Engine] 唤醒会话 [%s],锁定工作区: %s (PlanMode: %v)\n", session.ID, session.WorkDir, e.PlanMode)

// 【埋点 1】:开启 Root Span,记录整个任务的生命周期
ctx, rootSpan := observability.StartSpan(ctx, "Agent.Run")
rootSpan.AddAttribute("SessionID", session.ID)
rootSpan.AddAttribute("WorkDir", session.WorkDir)

// defer 保证在引擎退出时,无论成功失败,都能结束根 Span 并导出 Trace 报告
defer func() {
rootSpan.EndSpan()
_ = observability.ExportTraceToFile(rootSpan, session.WorkDir, session.ID)
log.Printf("📊 [Tracing] 本次任务的执行回放链路已保存至工作区的 .claw/traces 目录下\n")
}()

composer := ctxpkg.NewPromptComposer(session.WorkDir, e.PlanMode)
systemMsg := composer.Build()

turnCount := 0
for {
turnCount++

// 【埋点 2】:记录单次 Turn 循环
turnCtx, turnSpan := observability.StartSpan(ctx, fmt.Sprintf("Turn-%d", turnCount))
defer turnSpan.EndSpan() // 利用 defer,哪怕遇到了 break 或 error 也会计算耗时

availableTools := e.registry.GetAvailableTools()
workingMemory := session.GetWorkingMemory(20)

var contextHistory []schema.Message
contextHistory = append(contextHistory, systemMsg)
contextHistory = append(contextHistory, workingMemory...)
compactedContext := e.compactor.Compact(contextHistory)

// 记录发给模型的实际上下文大小,非常有助于排查幻觉
turnSpan.AddAttribute("context_message_count", len(compactedContext))

// ================= Phase 1: Thinking =================
var currentTurnThinkingContent string

if e.EnableThinking {
if reporter != nil { reporter.OnThinking(turnCtx) } // 传递带有 trace 的 turnCtx

// 【埋点 3】:记录 Thinking 调用
thinkCtx, thinkSpan := observability.StartSpan(turnCtx, "LLM.Thinking")
thinkResp, err := e.provider.Generate(thinkCtx, compactedContext, nil)
thinkSpan.EndSpan() // 结束思考跨度

if err != nil {
return fmt.Errorf("Thinking 阶段失败: %w", err)
}
if thinkResp.Content != "" {
currentTurnThinkingContent = thinkResp.Content
compactedContext = append(compactedContext, *thinkResp)
}
}

// ================= Phase 2: Action =================
// 【埋点 4】:记录 Action 调用
actCtx, actSpan := observability.StartSpan(turnCtx, "LLM.Action")
actionResp, err := e.provider.Generate(actCtx, compactedContext, availableTools)
actSpan.EndSpan() // 结束行动跨度

if err != nil {
return fmt.Errorf("Action 阶段失败: %w", err)
}

session.Append(*actionResp)
// ... 输出回调逻辑不变 ...

if len(actionResp.ToolCalls) == 0 {
turnSpan.EndSpan() // 没有工具调用,正常结束
break
}

// ================= 并发执行工具 =================
observationMsgs := make([]schema.Message, len(actionResp.ToolCalls))
var wg sync.WaitGroup

for i, toolCall := range actionResp.ToolCalls {
wg.Add(1)
go func(idx int, call schema.ToolCall) {
defer wg.Done()
... ...

// 此时,传给 Registry 的 ctx 是带有当前 Turn 的上下文。
// 并且由于是并发执行,多个工具的 Span 会平行地挂在 Turn 节点下!
result := e.registry.Execute(turnCtx, call)

// ... 错误注入等不变 ...

observationMsgs[idx] = schema.Message{
Role: schema.RoleUser,
Content: result.Output, // 生产环境为了 json 不至于过大,可考虑此处不塞入全量 Output
ToolCallID: call.ID,
}
}(i, toolCall)
}

wg.Wait()
session.Append(observationMsgs...)

// 结束本轮 Turn 的 Span
turnSpan.EndSpan()

// ... System Reminder 干预逻辑不变 ...
}

return nil
}
在 Tool Registry 中埋点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// internal/tools/registry.go (局部修改)
package tools

import (
// ... 导入保持不变 ...
"github.com/yourname/go-tiny-claw/internal/observability"
)

func (r *registryImpl) Execute(ctx context.Context, call schema.ToolCall) schema.ToolResult {
// 【埋点 5】:开启工具执行的 Span
ctx, span := observability.StartSpan(ctx, "Tool.Execute")
span.AddAttribute("tool_name", call.Name)
// 将 JSON 参数存入以备调试
span.AddAttribute("arguments", string(call.Arguments))

defer span.EndSpan() // 无论成功失败,确保结束

tool, exists := r.tools[call.Name]
if !exists {
// ...
}

for _, mw := range r.middlewares {
allowed, reason := mw(ctx, call)
if !allowed {
span.AddAttribute("intercepted", true)
span.AddAttribute("reject_reason", reason)
// ...
}
}

output, err := tool.Execute(ctx, call.Arguments)

if err != nil {
span.AddAttribute("error", err.Error())
// ...
}

// 我们甚至可以只截取输出的前 100 字符放入 Trace,防止 Trace 文件过度膨胀
span.AddAttribute("output_preview", truncate(output, 100))

return schema.ToolResult{
// ...
}
}

func truncate(s string, max int) string {
if len(s) > max {
return s[:max] + "..."
}
return s
}