第 20 章:三层架构设计

三层架构不是为了炫技,而是为了让每种语言做它最擅长的事——Go 编排、Rust 隔离、Python 对接 AI 生态。 但它也有代价:部署复杂度、调试成本、层间延迟。选之前想清楚你的规模是否需要。


你用 Python 写了一个 Agent。单进程,asyncio 并发,工具直接在进程里执行。

跑了一周,挺顺的。然后用户量上来了,问题开始冒出来:

  1. 有个用户提交了一段恶意 Python 代码,你的 Agent 老老实实执行了,结果把服务器的 /etc/passwd 读出来返回给他了
  2. 10 个请求同时来,GIL 卡得死死的,响应时间从 2 秒变成 20 秒
  3. 半夜进程崩了,正在执行的 5 个研究任务全丢了,第二天用户投诉
  4. 一个工具调用吃光了 8GB 内存,整个服务挂掉,连日志都来不及写

这些问题不是"代码写得不好",而是单体架构的天花板

三层架构就是用来突破这个天花板的——把编排、安全执行、LLM 调用分离到不同服务,让每种语言发挥最大优势。


20.1 为什么要分层?

单体架构的局限

我见过很多 Agent 框架都是这个结构:

┌───────────────────────────────────────┐
  Python Monolith                      
  ┌─────────────────────────────────┐  
   Orchestration (asyncio)           
   Agent Execution (同进程)           
   Tool Calling (直接执行)            
   LLM API (requests)                
  └─────────────────────────────────┘  
└───────────────────────────────────────┘

这个架构开发快、部署简单、调试方便。对于原型和小规模使用,完全够用。

但它有几个绕不过去的问题:

问题根因后果
安全边界模糊工具代码与主进程共享内存空间恶意代码可以访问任何数据
并发受限Python GIL 限制真正的并行10 个请求就能把服务卡死
状态易丢失内存中的状态没有持久化进程崩溃 = 任务全丢
资源隔离差所有工具共享同一个进程资源一个工具 OOM,整个服务挂

如果你的 Agent 只是自己用,或者用户可控,这些问题可能不会暴露。

但如果你要做一个面向外部用户的生产系统,这些问题迟早会变成事故。

三层架构怎么解决

三层架构把这些职责分开:

生产级三层架构

为什么是这三种语言?

这不是随便选的。每种语言在它的位置上有独特优势:

层级语言为什么选它如果换成 Python 会怎样
OrchestratorGo高并发、Temporal 原生支持、编译型语言不易出运行时错误asyncio 能做,但 Temporal SDK 不如 Go 成熟
Agent CoreRust内存安全、WASI 沙箱支持、零成本抽象无法提供同等级别的安全隔离
LLM ServicePythonLLM SDK 生态最丰富、AI 库最全、迭代最快本来就是 Python,生态优势明显

但我必须说:三层架构不是银弹。

它的代价是:

  • 部署复杂度上升(至少 3 个服务 + Temporal + 数据库)
  • 调试困难(问题可能在任意一层)
  • 层间通信有延迟开销

如果你的用户量不大、安全要求不高、可以接受偶尔的服务重启,单体 Python 可能更适合你。


20.2 Orchestrator 层 (Go)

Orchestrator 是系统的"大脑"——它不执行具体任务,而是决定谁来执行、按什么顺序执行、结果怎么综合

核心职责

职责说明关键实现
工作流编排基于 Temporal 的持久化执行崩溃自动恢复
预算控制Token 预算、成本追踪超预算自动停止
路由决策选择执行策略(ReAct/DAG/Supervisor)基于复杂度评分
结果综合合并多 Agent 结果LLM 辅助 + 规则综合

入口程序结构

Shannon 的 Orchestrator 入口在 main.go。我挑几个关键部分讲:

1. 健康检查最先启动

// 健康检查先于其他组件启动
hm := health.NewManager(logger)
healthHandler := health.NewHTTPHandler(hm, logger)
healthHandler.RegisterRoutes(httpMux)

go func() {
    _ = hm.Start(ctx)
    server := &http.Server{
        Addr:    ":" + strconv.Itoa(healthPort),
        Handler: httpMux,
    }
    logger.Info("Admin HTTP server listening", zap.Int("port", healthPort))
    server.ListenAndServe()
}()

为什么要先启动健康检查?因为 Kubernetes 的 readiness probe 会在服务启动后立刻开始检查。如果健康端点还没起来,Pod 会被判定为不健康然后被杀掉,形成无限重启循环。

2. 数据库连接和健康检查

dbClient, err := db.NewClient(dbConfig, logger)
if err != nil {
    logger.Fatal("Failed to initialize database client", zap.Error(err))
}
defer dbClient.Close()

// 注册数据库健康检查
if dbClient != nil {
    dbChecker := health.NewDatabaseHealthChecker(dbClient.GetDB(), dbClient.Wrapper(), logger)
    _ = hm.RegisterChecker(dbChecker)
}

3. Temporal Worker 启动(带重试)

// TCP 预检查
for i := 1; i <= 60; i++ {
    c, err := net.DialTimeout("tcp", host, 2*time.Second)
    if err == nil {
        _ = c.Close()
        break
    }
    logger.Warn("Waiting for Temporal TCP endpoint", zap.String("host", host), zap.Int("attempt", i))
    time.Sleep(1 * time.Second)
}

// SDK 连接重试
var tClient client.Client
for attempt := 1; ; attempt++ {
    tClient, err = client.Dial(client.Options{
        HostPort: host,
        Logger:   temporal.NewZapAdapter(logger),
    })
    if err == nil {
        break
    }
    delay := time.Duration(min(attempt, 15)) * time.Second
    logger.Warn("Temporal not ready, retrying", zap.Int("attempt", attempt), zap.Duration("sleep", delay))
    time.Sleep(delay)
}

为什么要两层检查?TCP 检查快(2 秒超时),可以快速判断 Temporal 服务是否可达。SDK 连接更重,失败后用指数退避重试。

4. 工作流和活动注册

orchestratorRegistry := registry.NewOrchestratorRegistry(registryConfig, logger, dbClient.GetDB(), sessionManager)

startWorker := func(queue string, actSize, wfSize int) worker.Worker {
    wk := worker.New(tClient, queue, worker.Options{
        MaxConcurrentActivityExecutionSize:     actSize,
        MaxConcurrentWorkflowTaskExecutionSize: wfSize,
    })
    if err := orchestratorRegistry.RegisterWorkflows(wk); err != nil {
        logger.Error("Failed to register workflows", zap.String("queue", queue), zap.Error(err))
    }
    if err := orchestratorRegistry.RegisterActivities(wk); err != nil {
        logger.Error("Failed to register activities", zap.String("queue", queue), zap.Error(err))
    }
    go wk.Run(worker.InterruptCh())
    return wk
}

优先级队列

Shannon 支持多队列模式,不同优先级的任务走不同队列:

if priorityQueues {
    _ = startWorker("shannon-tasks-critical", 12, 12)  // 关键任务,高并发
    _ = startWorker("shannon-tasks-high", 10, 10)
    w = startWorker("shannon-tasks", 8, 8)              // 普通任务
    _ = startWorker("shannon-tasks-low", 4, 4)          // 低优先级
} else {
    w = startWorker("shannon-tasks", 10, 10)            // 单队列模式
}

优先级队列的典型用途:

  • Critical:用户正在等待的实时请求
  • High:重要但可以稍等的任务
  • Normal:常规后台任务
  • Low:报告生成、数据清理等

20.3 Agent Core 层 (Rust)

Agent Core 是系统的"保镖"——它负责在受控环境中执行可能不安全的操作。

核心职责

职责说明实现方式
沙箱执行隔离运行用户代码WASI 沙箱
资源限制内存、CPU、网络cgroups + WASI 能力限制
超时控制强制终止长时任务系统级超时
工具运行安全的工具调用白名单 + 参数校验

为什么是 Rust?

Python 可以做资源限制吗?可以,但很难做到 Rust 的级别:

能力PythonRust
内存安全运行时检查编译时保证
WASI 沙箱需要外部进程原生集成(wasmtime)
资源隔离进程级别线程级别
性能开销极低

如果你的安全要求不高,用 Python 的 subprocess + ulimit 也能做基本隔离。但如果你要面向外部用户,Rust 的安全保证更可靠。

gRPC 服务定义

Agent Core 通过 gRPC 暴露服务:

service AgentService {
  rpc ExecuteTask(ExecuteTaskRequest) returns (ExecuteTaskResponse);
  rpc StreamExecuteTask(ExecuteTaskRequest) returns (stream TaskUpdate);
  rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
  rpc DiscoverTools(DiscoverToolsRequest) returns (DiscoverToolsResponse);
}

message ExecuteTaskRequest {
  TaskMetadata metadata = 1;
  string query = 2;
  google.protobuf.Struct context = 3;
  ExecutionMode mode = 4;
  repeated string available_tools = 5;
  AgentConfig config = 6;
}

message AgentConfig {
  int32 max_iterations = 1;      // 最大迭代次数
  int32 timeout_seconds = 2;     // 超时时间
  bool enable_sandbox = 3;       // 启用沙箱
  int64 memory_limit_mb = 4;     // 内存限制
}

工具能力描述

每个工具都有详细的能力描述:

message ToolCapability {
  string id = 1;
  string name = 2;
  string description = 3;
  string category = 4;
  google.protobuf.Struct input_schema = 5;   // JSON Schema
  google.protobuf.Struct output_schema = 6;
  repeated string required_permissions = 7;  // 需要的权限
  int64 estimated_duration_ms = 8;           // 预估耗时
  bool is_dangerous = 9;                     // 危险标记
  RateLimit rate_limit = 14;                 // 限流配置
}

is_dangerous 标记用于触发额外的审批流程或沙箱隔离。比如 code_executionfile_system 工具会被标记为危险。


20.4 LLM Service 层 (Python)

LLM Service 是系统的"嘴"——它负责和各种 AI 模型对话。

核心职责

职责说明实现方式
多 Provider 调用OpenAI、Anthropic、Google 等Provider 抽象层
工具选择基于查询选择合适工具语义匹配 + 规则
向量存储Embedding 生成和检索Qdrant + 缓存
流式响应Token 级别的流式输出SSE/WebSocket

为什么是 Python?

AI 生态几乎都是 Python 优先:

  • OpenAI SDK: Python 版本最先更新
  • Anthropic SDK: Python 版本功能最全
  • LangChain/LlamaIndex: Python 原生
  • 向量数据库客户端: Python 支持最好

用 Go 或 Rust 调用 LLM 当然可以,但你会花大量时间在 SDK 适配上,而不是业务逻辑。

关键端点

# /agent/query - 主查询端点
@app.post("/agent/query")
async def query(request: QueryRequest):
    response = await llm_client.query(
        query=request.query,
        context=request.context,
        tools=request.allowed_tools,
        model_tier=request.model_tier,
        max_tokens=request.max_tokens,
    )
    return {
        "success": True,
        "response": response.content,
        "tokens_used": response.usage.total_tokens,
        "model_used": response.model,
    }

# /embeddings - 向量生成
@app.post("/embeddings")
async def embeddings(request: EmbeddingRequest):
    vectors = await embedding_service.generate(
        texts=request.texts,
        model=request.model or "text-embedding-3-small",
    )
    return {"embeddings": vectors}

20.5 层间通信

三层之间通过 gRPC 和 HTTP 通信。这里有几个关键设计:

Workflow ID 传播

跨服务调用时,自动注入 Workflow ID 用于追踪:

// HTTP 请求自动注入 Workflow ID
type WorkflowHTTPRoundTripper struct {
    base http.RoundTripper
}

func (w *WorkflowHTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    info := activity.GetInfo(req.Context())
    if info.WorkflowExecution.ID != "" {
        req.Header.Set("X-Workflow-ID", info.WorkflowExecution.ID)
        req.Header.Set("X-Run-ID", info.WorkflowExecution.RunID)
    }
    return w.base.RoundTrip(req)
}

这样做的好处:

  • 分布式追踪:日志可以按 Workflow ID 聚合
  • 资源归属:Token 消耗可以归属到具体任务
  • 问题定位:出问题时能看到完整调用链

超时层级

层间超时必须由外向内递减

# 正确的超时配置
orchestrator: timeout=120s  # 最外层最长
agent-core: timeout=60s
llm-service: timeout=30s    # 最内层最短

# 错误的配置 - 会导致意外超时
# orchestrator: timeout=60s
# agent-core: timeout=30s  # 可能在 orchestrator 等待时超时

如果内层超时比外层长,外层会先超时,内层的工作就白费了。


20.6 配置管理

热重载配置

Shannon 支持配置热重载,不需要重启服务:

shannonCfgMgr.RegisterCallback(func(oldConfig, newConfig *cfg.ShannonConfig) error {
    // 更新健康检查配置
    newHealthConfig := &health.HealthConfiguration{
        Enabled:       newConfig.Health.Enabled,
        CheckInterval: newConfig.Health.CheckInterval,
        GlobalTimeout: newConfig.Health.Timeout,
    }
    hm.UpdateConfiguration(newHealthConfig)

    // 策略引擎变更则重新初始化
    if policyChanged(oldConfig, newConfig) {
        activities.InitializePolicyEngineFromShannonConfig(&newConfig.Policy)
    }

    return nil
})

// 模型定价热重载
configMgr.RegisterHandler("models.yaml", func(ev cfg.ChangeEvent) error {
    pricing.Reload()
    logger.Info("Pricing configuration reloaded")
    return nil
})

环境变量优先级

生产环境中,环境变量应该覆盖配置文件:

// 环境变量 > 配置文件
jwtSecret := shCfgForAuth.Auth.JWTSecret
if envSecret := os.Getenv("JWT_SECRET"); envSecret != "" {
    jwtSecret = envSecret  // 环境变量覆盖
}

这样可以在 Kubernetes 中通过 Secret 注入敏感配置,而不用把密钥写进配置文件。


20.7 请求流转示例

一个查询如何在三层间流转:

用户请求: "分析某 AI 公司"
          
[Orchestrator (Go)]
  1. gRPC 接收请求
  2. 创建 Temporal Workflow
  3. 路由选择: ResearchWorkflow
  4. 分解任务: 公司概况、产品、融资...
           gRPC
[Agent Core (Rust)]
  5. 沙箱执行搜索工具
  6. 资源限制: 1GB 内存, 30s 超时
  7. 返回搜索结果
           HTTP
[LLM Service (Python)]
  8. 调用 LLM API 分析结果
  9. 生成结构化摘要
  10. 向量化存储到 Qdrant
          
[Orchestrator (Go)]
  11. 综合多 Agent 结果
  12. 评估覆盖率
  13. 返回最终报告

20.8 常见的坑

坑 1:跨层事务一致性

三层架构没有跨服务的事务保证。如果你这样写:

// 错误:假设跨层原子性
func processTask() {
    orchestrator.StartWorkflow()  // 成功
    agentCore.ExecuteTask()       // 可能失败
    llmService.Synthesize()       // 状态不一致
}

正确做法是用 Temporal 的 Activity 包装:

// 正确:使用 Temporal 保证
workflow.ExecuteActivity(ctx, activities.ExecuteAgent, ...)
// Temporal 自动处理重试和恢复

坑 2:资源泄漏

gRPC 连接不关闭会导致资源泄漏:

// 错误:未关闭连接
conn, _ := grpc.Dial(addr, ...)
// 忘记 defer conn.Close()

// 正确:优雅关闭
defer func() {
    grpcServer.GracefulStop()
    w.Stop()
    dbClient.Close()
}()

坑 3:配置不一致

三层配置独立管理容易出现不一致:

# 错误:超时配置不匹配
# orchestrator: token_budget=10000
# llm-service: max_tokens=20000  #  orchestrator 预算还大,会被截断

# 正确:保持一致性
# orchestrator: token_budget=10000
# llm-service: max_tokens=10000  #  orchestrator 一致

坑 4:调试困难

问题可能在任意一层。建议:

  • 统一使用 Workflow ID 关联日志
  • 每层都输出关键指标
  • 使用分布式追踪(OpenTelemetry)

这章说了什么

  1. 三层分工:Orchestrator 编排、Agent Core 隔离、LLM Service 对接 AI
  2. 语言选择:Go 高并发、Rust 安全、Python 生态
  3. 层间通信:gRPC + Workflow ID 传播
  4. 配置管理:热重载 + 环境变量优先
  5. 代价意识:部署复杂度、调试成本、层间延迟

Shannon Lab(10 分钟上手)

本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。

必读(1 个文件)

选读深挖(2 个,按兴趣挑)

  • main.go:看服务启动顺序——健康检查为什么最先、Temporal 连接怎么重试
  • health/manager.go:健康检查管理器,理解 Critical 和 Non-Critical 检查的区别

练习

练习 1:画出请求链路

画一个序列图,展示"用户发送查询 -> 返回结果"的完整调用链路,标注:

  • 每个服务的职责
  • 层间通信协议
  • 可能失败的点

练习 2:超时配置设计

设计一个三层的超时配置:

  • 用户最多等待 2 分钟
  • 单个工具调用最多 30 秒
  • 写出每层应该配置的超时值,解释为什么

练习 3(进阶):降级策略

如果 Agent Core 层不可用,Orchestrator 应该怎么处理?设计一个降级策略:

  • 哪些任务可以降级处理
  • 降级后的行为是什么
  • 怎么通知用户

想深入?


下一章预告

三层架构解决了"怎么分工"的问题,但还有一个问题没解决:如果中间崩了怎么办?

搜索花了 30 秒,然后进程崩了——这 30 秒就白费了。

下一章讲 Temporal 工作流:如何让工作流持久化执行,崩溃后从最近检查点恢复,还能"时间旅行"到任意时刻看执行状态。

下一章我们继续。

引用本文 / Cite
Zhang, W. (2026). 第 20 章:三层架构设计. In AI Agent 架构:从单体到企业级多智能体. https://waylandz.com/ai-agent-book/第20章-三层架构设计
@incollection{zhang2026aiagent_第20章_三层架构设计,
  author = {Zhang, Wayland},
  title = {第 20 章:三层架构设计},
  booktitle = {AI Agent 架构:从单体到企业级多智能体},
  year = {2026},
  url = {https://waylandz.com/ai-agent-book/第20章-三层架构设计}
}