← 返回 JUC 列表

并发集合

并发集合

一、CopyOnWriteArrayList

知识点说明

CopyOnWriteArrayListArrayList 的线程安全版本,核心原理:写时复制(Copy-On-Write)

读操作:无锁,直接读原数组(读的性能极高)
写操作:加锁,复制整个数组,修改后替换引用(写的性能极低)

原理

java
// 核心源码示意
public class CopyOnWriteArrayList<E> {
    private volatile transient Object[] array;

    // 读 — 无锁,直接读
    public E get(int index) {
        return get(getArray(), index);  // 不需要加锁!
    }

    // 写 — 加锁,复制数组
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制
            newElements[len] = e;
            setArray(newElements); // 替换引用(volatile 保证可见性)
            return true;
        } finally {
            lock.unlock();
        }
    }
}

示例代码

java
// 使用场景:读多写少(白名单、缓存、配置)
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

// 可以安全地在遍历时修改
list.add("A");
list.add("B");
list.add("C");

// 遍历时不抛 ConcurrentModificationException
for (String s : list) {
    if ("B".equals(s)) {
        list.remove(s); // 不会抛异常!
    }
}

// 增强 for 循环仍然遍历旧数组(快照语义)
// 所以在遍历过程中修改不会影响当前遍历

CopyOnWriteArraySet

基于 CopyOnWriteArrayList 实现,用法类似:

java
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("A");
set.add("B");
// 去重,但添加时底层会遍历检查是否存在

与 Collections.synchronizedList 的对比

对比 CopyOnWriteArrayList synchronizedList
读性能 很高(无锁) 低(加锁)
写性能 低(复制数组) 较高(仅加锁)
迭代 快照迭代,不抛异常 加锁迭代,修改可能抛异常
内存 占用高(写时复制)
适用场景 读多写少 读写均衡

注意事项

  1. 不适合写多的场景——每次写都复制整个数组,导致 GC 压力大
  2. 数据不一致性——get() 可能返回过期的数据(但是弱一致性,对大部分场景可接受)
  3. 不能保证实时一致性——迭代器获取的是创建时的快照
  4. 内存开销大——写操作同时存在两份数组
  5. 适合:配置中心的白名单、事件监听器列表、路由表等读多写少的数据

二、ConcurrentLinkedQueue

知识点说明

ConcurrentLinkedQueue 是一个基于链表的、无界非阻塞的线程安全队列。使用 CAS 实现,没有锁。

特点:
  - 无界(不会阻塞)
  - 基于 CAS 的无锁并发
  - FIFO(先进先出)
  - 弱一致性

示例代码

java
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

// 入队
queue.offer("A");
queue.offer("B");
queue.offer("C");

// 出队(队列为空返回 null)
String item = queue.poll();

// 查看队首(不移除)
String head = queue.peek();

// 批量消费
while ((item = queue.poll()) != null) {
    System.out.println("处理: " + item);
}

注意事项

  • size() 是 O(n) 操作(遍历链表计数),性能差
  • 适合高并发但不需要阻塞的场景
  • 无界队列,注意生产速度 > 消费速度时的内存问题

三、ConcurrentSkipListMap

知识点说明

ConcurrentSkipListMapTreeMap 的线程安全版本,基于**跳表(Skip List)**实现,支持排序。

跳表结构示意(多层链表):
Level 3:  1 ──────────────────→ 9
Level 2:  1 ──────→ 5 ──────→ 9 ──→ 13
Level 1:  1 ──→ 3 ──→ 5 ──→ 7 ──→ 9 ──→ 11 ──→ 13
Level 0:  1→2→3→4→5→6→7→8→9→10→11→12→13→14

查找元素 12:从 Level 3 开始 → 1 → 9(往下)→ Level2 9→13(往下)→ Level1 9→11(往下)→ Level0 11→12 ✅

示例代码

java
// 替代 TreeMap(线程安全 + 排序)
ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>();

map.put("b", "banana");
map.put("a", "apple");
map.put("c", "cherry");

// 遍历输出:a=apple, b=banana, c=cherry(自动排序)
for (Map.Entry<String, String> entry : map.entrySet()) {
    System.out.println(entry.getKey() + "=" + entry.getValue());
}

// 范围查询
NavigableMap<String, String> sub = map.subMap("a", "c"); // ["a", "c")
System.out.println(sub.get("b")); // banana

// 首个/最后一个 entry
Map.Entry<String, String> first = map.firstEntry();  // a=apple
Map.Entry<String, String> last = map.lastEntry();    // c=cherry

// 小于/大于某个 key
Map.Entry<String, String> lower = map.lowerEntry("b"); // a=apple
Map.Entry<String, String> higher = map.higherEntry("b"); // c=cherry

与 ConcurrentHashMap 区别

对比 ConcurrentHashMap ConcurrentSkipListMap
排序 ❌ 无序 ✅ 有序(自然排序/Comparator)
性能 O(1) 平均 O(log n)
适用 通用场景 需要排序的场景

注意事项

  • 支持排序,适合需要范围查询的并发场景
  • subMap()/headMap()/tailMap() 返回的是视图,修改会影响原 map
  • 底层是跳表,比 TreeMap 的并发控制更简单(CAS 操作节点指针)
  • ConcurrentSkipListSet 是基于 ConcurrentSkipListMap 的 Set 实现

四、BlockingQueue(阻塞队列,非常重要)

知识点说明

BlockingQueue带有阻塞功能的队列,是生产-消费者模式的核心组件。

队列满了 → put() 阻塞
队列空了 → take() 阻塞

四种操作方式

操作 抛异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek()

常见实现类

java
// 1. ArrayBlockingQueue — 有界,数组实现
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(10); // 容量固定
// 特点:公平性可设置(true=公平,哪个线程等得久先服务哪个)

// 2. LinkedBlockingQueue — 可选有界,链表实现
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();   // 无界
BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100); // 有界
// 特点:生产者和消费者各有一把锁(锁分离),吞吐量比 ArrayBlockingQueue 高

// 3. PriorityBlockingQueue — 优先级无界队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// Task 需要实现 Comparable,或传入 Comparator

// 4. SynchronousQueue — 不存储元素
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 每个 put() 必须等待一个 take(),反之亦然(容量为 0)
// 特点:适用于生产者消费者一一对应,如 CachedThreadPool

// 5. DelayQueue — 延迟队列
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 元素需要实现 Delayed 接口,到期才能被 take() 取出
// 适用:定时任务、过期缓存清理

生产者-消费者经典模式

java
// 生产者
class Producer implements Runnable {
    private final BlockingQueue<String> queue;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String data = "数据-" + i;
                queue.put(data); // 队列满时阻塞
                System.out.println("生产: " + data);
                Thread.sleep((long) (Math.random() * 1000));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String data = queue.take(); // 队列空时阻塞
                System.out.println("消费: " + data);
                Thread.sleep((long) (Math.random() * 2000));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 使用
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();

各实现类对比

实现 有界/无界 数据结构 锁机制 特性
ArrayBlockingQueue 有界 数组(循环队列) 一把锁 公平/非公平可选
LinkedBlockingQueue 可选 链表 两把锁(分离) 锁分离提高吞吐量
PriorityBlockingQueue 无界 堆(数组) 一把锁 优先级排序
SynchronousQueue 容量为0 传递(不存储) 两种模式 每个 put 等一个 take
DelayQueue 无界 一把锁 延迟获取
LinkedTransferQueue 无界 链表 CAS 提供 transfer() 方法

注意事项

  1. 无界队列要小心 OOM——LinkedBlockingQueue 默认大小是 Integer.MAX_VALUE
  2. ArrayBlockingQueue 默认非公平——可通过构造参数设为公平,但性能会降低
  3. SynchronousQueue 容量为 0——误以为能存数据
  4. 阻塞队列是线程池的核心——不同的队列类型决定了线程池的特性
  5. offer(e, time, unit) 带超时——时间过了还放不进就返回 false
  6. DrainTo()——可以一次性取多个元素,减少锁竞争次数