一、模式核心思想流水线模式本质是任务分阶段串行处理把一个完整业务任务拆分成多道独立工序本例拆分为 TaskA、TaskB、TaskC 三个阶段。每个阶段由独立线程池负责消费处理上一阶段处理完成后自动把任务提交给下一阶段线程池形成生产者→阶段A→阶段B→阶段C→任务收尾的流水线链路。// 核心特点工序解耦、各阶段线程池独立管控、任务异步串行流转、支持高并发持续生产任务二、整体结构分层任务实体类 Task封装业务数据 各阶段业务处理逻辑阶段任务处理类 TaskA/TaskB/TaskC实现 Runnable绑定对应业务阶段处理完自动投递下一线程池主线程启动类 Main初始化多组独立线程池、任务队列、完成任务集合启动生产者线程持续生成任务启动监控线程观测队列积压与任务计算正确性三、核心代码实现1. 任务实体类 Task封装任务核心变量 num提供三段流水线处理方法每个方法模拟业务耗时。packageduoxiancheng.xq0529;publicclassTask{// 任务核心运算数据intnum;// 流水线第一阶段处理publicvoidtaskA(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段A业务逻辑数值20num20;}// 流水线第二阶段处理publicvoidtaskB(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段B业务逻辑数值*10num*10;}// 流水线第三阶段处理publicvoidtaskC(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段C业务逻辑数值平方num*num;}}// 备注三个方法严格串行依赖必须按 A→B→C 顺序执行最终预期结果(020)*10 再平方 400002. 阶段任务处理器2.1 第一阶段 TaskA执行完 taskA 后将任务交给第二阶段线程池packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第一阶段任务publicclassTaskAimplementsRunnable{ExecutorexecutorB;// 绑定第二阶段线程池ExecutorexecutorC;// 绑定第三阶段线程池ArrayListTasktaskDoneList;// 存放最终完成的任务Tasktask;// 当前待处理任务publicTaskA(ExecutorexecutorB,ExecutorexecutorC,ArrayListTasktaskDoneList,Tasktask){this.executorBexecutorB;this.executorCexecutorC;this.taskDoneListtaskDoneList;this.tasktask;}Overridepublicvoidrun(){// 执行第一阶段业务逻辑task.taskA();// 流转提交任务给第二阶段线程池executorB.execute(newTaskB(task,executorC,taskDoneList));}}2.2 第二阶段 TaskB执行完 taskB 后将任务交给第三阶段线程池packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第二阶段任务classTaskBimplementsRunnable{Tasktask;ExecutorexecutorC;ArrayListTasktaskDoneList;publicTaskB(Tasktask,ExecutorexecutorC,ArrayListTasktaskDoneList){this.tasktask;this.executorCexecutorC;this.taskDoneListtaskDoneList;}Overridepublicvoidrun(){// 执行第二阶段业务逻辑task.taskB();// 流转提交任务给第三阶段线程池executorC.execute(newTaskC(task,taskDoneList));}}2.3 第三阶段 TaskC最后阶段处理完毕把任务加入完成集合packageduoxiancheng.xq0529;importjava.util.ArrayList;// 流水线第三阶段任务classTaskCimplementsRunnable{Tasktask;ArrayListTasktaskDoneList;publicTaskC(Tasktask,ArrayListTasktaskDoneList){this.tasktask;this.taskDoneListtaskDoneList;}Overridepublicvoidrun(){// 执行第三阶段业务逻辑task.taskC();// 整个流水线处理完成加入完成列表taskDoneList.add(task);}}// 备注各阶段通过构造器传递下一级线程池与任务上下文实现任务自动流转无需手动阻塞等待3. 主线程工厂启动类 Main初始化三级线程池、队列启动生产者持续造任务启动监控线程观测运行状态。packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.Executor;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;publicclassMain{publicstaticvoidmain(String[]args){// 为每个阶段独立定义阻塞队列容量20ArrayBlockingQueueRunnablequeueAnewArrayBlockingQueue(20);ArrayBlockingQueueRunnablequeueBnewArrayBlockingQueue(20);ArrayBlockingQueueRunnablequeueCnewArrayBlockingQueue(20);// 保存所有流水线执行完成的任务ArrayListTasktaskDoneListnewArrayList();// 初始化三级独立线程池 // 核心线程5、最大线程10、空闲超时1000msExecutorexecutorAnewThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueA);ExecutorexecutorBnewThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueB);ExecutorexecutorCnewThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueC);// 生产者线程循环不断生成新任务提交给第一阶段线程池newThread(()-{intcount0;while(true){try{// 控制生产速率Thread.sleep(50);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 新建任务实例TasktasknewTask();// 提交到流水线首阶段executorA.execute(newTaskA(executorB,executorC,taskDoneList,task));count;System.out.println(生产者已生成任务数:count);}}).start();// 监控线程定时查看队列积压、完成任务数、计算正确率newThread(()-{while(true){try{// 每秒监控一次Thread.sleep(1000);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 打印三个阶段队列积压大小System.out.println(队列A大小:queueA.size() 队列B大小:queueB.size() 队列C大小:queueC.size());// 统计结果正确的任务数量intcorrectCount0;for(inti0;itaskDoneList.size();i){// 符合预期结果40000即为正确if(taskDoneList.get(i).num40000){correctCount;}}System.out.println(已完成任务总数:taskDoneList.size() 运算正确任务数:correctCount);}}).start();}}// 关键备注1每个阶段独享线程池和队列互不阻塞可单独调优核心线程数、队列容量// 关键备注2生产者无限循环生成任务通过 Thread.sleep 控流避免瞬间打满队列// 关键备注3监控线程实时观测队列积压可用来判断线程池参数是否合理、是否有任务堆积瓶颈四、执行流程梳理Main 初始化 A/B/C 三组线程池 对应阻塞队列生产者线程循环创建 Task 对象提交给 executorAexecutorA 调度执行 TaskA完成后把任务封装为 TaskB 提交给 executorBexecutorB 调度执行 TaskB完成后封装为 TaskC 提交给 executorCexecutorC 调度执行 TaskC完成后将任务加入完成列表监控线程每秒打印队列积压、完成任务数、业务计算正确数五、技术要点总结流水线拆分复杂任务拆分为多阶段单一职责便于维护和单独扩容线程池隔离各阶段独立线程池某一阶段阻塞不影响其他阶段运行任务自动流转通过 Runnable 构造器传递下一级线程池实现链式投递流量可控生产者通过休眠控制任务生产速率配合有界队列防止无限积压可观测性内置监控线程实时查看队列水位与任务处理正确率方便调参和排查瓶颈