批量调用 Embedding API 怕限流怕断点重算我写了一个 PyPI 包帮你自动控制速率、避免重复计算。前言作为一名 RAG检索增强生成开发者我每天都在跟“向量化”打交道把几千篇文档切块再调用 Embedding 模型转成向量。这个过程看似简单却藏着三大痛点怕限流调用 OpenAI、阿里云等云端 API 时速率稍快就报 429代码中断还得写一堆重试逻辑。怕中断一次性处理几万块网络一闪断或电脑休眠所有向量全白算第二天从头再来。怕拖垮batch_size 设大了爆显存设小了太慢每次调参全凭感觉。于是我利用 LangChain 的底层能力造了一个专门解决这些问题的轮子——ratelimited-embedder。它就像一个带“定速巡航”和“断点记忆”的向量化司机你只管给文本它自动按你设定的速度跑跑累了自动减速中途熄火了还能接着上次的位置继续跑。今天这篇文章我会把它的设计思路、用法以及如何接入云端大模型阿里云 Qwen / OpenAI完整分享出来。 GitHubhttps://github.com/lowers/ratelimited-embedder PyPIpip install ratelimited-embedder一、为什么需要专用的向量化控制器LangChain 虽然提供了很好的 Embedding 抽象但它并没有内置“批量速率控制”、“自动降速”、“断点续传”这些能力。开发者仍然需要自己写 while 循环、手动管理缓存、处理重试。而ratelimited-embedder的目标就是让你写一行代码就能获得工业级的向量化可靠性。它的核心特性特性解决的问题速率控制设置batch_size和delay平滑调用自动降速单批耗时过长时自动减半 batch 并增加延迟断点续传基于 SQLite 的 MD5 缓存重启后不重复计算硬件建议根据内存和 CPU 自动推荐初始 batch_sizeLangChain 无侵入适配wrap_embeddings一行代码包装现有模型二、快速上手1. 安装pipinstallratelimited-embedder如果需要本地 Ollama 嵌入免费提前启动 Ollama 并拉取模型ollama pull nomic-embed-text# 或 qwen3-embedding:0.6b2. 直接使用RateControlledEmbedder假设你已经有一个嵌入函数例如封装了 OpenAI API 调用你可以直接把它交给RateControlledEmbedderfromratelimited_embedderimportRateControlledEmbedderdefmy_embed_func(texts:list[str])-list[list[float]]:# 实际调用 OpenAI / Cohere / 本地模型return[[0.1,0.2,0.3]for_intexts]embedderRateControlledEmbedder(embeddingsmy_embed_func,# 注意参数名是 embeddingsbatch_size16,delay0.5,slow_threshold2.0,cache_dir./my_cache,# 断点续传的目录)texts[三国演义 第一回,桃园三结义,温酒斩华雄]vectorsembedder.run(texts)# 返回 list[list[float]]print(f生成{len(vectors)}个向量)cache_dir是实现断点续传的关键第一次运行会把向量存入 SQLite 文件第二次运行相同文本时瞬间返回。3. 包装 LangChain 嵌入模型无侵入如果你已经在用 LangChain 的嵌入对象比如OllamaEmbeddings可以用wrap_embeddings包装完全不影响原有代码fromlangchain_ollamaimportOllamaEmbeddingsfromratelimited_embedderimportwrap_embeddings original_embOllamaEmbeddings(modelnomic-embed-text)wrapped_embwrap_embeddings(original_emb,batch_size8,delay0.5,cache_dir./cache,)# 之后使用 wrapped_emb 与原来的 emb 完全一样vectorswrapped_emb.embed_documents([hello,world])wrap_embeddings返回的对象仍实现了embed_documents和embed_query可直接替换 LangChain 链条中的嵌入组件。4. 获取硬件建议不知道batch_size设多少合适调用get_hardware_suggestion()即可fromratelimited_embedderimportget_hardware_suggestion suggestionget_hardware_suggestion()print(suggestion)# 输出示例{batch_size: 16, delay: 0.5, mem_gb: 15.9, mem_percent: 45.2, cpu_count: 16}三、核心设计自动降速与断点续传自动降速工具内部会监控每一批的处理耗时。如果某批耗时超过slow_threshold默认 2 秒意味着当前批次过大或后端压力大它会自动执行batch_size max(1, batch_size // 2)delay min(delay * 1.5, 30.0)这个策略是指数后退的变体能快速适应后端负载变化。降速事件会被记录在统计信息中。断点续传缓存缓存基于文本内容的MD5 哈希存储到 SQLite 数据库中。每个文本块独立缓存即使程序崩溃或电脑休眠下次启动也会自动跳过已计算的部分。缓存目录示例my_cache/ └─ vector_cache.db你可以随时删除该目录来清空缓存。不同项目只要指定不同的cache_dir就不会互相干扰。统计信息调用embedder.get_stats()会返回以下关键指标total_chunks总处理块数cache_hits/cache_misses缓存命中次数batches总批次数degrade_count降速发生次数avg_batch_time平均每批耗时这些数据对调优batch_size和delay非常有帮助。四、接入云端大模型阿里云 Qwen / OpenAIratelimited-embedder不仅支持本地模型也能完美集成云端大模型的 Embedding API。你只需提供符合 LangChain 接口的嵌入对象工具会自动为其加上速率控制、自动降速和断点续传——这对于调用付费 API 尤其重要能有效避免限流、节省成本。1. 安装云端模型对应的依赖云服务依赖安装命令阿里云 DashScopepip install langchain-community已包含并设置环境变量DASHSCOPE_API_KEYOpenAIpip install langchain-openai并设置环境变量OPENAI_API_KEY其他 OpenAI 兼容服务同上只需修改base_url2. 示例使用阿里云 Qwen Embedding 模型fromlangchain_community.embeddingsimportDashScopeEmbeddingsfromratelimited_embedderimportRateControlledEmbedder# 初始化云端嵌入模型会自动从环境变量读取 API Keyqwen_embDashScopeEmbeddings(modeltext-embedding-v3)controllerRateControlledEmbedder(embeddingsqwen_emb,# 关键只需替换这里的对象batch_size16,# 根据 API 限制调整delay1.0,# 控制请求间隔避免 429slow_threshold2.0,cache_dir./qwen_cache,)texts[你的文本块1,文本块2]vectorscontroller.run(texts)# 自动批量调用 Qwen APIprint(f生成{len(vectors)}个向量)3. 示例使用 OpenAI Embedding 模型fromlangchain_openaiimportOpenAIEmbeddingsfromratelimited_embedderimportRateControlledEmbedder openai_embOpenAIEmbeddings(modeltext-embedding-3-small)controllerRateControlledEmbedder(embeddingsopenai_emb,batch_size8,# OpenAI 建议较小 batchdelay0.5,cache_dir./openai_cache,)vectorscontroller.run(texts)# 第二次运行相同 texts 时直接从缓存读取不再调用 API4. 配合wrap_embeddings无侵入使用如果你已经在 LangChain 流程中使用了from_documents等方法可以用wrap_embeddings包装原有嵌入对象自动增加速率控制和缓存fromlangchain_community.embeddingsimportDashScopeEmbeddingsfromratelimited_embedderimportwrap_embeddingsfromlangchain_milvusimportMilvus raw_embDashScopeEmbeddings(modeltext-embedding-v3)wrapped_embwrap_embeddings(raw_emb,cache_dir./milvus_cache)# 直接替换原有代码无需改动vector_storeMilvus.from_documents(documents,wrapped_emb,connection_args{uri:http://localhost:19530})五、与 FAISS / Milvus 集成ratelimited-embedder只负责生成向量不绑定任何存储后端。你可以自由选择向量数据库。importfaissimportnumpyasnpfromratelimited_embedderimportRateControlledEmbedder embedderRateControlledEmbedder(embeddings,...)vectorsembedder.run(texts)# 构建 FAISS 索引dimlen(vectors[0])indexfaiss.IndexFlatL2(dim)index.add(np.array(vectors).astype(float32))如果需要生产级向量数据库 Milvusfromlangchain_milvusimportMilvus vector_storeMilvus.from_documents(documents,wrapped_emb,connection_args{uri:http://localhost:19530})这种松耦合设计让你可以自由选择存储层不被工具绑定。六、本地 vs 云端如何选择对比项本地模型 (Ollama)云端模型 (Qwen/OpenAI)成本免费电费按 token 付费新用户有免费额度速度受本地 GPU 限制快但依赖网络隐私数据不出本地数据上传至云端适用场景开发测试、隐私敏感、离线环境生产环境、高精度、不想维护硬件你的工具在这两种模式下表现同样出色使用本地模型时确保资源稳定使用云端模型时控制成本 避免限流。七、发布到 PyPI 后的真实体验自从我把这个工具发布到 PyPI版本 0.1.1我在自己的几个 RAG 项目里全面替换了手写的 while 循环。最直观的感受代码量锐减原来 50 行的批量处理逻辑变成了 5 行。不再焦虑限流设置了batch_size8, delay0.5后OpenAI 的 429 再也没有出现过。半夜跑数不再害怕断点续传让我可以放心关掉电脑第二天接着跑。社区有朋友用它处理百万级文档配合 Milvus 取得了很好的效果。八、未来计划异步版本增加async支持适配高性能场景。更多统计指标可选 token 计数器针对 OpenAI 等模型。支持更多缓存后端Redis用于分布式场景。集成到 LangChain 官方生态争取成为 LangChain 社区推荐的速率控制组件。欢迎到 GitHub 仓库提 issue 或 PR。九、总结ratelimited-embedder是我为解决实际问题而写的一个小工具它专注做一件事让向量化过程稳定、可控、可恢复。它把那些重复、繁琐、容易出错的“基础设施”代码封装起来让业务代码更干净。如果你也经常被 Embedding API 的速率限制和大规模重复计算困扰不妨试试这个工具。一键安装pipinstallratelimited-embedder然后像这样使用fromratelimited_embedderimportRateControlledEmbedder embedderRateControlledEmbedder(embeddings,cache_dir./cache)vectorsembedder.run(texts)希望它能给你带来和给我一样的轻松感。更多用法和 API 细节请参考 GitHub 仓库的 README。你的 RAG 项目值得一个“永不抛锚”的向量化引擎。如果这篇文章对你有帮助欢迎点赞、收藏、转发也欢迎在评论区交流你的使用心得。关注我后续还会分享更多 AI 工程化实战经验。