随笔分类
FluxCreate解读
// FluxSink具有消费数据的能力, 结合 accept(T t)
// 参数可以理解为 逻辑上的一个动作 - 懒加载, 出于机器性能考虑
public static <T> Flux<T> create(Consumer< ? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}
// 参数一:FluxSink emitter
// 参数二:背压策略 - 默认的模式:推拉兼备
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}
create()中传递的参数最终会作为 FluxCreate的构造参数,传递了 FluxSink对象、背压策略、创建 Flux所去选择的模式
这里选择的背压策略:Buffer,这也是默认所选择的 BackPressureStrategy,还有的背压策略:
/**
* Enumeration for backpressure handling.
*/
// 可选择的背压策略
enum OverflowStrategy {
/**
* Completely ignore downstream backpressure requests.
* <p>
* This may yield {@link IllegalStateException} when queues get full downstream.
*/
// 完全忽略下游的背压请求
IGNORE,
/**
* Signal an {@link IllegalStateException} when the downstream can't keep up
*/
// 当下游消费无法跟上上游发布速度时, 则发布 error信号
ERROR,
/**
* Drop the incoming signal if the downstream is not ready to receive it.
*/
// 如果下游没有准备好接受元素, 则去丢弃元素
DROP,
/**
* Downstream will get only the latest signals from upstream.
*/
// 下游仅会从上游获取最新元素
LATEST,
/**
* Buffer all signals if the downstream can't keep up.
* <p>
* Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}.
*/
// 如果下游跟不上, 会去缓冲元素
BUFFER
}
来到 FluxCreate:
/**
* Provides a multi-valued sink API for a callback that is called for each individual
* Subscriber.
*
* @param <T> the value type
*/
final class FluxCreate<T> extends Flux<T> implements SourceProducer<T> {
enum CreateMode {
PUSH_ONLY, PUSH_PULL
}
final Consumer<? super FluxSink<T>> source;
final OverflowStrategy backpressure;
final CreateMode createMode;
FluxCreate(Consumer<? super FluxSink<T>> source,
FluxSink.OverflowStrategy backpressure,
CreateMode createMode) {
this.source = Objects.requireNonNull(source, "source");
this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
this.createMode = createMode;
}
可以看到,FluxMode便是 Flux的创建模式,主要有两种:PUSH_ONLY、PUSH_PULL,PUSH、PULL是两个动作,前者代表着发布者下发元素的一个动作,后者则是订阅者主动去拉取元素的一个动作,而 backPressure实则是会在 PULL这个动作中会体现出来,因此当我们选择以 PUSH_PULL的方式去创建 Flux时,我们会去考虑背压,实则这也是我们默认去创建 Flux时所去选择的模式,实则我们也可以换种角度思考下,如果没有订阅者 PULL这个动作,即仅存在发布者 PUSH元素,此时背压毫无意义,我们也无需去考虑背压
对于 Flux,我们显然关注于 subscribe():
// 这个实际上是我们关心的方法 - 将此方法看成是整个业务链的触发者
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// 根据背压策略去创建 FluxSink - 背压会在 BaseSink这块体现出来
// 元素下发的整个过程会在 Subscription中体现出来
// BaseSink具备 Subscription的能力
BaseSink<T> sink = createSink(actual, backpressure);
// 回调 onSubscription()
actual.onSubscribe(sink);
try {
// consumer在 这得以执行 - 此时将去调用我们 Flux.create(Consumer<? super FluxSink<T>> emitter) 的逻辑
source.accept(
// 这块默认下会是 PUSH_PULL, 我们会去考虑背压, 因此需要有一个缓冲队列, BaseSink、Queue都被封装在 SerializedFluxSink中
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}
对于 subscribe(),我们可以将其看成是整个业务调用链的触发者,在该调用链中的每一个动作,我们知道其本身是去做什么的,但实际实现我们并不清楚,我们也无需关注于此
首先,我们便是通过传递进来的 subscriber、背压策略去创建具有下发元素能力的 sink对象:BaseSink的具体实现
// 根据背压策略去创建具有下发元素能力的 sink对象
static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t,
OverflowStrategy backpressure) {
switch (backpressure) {
case IGNORE: {
return new IgnoreSink<>(t);
}
case ERROR: {
return new ErrorAsyncSink<>(t);
}
case DROP: {
return new DropAsyncSink<>(t);
}
case LATEST: {
return new LatestAsyncSink<>(t);
}
default: { /** 默认下, 我们创建的其实就是 BufferAsyncSink **/
// 参数一:Subscriber
// 参数二:队列长度 - 默认会是 256
return new BufferAsyncSink<>(t, Queues.SMALL_BUFFER_SIZE);
}
}
}
BufferAsyncSink(CoreSubscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = Queues.<T>unbounded(capacityHint).get();
}
可知,我们会去创建出 BufferAsyncSink对象,bufferAsyncSink对象包装着 subscriber以及一个队列:
/**
* Returns an unbounded, linked-array-based Queue. Integer.max sized link will
* return the default {@link #SMALL_BUFFER_SIZE} size.
* @param linkSize the link size
* @param <T> the reified {@link Queue} generic type
* @return an unbounded {@link Queue} {@link Supplier}
*/
@SuppressWarnings("unchecked")
public static <T> Supplier<Queue<T>> unbounded(int linkSize) {
if (linkSize == XS_BUFFER_SIZE) {
return XS_UNBOUNDED;
}
else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) { /** choose here **/
return unbounded();
}
return () -> Hooks.wrapQueue(new SpscLinkedArrayQueue<>(linkSize));
}
/**
*
* @param <T> the reified {@link Queue} generic type
* @return an unbounded {@link Queue} {@link Supplier}
*/
@SuppressWarnings("unchecked")
public static <T> Supplier<Queue<T>> unbounded() {
return SMALL_UNBOUNDED;
}
static final Supplier SMALL_UNBOUNDED =
() -> Hooks.wrapQueue(new SpscLinkedArrayQueue<>(SMALL_BUFFER_SIZE));
可知,该队列类型:SpscLinkedArrayQueue,队列长度:256
其次,去调用了 subscriber.onSubscribe(),传递了 sink对象进去,首先调用该方法的原因是什么,其次该 sink对象与 Subscription之间有什么关系?
联想 JDK中融入的 Flow Api,我们回调此方法实际上就是为了去确认下游对上游元素的请求数量,这是在确立订阅关系时首先需要搞清楚的;其次,sink默认下实际上会是 BaseSink的实现类:BufferAsyncSink
// BaseSink implements Subscription.
static abstract class BaseSink<T> extends AtomicBoolean
implements FluxSink<T>, InnerProducer<T> {
static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
static final Disposable CANCELLED = Disposables.disposed();
// actual - 消费者, 即订阅者
final CoreSubscriber<? super T> actual;
final Context ctx;
/**
*
* {@link InnerProducer} is a {@link reactor.core.Scannable} {@link Subscription} that produces
* data to an {@link #actual()} {@link Subscriber}
*
* @param <O> output operator produced type
*
* @author Stephane Maldini
*/
interface InnerProducer<O>
extends Scannable, Subscription {
CoreSubscriber<? super O> actual();
此时,我们便清楚了 BaseSink具备 Subscription的能力
此时,我们已然创建出了具有下发元素的 sink对象,以及产生了订阅关系,显然,接下来我们需要做的便是去下发元素,由于我们所选择的 Flux创建模式为 PUSH_PULL,因此,这块也是去创建了 SerializedFluxSink对象,其封装着 BaseSink对象,其作为参数触发了 FluxSinkConsumer的回调:accept()
此时上游变会去生产元素,通过 emitter.next()去下发元素:
/**
* 这是我们所关注的方法
* 如果去考虑背压的话, 我们要去消费的其实就是缓冲队列中的元素
* @param t the value to emit, not null
* @return
*/
@Override
public FluxSink<T> next(T t) {
// 往队列中添加元素
queue.offer(t);
// 消费控制
drain();
return this;
}
由于考虑到了背压,此时下发的元素会往长度为 256的 queue中存储,从这我们也不能看出,背压,本质上就是位于上游的一个缓冲队列
//impl note: don't use isTerminated() in the drain loop,
//it needs to first check the `done` status before setting `disposable` to TERMINATED
//otherwise it would either loose the ability to drain or the ability to invoke the
//handler at the right time.
void drain() {
// 第一次下发元素时, WIP == 0, 此时队列中还没有元素
if (WIP.getAndIncrement(this) != 0) {
return;
}
// 来到这, 已经不是第一个下发元素了
// 订阅者
final Subscriber<? super T> a = actual;
// 缓存队列
final Queue<T> q = queue;
// 在此自旋中消费元素
// 为什么要在自旋中?
// 下游每一个 next()中都有可能获取重新 request上游下发元素, 因此自旋是需要的
for (; ; ) {
// 获取下游请求元素个数
long r = requested;
long e = 0L;
while (e != r) {
// 条件成立 - 下游有去调用 disposable.dispose()来去取消订阅
if (isCancelled()) {
// 调用钩子尝试去清除队列中元素
Operators.onDiscardQueueWithClear(q, ctx, null);
// 如果上述钩子没有清除完队列中元素,则会跳过当前循环,下一个循环冲继续会去调用钩子来去处理该队列
if (WIP.decrementAndGet(this) != 0) {
continue;
}
else {
return;
}
}
boolean d = done;
// 取队头元素
T o = q.poll();
// 标记队列是否空
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.error(ex);
}
else {
super.complete();
}
return;
}
if (empty) {
break;
}
// 消费元素
a.onNext(o);
// 标识已经下发的元素个数
e++;
}
if (e == r) {
if (isCancelled()) {
Operators.onDiscardQueueWithClear(q, ctx, null);
if (WIP.decrementAndGet(this) != 0) {
continue;
}
else {
return;
}
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.error(ex);
}
else {
super.complete();
}
return;
}
}
if (e != 0) {
// 更新 REQUESTED
Operators.produced(REQUESTED, this, e);
}
// 条件成立:说明进入该方法时获取的下游请求数量对应的元素已经下发完毕, 此时需要做的便是在此进入自旋中,因为下游可能又重新 request了新的元素
if (WIP.decrementAndGet(this) == 0) {
break;
}
}
}
不难看出,drain()中做的便是消费控制,当下游请求上游下发元素时 (考虑有背压的情况),上游会将元素存储到队列中,然后配合上一个自旋以及 while循环,后者对应于循环中每一个获取到的下游请求元素下发量,通过 while循环来去下发元素,而前者实际上控制着的便是整个订阅过程中下游请求的元素下发量,即下游在每一个 onNext()中都可能会继续去 request元素的下发,因此整体控制元素下发请求量还是十分有必要的
那下游进行 request时会触发哪些操作呢?有上述我们知道,此时的 subscription将会是 BaseSink:
@Override
public final void request(long n) {
// 校验
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
// 第一次来到这里时, requestConsumer == null
LongConsumer consumer = requestConsumer;
// 第一次时, 该条件不会成立
if (n > 0 && consumer != null && !isCancelled()) {
consumer.accept(n);
}
onRequestedFromDownstream();
}
}
/**
* Concurrent addition bound to Long.MAX_VALUE.
* Any concurrent write will "happen before" this operation.
*
* @param <T> the parent instance type
* @param updater current field updater
* @param instance current instance to update
* @param toAdd delta to add
* @return value before addition or Long.MAX_VALUE
*/
public static <T> long addCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd) {
long r, u;
// 自旋以保证更新成功
for (;;) {
r = updater.get(instance);
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 累计后的值
u = addCap(r, toAdd);
// 进行累计
if (updater.compareAndSet(instance, r, u)) {
return r;
}
}
}
/**
* Cap an addition to Long.MAX_VALUE
*
* @param a left operand
* @param b right operand
*
* @return Addition result or Long.MAX_VALUE if overflow
*/
public static long addCap(long a, long b) {
long res = a + b;
if (res < 0L) {
return Long.MAX_VALUE;
}
return res;
}
由上不难看出,REQUESTED其实就是去累计下游请求下发的元素数量,这样上游就会根据下游实际需要而去下发元素
volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<BaseSink> REQUESTED =
AtomicLongFieldUpdater.newUpdater(BaseSink.class, "requested");
这块实际上也是 AtomicXXXFieldUpdater的玩法,这也来提一下
此时去回调了 onRequtestedFromDownStream
@Override
void onRequestedFromDownstream() {
drain();
}
看,此时去调用了 drain(),并且 REQUESTED中有值,这不就承接着我们上述的描述了么?
再比如说,我们有去使用 emitter.onRequest(LongConsumer consumer),即当 request()调用后,将触发onRequest()的调用
@Override
public FluxSink<T> onRequest(LongConsumer consumer) {
// 参数三:此次下游请求的下发元素个数
sink.onRequest(consumer, consumer, sink.requested);
return this;
}
// 下游调用 request请求元素时会触发此回调
protected void onRequest(LongConsumer initialRequestConsumer,
LongConsumer requestConsumer,
long value) {
// 于此 REQUEST_CONSUMER
if (!REQUEST_CONSUMER.compareAndSet(this, null, requestConsumer)) {
throw new IllegalStateException(
"A consumer has already been assigned to consume requests");
}
else if (value > 0) {
// 自定义逻辑的处理 - 这块还是蛮有想法的哈
initialRequestConsumer.accept(value);
}
}
可以看到,一方面 REQUEST_CONSUMER被赋予了 requestConsumer,其实也就是我们所传递的 LongConsumer,其次 LongConsumer.accept()也得以执行
其次,我们再来考虑下游第 N次请求下发元素的情况
@Override
public final void request(long n) {
// 校验
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
// 第一次来到这里时, requestConsumer == null
LongConsumer consumer = requestConsumer;
// 第一次时, 该条件不会成立
if (n > 0 && consumer != null && !isCancelled()) {
// 第n次,这块 LongConsumer也将得以执行
consumer.accept(n);
}
onRequestedFromDownstream();
}
}
说实在的,有时真的好奇这块代码的设计到底是如何想出来的,真的只能说是精彩无比!
经上述,我们也了解了下游请求元素,上游下发元素的整个过程
来个 demo:
public void testForFluxCreate() {
Flux.create(emitter -> {
for (int i = 0; i < 1000; i++) {
if (emitter.isCancelled()) {
return;
}
System.out.println("source create: " + i);
emitter.next(i);
}
}).doOnNext(s -> System.out.println("source push: " + s))
.publishOn(Schedulers.single())
.subscribe(data -> {
sleep(10L);
System.out.println("customerId: " + data);
});
sleep(1000000L);
}
}
执行结果:
source create: 0
source push: 0
source create: 1
source push: 1
source create: 2
source push: 2
source create: 3
source push: 3
source create: 4
source push: 4
...
source create: 250
source push: 250
source create: 251
source push: 251
source create: 252
source push: 252
source create: 253
source push: 253
source create: 254
source push: 254
source create: 255
source push: 255
source create: 256
source create: 257
...
source create: 294
source create: 295
source create: 296
source create: 297
source create: 298
source create: 299
customerId: 1
customerId: 2
customerId: 3
customerId: 4
customerId: 5
customerId: 6
customerId: 7
customerId: 8
customerId: 9
...
customerId: 190
customerId: 191
source push: 256
source push: 257
source push: 258
source push: 259
source push: 260
source push: 261
source push: 262
source push: 263
source push: 264
source push: 265
source push: 266
source push: 267
source push: 268
source push: 269
source push: 270
source push: 271
source push: 272
source push: 273
source push: 274
source push: 275
source push: 276
source push: 277
source push: 278
source push: 279
source push: 280
source push: 281
source push: 282
source push: 283
source push: 284
source push: 285
source push: 286
source push: 287
source push: 288
source push: 289
source push: 290
source push: 291
source push: 292
source push: 293
source push: 294
source push: 295
source push: 296
source push: 297
source push: 298
source push: 299
customerId: 192
customerId: 193
customerId: 194
customerId: 195
customerId: 196
...
customerId: 298
customerId: 299