当前位置:网站首页>Mono 的执行流程
Mono 的执行流程
2022-06-28 22:15:00 【_lrs】
前言
本文主要同时简单的示例来分析一下Mono在发布订阅过程中的执行流程。
一、示例
@Test
public void executeProcessTest() {
Mono.just("hello mono")
.filter(v -> v != null)
.map(v -> v + " map")
.defaultIfEmpty("default value")
.subscribe(System.out::println);
}
二、流程
1、构建数据发布者
(1)Mono.just(“hello mono”)
返回 MonoJust,包装值
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
MonoJust(T value) {
this.value = Objects.requireNonNull(value, "value");
}
(2)filter
返回 MonoFilterFuseable ,包装 MonoJust 和 predicate
public final Mono<T> filter(final Predicate<? super T> tester) {
if (this instanceof Fuseable) {
return onAssembly(new MonoFilterFuseable<>(this, tester));
}
return onAssembly(new MonoFilter<>(this, tester));
}
MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = Objects.requireNonNull(predicate, "predicate");
}
(3)map
返回 MonoMapFuseable 包装 MonoFilterFuseable 和 mapper
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new MonoMapFuseable<>(this, mapper));
}
return onAssembly(new MonoMap<>(this, mapper));
}
MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
(4)defaultIfEmpty
返回MonoDefaultIfEmpty,包装 MonoMapFuseable 和 defaultValue
public final Mono<T> defaultIfEmpty(T defaultV) {
if (this instanceof Fuseable.ScalarCallable) {
try {
T v = block();
if (v == null) {
return Mono.just(defaultV);
}
}
catch (Throwable e) {
//leave MonoError returns as this
}
return this;
}
return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));
}
MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {
super(source);
this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");
}
数据发布者的发布流程:
数据 -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty
2、构建数据订阅者
从示例中的 subscribe() 开始
(1) subscribe()
传入 consumer 消费者
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
completeConsumer, null, initialContext));
}
创建 LambdaMonoSubscriber 对象,包装最终的消费者consumer
(2)subscribeWith()
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}
public final void subscribe(Subscriber<? super T> actual) {
//最后一层发布者,这里是 MonoDefaultIfEmpty
CorePublisher publisher = Operators.onLastAssembly(this);
//最后一层订阅者,这里是 LambdaMonoSubscriber
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
//发布者与订阅者建立联系
try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
//发布者发布数据给订阅者
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
(3)发布者与订阅者建立联系的过程
核心方法:
subscriber = operator.subscribeOrReturn(subscriber);
a). operator 是 MonoDefaultIfEmpty,subscriber 是 LambdaMonoSubscriber
返回 DefaultIfEmptySubscriber,作为 LambdaMonoSubscriber 的发布者
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);
}
b). operator 是 MonoMapFuseable ,subscriber 是 DefaultIfEmptySubscriber
返回 MapFuseableSubscriber,作为 DefaultIfEmptySubscriber 的发布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
if (actual instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
}
return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
}
c).operator 是 MonoFilterFuseable ,subscriber 是 MapFuseableSubscriber
返回 FilterFuseableSubscriber,作为 MapFuseableSubscriber 的发布者
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);
}
return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);
}
此时发布者与订阅者关系:
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer
3、建立订阅关系
publisher.subscribe(subscriber);
此时publisher 是 MonoJust, subscriber 是 FilterFuseableSubscriber
创建 scalarSubscription ,包装 FilterFuseableSubscriber
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
根据发布订阅关系依次调用订阅者的 onSubscribe() 建立订阅关系
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
进入 LambdaMonoSubscriber 的 onSubscribe()
@Override
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else {
//请求数据
s.request(Long.MAX_VALUE);
}
}
}
4、请求数据
通过订阅关系调用 request() 请求数据,
s.request(Long.MAX_VALUE);
即根据下面的关系链反向请求数据
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
最终到了 Operators 类中
@Override
public void request(long n) {
if (validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) {
Subscriber<? super T> a = actual;
//发布数据
a.onNext(value);
if(once != 2) {
//发布完成
a.onComplete();
}
}
}
}
5、发布数据
从 FilterFuseableSubscriber 开始调用 onNext() 发布数据,根据依次发布给各自的订阅者,最终数据到了最后一个订阅者 LambdaMonoSubscriber
LambdaMonoSubscriber.java
@Override
public final void onNext(T x) {
Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
Operators.onNextDropped(x, this.initialContext);
return;
}
if (consumer != null) {
try {
//最终调用 consumer 消费数据
consumer.accept(x);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
doError(t);
}
}
if (completeConsumer != null) {
try {
completeConsumer.run();
}
catch (Throwable t) {
Operators.onErrorDropped(t, this.initialContext);
}
}
}
6、发布完成
在数据发布依次到消费者消费后,进入第4步中的 a.onComplete();
依次调用各自的订阅者调用 onComplete()。
边栏推荐
- The example application of histogram in data visualization makes the public performance results clear at a glance
- Dart的类扩展、可选类型扩展
- Interpretation of papers (DCN) towards k-means-friendly spaces: simultaneous deep learning and clustering
- Database basic notes
- Pytorch builds transformer to realize multivariable and multi step time series forecasting (load forecasting)
- Competition rules for the "network security" event of the secondary vocational group in the skills competition of Guangxi Vocational Colleges in 2022
- 终于有人把云原生架构讲明白了
- 昨天晚上失眠
- Oracle删除归档日志及添加定时任务
- 稳!上千微服务如何快速接入 Zadig(Helm Chart 篇)
猜你喜欢

Axure custom components

Webrtc audio and video development - experience

windows mysql5.7 开启binlog日志

What does project management really manage?

数据可视化中柱状图的实例应用,让乘风破浪公演结果一目了然

Linux Installation mysql5.7 (centos7.6) tutorial

Zadig officially launched vs code plug-in, making local development more efficient

宜明昂科在港交所递表:2021年亏损翻倍,过往融资额存在夸大情形

IDC:阿里云获2021中国数据治理平台市场份额第一

分享im即时通讯开发之WebSocket:概念、原理、易错常识
随机推荐
Research Report on workers: middle-aged people account for the highest proportion of naked words
oracle设置密码复杂度及设置超时退出的功能
以产业互联网的发展为开端,行业才能进入到一个全新的发展阶段
现在还能入“坑”数据分析吗?看看2022年数据分析热门岗位!
用指针计算数组的个数
如何制作精美的图片
IPv6 comprehensive experiment
重磅!CDA认证考试备考答疑上线
2022-06-28: what does the following golang code output? A:true; B:false; C:panic; D: Compilation failed. package main import “fmt“ func main() {
00 後雲原生工程師:用 Zadig 為思創科技(廣州公交)研發開源節流
The technology giants set up the meta universe standard forum to open up or build a besieged city?
Career consultation | in the data analysis interview, it is only reliable to introduce yourself in this way
解读 | 数据分析的发展和演变都经过哪几个阶段?
硬件开发笔记(七): 硬件开发基本流程,制作一个USB转RS232的模块(六):创建0603封装并关联原理图元器件
TCP三次握手四次挥手
code review
稳!上千微服务如何快速接入 Zadig(Helm Chart 篇)
C#/VB. Net to convert PDF to excel
What does project management really manage?
时间序列预测系列文章总结(代码使用方法)