并发是Java进阶的分水岭也是拉开薪资差距的核心能力。为了帮大家避开碎片化学习误区我整理了一套​​《Java并发编程实战》​​视频课覆盖线程基础、JMM底层、锁机制、JUC工具、线程池、并发设计模式与线上实战。全程原理源码实战调优一站式打通零基础也能轻松吃透Java并发彻底摆脱技术瓶颈。CountDownLatch计数器一个线程等待其他线程CyclicBarrier栅栏所有线程互相等待Semaphore信号量控制同时访问的线程数1.核心特性对比特性COUNTDOWNLATCHCYCLICBARRIERSEMAPHORE核心功能等待多个线程完成任务多个线程互相等待到达屏障控制并发访问数量重用性一次性使用计数到0后失效可循环使用自动重置可重复使用计数方向递减countDown递增await可增可减主被动主线程等待工作线程所有线程互相等待控制资源访问异常处理计数不会重置重置计数抛出BrokenBarrierException计数不变2.详细原理分析2.1 CountDownLatch - 倒计时门闩使用场景// 场景1主线程等待多个子线程完成 public class MainWaitWorker { public static void main(String[] args) throws InterruptedException { int threadCount 5; CountDownLatch doneSignal new CountDownLatch(threadCount); for (int i 0; i threadCount; i) { new Thread(() - { System.out.println(Thread.currentThread().getName() 开始工作); // 执行任务 doneSignal.countDown(); }).start(); } System.out.println(准备开始...); Thread.sleep(1000); doneSignal.await(); // 等待所有线程完成 System.out.println(所有工作完成); }2.2 CyclicBarrier - 循环屏障基本用法import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CyclicBarrierBasic { public static void main(String[] args) { // 1. 基本创建方式 CyclicBarrier barrier1 new CyclicBarrier(3); // 2. 带回调函数的创建方式 CyclicBarrier barrier2 new CyclicBarrier(3, () - { System.out.println(所有线程已到达屏障执行回调函数); }); // 3. 使用示例 demoBasicUsage(barrier1); } static void demoBasicUsage(CyclicBarrier barrier) { for (int i 0; i 3; i) { final int threadId i; new Thread(() - { try { System.out.println(线程 threadId 开始执行任务); Thread.sleep((threadId 1) * 1000); // 模拟任务执行时间 System.out.println(线程 threadId 到达屏障等待其他线程); // 等待其他线程 barrier.await(); System.out.println(线程 threadId 继续执行后续任务); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }多阶段任务协调public class MultiPhaseTask { public static void main(String[] args) { int workerCount 4; int phases 3; // 创建CyclicBarrier可以重用 CyclicBarrier barrier new CyclicBarrier(workerCount, () - { System.out.println( 当前阶段所有工作完成 ); }); // 创建多个工作线程 for (int i 0; i workerCount; i) { final int workerId i; new Thread(() - { try { for (int phase 1; phase phases; phase) { // 阶段开始 System.out.printf(工人%d 开始第%d阶段工作%n, workerId, phase); // 模拟工作 Thread.sleep((workerId 1) * 500); System.out.printf(工人%d 完成第%d阶段工作等待其他人%n, workerId, phase); // 等待所有工人完成当前阶段 barrier.await(); // 阶段间的休息 Thread.sleep(200); } System.out.printf(工人%d 完成所有工作%n, workerId); } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } }).start(); } } }并行计算示例public class ParallelComputation { public static void main(String[] args) { int[] data new int[1000]; for (int i 0; i data.length; i) { data[i] i 1; } int threadCount 4; int chunkSize data.length / threadCount; // 存储每个线程的结果 int[] partialSums new int[threadCount]; CyclicBarrier barrier new CyclicBarrier(threadCount, () - { // 所有线程计算完成后汇总结果 int totalSum 0; for (int sum : partialSums) { totalSum sum; } System.out.println(总和: totalSum); System.out.println(验证: (data.length * (data.length 1) / 2)); }); // 创建计算线程 for (int i 0; i threadCount; i) { final int threadId i; final int start i * chunkSize; final int end (i threadCount - 1) ? data.length : start chunkSize; new Thread(() - { try { // 计算部分和 int sum 0; for (int j start; j end; j) { sum data[j]; } partialSums[threadId] sum; System.out.printf(线程%d 计算完成: %d (范围: %d-%d)%n, threadId, sum, start, end - 1); // 等待其他线程 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } }).start(); } } }2.3 Semaphore - 信号量基本用法import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreBasic { public static void main(String[] args) { // 1. 创建信号量公平模式 Semaphore semaphore1 new Semaphore(3, true); // 2. 创建信号量非公平模式 Semaphore semaphore2 new Semaphore(3); // 3. 基本使用示例 demoBasicUsage(semaphore1); } static void demoBasicUsage(Semaphore semaphore) { for (int i 0; i 10; i) { final int threadId i; new Thread(() - { try { System.out.println(线程 threadId 尝试获取许可); semaphore.acquire(); // 获取许可 System.out.println(线程 threadId 获得许可开始执行任务); Thread.sleep(2000); // 模拟任务执行 System.out.println(线程 threadId 任务完成释放许可); semaphore.release(); // 释放许可 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } } }使用场景// 场景1控制并发线程数 public class ConnectionPool { private static final int MAX_CONNECTIONS 5; private final Semaphore semaphore new Semaphore(MAX_CONNECTIONS); private final SetConnection connections new HashSet(); public Connection getConnection() throws InterruptedException { semaphore.acquire(); // 获取许可 synchronized (this) { Connection conn createConnection(); connections.add(conn); return conn; } } public void releaseConnection(Connection conn) { synchronized (this) { connections.remove(conn); closeConnection(conn); } semaphore.release(); // 释放许可 } } // 场景2限流器 public class RateLimiter { private final Semaphore semaphore; private final int maxPermits; private final ScheduledExecutorService scheduler; public RateLimiter(int permitsPerSecond) { this.maxPermits permitsPerSecond; this.semaphore new Semaphore(permitsPerSecond); this.scheduler Executors.newScheduledThreadPool(1); // 每秒重置许可 scheduler.scheduleAtFixedRate(() - { int current semaphore.availablePermits(); if (current maxPermits) { semaphore.release(maxPermits - current); } }, 0, 1, TimeUnit.SECONDS); } public boolean tryAcquire() { return semaphore.tryAcquire(); } } // 场景3资源池 public class ObjectPoolT { private final ListT pool; private final Semaphore semaphore; public ObjectPool(int size, SupplierT factory) { pool new ArrayList(size); for (int i 0; i size; i) { pool.add(factory.get()); } semaphore new Semaphore(size); } public T borrow() throws InterruptedException { semaphore.acquire(); synchronized (pool) { return pool.remove(0); } } public void returnObj(T obj) { synchronized (pool) { pool.add(obj); } semaphore.release(); } }3.三者的核心区别3.1 设计目的不同// CountDownLatch: 一个或多个线程等待其他线程完成 // 典型使用模式 CountDownLatch startSignal new CountDownLatch(1); CountDownLatch doneSignal new CountDownLatch(N); // 工作线程 startSignal.await(); // 等待开始信号 // 执行任务... doneSignal.countDown(); // 完成任务 // 主线程 startSignal.countDown(); // 发送开始信号 doneSignal.await(); // 等待所有任务完成 // CyclicBarrier: 所有线程互相等待 // 典型使用模式 CyclicBarrier barrier new CyclicBarrier(N, barrierAction); // 每个线程 // 执行第一阶段... barrier.await(); // 等待其他线程 // 执行第二阶段... barrier.await(); // 再次等待 // Semaphore: 控制资源访问 // 典型使用模式 Semaphore semaphore new Semaphore(N); // 访问资源 semaphore.acquire(); try { // 使用资源 } finally { semaphore.release(); }3.2 状态重置机制// CountDownLatch: 不可重置 CountDownLatch latch new CountDownLatch(3); latch.countDown(); // 2 latch.countDown(); // 1 latch.countDown(); // 0门闩打开 // latch.await() 会立即返回无法再次使用 // CyclicBarrier: 自动重置 CyclicBarrier barrier new CyclicBarrier(3); // 第一次使用 barrier.await(); // 所有线程到达后重置 // 可以立即再次使用 barrier.await(); // 重新开始计数 // Semaphore: 手动控制 Semaphore semaphore new Semaphore(3); semaphore.acquire(); // 许可数: 2 semaphore.acquire(); // 许可数: 1 semaphore.release(); // 许可数: 2 // 可以动态调整 semaphore.release(2); // 许可数: 43.3 异常处理差异// CountDownLatch: 计数不受线程中断影响 CountDownLatch latch new CountDownLatch(3); Thread t new Thread(() - { try { Thread.sleep(1000); latch.countDown(); } catch (InterruptedException e) { // 即使中断计数仍然有效 latch.countDown(); } }); t.start(); t.interrupt(); // 中断线程但计数依然减少 // CyclicBarrier: 中断会导致屏障破坏 CyclicBarrier barrier new CyclicBarrier(3); Thread t1 new Thread(() - { try { try { barrier.await(); } catch (InterruptedException e) { barrier.await(); // 会导致 BrokenBarrierException } } catch (BrokenBarrierException | InterruptedException e) { System.out.println(屏障被破坏); } }); t1.start(); t1.interrupt(); // Semaphore: 中断会影响获取许可 Semaphore semaphore new Semaphore(0); Thread t2 new Thread(() - { try { semaphore.acquire(); // 会被中断 } catch (InterruptedException e) { System.out.println(获取许可被中断); } }); t2.start(); t2.interrupt();并发是Java进阶的分水岭也是拉开薪资差距的核心能力。为了帮大家避开碎片化学习误区我整理了一套​​《Java并发编程实战》​​视频课覆盖线程基础、JMM底层、锁机制、JUC工具、线程池、并发设计模式与线上实战。全程原理源码实战调优一站式打通零基础也能轻松吃透Java并发彻底摆脱技术瓶颈。