4. 聊天模型--流式传输流式处理对于使基于 LLM 的应用程序能够响应最终用户至关重要。其通过逐步显示输出甚至在完整的响应准备就绪之前流式传输可以显着改善用户体验。之前直接使用invoke 的调用方式属于非流式传输看到的现象是聊天模型直接返回全量内容若模型思考时间较长则我们等待的时间就越长。我们使用的deepseek客户端就是属于流式输出。可以看到输出时间快了很多对于 LangChain 的聊天模型来说它同样支持流式返回。4.1 stream() 同步传输LangChain 聊天模型中的.stream() 方法来同步生成流式响应的效果.stream() 方法返回一个迭代器该迭代器在生成输出时同步产生输出 消息块 。可以 使用 for 循环实时处理每个块。from langchain_deepseek import ChatDeepSeek modelChatDeepSeek(modeldeepseek-v4-flash) #流式输出返回一个迭代器产生的消息块 chunks[] for chunk in model.stream(写一段关于龟兔赛跑的故事20字左右): chunks.append(chunk) #chunks:AIMessage print(chunk.content,end|,flushTrue) #我们得到了一个叫做 AIMessageChunk 的东西它代表 AIMessage 的一部分也就是消息块。 #消息块还可以直接相加 print(\n) temp_chunkschunks[0]chunks[1]chunks[2]chunks[3]chunks[4]chunks[5] print(temp_chunks)4.2 astream() 异步传输想象一个场景你需要煮一壶水同时还要给朋友发一条短信。我们分别用同步传统和异步两种 方式来完成以此对比并引入 协程 和 事件循环 的概念• 同步阻塞方式这就像是一个“死心眼”的人做事必须一件一件来import time #同步io def boil_water(): print(开始烧水...) time.sleep(5)#模拟烧水5scpu完全空闲 print(烧水完成...) def send_message(): print(开始发消息...) time.sleep(2) print(发消息完成...) def main(): boil_water() send_message() #耗时七秒 main()问题 在 boil_water 函数等待的5秒里CPU 完全空闲但却不能去做 send_message 任务 效率低下。• 异步方式我们请出 asyncio 、 协程 和 事件循环 。什么是协程• 多进程通常利用的是多核 CPU 的优势同时执行多个计算任务。每个进程有自己独立的内存管 理所以不同进程之间要进行数据通信比较麻烦。• 多线程是在一个 cpu 上创建多个子任务当某一个子任务休息的时候其他任务接着执行。多线程 的控制是由 python 自己控制的。线程存在数据同步问题所以要有锁机制。• 协程的实现是在一个线程内实现的相当于流水线作业。由于线程切换的消耗比较大所以对于 并发编程可以优先使用协程。协程作为一种轻量级的并发编程模型可以被视为用户态的“轻量级线程”。其核心优势在于其调度完全由用户空间掌控避免了操作系统内核的频繁介入从而显著降低了上 下文切换的开销。在诸如网络数据刷新、资源加载、用户界面更新、以及 I/O 读写等场景下如果并 发任务的计算量相对较小、对系统资源占用较低则不必动用操作系统级别的线程。协程的切换则由程序员和编程语言控制程序员决定在何时暂停或恢复协程。协程是一个特殊的函 数它可以在执行过程中暂停并在稍后恢复执行。它用async def定义并在需要暂停的地方使 用await。#异步IO import asyncio #协程 async def boil_water(): print(开始烧水...) await asyncio.sleep(5) #await表示等待这个操作完成但期间可以做别的事 print(烧水完成...) #协程 async def send_message(): print(开始发消息...) await asyncio.sleep(2) print(发消息完毕...) #协程调度 #事件循环 async def main(): #创建两个任务并交给事件循环去调度 #1、烧水任务1 task1asyncio.create_task(boil_water()) #2、发消息。任务2 task2asyncio.create_task(send_message()) #等待两个任务完成 await task1 await task2 #run会创建一个事件循环 #5s asyncio.run(main())事件循环事件循环是asynciopython标准库中的模块用于编写异步I/O操作的代码的核心可以把它当作一个总调度员工作流程1、他维护一个任务列表2、不断循环检查每个任务若任务处于“等待I/O状态”比如等水烧开..就暂停他立即去执行下一个已经就绪的任务若任务等待时间到了或者I.O操作完成了事件循环就恢复执行该任务# 主程序也是一个协程 async def main(): # 创建两个任务并交给事件循环去调度 task1 asyncio.create_task(boil_water_async()) task2 asyncio.create_task(send_message_async()) # 等待两个任务都完成 await task1 await task2 # 它负责创建事件循环并将第一个协程主程序放入其中运行。 asyncio.run(main())通过asyncio可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程 的核心机制就是事件循环。它不停地检查哪些协程可以执行哪些在等待。总结• 协程是 asyncio 的核心概念之一。他是一个特殊函数可以在执行过程中暂停稍后可以恢复协程通过async def关键字定义。通过await关键字暂停执行等待异步操作完成使用asyncio.run()函数运行一个协程。他会创建一个事件循环并运行指定的协程。事件循环是 asyncio 的核心组件负责调度和执行协程。它不断地检查是否有任务需要执 行并在任务完成后调用相应的回调函数。使用.astream异步生成流式响应使用 .astream() 方法来异步生成流式响应的效果这专为非阻塞工作流程而设计。可以在 异步代码中使用它来实现相同的实时流式处理行为。import asyncio from langchain_deepseek import ChatDeepSeek modelChatDeepSeek(modeldeepseek-v4-flash) #异步流式输出 async def async_stream(): print(异步调用) async for chunk in model.astream(写一段关于春天的故事。100字): print(chunk.content,end|,flushTrue) asyncio.run(async_stream())4.3 使用 StrOutputParser 解析模型的输出StrOutputParser是 LangChain 中最基础的输出解析器。它的核心作用是将聊天模型返回的复杂AIMessage对象解析并提取为最纯净的字符串文本。LLM 返回的原始结果是一个包含大量元数据的对象如令牌用量、模型名称、响应ID等。在大多数应用场景下我们只关心回复的文本内容。StrOutputParser就是专门负责完成这个“过滤与提取”工作的组件。from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1聊天模型 modelChatDeepSeek(modeldeepseek-v4-flash) #2、初始化解析器 parserStrOutputParser() chainmodel | parser resultchain.invoke(简单介绍deepseek-v4) print(result)流式输出的兼容由于构建出的链本身也是标准的Runnable对象它完美地继承了流式处理能力。在流式场景下解析器会自动将每一个流式消息块转换为对应的字符串片段。from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1聊天模型 modelChatDeepSeek(modeldeepseek-v4-flash) #2、初始化解析器 parserStrOutputParser() chainmodel | parser for chunk in chain.stream(简单介绍Claude): print(chunk,end)注意流式传输不是聊天模型的专属特性而是所有遵循Runnable接口标准的组件所共有的“标准能力”。可以借助“USB接口标准”这个类比来理解聊天模型、输出解析器、处理链等都像是按照“Runnable”USB标准设计的不同设备U盘、鼠标、键盘。流式传输.stream()就像“即插即用”一样是这个标准本身定义好的一项通用功能。因此只要是Runnable的实例原则上都可以调用.stream()方法这是一种基于接口规范的、而不是具体类别的能力。实践中需要注意的三点“流出”的数据类型各不相同虽然都能流式输出但不同组件流出的“数据块”类型完全不同。例如聊天模型直接.stream()产出的是AIMessageChunk对象。“模型 解析器”构成的链对流式输出的每个AIMessageChunk做了处理再.stream()产出的就是纯净的str字符串。并非所有组件都支持流式即便实现了Runnable接口部分组件因其工作性质不提供流式处理或流式无意义。例如检索器 (Retrievers)是一次性返回全量检索结果的过程无法拆分因此对它调用.stream()也只会获得一次性返回的结果没有逐令牌生成的过程。链式调用是常态在实际开发中几乎不会单独使用模型的原生流绝大多数情况下都是通过构建链Chain的方式利用解析器将流式消息处理为最终想要的格式如字符串再进行流式输出。4.4 自定义流式输出解析器一个思维模式的转变从这一节开始我们要明白自定义流式解析器的任务就是对流经的数据进行任意的加工和重组。它不再局限于提取内容而是可以改变数据的形态。StrOutputParser它能帮我们把大模型返回的复杂消息块变成纯净的字符串并且在流式输出时也是一个字一个字地往外蹦。但这带来了一个新问题如果我们不想看一个字一个字地蹦而是想等模型“说”完一句话再把这句完整的话显示出来该怎么办这就引出了这一节的核心自定义流式输出解析器。它允许我们通过写一个生成器函数来任意地改变流式输出的“节奏”和“样式”。在链中使用 生成器函数即可完成自定义流式输出的能力。聊天模型的 .stream() 方法返回的是一个迭代器该迭代器在生成输出时同步产 生输出 消息块 。那么我们的将实现的这些生成器的签名应该是Iterator[Input] - Iterator[Output]。它规定了我们写的生成器函数必须接收一个“流”迭代器作为输入并且也返回一个“流”迭代器作为输出。这样就能完美地嵌入到model | parser这个管道里。或者对于异步生成器AsyncIterator[Input] - AsyncIterator[Output] 。from typing import Iterator, List from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1聊天模型 modelChatDeepSeek(modeldeepseek-v4-flash) #组件2输出解析器str parserStrOutputParser() #自定义生成器 def split_into_list(input:Iterator[str])-Iterator[List[str]]: #创建一个空字符串变量buffer buffer for chunk in input: buffer chunk #遇到句号需要刷新buffer while。 in buffer: #找到句号的位置 stop_indexbuffer.index(。) #yield用于创造生成器 #buffer[:stop_index]是切片取开头到句号位置stop_index之前的所有字符 #.strip() 去掉字符串前后的空白字符空格、换行等。 yield [buffer[:stop_index].strip()]#外面套上 [ ... ] 表示把这个字符串放进一个列表里。 #yield [buffer[:stop_index].strip()] —— 这是生成器的关键。 # yield 有点像 return但不会结束函数而是“返回一个值然后暂停函数下次调用时继续”。 # 这里返回的是一个列表列表中只有一个元素buffer[:stop_index].strip()。 bufferbuffer[stop_index1:] #处理buffer最后几个字 yield [buffer.strip()] #定义链。 # 构建处理链模型 - 字符串解析器 - 我们自定义的句子分割器 chainmodel | parser | split_into_list #流式输出返回一个迭代器产生的消息块 for chunk in chain.stream(写一段关于爱情的歌词需要5句话,每句话中文句号隔开): #使用parser结果就是str print(chunk,flushTrue)4.5 深度探索流式传输我们需要从源码层面回答三个根本性问题LangChain 请求 OpenAI 时底层使用什么网络协议LangChain 如何实现并支持流式传输OpenAI 返回的原始数据块是何格式 LangChain 如何将其转换为标准的AIMessageChunk对象SSE 协议介绍流式传输并非 LangChain 自身创造其根基在于大模型服务商如 OpenAI提供的SSEServer-Sent Events服务器发送事件支持。可以将其理解为一种基于 HTTP 的“长连接热线”。相比传统“一问一答”即关闭的短连接SSE 建立连接后服务器可持续向客户端单向推送数据流直到传输结束。这为流式输出提供了底层物理通道。HTTP 协议本身设计为无状态的请求-响应模式严格来说是无法做到服务器主动推送消息到客户端但通过Server-Sent Events 服务器发送事件简称 SSE技术可实现流式传输允许服务器主动向浏览器推送数据流。SSEServer-Sent Events是一种基于 HTTP 的轻量级实时通信协议浏览器可以通过内置的EventSource API 接收并处理这些实时事件。核心特点• 基于 HTTP 协议复用标准 HTTP/HTTPS 协议无需额外端口或协议兼容性好且易于部署。• 单向通信机制SSE 仅支持服务器向客户端的单向数据推送客户端通过普通 HTTP 请求建立连接后服务器可持续发送数据流但客户端无法通过同一连接向服务器发送数据。• 自动重连机制支持断线重连连接中断时浏览器会自动尝试重新连接支持 retry 字段指定重连间隔。• 自定义消息类型客户端发起请求后服务器保持连接开放。响应头设置Content-Type: text/eventstream标识为事件流格式持续推送事件流。数据格式服务端向浏览器发送 SSE 数据需要设置必要的 HTTP 头信息Content-Type: text/event-stream;charsetutf-8 Connection: keep-aliveLangChain 的流式处理流程源码层面通过分析BaseChatOpenAI类中的_stream方法可梳理出以下核心生命周期1. 发起请求建立热线LangChain 本身并不“创造”或“规定”一个底层的网络传输协议而是依赖于其底层的大模型供应商如 OpenAI的协议。因此当我们发起请求时会在请求中设置 streamTrue _stream() 源码中的第一步表示OpenAI 服务器将在生成 Response 时向客户端发出数据server-sent eventsSSE。此时 API会保持 HTTP 连接打开并以特定格式发送数据。在构建 HTTP 请求时LangChain 将参数stream强制设置为True。此标志告知 OpenAI 服务器本次对话需采用 SSE 模式持续推送生成结果。# 源码逻辑 kwargs[stream] True payload self._get_request_payload(messages, stopstop, **kwargs)2. 接入与接收监听数据流LangChain 使用openai官方 SDK 构建的 HTTP 客户端具体为_SyncHttpxClientWrapper类发起调用并持续监听来自https://api.openai.com/v1的 SSE 事件流。3. 核心转换数据“本土化”这是最关键的一步。OpenAI 通过 SSE 推送的原始数据是一系列包含delta字段的 JSON 对象例如delta:{content:你}而非 LangChain 可直接使用的消息对象。这一步的转换工作由_convert_chunk_to_generation_chunk方法完成。它的职责是输入OpenAI 推送的原始 JSON 数据块chunk。处理提取choices[0].delta中的内容。输出一个标准的ChatGenerationChunk对象其核心属性message就是我们所熟悉的AIMessageChunk实例。至此每一块原始的流式数据便被封装成了包含content、response_metadata等完整信息的AIMessageChunk对象可供下游组件统一处理。数据转换结构对比阶段数据格式示例内容OpenAI 原始 SSE 数据块JSON 对象{choices:[{delta:{content:你}}]}LangChain 转换后AIMessageChunkcontent你, additional_kwargs{...}总结技术基石流式传输能力由模型服务商基于 SSE 协议提供LangChain 是这一能力的使用者和封装者。核心机制LangChain 通过在请求中开启streamTrue启用 SSE并在客户端内部实现了一套从“原始 JSON 事件流”到“AIMessageChunk对象序列”的完整转换逻辑。深入价值掌握此流程有助于开发者理解流式处理的全链路便于日后进行定制化开发或排查相关问题。至此我们从底层的网络协议到上层的对象封装完成了对流式传输的深度剖析。LangChain流式传输的完整流程与底层协议。总结一下1. langchain-openai 包通过集成 OpenAI Python SDK提供了一个 HTTP 客户端。2. 因此支持 LangChain 向 OpenAI 的 API 发起调用请求。3. 若希望发起流式传输请求则需在请求中加入 streamTrue 向 OpenAI 说明以 SSE 协议进行流式返回。4. LangChain 接收 OpenAI 的 SSE 格式的响应并将其转换为 LangChain 自封装的消息格式如AIMessageChunk 消息。这样就可以以统一的方式处理来自不同模型提供商OpenAI,Anthropic等的流式响应。5. 使用 LangSmith 跟踪 LLM 应用使用 LangSmith 时没有代码介入只需要配置下环境就可以直接监控我们的应用。使用 LangChain 构建的许多应用程序可能会包含多个步骤和多次的 LLM 调用。随着这些应用程序变得越来越复杂作为开发者我们能够检查链或代理内部到底发生了什么变得至关重要。最好的方法是使用 LangSmith。LangSmith 与框架无关它可以与 langchain 和 langgraph 一起使用也可以不使用。LangSmith 是一个用于帮助我们构建生产级 LLM 应用程序的平台它将密切监控和评估我们的应用。LangSmith 平台地址https://smith.langchain.com/ 新用户需要注册要想让 LangSmith 跟踪 LLM 应用第一步申请 LangSmith API Key点击 Settings就会跳转到API Keys设置页面若没有跳转可以在左侧 tab 栏中找到进入。创建完成后保存好你的 API Key。接下来配置两个环境变量LANGSMITH_TRACINGtrueLANGSMITH_API_KEY你的 LangSmith API Key配置完成后我们任意执行代码查看 LangSmith 平台这将在 LangSmith 的默认跟踪项目中生成调用的跟踪跟踪会以瀑布流形式展示调用的完整步骤以及每个步骤的详细信息和耗时。让我们能够检查内部到底发生了什么