Core LLM Gateway 设计
版本: v0.5.0
日期: 2025-12-17
说明: LLM Gateway 详细设计文档
一、概述
1.1 定位
LLM Gateway 是 Core 平台提供的统一 LLM 接入层,为所有 Apps 提供受控的 LLM 调用能力。
1.2 核心原则
- 统一接入:所有 LLM 调用都通过 LLM Gateway
- 受控调用:限流、缓存、重试、审计
- 结构化输出:支持 JSON Schema 校验
- 上下文组装:输入只用引用,由 Core Query API 组装 context pack
二、架构设计
2.1 模块结构
backend/app/core/llm/
├── __init__.py
├── gateway.py # LLM Gateway 核心
├── providers/ # LLM Provider 实现
│ ├── __init__.py
│ ├── base.py # Provider 基类
│ ├── openai.py # OpenAI Provider
│ ├── dify.py # Dify Provider
│ └── local.py # 本地模型 Provider
├── context_builder.py # Context Pack 组装
├── rate_limiter.py # 限流
├── cache.py # 缓存
├── retry.py # 重试
├── validator.py # 输出校验
└── config.py # 配置管理
2.2 数据流
Rule/Action
↓
LLM Gateway API
↓
Context Builder (组装上下文)
↓
Prompt Renderer (渲染 Prompt)
↓
Rate Limiter (限流检查)
↓
Cache (缓存检查)
↓
LLM Provider (调用 LLM)
↓
Retry (如果需要)
↓
Validator (输出校验)
↓
Audit Logger (记录审计)
↓
返回结果
三、API 设计
3.1 LLM Gateway API
POST /api/v1/llm/chat
请求:
{
"prompt_template_id": "project-mngt.task_decomposition@1.0.0",
"context_refs": {
"eventIds": ["evt_123", "evt_456"],
"entityIds": ["proj_789"],
"alertIds": ["alert_001"],
"timeRange": {
"start": "2025-01-01T00:00:00Z",
"end": "2025-01-07T23:59:59Z"
},
"viewPreset": "project-mngt.view_project_events"
},
"variables": {
"project_name": "我的项目",
"project_type": "software"
},
"provider": "openai", // 可选,使用默认
"model": "gpt-4", // 可选,使用默认
"temperature": 0.7,
"max_tokens": 1000,
"stream": false // 是否流式输出
}
响应:
{
"id": "llm_call_123",
"status": "success",
"result": {
"content": "...",
"parsed": {
// 解析后的结构化数据(如果 output_schema 存在)
}
},
"metadata": {
"prompt_template_id": "project-mngt.task_decomposition@1.0.0",
"prompt_version": "1.0.0",
"model_provider": "openai",
"model_name": "gpt-4",
"input_tokens": 150,
"output_tokens": 300,
"total_tokens": 450,
"execution_time_ms": 2500,
"cached": false,
"validation_passed": true
}
}
错误响应:
{
"id": "llm_call_123",
"status": "error",
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "Rate limit exceeded for tenant",
"retry_after": 60
}
}
3.2 流式输出 API(可选)
POST /api/v1/llm/chat/stream
请求:同 /api/v1/llm/chat,但 stream: true
响应:Server-Sent Events (SSE)
data: {"chunk": "Hello", "done": false}
data: {"chunk": " world", "done": false}
data: {"chunk": "", "done": true, "metadata": {...}}
四、核心功能设计
4.1 Context Pack 组装
功能:根据 context_refs 调用 Query API 组装上下文
实现(context_builder.py):
class ContextBuilder:
async def build_context(
self,
context_refs: Dict[str, Any],
query_api: QueryAPI,
) -> str:
"""
组装上下文文本
context_refs:
- eventIds: 查询事件
- entityIds: 查询实体
- alertIds: 查询告警
- timeRange: 时间范围
- viewPreset: 视图预设
"""
context_parts = []
# 处理 eventIds
if "eventIds" in context_refs:
events = await query_api.get_events(context_refs["eventIds"])
context_parts.append(self._format_events(events))
# 处理 entityIds
if "entityIds" in context_refs:
entities = await query_api.get_entities(context_refs["entityIds"])
context_parts.append(self._format_entities(entities))
# 处理 alertIds
if "alertIds" in context_refs:
alerts = await query_api.get_alerts(context_refs["alertIds"])
context_parts.append(self._format_alerts(alerts))
# 处理 timeRange + viewPreset
if "timeRange" in context_refs and "viewPreset" in context_refs:
view_data = await query_api.get_view_data(
view_id=context_refs["viewPreset"],
time_range=context_refs["timeRange"],
)
context_parts.append(self._format_view_data(view_data))
return "\n\n".join(context_parts)
4.2 限流
功能:按租户/App/用户限流
实现(rate_limiter.py):
class RateLimiter:
async def check_rate_limit(
self,
tenant_id: str,
app_id: str,
user_id: str,
) -> bool:
"""
检查是否超过限流
返回: (allowed, retry_after)
"""
# 检查租户限流
tenant_key = f"llm:rate_limit:tenant:{tenant_id}"
tenant_count = await redis.incr(tenant_key)
if tenant_count == 1:
await redis.expire(tenant_key, 86400) # 24小时
if tenant_count > self.config.tenant_daily_limit:
return False, 86400
# 检查 App 限流
app_key = f"llm:rate_limit:app:{app_id}"
app_count = await redis.incr(app_key)
if app_count == 1:
await redis.expire(app_key, 86400)
if app_count > self.config.app_daily_limit:
return False, 86400
# 检查用户限流
user_key = f"llm:rate_limit:user:{user_id}"
user_count = await redis.incr(user_key)
if user_count == 1:
await redis.expire(user_key, 86400)
if user_count > self.config.user_daily_limit:
return False, 86400
return True, 0
配置:
llm_rate_limit:
per_tenant:
daily_tokens: 1000000
daily_calls: 10000
per_app:
daily_tokens: 500000
daily_calls: 5000
per_user:
daily_tokens: 100000
daily_calls: 1000
4.3 缓存
功能:相同输入缓存结果
实现(cache.py):
class LLMCache:
async def get(
self,
cache_key: str,
) -> Optional[Dict[str, Any]]:
"""获取缓存"""
cached = await redis.get(f"llm:cache:{cache_key}")
if cached:
return json.loads(cached)
return None
async def set(
self,
cache_key: str,
value: Dict[str, Any],
ttl: int = 3600,
):
"""设置缓存"""
await redis.setex(
f"llm:cache:{cache_key}",
ttl,
json.dumps(value),
)
def generate_key(
self,
prompt_template_id: str,
context_refs: Dict,
variables: Dict,
) -> str:
"""生成缓存键"""
key_data = {
"prompt": prompt_template_id,
"context": context_refs,
"variables": variables,
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
4.4 重试
功能:指数退避重试
实现(retry.py):
class RetryHandler:
async def call_with_retry(
self,
func: Callable,
max_retries: int = 3,
initial_delay: float = 1.0,
) -> Any:
"""带重试的调用"""
delay = initial_delay
last_error = None
for attempt in range(max_retries):
try:
return await func()
except RetryableError as e:
last_error = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2 # 指数退避
else:
raise
except NonRetryableError as e:
raise
raise last_error
4.5 输出校验
功能:使用 JSON Schema 校验输出
实现(validator.py):
class OutputValidator:
def validate(
self,
output: str,
output_schema: Dict,
) -> Tuple[bool, Optional[str], Optional[Dict]]:
"""
校验输出是否符合 schema
返回: (is_valid, error_message, parsed_data)
"""
try:
# 尝试解析 JSON
parsed = json.loads(output)
except json.JSONDecodeError:
return False, "Output is not valid JSON", None
# 使用 jsonschema 校验
try:
jsonschema.validate(parsed, output_schema)
return True, None, parsed
except jsonschema.ValidationError as e:
return False, str(e), None
五、LLM Provider 设计
5.1 Provider 接口
基类(providers/base.py):
class LLMProvider(ABC):
@abstractmethod
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = None,
temperature: float = 0.7,
max_tokens: int = None,
) -> Dict[str, Any]:
"""
聊天完成
返回:
{
"content": "...",
"usage": {
"prompt_tokens": 100,
"completion_tokens": 200,
"total_tokens": 300
}
}
"""
pass
@abstractmethod
async def stream_chat_completion(
self,
messages: List[Dict[str, str]],
model: str = None,
) -> AsyncIterator[Dict[str, Any]]:
"""
流式聊天完成
返回: AsyncIterator[{"chunk": "...", "done": False}]
"""
pass
5.2 OpenAI Provider
实现(providers/openai.py):
class OpenAIProvider(LLMProvider):
def __init__(self, config: Dict):
self.api_key = config["api_key"]
self.base_url = config.get("base_url", "https://api.openai.com/v1")
self.default_model = config.get("default_model", "gpt-4")
self.client = openai.AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = None,
temperature: float = 0.7,
max_tokens: int = None,
) -> Dict[str, Any]:
response = await self.client.chat.completions.create(
model=model or self.default_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
}
5.3 Dify Provider
实现(providers/dify.py):
class DifyProvider(LLMProvider):
def __init__(self, config: Dict):
self.api_key = config["api_key"]
self.base_url = config.get("base_url", "https://api.dify.ai/v1")
self.default_model = config.get("default_model", "gpt-4")
async def chat_completion(
self,
messages: List[Dict[str, str]],
model: str = None,
temperature: float = 0.7,
max_tokens: int = None,
) -> Dict[str, Any]:
# 调用 Dify API
# 实现类似 OpenAI Provider
pass
六、配置管理
6.1 配置文件
位置: backend/.env 或 backend/config/llm.yaml
配置示例:
llm:
default_provider: openai
providers:
openai:
api_key: ${OPENAI_API_KEY}
base_url: https://api.openai.com/v1
default_model: gpt-4
timeout: 30
dify:
api_key: ${DIFY_API_KEY}
base_url: https://api.dify.ai/v1
default_model: gpt-4
timeout: 30
local:
base_url: http://localhost:8000/v1
default_model: llama-2
timeout: 60
rate_limit:
per_tenant:
daily_tokens: 1000000
daily_calls: 10000
per_app:
daily_tokens: 500000
daily_calls: 5000
per_user:
daily_tokens: 100000
daily_calls: 1000
cache:
enabled: true
ttl: 3600
max_size: 10000
retry:
max_retries: 3
initial_delay: 1.0
max_delay: 60.0
七、审计日志
7.1 审计记录
位置: tool_calls 表
记录内容:
- prompt_template_id
- prompt_version
- model_provider
- model_name
- model_version
- context_refs
- input_tokens
- output_tokens
- total_tokens
- output_hash
- validation_passed
- execution_time_ms
7.2 审计查询
API: GET /api/v1/audit/tool_calls
支持过滤:
- 按 prompt_template_id
- 按 model_provider
- 按 validation_passed
- 按时间范围
- 按租户/App/用户
文档版本: v1.0
最后更新: 2025-12-17