第 20 章:三层架构设计
三层架构不是为了炫技,而是为了让每种语言做它最擅长的事——Go 编排、Rust 隔离、Python 对接 AI 生态。 但它也有代价:部署复杂度、调试成本、层间延迟。选之前想清楚你的规模是否需要。
你用 Python 写了一个 Agent。单进程,asyncio 并发,工具直接在进程里执行。
跑了一周,挺顺的。然后用户量上来了,问题开始冒出来:
- 有个用户提交了一段恶意 Python 代码,你的 Agent 老老实实执行了,结果把服务器的
/etc/passwd读出来返回给他了 - 10 个请求同时来,GIL 卡得死死的,响应时间从 2 秒变成 20 秒
- 半夜进程崩了,正在执行的 5 个研究任务全丢了,第二天用户投诉
- 一个工具调用吃光了 8GB 内存,整个服务挂掉,连日志都来不及写
这些问题不是"代码写得不好",而是单体架构的天花板。
三层架构就是用来突破这个天花板的——把编排、安全执行、LLM 调用分离到不同服务,让每种语言发挥最大优势。
20.1 为什么要分层?
单体架构的局限
我见过很多 Agent 框架都是这个结构:
┌───────────────────────────────────────┐
│ Python Monolith │
│ ┌─────────────────────────────────┐ │
│ │ Orchestration (asyncio) │ │
│ │ Agent Execution (同进程) │ │
│ │ Tool Calling (直接执行) │ │
│ │ LLM API (requests) │ │
│ └─────────────────────────────────┘ │
└───────────────────────────────────────┘
这个架构开发快、部署简单、调试方便。对于原型和小规模使用,完全够用。
但它有几个绕不过去的问题:
| 问题 | 根因 | 后果 |
|---|---|---|
| 安全边界模糊 | 工具代码与主进程共享内存空间 | 恶意代码可以访问任何数据 |
| 并发受限 | Python GIL 限制真正的并行 | 10 个请求就能把服务卡死 |
| 状态易丢失 | 内存中的状态没有持久化 | 进程崩溃 = 任务全丢 |
| 资源隔离差 | 所有工具共享同一个进程资源 | 一个工具 OOM,整个服务挂 |
如果你的 Agent 只是自己用,或者用户可控,这些问题可能不会暴露。
但如果你要做一个面向外部用户的生产系统,这些问题迟早会变成事故。
三层架构怎么解决
三层架构把这些职责分开:
为什么是这三种语言?
这不是随便选的。每种语言在它的位置上有独特优势:
| 层级 | 语言 | 为什么选它 | 如果换成 Python 会怎样 |
|---|---|---|---|
| Orchestrator | Go | 高并发、Temporal 原生支持、编译型语言不易出运行时错误 | asyncio 能做,但 Temporal SDK 不如 Go 成熟 |
| Agent Core | Rust | 内存安全、WASI 沙箱支持、零成本抽象 | 无法提供同等级别的安全隔离 |
| LLM Service | Python | LLM SDK 生态最丰富、AI 库最全、迭代最快 | 本来就是 Python,生态优势明显 |
但我必须说:三层架构不是银弹。
它的代价是:
- 部署复杂度上升(至少 3 个服务 + Temporal + 数据库)
- 调试困难(问题可能在任意一层)
- 层间通信有延迟开销
如果你的用户量不大、安全要求不高、可以接受偶尔的服务重启,单体 Python 可能更适合你。
20.2 Orchestrator 层 (Go)
Orchestrator 是系统的"大脑"——它不执行具体任务,而是决定谁来执行、按什么顺序执行、结果怎么综合。
核心职责
| 职责 | 说明 | 关键实现 |
|---|---|---|
| 工作流编排 | 基于 Temporal 的持久化执行 | 崩溃自动恢复 |
| 预算控制 | Token 预算、成本追踪 | 超预算自动停止 |
| 路由决策 | 选择执行策略(ReAct/DAG/Supervisor) | 基于复杂度评分 |
| 结果综合 | 合并多 Agent 结果 | LLM 辅助 + 规则综合 |
入口程序结构
Shannon 的 Orchestrator 入口在 main.go。我挑几个关键部分讲:
1. 健康检查最先启动
// 健康检查先于其他组件启动
hm := health.NewManager(logger)
healthHandler := health.NewHTTPHandler(hm, logger)
healthHandler.RegisterRoutes(httpMux)
go func() {
_ = hm.Start(ctx)
server := &http.Server{
Addr: ":" + strconv.Itoa(healthPort),
Handler: httpMux,
}
logger.Info("Admin HTTP server listening", zap.Int("port", healthPort))
server.ListenAndServe()
}()
为什么要先启动健康检查?因为 Kubernetes 的 readiness probe 会在服务启动后立刻开始检查。如果健康端点还没起来,Pod 会被判定为不健康然后被杀掉,形成无限重启循环。
2. 数据库连接和健康检查
dbClient, err := db.NewClient(dbConfig, logger)
if err != nil {
logger.Fatal("Failed to initialize database client", zap.Error(err))
}
defer dbClient.Close()
// 注册数据库健康检查
if dbClient != nil {
dbChecker := health.NewDatabaseHealthChecker(dbClient.GetDB(), dbClient.Wrapper(), logger)
_ = hm.RegisterChecker(dbChecker)
}
3. Temporal Worker 启动(带重试)
// TCP 预检查
for i := 1; i <= 60; i++ {
c, err := net.DialTimeout("tcp", host, 2*time.Second)
if err == nil {
_ = c.Close()
break
}
logger.Warn("Waiting for Temporal TCP endpoint", zap.String("host", host), zap.Int("attempt", i))
time.Sleep(1 * time.Second)
}
// SDK 连接重试
var tClient client.Client
for attempt := 1; ; attempt++ {
tClient, err = client.Dial(client.Options{
HostPort: host,
Logger: temporal.NewZapAdapter(logger),
})
if err == nil {
break
}
delay := time.Duration(min(attempt, 15)) * time.Second
logger.Warn("Temporal not ready, retrying", zap.Int("attempt", attempt), zap.Duration("sleep", delay))
time.Sleep(delay)
}
为什么要两层检查?TCP 检查快(2 秒超时),可以快速判断 Temporal 服务是否可达。SDK 连接更重,失败后用指数退避重试。
4. 工作流和活动注册
orchestratorRegistry := registry.NewOrchestratorRegistry(registryConfig, logger, dbClient.GetDB(), sessionManager)
startWorker := func(queue string, actSize, wfSize int) worker.Worker {
wk := worker.New(tClient, queue, worker.Options{
MaxConcurrentActivityExecutionSize: actSize,
MaxConcurrentWorkflowTaskExecutionSize: wfSize,
})
if err := orchestratorRegistry.RegisterWorkflows(wk); err != nil {
logger.Error("Failed to register workflows", zap.String("queue", queue), zap.Error(err))
}
if err := orchestratorRegistry.RegisterActivities(wk); err != nil {
logger.Error("Failed to register activities", zap.String("queue", queue), zap.Error(err))
}
go wk.Run(worker.InterruptCh())
return wk
}
优先级队列
Shannon 支持多队列模式,不同优先级的任务走不同队列:
if priorityQueues {
_ = startWorker("shannon-tasks-critical", 12, 12) // 关键任务,高并发
_ = startWorker("shannon-tasks-high", 10, 10)
w = startWorker("shannon-tasks", 8, 8) // 普通任务
_ = startWorker("shannon-tasks-low", 4, 4) // 低优先级
} else {
w = startWorker("shannon-tasks", 10, 10) // 单队列模式
}
优先级队列的典型用途:
- Critical:用户正在等待的实时请求
- High:重要但可以稍等的任务
- Normal:常规后台任务
- Low:报告生成、数据清理等
20.3 Agent Core 层 (Rust)
Agent Core 是系统的"保镖"——它负责在受控环境中执行可能不安全的操作。
核心职责
| 职责 | 说明 | 实现方式 |
|---|---|---|
| 沙箱执行 | 隔离运行用户代码 | WASI 沙箱 |
| 资源限制 | 内存、CPU、网络 | cgroups + WASI 能力限制 |
| 超时控制 | 强制终止长时任务 | 系统级超时 |
| 工具运行 | 安全的工具调用 | 白名单 + 参数校验 |
为什么是 Rust?
Python 可以做资源限制吗?可以,但很难做到 Rust 的级别:
| 能力 | Python | Rust |
|---|---|---|
| 内存安全 | 运行时检查 | 编译时保证 |
| WASI 沙箱 | 需要外部进程 | 原生集成(wasmtime) |
| 资源隔离 | 进程级别 | 线程级别 |
| 性能开销 | 高 | 极低 |
如果你的安全要求不高,用 Python 的 subprocess + ulimit 也能做基本隔离。但如果你要面向外部用户,Rust 的安全保证更可靠。
gRPC 服务定义
Agent Core 通过 gRPC 暴露服务:
service AgentService {
rpc ExecuteTask(ExecuteTaskRequest) returns (ExecuteTaskResponse);
rpc StreamExecuteTask(ExecuteTaskRequest) returns (stream TaskUpdate);
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
rpc DiscoverTools(DiscoverToolsRequest) returns (DiscoverToolsResponse);
}
message ExecuteTaskRequest {
TaskMetadata metadata = 1;
string query = 2;
google.protobuf.Struct context = 3;
ExecutionMode mode = 4;
repeated string available_tools = 5;
AgentConfig config = 6;
}
message AgentConfig {
int32 max_iterations = 1; // 最大迭代次数
int32 timeout_seconds = 2; // 超时时间
bool enable_sandbox = 3; // 启用沙箱
int64 memory_limit_mb = 4; // 内存限制
}
工具能力描述
每个工具都有详细的能力描述:
message ToolCapability {
string id = 1;
string name = 2;
string description = 3;
string category = 4;
google.protobuf.Struct input_schema = 5; // JSON Schema
google.protobuf.Struct output_schema = 6;
repeated string required_permissions = 7; // 需要的权限
int64 estimated_duration_ms = 8; // 预估耗时
bool is_dangerous = 9; // 危险标记
RateLimit rate_limit = 14; // 限流配置
}
is_dangerous 标记用于触发额外的审批流程或沙箱隔离。比如 code_execution 和 file_system 工具会被标记为危险。
20.4 LLM Service 层 (Python)
LLM Service 是系统的"嘴"——它负责和各种 AI 模型对话。
核心职责
| 职责 | 说明 | 实现方式 |
|---|---|---|
| 多 Provider 调用 | OpenAI、Anthropic、Google 等 | Provider 抽象层 |
| 工具选择 | 基于查询选择合适工具 | 语义匹配 + 规则 |
| 向量存储 | Embedding 生成和检索 | Qdrant + 缓存 |
| 流式响应 | Token 级别的流式输出 | SSE/WebSocket |
为什么是 Python?
AI 生态几乎都是 Python 优先:
- OpenAI SDK: Python 版本最先更新
- Anthropic SDK: Python 版本功能最全
- LangChain/LlamaIndex: Python 原生
- 向量数据库客户端: Python 支持最好
用 Go 或 Rust 调用 LLM 当然可以,但你会花大量时间在 SDK 适配上,而不是业务逻辑。
关键端点
# /agent/query - 主查询端点
@app.post("/agent/query")
async def query(request: QueryRequest):
response = await llm_client.query(
query=request.query,
context=request.context,
tools=request.allowed_tools,
model_tier=request.model_tier,
max_tokens=request.max_tokens,
)
return {
"success": True,
"response": response.content,
"tokens_used": response.usage.total_tokens,
"model_used": response.model,
}
# /embeddings - 向量生成
@app.post("/embeddings")
async def embeddings(request: EmbeddingRequest):
vectors = await embedding_service.generate(
texts=request.texts,
model=request.model or "text-embedding-3-small",
)
return {"embeddings": vectors}
20.5 层间通信
三层之间通过 gRPC 和 HTTP 通信。这里有几个关键设计:
Workflow ID 传播
跨服务调用时,自动注入 Workflow ID 用于追踪:
// HTTP 请求自动注入 Workflow ID
type WorkflowHTTPRoundTripper struct {
base http.RoundTripper
}
func (w *WorkflowHTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
info := activity.GetInfo(req.Context())
if info.WorkflowExecution.ID != "" {
req.Header.Set("X-Workflow-ID", info.WorkflowExecution.ID)
req.Header.Set("X-Run-ID", info.WorkflowExecution.RunID)
}
return w.base.RoundTrip(req)
}
这样做的好处:
- 分布式追踪:日志可以按 Workflow ID 聚合
- 资源归属:Token 消耗可以归属到具体任务
- 问题定位:出问题时能看到完整调用链
超时层级
层间超时必须由外向内递减:
# 正确的超时配置
orchestrator: timeout=120s # 最外层最长
agent-core: timeout=60s
llm-service: timeout=30s # 最内层最短
# 错误的配置 - 会导致意外超时
# orchestrator: timeout=60s
# agent-core: timeout=30s # 可能在 orchestrator 等待时超时
如果内层超时比外层长,外层会先超时,内层的工作就白费了。
20.6 配置管理
热重载配置
Shannon 支持配置热重载,不需要重启服务:
shannonCfgMgr.RegisterCallback(func(oldConfig, newConfig *cfg.ShannonConfig) error {
// 更新健康检查配置
newHealthConfig := &health.HealthConfiguration{
Enabled: newConfig.Health.Enabled,
CheckInterval: newConfig.Health.CheckInterval,
GlobalTimeout: newConfig.Health.Timeout,
}
hm.UpdateConfiguration(newHealthConfig)
// 策略引擎变更则重新初始化
if policyChanged(oldConfig, newConfig) {
activities.InitializePolicyEngineFromShannonConfig(&newConfig.Policy)
}
return nil
})
// 模型定价热重载
configMgr.RegisterHandler("models.yaml", func(ev cfg.ChangeEvent) error {
pricing.Reload()
logger.Info("Pricing configuration reloaded")
return nil
})
环境变量优先级
生产环境中,环境变量应该覆盖配置文件:
// 环境变量 > 配置文件
jwtSecret := shCfgForAuth.Auth.JWTSecret
if envSecret := os.Getenv("JWT_SECRET"); envSecret != "" {
jwtSecret = envSecret // 环境变量覆盖
}
这样可以在 Kubernetes 中通过 Secret 注入敏感配置,而不用把密钥写进配置文件。
20.7 请求流转示例
一个查询如何在三层间流转:
用户请求: "分析某 AI 公司"
↓
[Orchestrator (Go)]
1. gRPC 接收请求
2. 创建 Temporal Workflow
3. 路由选择: ResearchWorkflow
4. 分解任务: 公司概况、产品、融资...
↓ gRPC
[Agent Core (Rust)]
5. 沙箱执行搜索工具
6. 资源限制: 1GB 内存, 30s 超时
7. 返回搜索结果
↓ HTTP
[LLM Service (Python)]
8. 调用 LLM API 分析结果
9. 生成结构化摘要
10. 向量化存储到 Qdrant
↓
[Orchestrator (Go)]
11. 综合多 Agent 结果
12. 评估覆盖率
13. 返回最终报告
20.8 常见的坑
坑 1:跨层事务一致性
三层架构没有跨服务的事务保证。如果你这样写:
// 错误:假设跨层原子性
func processTask() {
orchestrator.StartWorkflow() // 成功
agentCore.ExecuteTask() // 可能失败
llmService.Synthesize() // 状态不一致
}
正确做法是用 Temporal 的 Activity 包装:
// 正确:使用 Temporal 保证
workflow.ExecuteActivity(ctx, activities.ExecuteAgent, ...)
// Temporal 自动处理重试和恢复
坑 2:资源泄漏
gRPC 连接不关闭会导致资源泄漏:
// 错误:未关闭连接
conn, _ := grpc.Dial(addr, ...)
// 忘记 defer conn.Close()
// 正确:优雅关闭
defer func() {
grpcServer.GracefulStop()
w.Stop()
dbClient.Close()
}()
坑 3:配置不一致
三层配置独立管理容易出现不一致:
# 错误:超时配置不匹配
# orchestrator: token_budget=10000
# llm-service: max_tokens=20000 # 比 orchestrator 预算还大,会被截断
# 正确:保持一致性
# orchestrator: token_budget=10000
# llm-service: max_tokens=10000 # 与 orchestrator 一致
坑 4:调试困难
问题可能在任意一层。建议:
- 统一使用 Workflow ID 关联日志
- 每层都输出关键指标
- 使用分布式追踪(OpenTelemetry)
这章说了什么
- 三层分工:Orchestrator 编排、Agent Core 隔离、LLM Service 对接 AI
- 语言选择:Go 高并发、Rust 安全、Python 生态
- 层间通信:gRPC + Workflow ID 传播
- 配置管理:热重载 + 环境变量优先
- 代价意识:部署复杂度、调试成本、层间延迟
Shannon Lab(10 分钟上手)
本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。
必读(1 个文件)
docs/multi-agent-workflow-architecture.md:系统全局图,理解 Router/Strategy/Pattern 三层怎么分工
选读深挖(2 个,按兴趣挑)
main.go:看服务启动顺序——健康检查为什么最先、Temporal 连接怎么重试health/manager.go:健康检查管理器,理解 Critical 和 Non-Critical 检查的区别
练习
练习 1:画出请求链路
画一个序列图,展示"用户发送查询 -> 返回结果"的完整调用链路,标注:
- 每个服务的职责
- 层间通信协议
- 可能失败的点
练习 2:超时配置设计
设计一个三层的超时配置:
- 用户最多等待 2 分钟
- 单个工具调用最多 30 秒
- 写出每层应该配置的超时值,解释为什么
练习 3(进阶):降级策略
如果 Agent Core 层不可用,Orchestrator 应该怎么处理?设计一个降级策略:
- 哪些任务可以降级处理
- 降级后的行为是什么
- 怎么通知用户
想深入?
- gRPC 最佳实践:连接池、负载均衡、健康检查
- 微服务模式:Sidecar、Service Mesh、Circuit Breaker
- Temporal 架构:理解持久化执行的原理
下一章预告
三层架构解决了"怎么分工"的问题,但还有一个问题没解决:如果中间崩了怎么办?
搜索花了 30 秒,然后进程崩了——这 30 秒就白费了。
下一章讲 Temporal 工作流:如何让工作流持久化执行,崩溃后从最近检查点恢复,还能"时间旅行"到任意时刻看执行状态。
下一章我们继续。