基于TypeScript构建AI代理网关:统一LLM调用、智能缓存与监控
1. 项目概述为什么我们需要一个AI代理网关在当前的AI应用开发浪潮中直接调用大型语言模型LLM的API比如OpenAI的ChatGPT、Anthropic的Claude或者开源的Llama模型已经成为一种标准操作。然而随着项目规模的扩大和团队协作的深入这种“直连”模式很快就会暴露出诸多痛点。想象一下你的团队有十个开发者在同时调试每个人都用自己的API密钥调用成本难以追踪或者某个高频调用的提示词Prompt被反复发送产生了大量重复的计费又或者你需要临时切换模型的供应商却发现代码里硬编码了某个服务商的SDK调用改动起来牵一发而动全身。这正是“构建你自己的AI代理”这个项目要解决的核心问题。它不是一个简单的API封装而是一个位于你的应用程序与众多LLM服务商之间的智能中间层。你可以把它理解为你私有AI服务的“交通枢纽”或“总控中心”。通过这个用TypeScript构建的代理网关你可以实现请求的路由转发比如根据策略将请求分发给不同模型、响应内容的智能缓存对相同或相似的请求直接返回历史结果以节省成本和延迟以及全面的请求监控与审计记录每一次调用的详情用于分析和优化。对于前端工程师、全栈开发者或AI应用架构师而言亲手搭建这样一个工具不仅能让你对LLM的调用有前所未有的掌控力更是深入理解现代API网关设计、Node.js高性能服务、以及TypeScript工程化实践的绝佳机会。它适合那些不满足于仅仅调用API而是希望构建可维护、可观测、高性价比的AI能力中台的开发者。2. 核心架构设计与技术选型2.1 整体架构思路拆解一个健壮的AI代理网关其核心职责可以抽象为三个层面接入层、处理层和输出层。接入层负责接收来自客户端可能是你的Web应用、移动端或后端服务的标准化请求。这里的关键是定义一套与具体LLM供应商无关的、统一的请求协议。例如你的代理网关可以暴露一个POST /v1/chat/completions的端点其请求体格式可能融合了OpenAI、Anthropic等多家厂商的共性字段并附加一些你自己的控制参数如providercache_ttl等。处理层是代理的“大脑”也是我们项目的核心。它包含几个关键模块路由引擎根据请求中的配置如指定的模型名称、成本策略、负载情况决定将请求转发给哪个后端的LLM API。例如你可以设置规则“所有gpt-4的请求走Azure OpenAI服务所有claude-3的请求走Anthropic如果Azure服务不可用则自动降级到OpenAI官方API。”缓存引擎在将请求转发出去之前先检查缓存中是否有相同或语义相似的请求结果。这里的挑战在于如何设计缓存键Cache Key。简单的做法是对整个请求体做哈希如MD5或SHA-256但这无法处理语义相同但表述微调的Prompt。更高级的做法可以引入轻量级的文本嵌入模型来计算Prompt的向量相似度。监控与装饰器在请求的生命周期中需要埋点记录开始时间、结束时间、使用的Token数量、模型名称、响应状态、成本估算等。这些数据可以实时输出到控制台也可以发送到像Prometheus、Datadog这样的监控系统或者存入数据库供日后分析。输出层负责将处理后的LLM响应重新封装成统一的格式返回给客户端并确保响应流Streaming Response的正确透传以支持类似ChatGPT那样的打字机效果。2.2 技术栈选型与理由选择TypeScript作为实现语言是项目的基石。TypeScript的强类型系统在构建一个需要与多种外部API各有不同的SDK类型定义打交道的复杂网关时提供了无与伦比的可靠性和开发体验。它能极大减少因字段名拼写错误、类型不匹配导致的运行时错误。对于Web框架Express.js或Fastify都是成熟的选择。Express生态庞大、中间件机制灵活适合快速原型。Fastify则在性能上更胜一筹对JSON序列化等操作有深度优化更适合高并发场景。本项目选择Fastify因为它对TypeScript的支持一流且其插件生态系统能很好地模块化我们的路由、缓存等功能。缓存存储方面Redis是不二之选。它支持丰富的数据结构、设置过期时间TTL并且性能极高非常适合作为临时存储响应结果的缓存数据库。对于简单的部署或开发环境也可以先用内存缓存如node-cache起步。对于监控和数据记录我们不需要一开始就引入重型武器。可以使用Winston或Pino作为日志库结构化地输出JSON日志。这些日志可以被Fluentd、Logstash等工具收集并导入到Elasticsearch或时序数据库中。为了更直观我们可以同时实现一个简单的内存统计器在网关内聚合最近一段时间的请求指标。注意在涉及多模型路由时务必妥善管理各家的API密钥。绝对不要将密钥硬编码在代码或提交到版本库中。必须使用环境变量或专业的密钥管理服务如AWS Secrets Manager, HashiCorp Vault来注入这些敏感信息。3. 核心模块实现详解3.1 统一请求/响应协议设计这是实现供应商无关性的关键。我们需要定义一套内部通用的ChatMessage和ChatCompletionRequest接口。// 统一的消息接口 interface UnifiedMessage { role: system | user | assistant; content: string; // 可扩展其他通用字段如 name } // 统一的请求体 interface UnifiedChatRequest { messages: UnifiedMessage[]; model: string; // 这里可以是逻辑模型名如 “gpt-4-turbo” 路由层会映射到具体供应商 stream?: boolean; temperature?: number; max_tokens?: number; // 代理网关的自定义控制参数 provider?: openai | anthropic | azure; // 强制指定供应商 cache_enabled?: boolean; cache_ttl?: number; // 缓存生存时间秒 user?: string; // 用于监控和审计 }相应地我们也需要定义统一的响应接口它需要能兼容流式和非流式响应。// 非流式统一响应 interface UnifiedChatResponse { id: string; choices: { message: UnifiedMessage; finish_reason: string; index: number; }[]; usage: { prompt_tokens: number; completion_tokens: number; total_tokens: number; }; model: string; provider: string; // 实际调用的供应商 cached?: boolean; // 标记是否来自缓存 } // 流式响应则是一个可读流其中每个chunk也需要遵循一定的格式在网关的入口处我们将客户端的请求转换为UnifiedChatRequest。在处理层结束后再将内部统一的响应结构转换回客户端期望的格式可以是完全自定义的也可以模仿OpenAI的格式以保持兼容。3.2 智能路由引擎的实现路由引擎的核心是一个路由解析器Router和一个供应商适配器ProviderAdapter的注册表。class RoutingEngine { private adapters: Mapstring, ProviderAdapter new Map(); registerProvider(name: string, adapter: ProviderAdapter) { this.adapters.set(name, adapter); } async route(request: UnifiedChatRequest): Promise{ adapter: ProviderAdapter; modelMapping: string } { // 1. 优先级如果请求中明确指定了 provider则直接使用 if (request.provider this.adapters.has(request.provider)) { const adapter this.adapters.get(request.provider)!; // 可能需要一个模型映射表将通用模型名映射到该供应商的具体模型名 const concreteModel this.getConcreteModelForProvider(request.model, request.provider); return { adapter, modelMapping: concreteModel }; } // 2. 基于规则的动态路由 // 例如根据模型名前缀路由 if (request.model.startsWith(gpt-)) { // 检查成本如果 gpt-4 成本超预算则降级到 gpt-3.5-turbo // 或者根据负载均衡选择不同的 OpenAI 端点 return this.routeToOpenAI(request); } else if (request.model.startsWith(claude-)) { return this.routeToAnthropic(request); } else if (request.model.includes(llama)) { return this.routeToSelfHostedLlama(request); } // 3. 默认路由 throw new Error(Unable to route request for model: ${request.model}); } private routeToOpenAI(req: UnifiedChatRequest): { adapter: ProviderAdapter; modelMapping: string } { // 这里可以实现复杂的逻辑检查各端点健康状态、当前成本、限流情况 const adapter this.adapters.get(openai)!; // 简单的模型名映射例如内部‘gpt-4’映射为官方‘gpt-4-turbo-preview’ const mapping MODEL_MAPPING_OPENAI[req.model] || req.model; return { adapter, modelMapping: mapping }; } // ... 其他路由方法 }每个ProviderAdapter负责封装与特定LLM API的通信细节处理认证、错误重试、特定参数转换等。这样新增一个供应商支持就只需要实现一个新的Adapter并注册到路由引擎即可。3.3 缓存策略与实现缓存是降低成本和延迟的利器。我们实现一个CacheManager类。import Redis from ioredis; class CacheManager { private client: Redis; constructor(redisUrl: string) { this.client new Redis(redisUrl); } // 生成缓存键对关键参数进行哈希 private generateCacheKey(request: UnifiedChatRequest): string { const keyObject { model: request.model, messages: request.messages, // 注意需要稳定序列化 temperature: request.temperature, max_tokens: request.max_tokens, // 可能不需要把所有参数都放进去根据业务决定 }; const str JSON.stringify(keyObject); return ai_proxy:cache:${sha256(str)}; } async get(request: UnifiedChatRequest): PromiseUnifiedChatResponse | null { if (!request.cache_enabled) return null; const key this.generateCacheKey(request); const cached await this.client.get(key); return cached ? JSON.parse(cached) : null; } async set(request: UnifiedChatRequest, response: UnifiedChatResponse, ttl?: number): Promisevoid { const key this.generateCacheKey(request); const effectiveTtl ttl || request.cache_ttl || 300; // 默认5分钟 await this.client.setex(key, effectiveTtl, JSON.stringify(response)); } }高级缓存技巧语义缓存如上文所述简单的哈希缓存对Prompt的微小变动非常敏感。你可以集成一个轻量级的句子编码器例如通过xenova/transformers在Node.js中运行MiniLM模型计算消息列表的嵌入向量。缓存时不仅存储请求-响应对还存储该请求的向量。查询时计算新请求的向量并在向量数据库如Redis的RediSearch模块中进行相似度搜索如果找到相似度高于阈值如0.95的旧请求则返回其缓存响应。这能显著提高缓存命中率。缓存分区可以为不同的用户、项目或模型设置不同的缓存命名空间namespace:key便于管理和清理。缓存预热对于已知的高频、固定提示词如某些系统指令可以在服务启动时主动调用并缓存结果。3.4 监控、日志与指标收集监控模块应该像切面一样无侵入地集成到请求处理流程中。我们可以利用Fastify的钩子Hooks来实现。// 监控装饰器 function createMonitoringPlugin() { return { name: monitoring-plugin, async onRequest(request: FastifyRequest, reply: FastifyReply) { request.startTime Date.now(); request.requestId generateRequestId(); // 记录结构化日志 logger.info({ type: request_started, requestId: request.requestId, path: request.url, model: request.body?.model, user: request.body?.user }); }, async onResponse(request: FastifyRequest, reply: FastifyReply) { const duration Date.now() - request.startTime; const responseBody reply.getResponseBody(); // 注意获取流式响应体可能需要额外处理 logger.info({ type: request_completed, requestId: request.requestId, durationMs: duration, statusCode: reply.statusCode, tokenUsage: responseBody?.usage, // 从响应中提取 cached: responseBody?.cached }); // 更新内存中的实时指标 metricsCollector.recordRequest({ duration, model: request.body?.model, provider: responseBody?.provider, cached: responseBody?.cached, tokenCount: responseBody?.usage?.total_tokens }); } }; }此外可以暴露一个GET /metrics端点返回符合Prometheus格式的指标数据方便被监控系统拉取。指标可以包括各模型请求总数、平均延迟、错误率、缓存命中率、Token消耗总量等。4. 完整集成与部署实战4.1 项目初始化与结构搭建首先创建一个新的TypeScript项目并安装核心依赖。mkdir ai-proxy-gateway cd ai-proxy-gateway npm init -y npm install typescript ts-node types/node --save-dev npm install fastify fastify-plugin fastify/redis ioredis npm install openai anthropic-ai/sdk # 或其他LLM SDK npm install winston # 日志 npx tsc --init调整tsconfig.json设置target: ES2020,module: commonjs(或NodeNext),outDir: ./dist并确保strict: true。建议的目录结构如下src/ ├── index.ts # 应用入口 ├── server.ts # Fastify服务器配置 ├── types/ │ └── unified.ts # 统一类型定义 ├── core/ │ ├── routing/ │ │ ├── engine.ts # 路由引擎 │ │ └── adapters/ # 各供应商适配器 │ ├── cache/ │ │ └── manager.ts # 缓存管理器 │ └── monitoring/ │ ├── logger.ts # 日志配置 │ └── metrics.ts # 指标收集器 ├── plugins/ # Fastify插件 │ └── monitoring.ts └── routes/ └── v1/ └── chat.ts # 主要的聊天补全路由4.2 主服务与路由集成在server.ts中我们初始化Fastify注册插件并连接Redis。import Fastify from fastify; import fastifyRedis from fastify/redis; import { monitoringPlugin } from ./plugins/monitoring; import { registerRoutes } from ./routes/v1/chat; export async function buildServer() { const server Fastify({ logger: true // 使用Fastify内置日志或替换为Winston }); // 注册Redis插件 await server.register(fastifyRedis, { host: process.env.REDIS_HOST || 127.0.0.1, port: parseInt(process.env.REDIS_PORT || 6379), // password: process.env.REDIS_PASSWORD }); // 注册自定义监控插件 await server.register(monitoringPlugin); // 注册业务路由 await registerRoutes(server); return server; }在routes/v1/chat.ts中实现核心的聊天端点。import { FastifyInstance, FastifyRequest, FastifyReply } from fastify; import { UnifiedChatRequest } from ../../types/unified; import { RoutingEngine } from ../../core/routing/engine; import { CacheManager } from ../../core/cache/manager; export async function registerRoutes(server: FastifyInstance) { const routingEngine new RoutingEngine(); const cacheManager new CacheManager(server.redis); // 初始化并注册供应商适配器 const openAIAdapter new OpenAIAdapter(process.env.OPENAI_API_KEY!); routingEngine.registerProvider(openai, openAIAdapter); // ... 注册其他适配器 server.post(/v1/chat/completions, async (request: FastifyRequest, reply: FastifyReply) { const unifiedReq request.body as UnifiedChatRequest; // 1. 检查缓存 if (unifiedReq.cache_enabled ! false) { // 默认开启缓存 const cachedResponse await cacheManager.get(unifiedReq); if (cachedResponse) { reply.header(X-Cache, HIT); // 添加自定义头方便客户端识别 return { ...cachedResponse, cached: true }; } } // 2. 路由决策 const { adapter, modelMapping } await routingEngine.route(unifiedReq); // 3. 调用实际供应商API let response; try { // 将统一请求转换为供应商特定请求并调用 const providerReq adapter.formatRequest(unifiedReq, modelMapping); response await adapter.createChatCompletion(providerReq); } catch (error) { // 这里可以加入重试逻辑、降级策略等 server.log.error(Provider call failed: ${error}); throw error; // 或返回一个友好的错误响应 } // 4. 将供应商响应转换为统一响应 const unifiedResp adapter.formatResponse(response, modelMapping); // 5. 写入缓存如果是非流式响应且成功 if (!unifiedReq.stream unifiedReq.cache_enabled ! false) { await cacheManager.set(unifiedReq, unifiedResp, unifiedReq.cache_ttl).catch(err { server.log.warn(Failed to set cache: ${err}); // 缓存失败不应阻塞主响应 }); } reply.header(X-Cache, MISS); return unifiedResp; }); // 流式响应的处理逻辑类似但需要处理Node.js Stream // server.post(/v1/chat/completions/stream, ...) }4.3 环境配置与部署创建一个.env.example文件列出所有需要的环境变量。NODE_ENVproduction PORT3000 REDIS_HOSTlocalhost REDIS_PORT6379 OPENAI_API_KEYyour_openai_key_here ANTHROPIC_API_KEYyour_anthropic_key_here AZURE_OPENAI_ENDPOINTyour_azure_endpoint AZURE_OPENAI_API_KEYyour_azure_key # ... 其他配置使用dotenv或fastify-env插件在启动时加载配置。对于部署你可以将编译后的JavaScript代码在dist目录部署到任何Node.js环境如传统服务器使用pm2或systemd来守护进程。容器化创建Dockerfile基于node:18-alpine镜像复制代码安装生产依赖npm ci --onlyproduction然后运行。这便于在Kubernetes或云服务商的容器平台上进行弹性伸缩。Serverless可以将网关逻辑包装成云函数如AWS Lambda Google Cloud Functions但需要注意冷启动、运行时长限制以及对HTTP流式响应的支持情况。5. 高级特性与优化方向5.1 实现请求限流与配额管理为了防止滥用或控制成本必须引入限流。可以为不同的API密钥、用户ID或IP地址设置速率限制。import rateLimit from fastify/rate-limit; await server.register(rateLimit, { global: false, // 我们将在特定路由上启用 keyGenerator: (request) { // 根据请求头中的API Key或用户ID生成限流键 return request.headers[x-api-key] || request.ip; }, // 其他配置max, timeWindow, redis等 }); // 然后在聊天路由上启用 server.post(/v1/chat/completions, { config: { rateLimit: { max: 100, timeWindow: 1 minute } } }, handler);更精细的配额管理可以基于Token数量或成本。你需要维护一个数据库如PostgreSQL来记录每个用户/项目的每日/每月Token消耗在路由处理前进行校验。5.2 支持流式响应Server-Sent Events流式响应对于提升用户体验至关重要。处理流式响应的关键在于管道化Piping将上游LLM API返回的流通常是ReadableStream直接通过代理网关转发给客户端并在流转过程中插入监控和日志。// 伪代码展示流式处理思路 server.post(/v1/chat/completions/stream, async (request, reply) { const unifiedReq request.body as UnifiedChatRequest; unifiedReq.stream true; // 强制开启流式 // ... 路由决策等前置逻辑 const providerStream await adapter.createChatCompletionStream(providerReq); reply.raw.writeHead(200, { Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive }); // 将供应商的流管道到客户端响应流 providerStream.on(data, (chunk) { // 可选在这里解析chunk记录部分Token使用情况 // 将chunk转换为统一的SSE格式 const formattedChunk data: ${JSON.stringify(chunk)}\n\n; reply.raw.write(formattedChunk); }); providerStream.on(end, () { reply.raw.write(data: [DONE]\n\n); reply.raw.end(); }); providerStream.on(error, (error) { // 处理错误发送错误事件给客户端 reply.raw.write(event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n); reply.raw.end(); }); });实操心得处理流式响应时要特别注意错误处理和连接保持。客户端的网络可能不稳定要做好请求超时和连接中断的清理工作避免服务器端资源泄漏。5.3 熔断、降级与重试机制为了提高网关的韧性需要实现针对下游LLM API的熔断器Circuit Breaker。当某个供应商的API在短时间内失败次数超过阈值时熔断器“跳闸”短时间内后续请求直接失败而不再尝试调用给下游服务恢复的时间。可以使用opossum或brakes这样的库。降级策略可以配置在路由引擎中。例如当gpt-4的请求因成本或速率限制失败时自动降级到gpt-3.5-turbo或者当主要供应商不可用时切换到备用供应商。重试逻辑对于处理网络抖动或供应商的瞬时过载很有用。但要注意对于非幂等的操作虽然聊天补全通常是幂等的和已经消耗了Token的请求重试需要谨慎。最好只在特定的、可重试的错误状态码如429 Too Many Requests, 5xx服务器错误上进行并设置指数退避Exponential Backoff策略。6. 常见问题、调试与性能调优6.1 开发与调试技巧使用结构化日志确保你的日志输出是JSON格式并包含requestId、model、durationMs等关键字段。这样可以使用像jq这样的工具在终端快速过滤或者直接导入到日志分析平台进行聚合查询。实现一个调试端点创建一个GET /debug/requests端点返回最近N个请求的详细日志从内存或临时存储中。这在排查特定问题时非常有用。模拟与测试为你的供应商适配器编写单元测试并使用像nock这样的库来模拟HTTP响应。对于缓存和路由逻辑也要编写集成测试。6.2 典型问题排查清单问题现象可能原因排查步骤请求返回401 UnauthorizedAPI密钥错误、过期或未正确注入。1. 检查环境变量名是否正确加载。2. 检查密钥字符串是否完整前后有无空格。3. 确认该密钥在对应供应商控制台是否启用、是否有权限调用目标模型。请求超时或响应极慢下游LLM API服务延迟高、网络问题、代理网关自身阻塞。1. 查看网关监控中的请求延迟分布。2. 检查网关服务器的CPU、内存和网络I/O。3. 尝试直接调用下游API排除供应商侧问题。4. 检查是否有慢查询阻塞了Redis。缓存命中率始终为0缓存键生成逻辑有误、缓存未启用、Redis连接失败。1. 在代码中打印出生成的缓存键对比不同但期望命中缓存的请求键是否一致。2. 检查请求中cache_enabled参数是否为false。3. 使用redis-cli检查键是否存在测试Redis连通性。流式响应中断或格式错误流式响应处理逻辑有bug未正确转发SSE格式或未处理背压。1. 使用curl或Postman直接测试代理的流式端点观察原始数据流。2. 对比直接调用供应商API的流式响应检查代理转发的数据块格式是否正确。3. 检查在流式传输过程中是否有未捕获的异常导致连接中断。路由到错误的供应商路由规则配置错误或模型名映射表不完整。1. 在路由决策点打印日志查看request.model和最终决定的provider。2. 检查路由规则配置文件的优先级和匹配逻辑。6.3 性能与扩展性考量无状态设计确保代理网关本身是无状态的。所有的状态如缓存、限流计数器都应存储在外部服务Redis、数据库中。这样你可以轻松地水平扩展网关实例通过负载均衡器分发流量。连接池与复用对下游LLM API的HTTP客户端如axios实例要配置连接池复用TCP连接避免为每个请求都建立新的连接这能大幅提升高并发下的性能。监控告警为关键指标设置告警。例如当错误率超过5%、平均延迟超过10秒、或缓存命中率低于某个阈值时通过邮件、Slack或钉钉通知负责人。成本仪表盘将监控数据中的Token使用情况结合各模型的官方定价如$0.01 / 1K tokens计算出实时成本和预算消耗情况并展示在一个简单的仪表盘上。这是控制AI项目支出的重要工具。构建自己的AI代理网关从零到一的过程充满挑战但带来的控制力、可观测性和成本优化能力是直接调用API无法比拟的。它不仅是技术基础设施的一部分更是你理解和驾驭AI能力的关键一步。随着功能的不断迭代你可以逐步加入A/B测试、提示词版本管理、自动化评估等更高级的特性最终演变成一个功能完备的内部AI平台。