当前位置:网站首页>Mono's execution process
Mono's execution process
2022-06-28 22:38:00 【_ lrs】
Catalog
Preface
In this article, we mainly analyze the following with simple examples Mono The execution process in the process of publishing and subscribing .
One 、 Example
@Test
public void executeProcessTest() {
Mono.just("hello mono")
.filter(v -> v != null)
.map(v -> v + " map")
.defaultIfEmpty("default value")
.subscribe(System.out::println);
}
Two 、 technological process
1、 Build a data publisher
(1)Mono.just(“hello mono”)
return MonoJust, Packaging value
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
MonoJust(T value) {
this.value = Objects.requireNonNull(value, "value");
}
(2)filter
return MonoFilterFuseable , packing MonoJust and predicate
public final Mono<T> filter(final Predicate<? super T> tester) {
if (this instanceof Fuseable) {
return onAssembly(new MonoFilterFuseable<>(this, tester));
}
return onAssembly(new MonoFilter<>(this, tester));
}
MonoFilterFuseable(Mono<? extends T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = Objects.requireNonNull(predicate, "predicate");
}
(3)map
return MonoMapFuseable packing MonoFilterFuseable and mapper
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
if (this instanceof Fuseable) {
return onAssembly(new MonoMapFuseable<>(this, mapper));
}
return onAssembly(new MonoMap<>(this, mapper));
}
MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
(4)defaultIfEmpty
return MonoDefaultIfEmpty, packing MonoMapFuseable and defaultValue
public final Mono<T> defaultIfEmpty(T defaultV) {
if (this instanceof Fuseable.ScalarCallable) {
try {
T v = block();
if (v == null) {
return Mono.just(defaultV);
}
}
catch (Throwable e) {
//leave MonoError returns as this
}
return this;
}
return onAssembly(new MonoDefaultIfEmpty<>(this, defaultV));
}
MonoDefaultIfEmpty(Mono<? extends T> source, T defaultValue) {
super(source);
this.defaultValue = Objects.requireNonNull(defaultValue, "defaultValue");
}
The publishing process of the data publisher :
data -> MonoJust -> MonoFilterFuseable -> MonoMapFuseable -> MonoDefaultIfEmpty
2、 Building data subscribers
From the subscribe() Start
(1) subscribe()
Pass in consumer consumer
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
completeConsumer, null, initialContext));
}
establish LambdaMonoSubscriber object , Packaging the ultimate consumer consumer
(2)subscribeWith()
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}
public final void subscribe(Subscriber<? super T> actual) {
// The last layer of publishers , Here is MonoDefaultIfEmpty
CorePublisher publisher = Operators.onLastAssembly(this);
// Last level subscribers , Here is LambdaMonoSubscriber
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
// Publishers connect with subscribers
try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
// Publishers publish data to subscribers
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
(3) The process of establishing contact between publishers and subscribers
The core approach :
subscriber = operator.subscribeOrReturn(subscriber);
a). operator yes MonoDefaultIfEmpty,subscriber yes LambdaMonoSubscriber
return DefaultIfEmptySubscriber, As LambdaMonoSubscriber The publisher of
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, defaultValue);
}
b). operator yes MonoMapFuseable ,subscriber yes DefaultIfEmptySubscriber
return MapFuseableSubscriber, As DefaultIfEmptySubscriber The publisher of
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
if (actual instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
}
return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
}
c).operator yes MonoFilterFuseable ,subscriber yes MapFuseableSubscriber
return FilterFuseableSubscriber, As MapFuseableSubscriber The publisher of
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
return new FluxFilterFuseable.FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);
}
return new FluxFilterFuseable.FilterFuseableSubscriber<>(actual, predicate);
}
At this point, the relationship between publisher and subscriber :
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber -> consumer
3、 Establish a subscription relationship
publisher.subscribe(subscriber);
here publisher yes MonoJust, subscriber yes FilterFuseableSubscriber
establish scalarSubscription , packing FilterFuseableSubscriber
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
Call the subscriber's in turn according to the publish subscribe relationship onSubscribe() Establish a subscription relationship
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
Get into LambdaMonoSubscriber Of onSubscribe()
@Override
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else {
// Request data
s.request(Long.MAX_VALUE);
}
}
}
4、 Request data
Call through the subscription relationship request() Request data ,
s.request(Long.MAX_VALUE);
That is, reverse request data according to the following relationship chain
FilterFuseableSubscriber -> MapFuseableSubscriber -> DefaultIfEmptySubscriber -> LambdaMonoSubscriber
It's finally Operators Class
@Override
public void request(long n) {
if (validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) {
Subscriber<? super T> a = actual;
// Release data
a.onNext(value);
if(once != 2) {
// Release complete
a.onComplete();
}
}
}
}
5、 Release data
from FilterFuseableSubscriber Start calling onNext() Release data , Publish to respective subscribers according to , The final data goes to the last subscriber LambdaMonoSubscriber
LambdaMonoSubscriber.java
@Override
public final void onNext(T x) {
Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
Operators.onNextDropped(x, this.initialContext);
return;
}
if (consumer != null) {
try {
// The final call consumer Consumption data
consumer.accept(x);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
doError(t);
}
}
if (completeConsumer != null) {
try {
completeConsumer.run();
}
catch (Throwable t) {
Operators.onErrorDropped(t, this.initialContext);
}
}
}
6、 Release complete
After data release and consumer consumption in turn , In the first 4 In step a.onComplete();
Call the respective subscriber calls in turn onComplete().
边栏推荐
- Google Earth Engine(GEE)——利用sentinel-2数据进行农作物提取分析
- Oracle set password complexity and timeout exit function
- The example application of histogram in data visualization makes the public performance results clear at a glance
- 5毛VS600亿,食品安全问题是卫龙上市最大的拦路虎?
- 如何使用伦敦金画出支撑阻力线
- ansible生产环境使用场景(七):批量部署elk客户端
- [gateway development] handle the IP address segment represented by CIDR when NGX nested Lua
- Considerations on the construction of operation and maintenance system - stability
- TCP三次握手四次挥手
- [SSH] login without password
猜你喜欢

Interpretation of papers (DCN) towards k-means-friendly spaces: simultaneous deep learning and clustering

如何结合均线分析伦敦金行情走势线图

Move the mouse out of the selected area style cancel

Pytorch builds transformer to realize multivariable and multi step time series forecasting (load forecasting)

Zadig officially launched vs code plug-in, making local development more efficient

带链接跳转的微信红包封面制作教程和使用指南

Linux Installation mysql5.7 (centos7.6) tutorial

如何使用伦敦金画出支撑阻力线

Gross vs60 billion. Is food safety the biggest obstacle to Weilong's listing?

Linux安装mysql5.7(CentOS7.6) 教程
随机推荐
穿越过后,她说多元宇宙真的存在
科技巨头成立元宇宙标准论坛,走向开放还是建立围城?
Steady! How thousands of micro services can quickly access Zadig (helm chart)
Database basic notes
职业问诊 | 在数据分析面试中,这样做自我介绍才靠谱
Water brother's code
Embedded dynamic Arabic string conversion LCD display string [thanks for Jianguo ambition]
Summary of time series prediction series (code usage)
计数排序的简单理解
数据可视化中柱状图的实例应用,让乘风破浪公演结果一目了然
这个简单的小功能,半年为我们产研团队省下213个小时
torch. nn. Transformer import failed
职业问诊 | 面试时被问到职业规划该怎么回答?
[SSH] login without password
Sample code of using redis to realize the like function
Multiomics single cell data integration and regulatory reasoning based on graph linked embedding
代码复查
Career consultation | what should I answer when I am asked about my intended salary during the interview?
Websocket for im instant messaging development: concept, principle and common sense of mistakes
Use of axurer9 option group