同步工具类
一、CountDownLatch(倒计时门闩)
知识点说明
CountDownLatch 允许一个或多个线程等待其他线程完成操作后再继续执行。
类比:教室里的学生(多个线程各自做自己的事),班长(等待线程)等所有人离开后才锁门。
初始化:CountDownLatch latch = new CountDownLatch(5); // 倒计时 5
countDown() → 计数器减 1(学生离开)
await() → 等待计数器为 0(班长等着)
示例代码
java
// 场景:主线程等待 3 个任务都执行完
CountDownLatch latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
int taskId = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("任务 " + taskId + " 执行完毕");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 每个任务完成后减 1
}
}).start();
}
System.out.println("主线程等待所有任务完成...");
latch.await(); // 阻塞等待计数器归零
System.out.println("所有任务完成,主线程继续");
// 带超时的等待
boolean completed = latch.await(5, TimeUnit.SECONDS);
if (!completed) {
System.out.println("超时了,部分任务未完成");
}
// 模拟并发:100 个线程同时开始
CountDownLatch startSignal = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
startSignal.await(); // 所有线程在这里等待
doWork();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
startSignal.countDown(); // 100 个线程同时开始执行
注意事项
- CountDownLatch 是一次性的——计数器归零后不能再重置
- countDown() 必须在 finally 中调用,否则异常导致计数器永远不为 0 时,所有等待线程永远阻塞
await()会阻塞当前线程,小心死锁- CountDownLatch 的计数器不能在构造后修改
- 适合场景:并行任务的汇总等待
二、CyclicBarrier(循环屏障)
知识点说明
CyclicBarrier 让一组线程互相等待,当所有线程都到达屏障点时,才一起继续执行。
类比:团建时人齐了才出发,人没齐就先等着。下次出发还能再用。
初始化:CyclicBarrier barrier = new CyclicBarrier(5); // 5 个线程到齐后继续
CyclicBarrier barrier = new CyclicBarrier(5, () -> { // 到齐后优先执行 barrierAction
System.out.println("人齐了,出发!");
});
与 CountDownLatch 的区别(面试高频)
| 对比 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 使用方式 | 等待者调用 await(),任务调用 countDown() | 所有线程都调用 await() 等待彼此 |
| 能否重置 | ❌ 不能(一次性) | ✅ 可循环使用(reset()) |
| 参与者 | 等待线程 vs 执行线程(两个角色) | 所有线程都是参与者(一个角色) |
| 构造参数 | 计数值 | 参与的线程数 |
| barrierAction | ❌ 不支持 | ✅ 支持,到齐后执行 |
| 适用场景 | 一个线程等 N 个线程 | N 个线程互相等待 |
示例代码
java
// 场景:3 个玩家都准备好了才开始游戏
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("=== 所有玩家已就位,游戏开始!===");
});
for (int i = 1; i <= 3; i++) {
int playerId = i;
new Thread(() -> {
try {
System.out.println("玩家 " + playerId + " 正在加载资源...");
Thread.sleep((long) (Math.random() * 3000));
System.out.println("玩家 " + playerId + " 已就位");
barrier.await(); // 等待其他玩家
System.out.println("玩家 " + playerId + " 开始游戏");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
// 循环使用(重置)
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int round = 0; round < 3; round++) {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
cyclicBarrier.await(); // 每轮 3 个线程到齐才继续
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
// 注意:实际需要控制循环时序,这里简化展示
}
BrokenBarrierException
当一个线程在等待时被中断或超时,屏障会损坏(Broken),其他等待线程会抛出 BrokenBarrierException。
java
// 屏障损坏的情况
CyclicBarrier barrier = new CyclicBarrier(3);
new Thread(() -> {
try {
barrier.await(1, TimeUnit.SECONDS); // 超时 → 屏障损坏
} catch (TimeoutException e) {
System.out.println("超时了");
}
}).start();
// 其他线程调用 await() 时会抛出 BrokenBarrierException
注意事项
- CyclicBarrier 可循环使用——调用
reset()重置计数器 BrokenBarrierException——如果某个线程在屏障点被中断/超时,整个屏障损坏,其他线程抛出该异常- barrierAction 的执行线程是最后一个到达屏障的线程
- 适合场景:多线程分阶段计算(如并行计算中的阶段同步、游戏中的玩家匹配)
三、Semaphore(信号量)
知识点说明
Semaphore 用来控制同时访问特定资源的线程数量,常用于限流。
类比:停车场门口的计数器显示剩余车位
acquire() → 拿一个许可证(车位减 1,没有车位则等待)
release() → 归还一个许可证(车位加 1)
初始化:
Semaphore semaphore = new Semaphore(5); // 5 个许可证,非公平
Semaphore semaphore = new Semaphore(5, true); // 5 个许可证,公平
示例代码
java
// 场景:限流,最多 3 个线程同时访问数据库
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 10; i++) {
int threadId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可证
System.out.println("线程 " + threadId + " 正在访问数据库...");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 归还许可证
System.out.println("线程 " + threadId + " 释放许可证");
}
}).start();
}
// 尝试获取,获取不到就放弃
if (semaphore.tryAcquire()) {
try {
// 获取到许可证
} finally {
semaphore.release();
}
}
// 尝试获取(带超时)
if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
try {
// 3 秒内获取到许可证
} finally {
semaphore.release();
}
}
// 一次性获取多个许可证
semaphore.acquire(2); // 一次获取 2 个
注意事项
- release() 不一定要由 acquire() 的线程调用——任何线程都可以释放许可证(区别于锁)
- acquire() 必须在 try 块外或 finally 中处理 release(),确保异常时也能释放
- 许可证的数量不是上限也不是下限——可以通过 release() 增加许可证数量(超过初始值)
- 公平模式下,acquire 的线程按 FIFO 顺序获取;非公平模式下插队
- 适合场景:连接池限流、接口限流、数据库连接数控制
Semaphore 的底层原理
Semaphore 基于 AQS 共享模式实现:
state= 剩余的许可证数量tryAcquireShared:CAS 减少 statetryReleaseShared:CAS 增加 state
四、Exchanger(数据交换)
知识点说明
Exchanger 用于两个线程在同一个屏障点交换数据。两个线程同时到达交换点时,互换数据。
线程 A ─────── 数据 A ──────┐
├────── 交换 ──────→ 线程 A 得到数据 B
线程 B ─────── 数据 B ──────┘
└────── 交换 ──────→ 线程 B 得到数据 A
示例代码
java
// 场景:两个线程交换缓冲区数据
Exchanger<List<Integer>> exchanger = new Exchanger<>();
// 生产者线程
new Thread(() -> {
List<Integer> buffer = new ArrayList<>();
for (int i = 0; i < 10; i++) {
buffer.add(i);
if (buffer.size() == 5) {
try {
// 装满 5 个后交换
buffer = exchanger.exchange(buffer);
System.out.println("生产者收到空的缓冲区");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}).start();
// 消费者线程
new Thread(() -> {
List<Integer> buffer = new ArrayList<>();
for (int i = 0; i < 10; i++) {
try {
// 等待生产者交换数据
buffer = exchanger.exchange(buffer);
System.out.println("消费者处理数据: " + buffer);
buffer.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
注意事项
- 只支持两个线程交换,超过两个线程无法使用
exchange()是阻塞的,直到另一个线程也到达交换点- 支持超时版本:
exchange(V x, long timeout, TimeUnit unit) - 适用场景有限:两个线程间的数据交换、生产者消费者缓冲区交换、遗传算法中的配对交换
- 使用率不如前三个工具高,理解即可
面试常问:三个同步工具的使用场景
| 工具 | 核心区别 | 使用场景 |
|---|---|---|
| CountDownLatch | 一个线程等 N 个线程做完 | 主线程等待多个子任务完成汇总 |
| CyclicBarrier | N 个线程互相等待到齐 | 多阶段并行计算、游戏多人在线匹配 |
| Semaphore | 控制 N 个线程同时访问 | 限流、连接池控制 |
记忆口诀:
- CountDownLatch = 倒计时(等所有人跑完再走)
- CyclicBarrier = 人齐了发车(人没到齐就等)
- Semaphore = 红灯绿灯(控制车流量)