第 22 章:可观测性
可观测性让你的系统像玻璃一样透明——用指标量化性能、用追踪定位瓶颈、用日志记录细节。 但过度观测也有代价:指标爆炸、存储成本、性能开销。选择观测什么,比观测本身更重要。
⏱️ 快速通道(5 分钟掌握核心)
- 三支柱各有强项:Metrics 看趋势、Traces 追链路、Logs 查细节
- Prometheus 指标命名用
{系统}_{模块}_{动作}_{单位}格式- OpenTelemetry Trace 串联跨服务调用,用 Span 记录每一跳
- 结构化日志必须带 trace_id,方便三者关联分析
- 采样策略:生产环境 10-20%,关键路径可提高到 50%
10 分钟路径:22.1-22.3 → 22.5 → Shannon Lab
周一早上,用户投诉:"Agent 响应太慢了,上周还挺快的。"
你打开日志,翻了一个小时:
- LLM API 是变慢了,还是我们的代码慢了?
- 是所有请求都慢,还是只有某类任务慢?
- Token 消耗是正常的还是异常的?
- 哪个 Agent 最耗时?失败率是多少?
答案是:不知道,只能猜。
这就是为什么需要可观测性。
22.1 可观测性三支柱
三者各有侧重,互为补充:
| 支柱 | 强项 | 弱项 | 典型问题 |
|---|---|---|---|
| Metrics | 聚合、趋势、告警 | 不知道单次请求细节 | "失败率是多少?" |
| Traces | 单次请求全链路 | 存储成本高 | "这个请求卡在哪里?" |
| Logs | 详细上下文 | 难以聚合分析 | "报错时参数是什么?" |
22.2 Prometheus 指标
为什么选 Prometheus?
- 时序数据库,天然适合性能指标
- 拉取模式(Pull),服务无需知道监控系统存在
- 强大的查询语言 PromQL
- 生态成熟:Grafana、Alertmanager
Shannon 的指标体系
Shannon 在 go/orchestrator/internal/workflows/metrics/pattern_metrics.go 中定义了 Pattern 级别的指标:
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// Pattern 执行计数器
PatternExecutions = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_pattern_executions_total",
Help: "Total number of pattern executions by type",
},
[]string{"pattern", "workflow_version"},
)
// Pattern 执行时长
PatternDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "shannon_pattern_duration_seconds",
Help: "Duration of pattern executions in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"pattern", "workflow_version"},
)
// 每个 Pattern 的 Agent 执行数
AgentExecutionsByPattern = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_agents_by_pattern_total",
Help: "Total number of agents executed by pattern type",
},
[]string{"pattern", "workflow_version"},
)
// Token 使用量
TokenUsageByPattern = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_tokens_by_pattern_total",
Help: "Total tokens used by pattern type",
},
[]string{"pattern", "workflow_version"},
)
// Reflection 改进次数
ReflectionImprovements = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_reflection_improvements_total",
Help: "Number of times reflection improved quality",
},
[]string{"workflow_version"},
)
)
指标分类
一个完整的 Agent 系统需要哪些指标?
1. 工作流级指标
// 工作流完成计数
WorkflowsCompleted = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_workflows_completed_total",
Help: "Total number of workflows completed",
},
[]string{"workflow_type", "mode", "status"}, // research/react/dag, sync/async, success/failed
)
// 工作流延迟分布
WorkflowDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "shannon_workflow_duration_seconds",
Help: "Workflow execution duration in seconds",
Buckets: []float64{1, 5, 10, 30, 60, 120, 300}, // 1s 到 5min
},
[]string{"workflow_type", "mode"},
)
2. Token/成本指标
// 每任务 Token 消耗
TaskTokensUsed = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "shannon_task_tokens_used",
Help: "Number of tokens used per task",
Buckets: []float64{100, 500, 1000, 5000, 10000, 50000},
},
)
// 成本(美元)
TaskCostUSD = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "shannon_task_cost_usd",
Help: "Cost in USD per task",
Buckets: []float64{0.001, 0.01, 0.05, 0.1, 0.5, 1, 5},
},
)
3. 记忆系统指标
// 记忆获取操作
MemoryFetches = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_memory_fetches_total",
Help: "Total number of memory fetch operations",
},
[]string{"type", "source", "result"}, // session/semantic, qdrant/redis, hit/miss
)
// 压缩比率
CompressionRatio = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "shannon_compression_ratio",
Help: "Compression ratio (original/compressed)",
Buckets: []float64{1.5, 2, 3, 5, 10, 20},
},
)
4. 向量搜索指标
// 向量搜索次数
VectorSearches = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "shannon_vector_search_total",
Help: "Total number of vector searches",
},
[]string{"collection", "status"},
)
// 向量搜索延迟
VectorSearchLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "shannon_vector_search_latency_seconds",
Help: "Vector search latency in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"collection"},
)
指标记录
Shannon 在 activities/metrics.go 中封装了指标记录:
// PatternMetricsInput contains pattern execution metrics
type PatternMetricsInput struct {
Pattern string
Version string
AgentCount int
TokensUsed int
Duration time.Duration
Improved bool // For reflection pattern
WorkflowType string
}
// RecordPatternMetrics records pattern execution metrics
func RecordPatternMetrics(ctx context.Context, input PatternMetricsInput) error {
// Record pattern execution
metrics.RecordPatternExecution(input.Pattern, input.Version)
// Record duration if provided
if input.Duration > 0 {
metrics.RecordPatternDuration(input.Pattern, input.Version, input.Duration.Seconds())
}
// Record agent executions
if input.AgentCount > 0 {
metrics.RecordAgentExecution(input.Pattern, input.Version, input.AgentCount)
}
// Record token usage
if input.TokensUsed > 0 {
metrics.RecordTokenUsage(input.Pattern, input.Version, input.TokensUsed)
}
// Record reflection improvement
if input.Pattern == "reflection" && input.Improved {
metrics.RecordReflectionImprovement(input.Version)
}
return nil
}
Metrics 服务器
go func() {
http.Handle("/metrics", promhttp.Handler())
port := cfg.MetricsPort(2112)
addr := ":" + fmt.Sprintf("%d", port)
logger.Info("Metrics server listening", zap.String("address", addr))
if err := http.ListenAndServe(addr, nil); err != nil {
logger.Error("Failed to start metrics server", zap.Error(err))
}
}()
22.3 OpenTelemetry 追踪
为什么需要追踪?
指标告诉你"系统整体慢了",但不告诉你"哪个请求卡在哪里"。
追踪解决这个问题:记录一个请求从进入系统到返回的完整路径。
初始化
Shannon 在 go/orchestrator/internal/tracing/tracing.go 中初始化追踪:
package tracing
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.27.0"
oteltrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
var tracer oteltrace.Tracer
// Config holds tracing configuration
type Config struct {
Enabled bool `mapstructure:"enabled"`
ServiceName string `mapstructure:"service_name"`
OTLPEndpoint string `mapstructure:"otlp_endpoint"`
}
// Initialize sets up minimal OTLP tracing
func Initialize(cfg Config, logger *zap.Logger) error {
// Always initialize a tracer handle
if cfg.ServiceName == "" {
cfg.ServiceName = "shannon-orchestrator"
}
tracer = otel.Tracer(cfg.ServiceName)
if !cfg.Enabled {
logger.Info("Tracing disabled")
return nil
}
if cfg.OTLPEndpoint == "" {
cfg.OTLPEndpoint = "localhost:4317"
}
// Create OTLP exporter
exporter, err := otlptracegrpc.New(
context.Background(),
otlptracegrpc.WithEndpoint(cfg.OTLPEndpoint),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return fmt.Errorf("failed to create OTLP exporter: %w", err)
}
// Create resource
res, _ := resource.New(context.Background(),
resource.WithAttributes(
semconv.ServiceName(cfg.ServiceName),
semconv.ServiceVersion("1.0.0"),
),
)
// Create tracer provider
tp := trace.NewTracerProvider(
trace.WithBatcher(exporter),
trace.WithResource(res),
)
otel.SetTracerProvider(tp)
tracer = otel.Tracer(cfg.ServiceName)
logger.Info("Tracing initialized", zap.String("endpoint", cfg.OTLPEndpoint))
return nil
}
Span 创建
// StartSpan creates a new span with the given name
func StartSpan(ctx context.Context, spanName string) (context.Context, oteltrace.Span) {
return tracer.Start(ctx, spanName)
}
// StartHTTPSpan creates a span for HTTP operations with method and URL
func StartHTTPSpan(ctx context.Context, method, url string) (context.Context, oteltrace.Span) {
if tracer == nil {
tracer = otel.Tracer("shannon-orchestrator")
}
spanName := fmt.Sprintf("HTTP %s", method)
ctx, span := tracer.Start(ctx, spanName)
span.SetAttributes(
semconv.HTTPRequestMethodKey.String(method),
semconv.URLFull(url),
)
return ctx, span
}
W3C Traceparent 传播
跨服务调用时,需要传递 Trace Context:
// W3CTraceparent generates a W3C traceparent header value
func W3CTraceparent(ctx context.Context) string {
span := oteltrace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return ""
}
sc := span.SpanContext()
return fmt.Sprintf("00-%s-%s-%02x",
sc.TraceID().String(),
sc.SpanID().String(),
sc.TraceFlags(),
)
}
// InjectTraceparent adds W3C traceparent header to HTTP request
func InjectTraceparent(ctx context.Context, req *http.Request) {
if traceparent := W3CTraceparent(ctx); traceparent != "" {
req.Header.Set("traceparent", traceparent)
}
}
跨服务追踪示例
func (a *Activities) ExecuteAgent(ctx context.Context, input AgentExecutionInput) (...) {
// 创建 HTTP Span
ctx, span := tracing.StartHTTPSpan(ctx, "POST", llmServiceURL)
defer span.End()
// 构建请求
req, _ := http.NewRequestWithContext(ctx, "POST", llmServiceURL, body)
// 注入 Trace Context
tracing.InjectTraceparent(ctx, req)
// 执行请求
resp, err := client.Do(req)
// 记录结果
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return result, nil
}
这样,一个请求从 Orchestrator 到 LLM Service 的整个链路都可以在 Jaeger 或其他追踪系统中看到。
22.4 结构化日志
Zap 日志配置
Shannon 使用 Zap 作为日志库:
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("Failed to initialize logger: %v", err)
}
defer logger.Sync()
日志最佳实践
1. 包含上下文
// 好的日志:包含足够上下文
logger.Info("Workflow started",
zap.String("workflow_id", workflowID),
zap.String("workflow_type", workflowType),
zap.String("session_id", sessionID),
zap.Int("subtasks", len(subtasks)),
)
// 不好的日志:缺乏上下文
logger.Info("Workflow started") // 哪个 workflow?
2. 错误日志要详细
logger.Error("Agent execution failed",
zap.Error(err),
zap.String("agent_id", agentID),
zap.String("workflow_id", workflowID),
zap.Int("attempt", attempt),
zap.Duration("duration", duration),
)
3. 性能问题用 Warn
logger.Warn("Slow LLM response",
zap.Duration("duration", duration),
zap.String("model", modelUsed),
zap.Int("tokens", tokensUsed),
)
4. Debug 日志控制量
// 汇总而不是逐条
logger.Debug("Processing items",
zap.Int("count", len(items)),
zap.Duration("total_duration", totalDuration),
)
22.5 健康检查
健康管理器
Shannon 的健康检查系统在 go/orchestrator/internal/health/manager.go:
// Manager implements the HealthManager interface
type Manager struct {
checkers map[string]*CheckerState
lastResults map[string]CheckResult
config *HealthConfiguration
started bool
checkInterval time.Duration
stopCh chan struct{}
logger *zap.Logger
mu sync.RWMutex
}
// CheckerState represents the runtime state of a health checker
type CheckerState struct {
checker Checker
enabled bool
interval time.Duration
timeout time.Duration
critical bool // 关键依赖 vs 非关键依赖
lastCheck time.Time
}
关键设计:区分 Critical 和 Non-Critical 检查。
// 计算整体健康状态
func (m *Manager) calculateOverallStatus(components map[string]CheckResult, summary HealthSummary) OverallHealth {
criticalFailures := 0
nonCriticalFailures := 0
degradedComponents := 0
for _, result := range components {
if result.Status == StatusUnhealthy {
if result.Critical {
criticalFailures++
} else {
nonCriticalFailures++
}
}
}
// 只有关键依赖失败才判定为 Unhealthy
if criticalFailures > 0 {
return OverallHealth{
Status: StatusUnhealthy,
Message: fmt.Sprintf("%d critical component(s) failing", criticalFailures),
Ready: false,
Live: true, // 还活着,只是没准备好
}
} else if nonCriticalFailures > 0 {
return OverallHealth{
Status: StatusDegraded, // 降级但可用
Message: fmt.Sprintf("%d non-critical component(s) failing", nonCriticalFailures),
Ready: true,
Live: true,
}
}
// ...
}
注册检查器
// 创建健康管理器
hm := health.NewManager(logger)
// 注册各类检查器
if dbClient != nil {
dbChecker := health.NewDatabaseHealthChecker(dbClient.GetDB(), dbClient.Wrapper(), logger)
_ = hm.RegisterChecker(dbChecker)
}
// Redis 检查
if rw := orchestratorService.SessionManager().RedisWrapper(); rw != nil {
rc := health.NewRedisHealthChecker(rw.GetClient(), rw, logger)
_ = hm.RegisterChecker(rc)
}
// Agent Core 检查
if agentAddr != "" {
conn, err := grpc.Dial(agentAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err == nil {
client := agentpb.NewAgentServiceClient(conn)
ac := health.NewAgentCoreHealthChecker(client, conn, logger)
_ = hm.RegisterChecker(ac)
}
}
// LLM Service 检查
lc := health.NewLLMServiceHealthChecker(llmBase, logger)
_ = hm.RegisterChecker(lc)
健康端点
# 存活检查(用于 Kubernetes liveness probe)
GET /health/live
# 就绪检查(用于 Kubernetes readiness probe)
GET /health/ready
# 详细状态
GET /health
响应示例:
{
"status": "healthy",
"ready": true,
"live": true,
"checks": {
"database": {"status": "healthy", "critical": true, "latency_ms": 5},
"redis": {"status": "healthy", "critical": false, "latency_ms": 2},
"llm_service": {"status": "healthy", "critical": true, "latency_ms": 150},
"agent_core": {"status": "healthy", "critical": true, "latency_ms": 10}
}
}
22.6 告警策略
Prometheus Alertmanager 规则
groups:
- name: shannon-alerts
rules:
# 工作流失败率过高
- alert: HighWorkflowFailureRate
expr: |
sum(rate(shannon_workflows_completed_total{status="failed"}[5m]))
/ sum(rate(shannon_workflows_completed_total[5m])) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Workflow failure rate > 10%"
description: "{{ $value | humanizePercentage }} of workflows failed in last 5m"
# LLM 延迟过高
- alert: HighLLMLatency
expr: |
histogram_quantile(0.95,
sum(rate(shannon_pattern_duration_seconds_bucket{pattern="react"}[5m])) by (le)
) > 30
for: 10m
labels:
severity: warning
annotations:
summary: "P95 pattern execution > 30s"
# Token 消耗异常
- alert: AbnormalTokenUsage
expr: |
sum(rate(shannon_tokens_by_pattern_total[1h])) > 1000000
for: 1h
labels:
severity: warning
annotations:
summary: "Token consumption > 1M/hour"
# 关键依赖不健康
- alert: CriticalDependencyDown
expr: |
shannon_health_check_status{critical="true"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Critical dependency {{ $labels.component }} is down"
告警分级
| 级别 | 触发条件 | 响应时间 | 通知渠道 |
|---|---|---|---|
| Critical | 关键依赖宕机、失败率 >10% | 立即 | PagerDuty/电话 |
| Warning | 延迟上升、非关键依赖问题 | 1 小时内 | Slack |
| Info | 资源使用接近阈值 | 下个工作日 | 邮件 |
22.7 仪表盘设计
Grafana 面板布局
概览行
- 活跃工作流数(Gauge)
- 每分钟请求数(Counter rate)
- 成功率(Percentage)
- 总 Token 消耗(Counter)
性能行
- 工作流延迟分布(Heatmap)
- Pattern 执行时间(Histogram)
- P50/P95/P99 趋势(Graph)
资源行
- Token 消耗趋势(Graph)
- 成本趋势(Graph)
- 各 Pattern 使用分布(Pie)
错误行
- 错误率时序(Graph)
- 错误类型分布(Pie)
- 最近错误列表(Table)
PromQL 示例
# 工作流成功率
sum(rate(shannon_workflows_completed_total{status="success"}[5m]))
/ sum(rate(shannon_workflows_completed_total[5m]))
# P99 延迟
histogram_quantile(0.99,
sum(rate(shannon_workflow_duration_seconds_bucket[5m])) by (le, workflow_type)
)
# Token 消耗速率(每分钟)
sum(rate(shannon_tokens_by_pattern_total[5m])) * 60
# 各 Pattern 使用占比
sum by (pattern) (rate(shannon_pattern_executions_total[1h]))
/ sum(rate(shannon_pattern_executions_total[1h]))
# Reflection 改进率
sum(rate(shannon_reflection_improvements_total[1h]))
/ sum(rate(shannon_pattern_executions_total{pattern="reflection"}[1h]))
22.8 常见的坑
坑 1:指标爆炸(Cardinality Explosion)
// 错误:高基数标签
AgentExecutions.WithLabelValues(userID, taskID, timestamp)
// userID * taskID * timestamp = 无限组合,Prometheus 会 OOM
// 正确:有限基数
AgentExecutions.WithLabelValues(agentType, mode)
// agentType * mode = 有限组合(比如 5 * 3 = 15)
经验法则:每个指标的标签组合不要超过 1000 种。
坑 2:追踪采样不当
// 错误:采样所有请求(生产环境存储成本太高)
tp := trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
)
// 正确:概率采样
tp := trace.NewTracerProvider(
trace.WithSampler(trace.TraceIDRatioBased(0.1)), // 10% 采样
)
// 更好:错误请求全采样,正常请求按比例
tp := trace.NewTracerProvider(
trace.WithSampler(trace.ParentBased(
trace.TraceIDRatioBased(0.1),
)),
)
坑 3:日志过多
// 错误:循环中打印
for _, item := range items {
logger.Info("Processing item", zap.String("id", item.ID))
}
// 如果 items 有 10000 个,就打 10000 行日志
// 正确:批量汇总
logger.Info("Processing items",
zap.Int("count", len(items)),
zap.Duration("total_duration", totalDuration),
)
坑 4:健康检查过严
// 错误:任何依赖失败都报不健康
if !db.Ping() || !redis.Ping() || !llm.Ping() {
return unhealthy
}
// Redis 临时抖动,整个服务被判定不健康,被 Kubernetes 重启
// 正确:区分关键和非关键
type CheckerState struct {
Critical bool // 只有 Critical=true 才影响整体健康
}
// Redis 是缓存,不影响核心功能,设为 Non-Critical
坑 5:缺少 Trace ID 关联
// 错误:日志和追踪不关联
logger.Error("Request failed", zap.Error(err))
// 正确:日志包含 Trace ID
traceID := oteltrace.SpanFromContext(ctx).SpanContext().TraceID().String()
logger.Error("Request failed",
zap.Error(err),
zap.String("trace_id", traceID),
)
// 这样可以从日志跳转到追踪详情
这章说了什么
- 三支柱分工:Metrics 量化趋势、Traces 定位单次请求、Logs 记录细节
- 指标设计:按层分类(工作流/Agent/记忆/向量),控制标签基数
- 追踪传播:W3C Traceparent 跨服务传递
- 健康检查:区分 Critical 和 Non-Critical 依赖
- 告警分级:Critical 立即响应,Warning 1 小时内,Info 下个工作日
Shannon Lab(10 分钟上手)
本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。
必读(1 个文件)
go/orchestrator/internal/workflows/metrics/pattern_metrics.go:Pattern 级别指标定义,看指标命名和标签设计
选读深挖(2 个,按兴趣挑)
go/orchestrator/internal/tracing/tracing.go:追踪初始化和 W3C Traceparent 传播go/orchestrator/internal/health/manager.go:健康检查管理器,看 Critical/Non-Critical 区分
练习
练习 1:设计指标体系
为一个"代码审查 Agent"设计指标体系,包括:
- 工作流级指标
- Agent 级指标
- LLM 调用指标
- 成本指标
列出指标名称、标签、类型(Counter/Histogram/Gauge)。
练习 2:告警规则
基于练习 1 的指标,设计 3 条告警规则:
- 1 条 Critical 级别
- 1 条 Warning 级别
- 1 条 Info 级别
写出 PromQL 表达式和触发条件。
练习 3(进阶):调试场景
用户反馈:"昨天下午 3 点到 4 点,Agent 特别慢"。
描述你会:
- 首先看哪些指标
- 如果指标显示 LLM 延迟上升,下一步查什么
- 如何找到具体的慢请求
- 如何确认根因
进一步阅读
- Prometheus 最佳实践:命名规范、标签设计
- OpenTelemetry 文档:采样策略、Span 设计
- SRE Book - Monitoring:Google 的监控经验
下一章预告
这是 Part 7 生产架构的最后一章。我们讲了三层架构、Temporal 工作流、可观测性——这些是让 Agent 系统"跑起来"的基础。
从 Part 8 开始,我们进入企业级特性。
第 23 章讲 Token 预算控制:如何在任务/会话/租户三个层级实施硬性成本控制,防止失控的 Agent 把你的账单刷爆。