第 26 章:多租户设计

多租户隔离不是"加个 WHERE 条件"就完事——它要求每一层数据访问都经过租户验证,一个遗漏就是一次数据泄露。


⏱️ 快速通道(5 分钟掌握核心)

  1. 三种隔离模式:行级(便宜)、Schema 级(中等)、实例级(最贵最安全)
  2. 租户 ID 必须从认证 Context 提取,永远不信任用户输入
  3. 向量数据库用命名空间隔离,不同租户数据物理分开
  4. 每层数据访问都要验证租户,中间件只是第一道防线
  5. 隔离测试用专门的跨租户检查用例,CI 必须包含

10 分钟路径:26.1-26.3 → 26.5 → Shannon Lab


先讲个真实的事故

你的 Agent 系统开始服务多个企业客户。Acme Inc 和 TechCorp 都在用,一切看起来很美好。

有一天 Acme 的管理员发现,他们的会话列表里出现了几条奇怪的记录——内容是 TechCorp 的内部项目讨论。

更糟的是,他好奇地试了一下向量搜索,居然能搜到 TechCorp 的技术文档。

事后复盘,原因很简单:有人在写 Session 列表查询的时候,忘了加 WHERE tenant_id = ?

就这一行代码的遗漏,导致了跨租户数据泄露。

这就是多租户设计的挑战:它不是一个功能,它是一个贯穿所有数据访问的系统约束。


26.1 为什么需要多租户?

单租户架构的问题

先看一个"天真"的架构——所有用户共享同一个数据空间:

┌──────────────────────────────────────┐
           共享数据库                  
  ┌──────┐ ┌──────┐ ┌──────┐         
   用户A   用户B   用户C   混在一起 
   数据    数据    数据           
  └──────┘ └──────┘ └──────┘         
└──────────────────────────────────────┘

问题在哪?

问题影响为什么严重
数据泄露风险一个 API 漏洞可能暴露所有用户数据合规要求(GDPR、等保)不允许
无法独立计费难以按组织统计资源使用无法实现 SaaS 商业模式
无法定制配置所有用户共享相同配额限制大客户和免费用户混用资源
合规困难无法按组织隔离删除数据"被遗忘权"无法实现

多租户架构的目标

多租户架构

三个核心保证:

  1. 数据隔离:租户 A 看不到租户 B 的任何数据
  2. 故障隔离:租户 A 的异常不影响租户 B(至少在应用层)
  3. 配额隔离:每个租户独立的资源限制和计费

26.2 多租户实现策略

在讨论具体实现之前,先理解三种常见的多租户策略:

策略说明隔离程度成本适用场景
每租户独立数据库每个租户一个物理数据库最强最高金融、医疗等强合规场景
共享数据库独立 Schema同一数据库,每个租户一个 schema中等中等大型企业客户
共享数据库共享 Schema所有租户共享表,通过 tenant_id 列隔离最弱最低SaaS、中小客户

大多数 SaaS 选择第三种——成本最低,扩展最灵活。但代价是:你必须在每一层数据访问都强制 tenant_id 过滤。

一个遗漏就是一次泄露。


26.3 数据模型设计

核心表结构

以下是一个典型的多租户数据模型。关键点:每个业务表都有 tenant_id 字段

-- 租户表(组织)
CREATE TABLE auth.tenants (
    id UUID PRIMARY KEY,
    name VARCHAR(255),
    slug VARCHAR(100) UNIQUE,           -- 用户可见的标识符
    plan VARCHAR(50),                   -- free, pro, enterprise
    token_limit INTEGER,                -- 月度 Token 配额
    monthly_token_usage INTEGER DEFAULT 0,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 用户表(绑定到租户)
CREATE TABLE auth.users (
    id UUID PRIMARY KEY,
    email VARCHAR(255) UNIQUE,
    username VARCHAR(100) UNIQUE,
    password_hash VARCHAR(255),
    tenant_id UUID REFERENCES auth.tenants(id),  -- 租户绑定
    role VARCHAR(50),                   -- owner, admin, user
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 会话表(带租户隔离)
CREATE TABLE sessions (
    id VARCHAR(255) PRIMARY KEY,
    user_id VARCHAR(255),
    tenant_id UUID,                     -- 租户隔离
    created_at TIMESTAMP,
    deleted_at TIMESTAMP,               -- 软删除
    metadata JSONB
);

-- 任务执行表(带租户隔离)
CREATE TABLE task_executions (
    id UUID PRIMARY KEY,
    workflow_id VARCHAR(255) UNIQUE NOT NULL,
    user_id UUID,
    tenant_id UUID,                     -- 租户隔离
    status VARCHAR(50),
    created_at TIMESTAMP
);

设计原则

  1. tenant_id 是必填字段:业务表的 tenant_id 不能为 NULL
  2. 软删除优于硬删除:用 deleted_at 而不是 DELETE,方便审计和恢复
  3. slug 用于用户可见场景:UUID 是内部用的,slug 是给用户看的

26.4 五层租户隔离

这是多租户设计的核心。光在数据库层加 tenant_id 是不够的——你需要在每一层都验证租户归属。

五层租户隔离

让我逐层解释。

第一层:认证层隔离

用户登录后,tenant_id 嵌入 JWT Token:

实现参考 (Shannon): go/orchestrator/internal/auth/jwt.go - CustomClaims 结构

// JWT 声明中包含 tenant_id
type CustomClaims struct {
    jwt.RegisteredClaims
    TenantID string   `json:"tenant_id"`  // 租户隔离关键
    Username string   `json:"username"`
    Email    string   `json:"email"`
    Role     string   `json:"role"`
    Scopes   []string `json:"scopes"`
}

func (j *JWTManager) ValidateAccessToken(tokenString string) (*UserContext, error) {
    token, err := jwt.ParseWithClaims(tokenString, &CustomClaims{}, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
        }
        return j.signingKey, nil
    })
    // ... 验证后返回 UserContext
    return &UserContext{
        UserID:   userID,
        TenantID: tenantID,  // 从 token 解析出租户
        // ...
    }, nil
}

设计点:tenant_id 在认证时就绑定到上下文,后续所有层都从这里读取,不依赖用户输入。

第二层:会话层隔离

Session 查询必须过滤 tenant_id,而且跨租户访问要静默失败:

实现参考 (Shannon): go/orchestrator/internal/session/manager.go - GetSession 方法

func (m *Manager) GetSession(ctx context.Context, sessionID string) (*Session, error) {
    // ... 从 Redis 加载 session ...

    // 关键:强制租户隔离
    if userCtx, err := authFromContext(ctx); err == nil && userCtx.TenantID != "" {
        if session.TenantID != "" && session.TenantID != userCtx.TenantID {
            // 静默失败,不泄露存在性
            return nil, ErrSessionNotFound
        }
    }

    return &session, nil
}

为什么是 NotFound 而不是 AccessDenied?

如果返回"Access Denied",攻击者就知道这个 session 存在但不属于他。返回"Not Found"则不泄露任何信息——这叫"静默失败"。

第三层:工作流层隔离

Temporal 工作流通过 Memo 传递租户信息:

func (h *TaskHandler) SubmitTask(ctx context.Context, req *SubmitTaskRequest) (*SubmitTaskResponse, error) {
    userCtx, err := auth.GetUserContext(ctx)
    if err != nil {
        return nil, err
    }

    workflowOptions := client.StartWorkflowOptions{
        ID:        workflowID,
        TaskQueue: "shannon-tasks",
        Memo: map[string]interface{}{
            "tenant_id": userCtx.TenantID.String(),  // 存入 Memo
            "user_id":   userCtx.UserID.String(),
        },
    }

    _, err = h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, ...)
    // ...
}

func (h *TaskHandler) GetTaskStatus(ctx context.Context, taskID string) (*TaskStatus, error) {
    userCtx, err := auth.GetUserContext(ctx)
    if err != nil {
        return nil, err
    }

    wf := h.temporalClient.GetWorkflow(ctx, taskID, "")
    desc, _ := wf.Describe(ctx)

    // 从 Memo 检查租户
    var memoTenantID string
    if desc.WorkflowExecutionInfo.Memo != nil {
        if tid, ok := desc.WorkflowExecutionInfo.Memo.Fields["tenant_id"]; ok {
            memoTenantID = tid.GetStringValue()
        }
    }

    // 验证租户所有权
    if memoTenantID != userCtx.TenantID.String() {
        return nil, ErrTaskNotFound  // 静默失败
    }
    // ...
}

为什么用 Memo?

Temporal Workflow 有自己的存储,不在你的数据库里。Memo 是 Workflow 的元数据字段,专门用来存这类业务上下文。

第四层:向量存储层隔离

向量搜索必须加 tenant 过滤器,否则用户能搜到别人的私有文档:

实现参考 (Shannon): go/orchestrator/internal/vectordb/search.go - FindSimilarQueries 方法

func (c *Client) FindSimilarQueries(ctx context.Context, embedding []float32, limit int) ([]SimilarQuery, error) {
    // 从上下文提取 tenant_id
    var filter map[string]interface{}
    if userCtx, ok := ctx.Value(auth.UserContextKey).(*auth.UserContext); ok &&
       userCtx.TenantID.String() != "00000000-0000-0000-0000-000000000000" {
        filter = map[string]interface{}{
            "must": []map[string]interface{}{
                {"key": "tenant_id", "match": map[string]interface{}{"value": userCtx.TenantID.String()}},
            },
        }
    }

    // 搜索时强制带 filter
    pts, err := c.search(ctx, c.cfg.TaskEmbeddings, embedding, limit, c.cfg.Threshold, filter)
    // ...
}

这是最容易遗漏的一层。 因为向量搜索通常是"相似度匹配",开发者容易忘记还需要"权限过滤"。

第五层:数据库层隔离

所有 SQL 查询都带 tenant_id 条件:

func (r *TaskRepository) GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error) {
    userCtx, err := auth.GetUserContext(ctx)
    if err != nil {
        return nil, err
    }

    var task Task
    err = r.db.QueryRowContext(ctx, `
        SELECT id, workflow_id, user_id, tenant_id, status, created_at
        FROM task_executions
        WHERE id = $1 AND tenant_id = $2  -- 强制 tenant 过滤
    `, taskID, userCtx.TenantID).Scan(...)

    if err == sql.ErrNoRows {
        return nil, ErrTaskNotFound
    }
    return &task, err
}

26.5 认证中间件设计

多租户隔离的第一道防线是认证中间件——它负责从请求中提取用户身份,并注入 tenant_id 到上下文。

HTTP 中间件

实现参考 (Shannon): go/orchestrator/internal/auth/middleware.go - HTTPMiddleware 方法

func (m *Middleware) HTTPMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 开发模式跳过认证
        if m.skipAuth {
            ctx := context.WithValue(r.Context(), UserContextKey, &UserContext{
                UserID:   uuid.MustParse("00000000-0000-0000-0000-000000000002"),
                TenantID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
                Username: "dev",
                Role:     RoleOwner,
            })
            next.ServeHTTP(w, r.WithContext(ctx))
            return
        }

        // 尝试 API Key
        apiKey := r.Header.Get("X-API-Key")
        if apiKey != "" {
            userCtx, err := m.authService.ValidateAPIKey(r.Context(), apiKey)
            if err != nil {
                http.Error(w, "Invalid API key", http.StatusUnauthorized)
                return
            }
            ctx := context.WithValue(r.Context(), UserContextKey, userCtx)
            next.ServeHTTP(w, r.WithContext(ctx))
            return
        }

        // 尝试 Bearer Token
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, `{"error":"API key is required"}`, http.StatusUnauthorized)
            return
        }

        token, err := ExtractBearerToken(authHeader)
        if err != nil {
            http.Error(w, "Invalid authorization header", http.StatusUnauthorized)
            return
        }

        userCtx, err := m.jwtManager.ValidateAccessToken(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }

        // 注入用户上下文(包含 TenantID)
        ctx := context.WithValue(r.Context(), UserContextKey, userCtx)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

gRPC 拦截器

gRPC 服务也需要类似的认证拦截器:

实现参考 (Shannon): go/orchestrator/internal/auth/middleware.go - UnaryServerInterceptor 方法

func (m *Middleware) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // 健康检查跳过认证
        if strings.HasSuffix(info.FullMethod, "/Health") {
            return handler(ctx, req)
        }

        // 开发模式可以通过 metadata 传递用户信息
        if m.skipAuth {
            userID := uuid.MustParse("00000000-0000-0000-0000-000000000002")
            tenantID := uuid.MustParse("00000000-0000-0000-0000-000000000001")

            if md, ok := metadata.FromIncomingContext(ctx); ok {
                if vals := md.Get("x-user-id"); len(vals) > 0 {
                    if parsed, err := uuid.Parse(vals[0]); err == nil {
                        userID = parsed
                    }
                }
                if vals := md.Get("x-tenant-id"); len(vals) > 0 {
                    if parsed, err := uuid.Parse(vals[0]); err == nil {
                        tenantID = parsed
                    }
                }
            }

            ctx = context.WithValue(ctx, UserContextKey, &UserContext{
                UserID:   userID,
                TenantID: tenantID,
                Username: "dev",
                Role:     RoleOwner,
            })
            return handler(ctx, req)
        }

        // 生产环境:验证 API Key 或 JWT
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "missing metadata")
        }

        var userCtx *UserContext
        if apiKeys := md.Get("x-api-key"); len(apiKeys) > 0 {
            var err error
            userCtx, err = m.authService.ValidateAPIKey(ctx, apiKeys[0])
            if err != nil {
                return nil, status.Error(codes.Unauthenticated, "invalid API key")
            }
        }

        if userCtx == nil {
            return nil, status.Error(codes.Unauthenticated, "missing authentication")
        }

        ctx = context.WithValue(ctx, UserContextKey, userCtx)
        return handler(ctx, req)
    }
}

26.6 租户配额管理

不同租户有不同的资源配额——这是 SaaS 定价的基础。

配额结构

实现参考 (Shannon): go/orchestrator/internal/auth/types.go - Tenant 结构

type Tenant struct {
    ID                uuid.UUID  `json:"id" db:"id"`
    Name              string     `json:"name" db:"name"`
    Slug              string     `json:"slug" db:"slug"`
    Plan              string     `json:"plan" db:"plan"` // free, pro, enterprise
    TokenLimit        int        `json:"token_limit" db:"token_limit"`
    MonthlyTokenUsage int        `json:"monthly_token_usage" db:"monthly_token_usage"`
    DailyTokenLimit   *int       `json:"daily_token_limit,omitempty" db:"daily_token_limit"`
    DailyTokenUsage   int        `json:"daily_token_usage" db:"daily_token_usage"`
    RateLimitPerHour  int        `json:"rate_limit_per_hour" db:"rate_limit_per_hour"`
    IsActive          bool       `json:"is_active" db:"is_active"`
    // ...
}

按 Plan 差异化配额

type TenantQuotas struct {
    MonthlyTokenLimit   int `json:"monthly_token_limit"`
    RateLimitPerMinute  int `json:"rate_limit_per_minute"`
    RateLimitPerHour    int `json:"rate_limit_per_hour"`
    MaxSessions         int `json:"max_sessions"`
    MaxVectorDocs       int `json:"max_vector_docs"`
}

var PlanQuotas = map[string]TenantQuotas{
    "free": {
        MonthlyTokenLimit:  100000,    // 10万 tokens/月
        RateLimitPerMinute: 20,
        RateLimitPerHour:   500,
        MaxSessions:        10,
        MaxVectorDocs:      1000,
    },
    "pro": {
        MonthlyTokenLimit:  1000000,   // 100万 tokens/月
        RateLimitPerMinute: 60,
        RateLimitPerHour:   2000,
        MaxSessions:        100,
        MaxVectorDocs:      50000,
    },
    "enterprise": {
        MonthlyTokenLimit:  -1,        // 无限制(按用量计费)
        RateLimitPerMinute: 300,
        RateLimitPerHour:   10000,
        MaxSessions:        -1,
        MaxVectorDocs:      -1,
    },
}

配额检查

func (s *QuotaService) CheckQuota(ctx context.Context, quotaType string, amount int) error {
    userCtx, err := auth.GetUserContext(ctx)
    if err != nil {
        return err
    }

    tenant, err := s.tenantRepo.GetTenant(ctx, userCtx.TenantID)
    if err != nil {
        return err
    }

    quotas := PlanQuotas[tenant.Plan]

    switch quotaType {
    case "tokens":
        if quotas.MonthlyTokenLimit > 0 {  // -1 表示无限制
            usage, _ := s.usageRepo.GetMonthlyTokens(ctx, tenant.ID)
            if usage+amount > quotas.MonthlyTokenLimit {
                return ErrQuotaExceeded
            }
        }
    case "sessions":
        if quotas.MaxSessions > 0 {
            count, _ := s.sessionRepo.CountSessions(ctx, tenant.ID)
            if count >= quotas.MaxSessions {
                return ErrQuotaExceeded
            }
        }
    }

    return nil
}

26.7 API Key 管理

API Key 是多租户系统的重要认证方式——它绑定到特定用户和租户。

安全存储

API Key 只存哈希,不存原文:

实现参考 (Shannon): go/orchestrator/internal/auth/types.go - APIKey 结构

type APIKey struct {
    ID               uuid.UUID      `json:"id" db:"id"`
    KeyHash          string         `json:"-" db:"key_hash"`      // 只存哈希
    KeyPrefix        string         `json:"key_prefix" db:"key_prefix"` // sk_live_
    UserID           uuid.UUID      `json:"user_id" db:"user_id"`
    TenantID         uuid.UUID      `json:"tenant_id" db:"tenant_id"`
    Name             string         `json:"name" db:"name"`
    Scopes           pq.StringArray `json:"scopes" db:"scopes"`
    RateLimitPerHour int            `json:"rate_limit_per_hour" db:"rate_limit_per_hour"`
    LastUsed         *time.Time     `json:"last_used,omitempty" db:"last_used"`
    ExpiresAt        *time.Time     `json:"expires_at,omitempty" db:"expires_at"`
    IsActive         bool           `json:"is_active" db:"is_active"`
    // ...
}

验证 API Key

func (s *AuthService) ValidateAPIKey(ctx context.Context, key string) (*UserContext, error) {
    // 标准化格式:sk-shannon-xxx → sk_xxx
    if strings.HasPrefix(key, "sk-shannon-") {
        key = "sk_" + strings.TrimPrefix(key, "sk-shannon-")
    }

    // 哈希查找
    keyHash := sha256.Sum256([]byte(key))
    hashStr := hex.EncodeToString(keyHash[:])

    apiKey, err := s.apiKeyRepo.GetByHash(ctx, hashStr)
    if err != nil {
        return nil, ErrInvalidAPIKey
    }

    if !apiKey.IsActive {
        return nil, ErrAPIKeyRevoked
    }

    // 检查过期
    if apiKey.ExpiresAt != nil && apiKey.ExpiresAt.Before(time.Now()) {
        return nil, ErrAPIKeyExpired
    }

    // 更新最后使用时间(异步,不阻塞请求)
    go func() {
        apiKey.LastUsed = &time.Time{}
        *apiKey.LastUsed = time.Now()
        s.apiKeyRepo.Update(context.Background(), apiKey)
    }()

    // 获取用户信息,构建 UserContext
    user, err := s.userRepo.GetByID(ctx, apiKey.UserID)
    if err != nil {
        return nil, err
    }

    return &UserContext{
        UserID:   user.ID,
        TenantID: user.TenantID,  // 租户来自用户,不来自 Key
        Username: user.Username,
        Email:    user.Email,
        Role:     user.Role,
        Scopes:   apiKey.Scopes,
        IsAPIKey: true,
        APIKeyID: apiKey.ID,
    }, nil
}

26.8 常见的坑

我见过太多多租户实现在这些地方翻车:

坑 1:遗漏 tenant_id 过滤

这是最常见的问题。代码 review 时要特别注意没有 tenant_id 的查询:

// 错误:忘记添加 tenant_id 过滤
rows, _ := db.Query("SELECT * FROM sessions WHERE user_id = $1", userID)

// 正确:始终过滤 tenant_id
rows, _ := db.Query(`
    SELECT * FROM sessions
    WHERE user_id = $1 AND tenant_id = $2
`, userID, tenantID)

防护措施:可以考虑在 SQL 执行层加一个检测,警告没有 tenant_id 条件的查询。

坑 2:向量存储未隔离

向量搜索是最容易遗漏的,因为它不像 SQL 那样有 WHERE 语法:

// 错误:查询时不过滤租户
searchResult, _ := v.client.Search(ctx, &qdrant.SearchPoints{
    CollectionName: "memory",
    Vector:         embedding,
    Limit:          10,
    // 缺少 Filter!
})

// 正确:添加租户过滤器
filter := &qdrant.Filter{
    Must: []*qdrant.Condition{{
        ConditionOneOf: &qdrant.Condition_Field{
            Field: &qdrant.FieldCondition{
                Key: "tenant_id",
                Match: &qdrant.Match{
                    MatchValue: &qdrant.Match_Keyword{Keyword: tenantID},
                },
            },
        },
    }},
}

searchResult, _ := v.client.Search(ctx, &qdrant.SearchPoints{
    CollectionName: "memory",
    Vector:         embedding,
    Limit:          10,
    Filter:         filter,  // 强制租户过滤
})

坑 3:Temporal Memo 遗漏

Workflow 数据不在你的数据库里,所以 tenant_id 要通过 Memo 传递:

// 错误:不在 Memo 中存储租户
workflowOptions := client.StartWorkflowOptions{
    ID: workflowID,
    // Memo 为空!
}

// 正确:存储租户 ID
workflowOptions := client.StartWorkflowOptions{
    ID: workflowID,
    Memo: map[string]interface{}{
        "tenant_id": tenantID,
        "user_id":   userID,
    },
}

坑 4:开发模式泄露到生产

开发时为了方便,经常跳过认证。但这个配置绝对不能带到生产:

# 错误:生产环境启用 skip_auth
auth:
  skip_auth: true  # 危险!

# 正确:生产环境必须关闭
auth:
  enabled: true
  skip_auth: false

防护措施:在生产环境启动时检测 skip_auth,如果为 true 就报错退出。

坑 5:返回 AccessDenied 而不是 NotFound

跨租户访问不应该返回"无权限",而应该返回"不存在":

// 错误:泄露存在性
if task.TenantID != userCtx.TenantID {
    return nil, ErrAccessDenied  // 攻击者知道资源存在
}

// 正确:静默失败
if task.TenantID != userCtx.TenantID {
    return nil, ErrTaskNotFound  // 不泄露存在性
}

26.9 安全最佳实践

实践说明为什么重要
JWT Secret 长度至少 32 字符防止暴力破解
Token 过期时间Access 30 分钟,Refresh 7 天平衡安全与体验
API Key 存储只存储哈希,不存原文数据库泄露也不暴露 Key
密码哈希使用 bcrypt防止彩虹表攻击
HTTPS 强制生产环境必须防止中间人攻击
跨租户访问静默返回 NotFound不泄露数据存在性
审计日志记录所有访问尝试合规与事故排查

审计日志示例

func (a *AuditLogger) LogAccess(ctx context.Context, event AuditEvent) {
    userCtx, _ := auth.GetUserContext(ctx)

    a.logger.Info("access_audit",
        zap.String("user_id", userCtx.UserID.String()),
        zap.String("tenant_id", userCtx.TenantID.String()),
        zap.String("action", event.Action),
        zap.String("resource", event.Resource),
        zap.String("resource_id", event.ResourceID),
        zap.Bool("allowed", event.Allowed),
        zap.String("ip_address", event.IPAddress),
        zap.Time("timestamp", time.Now()),
    )
}

26.10 框架对比

特性ShannonLangGraphDifyFlowise
JWT 认证原生支持原生支持原生支持原生支持
API Keys原生支持原生支持原生支持原生支持
多租户隔离五层完整隔离部分支持完整支持仅 UI 层
会话隔离Redis + tenant_id支持支持不支持
向量隔离Qdrant Filter不支持部分支持不支持
工作流隔离Temporal Memo支持支持不适用
审计日志完整支持部分支持完整支持不支持

这章讲完了

核心就一句话:多租户隔离要做到五层——认证、会话、工作流、向量存储、数据库——每一层都必须验证 tenant_id,一个遗漏就可能导致数据泄露。


回顾

  1. 五层隔离:认证/会话/工作流/向量/数据库,每层都要验证 tenant_id
  2. 静默失败:跨租户访问返回 NotFound,不返回 AccessDenied
  3. JWT 绑定:tenant_id 在认证时注入上下文,后续所有层从这里读取
  4. 向量过滤:向量搜索必须带 tenant 过滤器,这是最容易遗漏的层
  5. 配额管理:按 Plan 差异化配额,实现 SaaS 定价模型

Shannon Lab(10 分钟上手)

本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。

必读(1 个文件)

  • go/orchestrator/internal/auth/middleware.go:看 HTTPMiddleware 和 UnaryServerInterceptor 函数,理解 tenant_id 怎么从 Token 提取并注入到 Context

选读深挖(2 个,按兴趣挑)

  • go/orchestrator/internal/session/manager.go:找 GetSession 方法中的租户检查逻辑,理解为什么返回 ErrSessionNotFound 而不是 ErrAccessDenied
  • go/orchestrator/internal/vectordb/search.go:找 FindSimilarQueries 方法中的 filter 构建,理解向量搜索怎么实现租户隔离

练习

练习 1:代码审计

review 以下代码,找出多租户隔离的问题:

func (r *DocumentRepo) GetDocument(ctx context.Context, docID string) (*Document, error) {
    var doc Document
    err := r.db.QueryRowContext(ctx, `
        SELECT id, title, content, user_id
        FROM documents
        WHERE id = $1
    `, docID).Scan(&doc.ID, &doc.Title, &doc.Content, &doc.UserID)
    return &doc, err
}

练习 2:设计向量隔离

设计一个向量搜索的隔离方案:

  • 每个租户的向量存在哪里?
  • 查询时怎么过滤?
  • 如果要支持"跨租户公共知识库"怎么设计?

练习 3(进阶):PostgreSQL RLS

了解 PostgreSQL 的 Row-Level Security (RLS) 特性:

  • 它能在数据库层强制租户隔离吗?
  • 和应用层过滤相比,优缺点是什么?
  • Shannon 为什么选择应用层过滤而不是 RLS?

进一步阅读


Part 8 完结

Part 8 企业级特性到此完结。我们讲了:

  • Token 预算控制(第 23 章)
  • OPA 策略治理(第 24 章)
  • WASI 安全沙箱(第 25 章)
  • 多租户设计(第 26 章)

这四章解决的是"Agent 怎么在企业环境安全运行"的问题。

下一部分 Part 9,我们进入前沿实践——Computer Use、Agentic Coding、Hooks 系统、Plugin 架构。这些是 2025-2026 年 Agent 领域最热的方向。

引用本文 / Cite
Zhang, W. (2026). 第 26 章:多租户设计. In AI Agent 架构:从单体到企业级多智能体. https://waylandz.com/ai-agent-book/第26章-多租户设计
@incollection{zhang2026aiagent_第26章_多租户设计,
  author = {Zhang, Wayland},
  title = {第 26 章:多租户设计},
  booktitle = {AI Agent 架构:从单体到企业级多智能体},
  year = {2026},
  url = {https://waylandz.com/ai-agent-book/第26章-多租户设计}
}