0%

hippo4j的可变长度阻塞队列为什么不需要volatile

为什么 hippo4j 的 ResizableCapacityLinkedBlockingQueue 中的 capacity 变量不需要 volatile 保证内存可见性呢

hippo4j 支持修改线程池的阻塞队列大小,这个修改操作只有使用了其提供的 ResizableCapacityLinkedBlockingQueue 可变长度阻塞队列才可以。这个阻塞队列的实现也很简单,可以看下代码注释中的描述:

A clone of java. Util. Concurrent. LinkedBlockingQueue with the addition of a setCapacity (int) method, allowing us to change the capacity of the queue while it is in use

其实只是把 LinkedBlockingQueue 中的 capacity 的 final 去掉了,并且增加了一个 setCapacity 的方法。这里可能会有疑惑🤔,不是说 LinkedBlockingQueue 是无界的吗,其实虽然我们通常称其为一个无界队列,但是是可以人为指定队列大小的,而且由于其用于记录队列大小的参数是 int 类型字段,所以通常意义上的无界其实就是队列长度为 Integer. MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer. MAX_VALUE。

关于 ResizableCapacityLinkedBlockingQueue 有一些不同的实现方案,可以参考今天,说一说线程池 “动态更新”(三)-鸿蒙开发者社区-51CTO.COM,其实 hippo4j 的实现类似于第一种,但是由于本场景并不涉及多个线程同时修改容量,所以也没有加 synchronized 锁。其原理如下:

private final ReentrantLock putLock = new ReentrantLock();  
private final Condition notFull = putLock.newCondition();
private int capacity;

public void put(E o) throws InterruptedException {
if (o == null) {
throw new NullPointerException();
}
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set. int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is not protected by lock. This works because count can only decrease at this point (all other puts are shut out by lock), and we (or * some other waiting put) are signalled if it ever changes from capacity. Similarly for all other uses of count in other wait guards. */ try {
while (count.get() >= capacity) {
notFull.await();
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
}


public void setCapacity(int capacity) {
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
// 如果变大了,会声明自己没有满,这样就可以取消put操作的阻塞
signalNotFull();
}
}

private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

Put 操作的时候会首先申请可重入锁,如果 count.get() >= capacity 就会调用 notFull 这个 Condition 的 await 方法阻塞,这时会释放可重入锁,此时另一个线程扩大容量就会调用 signalNotFull 方法,该方法会申请到可重入锁,同时会调用 notFull 这个 Condition 的 Signal 方法通知阻塞的 put 线程,容量未满,并释放锁。这样 put 线程就可以被唤醒并重新获取可重入锁,完成 put 操作。

但这里我比较疑惑的是为什么 capacity 并没有加 volatile 关键字来保证多线程情况下的可见性,这难道不会导致增大了容量但该修改对其他正在执行 put 操作的阻塞线程不可见进而造成该线程一直阻塞吗?

为此我去查了一下相关 Issue:ResizableCapacityLinkedBlockingQueue可变队列的capacity字段可见性问题 · Issue #808 · opengoofy/hippo4j · GitHub
20250106140557.png

项目开发者给出的说法是:

ResizableCapacityLinkedBlockingQueue 引用自 RabbitMQ 可变队列,目前经过测试及使用没有发现问题。

我尝试进行了测试:

ResizableCapacityLinkedBlockingQueue<Integer> queue = new ResizableCapacityLinkedBlockingQueue<>(1);  
queue.add(0);
Thread t1 = new Thread(() -> {
for (int i = 1; i < 100; i++){
queue.setCapacity(i);
System.out.println("set capacity to" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 1; i < 100; i++){
try {
queue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("add " + i);
}
});
t2.start();
t1.start();

结果如下:

set capacity to1
set capacity to2
add 1
set capacity to3
add 2
set capacity to4
add 3
set capacity to5
add 4
set capacity to6
add 5
set capacity to7
add 6
set capacity to8
add 7
set capacity to9
add 8
set capacity to10
add 9
set capacity to11
add 10
set capacity to12
add 11

看起来并没有出现想象中的阻塞问题,这是为什么呢? 通过查阅资料发现是由可重入锁带来的,可重入锁在执行 lock 时会把内存中已经修改了的变量都强制刷新会主内存(之前一直以为只是在锁释放的时候会把锁期间的修改写回内存)。
1736150249816.png
那么为什么会这样呢? 我从这篇文章中获取到了答案:深度好文 | Java 可重入锁内存可见性分析,由于释放锁时会修改 volatile 修饰的 state 变量,根据 Happens-before 原则,对一个 volatile 的写 happens-before 后续对这个 volatile 的写(其他获取锁的线程),而根据程序顺序规则一个线程中的每个操作 happens-before 于该线程中的后续操作,也就是修改 capacity 这个步骤 happens-before 于释放锁这个操作,而根据传递原则,修改 capacity 也会 happens-before 于后续线程获取锁,因此对容量的修改是会对后续线程可见的。

private volatile int state; // 关键 volatile 变量
protected final int getState() {
return state;
}
// 获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 重要!!!读 volatile 变量
... // 竞争获取锁逻辑,省略
}

// 释放锁
protected final boolean tryRelease(int releases) {
boolean free = false;
... // 根据状态判断是否成功释放,省略
setState(c); // 重要!!!写 volatile 变量
return free;
}

通过上述分析可以发现,由于可重入锁的存在,capacity 的修改会对其他 put 线程可见,而不需要添加 volatile 关键字。