文档阅读区

Core LLM Gateway 设计

版本: v0.5.0
日期: 2025-12-17
说明: LLM Gateway 详细设计文档


一、概述

1.1 定位

LLM Gateway 是 Core 平台提供的统一 LLM 接入层,为所有 Apps 提供受控的 LLM 调用能力。

1.2 核心原则

  • 统一接入:所有 LLM 调用都通过 LLM Gateway
  • 受控调用:限流、缓存、重试、审计
  • 结构化输出:支持 JSON Schema 校验
  • 上下文组装:输入只用引用,由 Core Query API 组装 context pack

二、架构设计

2.1 模块结构

backend/app/core/llm/
├── __init__.py
├── gateway.py          # LLM Gateway 核心
├── providers/          # LLM Provider 实现
│   ├── __init__.py
│   ├── base.py         # Provider 基类
│   ├── openai.py       # OpenAI Provider
│   ├── dify.py         # Dify Provider
│   └── local.py        # 本地模型 Provider
├── context_builder.py  # Context Pack 组装
├── rate_limiter.py     # 限流
├── cache.py            # 缓存
├── retry.py            # 重试
├── validator.py        # 输出校验
└── config.py           # 配置管理

2.2 数据流

Rule/Action
    ↓
LLM Gateway API
    ↓
Context Builder (组装上下文)
    ↓
Prompt Renderer (渲染 Prompt)
    ↓
Rate Limiter (限流检查)
    ↓
Cache (缓存检查)
    ↓
LLM Provider (调用 LLM)
    ↓
Retry (如果需要)
    ↓
Validator (输出校验)
    ↓
Audit Logger (记录审计)
    ↓
返回结果

三、API 设计

3.1 LLM Gateway API

POST /api/v1/llm/chat

请求

{
  "prompt_template_id": "project-mngt.task_decomposition@1.0.0",
  "context_refs": {
    "eventIds": ["evt_123", "evt_456"],
    "entityIds": ["proj_789"],
    "alertIds": ["alert_001"],
    "timeRange": {
      "start": "2025-01-01T00:00:00Z",
      "end": "2025-01-07T23:59:59Z"
    },
    "viewPreset": "project-mngt.view_project_events"
  },
  "variables": {
    "project_name": "我的项目",
    "project_type": "software"
  },
  "provider": "openai",  // 可选,使用默认
  "model": "gpt-4",      // 可选,使用默认
  "temperature": 0.7,
  "max_tokens": 1000,
  "stream": false        // 是否流式输出
}

响应

{
  "id": "llm_call_123",
  "status": "success",
  "result": {
    "content": "...",
    "parsed": {
      // 解析后的结构化数据(如果 output_schema 存在)
    }
  },
  "metadata": {
    "prompt_template_id": "project-mngt.task_decomposition@1.0.0",
    "prompt_version": "1.0.0",
    "model_provider": "openai",
    "model_name": "gpt-4",
    "input_tokens": 150,
    "output_tokens": 300,
    "total_tokens": 450,
    "execution_time_ms": 2500,
    "cached": false,
    "validation_passed": true
  }
}

错误响应

{
  "id": "llm_call_123",
  "status": "error",
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "Rate limit exceeded for tenant",
    "retry_after": 60
  }
}

3.2 流式输出 API(可选)

POST /api/v1/llm/chat/stream

请求:同 /api/v1/llm/chat,但 stream: true

响应:Server-Sent Events (SSE)

data: {"chunk": "Hello", "done": false}
data: {"chunk": " world", "done": false}
data: {"chunk": "", "done": true, "metadata": {...}}

四、核心功能设计

4.1 Context Pack 组装

功能:根据 context_refs 调用 Query API 组装上下文

实现context_builder.py):

class ContextBuilder:
    async def build_context(
        self,
        context_refs: Dict[str, Any],
        query_api: QueryAPI,
    ) -> str:
        """
        组装上下文文本
        
        context_refs:
          - eventIds: 查询事件
          - entityIds: 查询实体
          - alertIds: 查询告警
          - timeRange: 时间范围
          - viewPreset: 视图预设
        """
        context_parts = []
        
        # 处理 eventIds
        if "eventIds" in context_refs:
            events = await query_api.get_events(context_refs["eventIds"])
            context_parts.append(self._format_events(events))
        
        # 处理 entityIds
        if "entityIds" in context_refs:
            entities = await query_api.get_entities(context_refs["entityIds"])
            context_parts.append(self._format_entities(entities))
        
        # 处理 alertIds
        if "alertIds" in context_refs:
            alerts = await query_api.get_alerts(context_refs["alertIds"])
            context_parts.append(self._format_alerts(alerts))
        
        # 处理 timeRange + viewPreset
        if "timeRange" in context_refs and "viewPreset" in context_refs:
            view_data = await query_api.get_view_data(
                view_id=context_refs["viewPreset"],
                time_range=context_refs["timeRange"],
            )
            context_parts.append(self._format_view_data(view_data))
        
        return "\n\n".join(context_parts)

4.2 限流

功能:按租户/App/用户限流

实现rate_limiter.py):

class RateLimiter:
    async def check_rate_limit(
        self,
        tenant_id: str,
        app_id: str,
        user_id: str,
    ) -> bool:
        """
        检查是否超过限流
        
        返回: (allowed, retry_after)
        """
        # 检查租户限流
        tenant_key = f"llm:rate_limit:tenant:{tenant_id}"
        tenant_count = await redis.incr(tenant_key)
        if tenant_count == 1:
            await redis.expire(tenant_key, 86400)  # 24小时
        if tenant_count > self.config.tenant_daily_limit:
            return False, 86400
        
        # 检查 App 限流
        app_key = f"llm:rate_limit:app:{app_id}"
        app_count = await redis.incr(app_key)
        if app_count == 1:
            await redis.expire(app_key, 86400)
        if app_count > self.config.app_daily_limit:
            return False, 86400
        
        # 检查用户限流
        user_key = f"llm:rate_limit:user:{user_id}"
        user_count = await redis.incr(user_key)
        if user_count == 1:
            await redis.expire(user_key, 86400)
        if user_count > self.config.user_daily_limit:
            return False, 86400
        
        return True, 0

配置

llm_rate_limit:
  per_tenant:
    daily_tokens: 1000000
    daily_calls: 10000
  per_app:
    daily_tokens: 500000
    daily_calls: 5000
  per_user:
    daily_tokens: 100000
    daily_calls: 1000

4.3 缓存

功能:相同输入缓存结果

实现cache.py):

class LLMCache:
    async def get(
        self,
        cache_key: str,
    ) -> Optional[Dict[str, Any]]:
        """获取缓存"""
        cached = await redis.get(f"llm:cache:{cache_key}")
        if cached:
            return json.loads(cached)
        return None
    
    async def set(
        self,
        cache_key: str,
        value: Dict[str, Any],
        ttl: int = 3600,
    ):
        """设置缓存"""
        await redis.setex(
            f"llm:cache:{cache_key}",
            ttl,
            json.dumps(value),
        )
    
    def generate_key(
        self,
        prompt_template_id: str,
        context_refs: Dict,
        variables: Dict,
    ) -> str:
        """生成缓存键"""
        key_data = {
            "prompt": prompt_template_id,
            "context": context_refs,
            "variables": variables,
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()

4.4 重试

功能:指数退避重试

实现retry.py):

class RetryHandler:
    async def call_with_retry(
        self,
        func: Callable,
        max_retries: int = 3,
        initial_delay: float = 1.0,
    ) -> Any:
        """带重试的调用"""
        delay = initial_delay
        last_error = None
        
        for attempt in range(max_retries):
            try:
                return await func()
            except RetryableError as e:
                last_error = e
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay)
                    delay *= 2  # 指数退避
                else:
                    raise
            except NonRetryableError as e:
                raise
        
        raise last_error

4.5 输出校验

功能:使用 JSON Schema 校验输出

实现validator.py):

class OutputValidator:
    def validate(
        self,
        output: str,
        output_schema: Dict,
    ) -> Tuple[bool, Optional[str], Optional[Dict]]:
        """
        校验输出是否符合 schema
        
        返回: (is_valid, error_message, parsed_data)
        """
        try:
            # 尝试解析 JSON
            parsed = json.loads(output)
        except json.JSONDecodeError:
            return False, "Output is not valid JSON", None
        
        # 使用 jsonschema 校验
        try:
            jsonschema.validate(parsed, output_schema)
            return True, None, parsed
        except jsonschema.ValidationError as e:
            return False, str(e), None

五、LLM Provider 设计

5.1 Provider 接口

基类providers/base.py):

class LLMProvider(ABC):
    @abstractmethod
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = None,
    ) -> Dict[str, Any]:
        """
        聊天完成
        
        返回:
        {
            "content": "...",
            "usage": {
                "prompt_tokens": 100,
                "completion_tokens": 200,
                "total_tokens": 300
            }
        }
        """
        pass
    
    @abstractmethod
    async def stream_chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
    ) -> AsyncIterator[Dict[str, Any]]:
        """
        流式聊天完成
        
        返回: AsyncIterator[{"chunk": "...", "done": False}]
        """
        pass

5.2 OpenAI Provider

实现providers/openai.py):

class OpenAIProvider(LLMProvider):
    def __init__(self, config: Dict):
        self.api_key = config["api_key"]
        self.base_url = config.get("base_url", "https://api.openai.com/v1")
        self.default_model = config.get("default_model", "gpt-4")
        self.client = openai.AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = None,
    ) -> Dict[str, Any]:
        response = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
        )
        
        return {
            "content": response.choices[0].message.content,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            },
        }

5.3 Dify Provider

实现providers/dify.py):

class DifyProvider(LLMProvider):
    def __init__(self, config: Dict):
        self.api_key = config["api_key"]
        self.base_url = config.get("base_url", "https://api.dify.ai/v1")
        self.default_model = config.get("default_model", "gpt-4")
    
    async def chat_completion(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = None,
    ) -> Dict[str, Any]:
        # 调用 Dify API
        # 实现类似 OpenAI Provider
        pass

六、配置管理

6.1 配置文件

位置: backend/.envbackend/config/llm.yaml

配置示例

llm:
  default_provider: openai
  providers:
    openai:
      api_key: ${OPENAI_API_KEY}
      base_url: https://api.openai.com/v1
      default_model: gpt-4
      timeout: 30
    dify:
      api_key: ${DIFY_API_KEY}
      base_url: https://api.dify.ai/v1
      default_model: gpt-4
      timeout: 30
    local:
      base_url: http://localhost:8000/v1
      default_model: llama-2
      timeout: 60

rate_limit:
  per_tenant:
    daily_tokens: 1000000
    daily_calls: 10000
  per_app:
    daily_tokens: 500000
    daily_calls: 5000
  per_user:
    daily_tokens: 100000
    daily_calls: 1000

cache:
  enabled: true
  ttl: 3600
  max_size: 10000

retry:
  max_retries: 3
  initial_delay: 1.0
  max_delay: 60.0

七、审计日志

7.1 审计记录

位置: tool_calls

记录内容

  • prompt_template_id
  • prompt_version
  • model_provider
  • model_name
  • model_version
  • context_refs
  • input_tokens
  • output_tokens
  • total_tokens
  • output_hash
  • validation_passed
  • execution_time_ms

7.2 审计查询

API: GET /api/v1/audit/tool_calls

支持过滤

  • 按 prompt_template_id
  • 按 model_provider
  • 按 validation_passed
  • 按时间范围
  • 按租户/App/用户

文档版本: v1.0
最后更新: 2025-12-17