当前位置:网站首页>Mono's execution process
Mono's execution process
2022-06-28 22:38:00 【_ lrs】
Catalog
Preface
In this article, we mainly analyze the following with simple examples Mono The execution process in the process of publishing and subscribing .
One 、 Example
@Test
public void executeProcessTest() {
Mono.just("hello mono")
.filter(v -> v != null)
.map(v -> v + " map")
.defaultIfEmpty("default value")
.subscribe(System.out::println);
}
Two 、 technological process
1、 Build a data publisher
(1)Mono.just(“hello mono”)
return MonoJust, Packaging value
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
MonoJust(T value) {
this.value = Objects.requireNonNull(value, "value");
}
(2)filter
return MonoFilterFuseable , packing MonoJust and predicate
public final Mono<T> filter(final Predicate<? super T> tester) {
if (this instanceof Fuseable) {
return onAssembly(new MonoFilterFuseable<>(this, tester));
}
return onAssembly(new MonoFilter<>(this, tester));
}
MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = Objects.requireNonNull(predicate, "predicate");
}
(3)map
return MonoMapFuseable packing MonoFilterFuseable and mapper
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new MonoMapFuseable<>(this, mapper));
}
return onAssembly(new MonoMap<>(this, mapper));
}
MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
(4)defaultIfEmpty
return MonoDefaultIfEmpty, packing MonoMapFuseable and defaultValue
public final Mono<T> defaultIfEmpty(T defaultV) {
if (this instanceof Fuseable.ScalarCallable) {
try {
T v = block();
if (v == null) {
return Mono.just(defaultV);
}
}
catch (Throwable e) {
//leave MonoError returns as this
}
return this;
}
return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));
}
MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {
super(source);
this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");
}
The publishing process of the data publisher :
data -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty
2、 Building data subscribers
From the subscribe() Start
(1) subscribe()
Pass in consumer consumer
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
completeConsumer, null, initialContext));
}
establish LambdaMonoSubscriber object , Packaging the ultimate consumer consumer
(2)subscribeWith()
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}
public final void subscribe(Subscriber<? super T> actual) {
// The last layer of publishers , Here is MonoDefaultIfEmpty
CorePublisher publisher = Operators.onLastAssembly(this);
// Last level subscribers , Here is LambdaMonoSubscriber
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
// Publishers connect with subscribers
try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
// Publishers publish data to subscribers
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
(3) The process of establishing contact between publishers and subscribers
The core approach :
subscriber = operator.subscribeOrReturn(subscriber);
a). operator yes MonoDefaultIfEmpty,subscriber yes LambdaMonoSubscriber
return DefaultIfEmptySubscriber, As LambdaMonoSubscriber The publisher of
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);
}
b). operator yes MonoMapFuseable ,subscriber yes DefaultIfEmptySubscriber
return MapFuseableSubscriber, As DefaultIfEmptySubscriber The publisher of
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
if (actual instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
}
return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
}
c).operator yes MonoFilterFuseable ,subscriber yes MapFuseableSubscriber
return FilterFuseableSubscriber, As MapFuseableSubscriber The publisher of
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);
}
return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);
}
At this point, the relationship between publisher and subscriber :
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer
3、 Establish a subscription relationship
publisher.subscribe(subscriber);
here publisher yes MonoJust, subscriber yes FilterFuseableSubscriber
establish scalarSubscription , packing FilterFuseableSubscriber
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
Call the subscriber's in turn according to the publish subscribe relationship onSubscribe() Establish a subscription relationship
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
Get into LambdaMonoSubscriber Of onSubscribe()
@Override
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else {
// Request data
s.request(Long.MAX_VALUE);
}
}
}
4、 Request data
Call through the subscription relationship request() Request data ,
s.request(Long.MAX_VALUE);
That is, reverse request data according to the following relationship chain
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
It's finally Operators Class
@Override
public void request(long n) {
if (validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) {
Subscriber<? super T> a = actual;
// Release data
a.onNext(value);
if(once != 2) {
// Release complete
a.onComplete();
}
}
}
}
5、 Release data
from FilterFuseableSubscriber Start calling onNext() Release data , Publish to respective subscribers according to , The final data goes to the last subscriber LambdaMonoSubscriber
LambdaMonoSubscriber.java
@Override
public final void onNext(T x) {
Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
Operators.onNextDropped(x, this.initialContext);
return;
}
if (consumer != null) {
try {
// The final call consumer Consumption data
consumer.accept(x);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
doError(t);
}
}
if (completeConsumer != null) {
try {
completeConsumer.run();
}
catch (Throwable t) {
Operators.onErrorDropped(t, this.initialContext);
}
}
}
6、 Release complete
After data release and consumer consumption in turn , In the first 4 In step a.onComplete();
Call the respective subscriber calls in turn onComplete().
边栏推荐
- How to use London gold to draw support resistance line
- How to analyze the trend chart of London gold market with the moving average
- Detailed steps for MySQL to recover data through IBD files
- Use of axurer9 master
- 时间序列预测系列文章总结(代码使用方法)
- Linux安装mysql5.7(CentOS7.6) 教程
- 网上办理股票开户安全性高吗?
- Icon fill color and background color change together
- 稳!上千微服务如何快速接入 Zadig(Helm Chart 篇)
- 【Try to Hack】nmap
猜你喜欢

Use of axurer9 option group

5毛VS600亿,食品安全问题是卫龙上市最大的拦路虎?

初识阿里云(云计算)—发展历程和技术架构、地域和可用区!

How to advance data analysis from 1 to 10?

项目管理到底管的是什么?

Ingénieur natif du nuage après 00: utiliser Zadig pour développer des sources ouvertes et des économies d'énergie pour la technologie innovante (bus de Guangzhou)

Nc1033 palindrome substring of small a (ring, interval DP)

PyTorch搭建Transformer实现多变量多步长时间序列预测(负荷预测)

Description détaillée du schéma technique du sous - environnement syntonique auto - test de Zadig pour les développeurs

Use of axurer9 master
随机推荐
Analysis of CSRF Cross Site Request Forgery vulnerability
Use of axurer9 option group
在产业互联网时代,传统意义上的互联网将会演变出来诸多新的形态
论文解读(DCN)《Towards K-means-friendly Spaces: Simultaneous Deep Learning and Clustering》
场景化接口开发利器,金蝶云苍穹新版OpenAPI引擎来了!
Linux Installation mysql5.7 (centos7.6) tutorial
Considerations on the construction of operation and maintenance system - stability
Career consultation | what should I answer when I am asked about my intended salary during the interview?
The love digital smart 2022 summit opens, sharing data strategy and building data-driven organization methodology
Gross vs60 billion. Is food safety the biggest obstacle to Weilong's listing?
浅析搭建校园在线教学视频汇聚平台的必要性及解决方案
计数排序的简单理解
oracle设置密码复杂度及设置超时退出的功能
After crossing, she said that the multiverse really exists
C#/VB. Net to convert PDF to excel
Quartz定时任务触发器启动时设置
Ingénieur natif du nuage après 00: utiliser Zadig pour développer des sources ouvertes et des économies d'énergie pour la technologie innovante (bus de Guangzhou)
基于graph-linked embedding的多组学单细胞数据整合与调控推理
How to use the style label of jade template- How to use the style tag with jade templates?
Description détaillée du schéma technique du sous - environnement syntonique auto - test de Zadig pour les développeurs