随笔分类
Reactor中无界队列 SpscLinkedQueue
Reactor为我们提供了一下无界队列,其本身又是怎么去实现 "无界"的,我们对此比较好奇
@SuppressWarnings("rawtypes")
static final Supplier SMALL_UNBOUNDED =
() -> Hooks.wrapQueue(new SpscLinkedArrayQueue<>(SMALL_BUFFER_SIZE));
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
SpscLinkedArrayQueue(int linkSize) {
int c = Queues.ceilingNextPowerOfTwo(Math.max(8, linkSize));
// 可以看到, 初始时也只有一个队列:producerArray、consumerArray
// 初始长度:c:256, 总 257, 0 ~ 256
// 为什么要有 consumerArray, 又要有 producerArray, 其实就是为了区分出 "存"与"取"这两个动作.
this.producerArray = this.consumerArray = new AtomicReferenceArray<>(c + 1);
// mask:255, 为什么要取 c - 1, 实际桶位有 257个, 这其实也是其 "断桥流水"的主要逻辑,
// 将原数组的最后一位用来存储新数组的索引, 对应的其实也就是引用 ref
this.mask = c - 1;
}
可以看到,其构造中去初始化了两个原子数组,producerArray、consumerArray,为什么要去有这两个数组呢,其实就是为了区分 "存" 与 "取"两个动作
volatile long producerIndex;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SpscLinkedArrayQueue> PRODUCER_INDEX =
AtomicLongFieldUpdater.newUpdater(SpscLinkedArrayQueue.class,
"producerIndex");
AtomicReferenceArray<Object> producerArray;
volatile long consumerIndex;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SpscLinkedArrayQueue> CONSUMER_INDEX =
AtomicLongFieldUpdater.newUpdater(SpscLinkedArrayQueue.class,
"consumerIndex");
AtomicReferenceArray<Object> consumerArray;
于此对应的便是两个数组下表,producerIndex、consumerIndex,其也是去基于 AtomicXXXFieldUpdater的技巧予以实现,此前我们已经述说过相关实现,此篇中不再去做过多描述
其实 SpscLinkedArrayQueue实现无界队列的方式不难,基于 "断桥流水"的方式实现无界,断桥流水指的便是当判断出队列中已经要填满时,对数组进行扩容,这这里的扩容并非值得是纯粹去创建一个更大的数组,再将原数组中的内容拷贝进去,而是在原数组的尾部去接一个新数组,以来实现别样的扩容,那如何去续接呢,其实也很简单,只需要占用原数组的一个槽位,在其中放置索引即可,注意,此出的索引指的是指向下一个数组的引用
此次的突破重点,同样如此:offer()、poll()
@Override
public boolean offer(T e) {
Objects.requireNonNull(e);
long pi = producerIndex;
AtomicReferenceArray<Object> a = producerArray;
// 假设 m:255
int m = mask;
// 获取偏移量
// 假设:pi:255, offset:0
int offset = (int) (pi + 1) & m;
// 条件成立:相当于就是回到了原点, 当前队列已经满了, 此时要怎么做呢?
if (a.get(offset) != null) {
// offset:255
offset = (int) pi & m;
// 新数组长度:257
AtomicReferenceArray<Object> b = new AtomicReferenceArray<>(m + 2);
// 赋予 producerArray以新数组
producerArray = b;
// 注意到, 这里并不是将元素存储到开头, 而是原有偏移量对应的位置上去,
// 个人觉得这么设计其实就是为了在不改变偏移量的前提下当判断出需要到下一个数组中取元素时偏移量不变便也能够取得到元素
b.lazySet(offset, e);
// 256, 也就是原数组的最后一个元素, 存储索引
a.lazySet(m + 1, b);
// NEXT其实是个标志信号, 代表着需要去找下一个数组去了
a.lazySet(offset, NEXT);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
else {
offset = (int) pi & m;
a.lazySet(offset, e);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
return true;
}
当我们想要往数组中存放元素时,首先获取生产者数组对于下标 pi,然后判断指定偏移量位置的桶位是否有存放着元素,没有的话直接存放即可,有的话说明当前数组已经不能再去存放元素了,需要进行扩容,这判断的依据是什么,可以看到其获取的并不是 pi位置处的偏移量,而是其下一个位置的偏移量,上述讲过,mask代表着便是数组中除了索引位置外其余可以去存放的元素,pi + 1可以让我们越过数组数组最后一个位置,配合上路由元素可回到数组起始点,这样我们便能判断出当前数组是否还有剩余可用的元素存储空间,没有剩余空间了,那就去扩容,但实际上还是剩余有两个空间的,其用于存放了 Next、以及数组索引,详细可以去代码注释,此出不做过多讲述
再来看看取这个动作:
@SuppressWarnings("unchecked")
@Override
@Nullable
public T poll() {
long ci = consumerIndex;
AtomicReferenceArray<Object> a = consumerArray;
int m = mask;
int offset = (int) ci & m;
Object o = a.get(offset);
if (o == null) {
return null;
}
// 条件成立:说明需要到下一个数组中去找相应元素去
if (o == NEXT) {
AtomicReferenceArray<Object> b = (AtomicReferenceArray<Object>) a.get(m + 1);
// 清除强引用, help GC.
a.lazySet(m + 1, null);
o = b.get(offset);
a = b;
consumerArray = b;
}
a.lazySet(offset, null);
CONSUMER_INDEX.lazySet(this, ci + 1);
return (T) o;
}
取动作里实现也比较简单,对指定偏移量位置处的元素判断是不是 Next,这也就承接上前述所讲,条件成立说明当前数组已经没有可以取得元素了,那就去下个数组中去取,也就是 Next的下一个元素,存放的也就是下个数组的索引,取之即可,offset偏移量不变,获取的便是其想要的元素
综上,Reatcor中无界队列阐述完毕!