并发集合
一、CopyOnWriteArrayList
知识点说明
CopyOnWriteArrayList 是 ArrayList 的线程安全版本,核心原理:写时复制(Copy-On-Write)。
读操作:无锁,直接读原数组(读的性能极高)
写操作:加锁,复制整个数组,修改后替换引用(写的性能极低)
原理
java
// 核心源码示意
public class CopyOnWriteArrayList<E> {
private volatile transient Object[] array;
// 读 — 无锁,直接读
public E get(int index) {
return get(getArray(), index); // 不需要加锁!
}
// 写 — 加锁,复制数组
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制
newElements[len] = e;
setArray(newElements); // 替换引用(volatile 保证可见性)
return true;
} finally {
lock.unlock();
}
}
}
示例代码
java
// 使用场景:读多写少(白名单、缓存、配置)
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 可以安全地在遍历时修改
list.add("A");
list.add("B");
list.add("C");
// 遍历时不抛 ConcurrentModificationException
for (String s : list) {
if ("B".equals(s)) {
list.remove(s); // 不会抛异常!
}
}
// 增强 for 循环仍然遍历旧数组(快照语义)
// 所以在遍历过程中修改不会影响当前遍历
CopyOnWriteArraySet
基于 CopyOnWriteArrayList 实现,用法类似:
java
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("A");
set.add("B");
// 去重,但添加时底层会遍历检查是否存在
与 Collections.synchronizedList 的对比
| 对比 | CopyOnWriteArrayList | synchronizedList |
|---|---|---|
| 读性能 | 很高(无锁) | 低(加锁) |
| 写性能 | 低(复制数组) | 较高(仅加锁) |
| 迭代 | 快照迭代,不抛异常 | 加锁迭代,修改可能抛异常 |
| 内存 | 占用高(写时复制) | 低 |
| 适用场景 | 读多写少 | 读写均衡 |
注意事项
- 不适合写多的场景——每次写都复制整个数组,导致 GC 压力大
- 数据不一致性——get() 可能返回过期的数据(但是弱一致性,对大部分场景可接受)
- 不能保证实时一致性——迭代器获取的是创建时的快照
- 内存开销大——写操作同时存在两份数组
- 适合:配置中心的白名单、事件监听器列表、路由表等读多写少的数据
二、ConcurrentLinkedQueue
知识点说明
ConcurrentLinkedQueue 是一个基于链表的、无界、非阻塞的线程安全队列。使用 CAS 实现,没有锁。
特点:
- 无界(不会阻塞)
- 基于 CAS 的无锁并发
- FIFO(先进先出)
- 弱一致性
示例代码
java
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 入队
queue.offer("A");
queue.offer("B");
queue.offer("C");
// 出队(队列为空返回 null)
String item = queue.poll();
// 查看队首(不移除)
String head = queue.peek();
// 批量消费
while ((item = queue.poll()) != null) {
System.out.println("处理: " + item);
}
注意事项
size()是 O(n) 操作(遍历链表计数),性能差- 适合高并发但不需要阻塞的场景
- 无界队列,注意生产速度 > 消费速度时的内存问题
三、ConcurrentSkipListMap
知识点说明
ConcurrentSkipListMap 是 TreeMap 的线程安全版本,基于**跳表(Skip List)**实现,支持排序。
跳表结构示意(多层链表):
Level 3: 1 ──────────────────→ 9
Level 2: 1 ──────→ 5 ──────→ 9 ──→ 13
Level 1: 1 ──→ 3 ──→ 5 ──→ 7 ──→ 9 ──→ 11 ──→ 13
Level 0: 1→2→3→4→5→6→7→8→9→10→11→12→13→14
查找元素 12:从 Level 3 开始 → 1 → 9(往下)→ Level2 9→13(往下)→ Level1 9→11(往下)→ Level0 11→12 ✅
示例代码
java
// 替代 TreeMap(线程安全 + 排序)
ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>();
map.put("b", "banana");
map.put("a", "apple");
map.put("c", "cherry");
// 遍历输出:a=apple, b=banana, c=cherry(自动排序)
for (Map.Entry<String, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
// 范围查询
NavigableMap<String, String> sub = map.subMap("a", "c"); // ["a", "c")
System.out.println(sub.get("b")); // banana
// 首个/最后一个 entry
Map.Entry<String, String> first = map.firstEntry(); // a=apple
Map.Entry<String, String> last = map.lastEntry(); // c=cherry
// 小于/大于某个 key
Map.Entry<String, String> lower = map.lowerEntry("b"); // a=apple
Map.Entry<String, String> higher = map.higherEntry("b"); // c=cherry
与 ConcurrentHashMap 区别
| 对比 | ConcurrentHashMap | ConcurrentSkipListMap |
|---|---|---|
| 排序 | ❌ 无序 | ✅ 有序(自然排序/Comparator) |
| 性能 | O(1) 平均 | O(log n) |
| 适用 | 通用场景 | 需要排序的场景 |
注意事项
- 支持排序,适合需要范围查询的并发场景
subMap()/headMap()/tailMap()返回的是视图,修改会影响原 map- 底层是跳表,比 TreeMap 的并发控制更简单(CAS 操作节点指针)
- ConcurrentSkipListSet 是基于 ConcurrentSkipListMap 的 Set 实现
四、BlockingQueue(阻塞队列,非常重要)
知识点说明
BlockingQueue 是带有阻塞功能的队列,是生产-消费者模式的核心组件。
队列满了 → put() 阻塞
队列空了 → take() 阻塞
四种操作方式
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | — | — |
常见实现类
java
// 1. ArrayBlockingQueue — 有界,数组实现
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(10); // 容量固定
// 特点:公平性可设置(true=公平,哪个线程等得久先服务哪个)
// 2. LinkedBlockingQueue — 可选有界,链表实现
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(); // 无界
BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100); // 有界
// 特点:生产者和消费者各有一把锁(锁分离),吞吐量比 ArrayBlockingQueue 高
// 3. PriorityBlockingQueue — 优先级无界队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// Task 需要实现 Comparable,或传入 Comparator
// 4. SynchronousQueue — 不存储元素
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 每个 put() 必须等待一个 take(),反之亦然(容量为 0)
// 特点:适用于生产者消费者一一对应,如 CachedThreadPool
// 5. DelayQueue — 延迟队列
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 元素需要实现 Delayed 接口,到期才能被 take() 取出
// 适用:定时任务、过期缓存清理
生产者-消费者经典模式
java
// 生产者
class Producer implements Runnable {
private final BlockingQueue<String> queue;
Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String data = "数据-" + i;
queue.put(data); // 队列满时阻塞
System.out.println("生产: " + data);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String data = queue.take(); // 队列空时阻塞
System.out.println("消费: " + data);
Thread.sleep((long) (Math.random() * 2000));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 使用
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
各实现类对比
| 实现 | 有界/无界 | 数据结构 | 锁机制 | 特性 |
|---|---|---|---|---|
| ArrayBlockingQueue | 有界 | 数组(循环队列) | 一把锁 | 公平/非公平可选 |
| LinkedBlockingQueue | 可选 | 链表 | 两把锁(分离) | 锁分离提高吞吐量 |
| PriorityBlockingQueue | 无界 | 堆(数组) | 一把锁 | 优先级排序 |
| SynchronousQueue | 容量为0 | 传递(不存储) | 两种模式 | 每个 put 等一个 take |
| DelayQueue | 无界 | 堆 | 一把锁 | 延迟获取 |
| LinkedTransferQueue | 无界 | 链表 | CAS | 提供 transfer() 方法 |
注意事项
- 无界队列要小心 OOM——
LinkedBlockingQueue默认大小是Integer.MAX_VALUE ArrayBlockingQueue默认非公平——可通过构造参数设为公平,但性能会降低SynchronousQueue容量为 0——误以为能存数据- 阻塞队列是线程池的核心——不同的队列类型决定了线程池的特性
offer(e, time, unit)带超时——时间过了还放不进就返回 falseDrainTo()——可以一次性取多个元素,减少锁竞争次数