当前位置:网站首页>Mono 的创建
Mono 的创建
2022-06-21 22:12:00 【_lrs】
前言
Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库,可用它构建时效性流式数据应用,实现应用高效、异步地传递消息。
Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每秒钟可处理 1500 万事件。
Reactor 中的两个重要的概念是Flux 和 Mono。Mono 表示包含 0 或者 1 个元素的异步序列。Flux 表示包含 0 到 N 个元素的异步序列。他们包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。
一、响应式数据流
响应式数据流 作为一种新的数据流规范应用于 Java 9 及其后续版本,旨在提供同/异步数据序列流式控制机制。
1、响应式数据流接口
org.reactivestreams.Pubslisher:数据流发布者(信号从 0 到 N,N 可为无穷)。提供两个可选终端事件:错误和完成。
org.reactivestreams.Subscriber:数据流消费者(信号从 0 到 N,N 可为无穷)。消费者初始化过程中,会请求生产者当前需要订阅多少数据。其他情况,通过接口回调与数据生产方交互: 下一条(新消息)和状态。状态包括:完成/错误,可选。
org.reactivestreams.Subscription:初始化阶段将一个小追踪器传递给订阅者。它控制着我们准备好来消费多少数据,以及我们想要什么时候停止消费(取消)。
org.reactivestreams.Processor:同时作为发布者和订阅者的组件的标记。
2、响应式数据流发布协议
订阅者有两种方式向发布者请求数据:
无界的:订阅者只需要调用 Subscription#request(Long.MAX_VALUE) 即可。
有界的:订阅者保留数据引用,调用request(long) 方法消费。
二、Mono的创建
(1)empty
不包含任何元素,可以发布结束消息
@Test
public void empty(){
Mono.empty().subscribe(System.out::println);
}
(2)just
包含指定元素
@Test
public void just(){
Mono.just("hello mono").subscribe(System.out::println);
}
(3)justOrEmpty
有元素时相当于just,
没有元素时相当于empty,
元素是Optional时,则根据Optional里是否有值来创建just或empty。
@Test
public void justOrEmpty(){
Mono.justOrEmpty("hello mono").subscribe(System.out::println);
}
(4)never
不包含任何元素
@Test
public void never(){
Mono.never().subscribe(System.out::println);
}
(5)from
@Test
public void from() {
//从 Publisher 生成 Mono
Mono.from(Mono.just("hello mono")).subscribe(System.out::println);
//从 Publisher 生成 Mono,会对Flux类型进行包装
Mono.fromDirect(Mono.just("hello mono")).subscribe(System.out::println);
//从 Supplier 生成 Mono
Mono.fromSupplier(() -> "hello mono").subscribe(System.out::println);
//从 Runnable 生成 Mono
Mono.fromRunnable(() -> System.out.println("hello mono")).subscribe(System.out::println);
//从 Callable 生成 Mono
Mono.fromCallable(() ->"hello mono").subscribe(System.out::println);
//从 CompletableFuture 生成 Mono
Mono.fromFuture(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);
//从 CompletionStage 生成 Mono
Mono.fromCompletionStage(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);
//从 Supplier<? extends CompletionStage<? extends T> 生成 Mono
Mono.fromCompletionStage(() -> CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);
}
(6)defer
@Test
public void defer() {
//从 Supplier 获取 mono
Mono.defer(() -> Mono.just("hello mono")).subscribe(System.out::println);
//从 Function<ContextView, ? extends Mono<? extends T>> 获取 mono
Mono.deferContextual(view -> Mono.just("hello mono")).subscribe(System.out::println);
}
(7)delay
@Test
public void delay() throws InterruptedException {
//指定延时时间,发布值是0
Mono.delay(Duration.of(5, ChronoUnit.SECONDS)).subscribe(System.out::println);
Mono.delay(Duration.of(5, ChronoUnit.SECONDS), Schedulers.parallel()).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(15);
}
(8)error
@Test
public void error() throws InterruptedException {
//包含异常的 mono
Mono.error(new RuntimeException("出错了")).subscribe(System.out::println);
//Supplier 提供包含异常的 mono
Mono.error(() -> new RuntimeException("出错了")).subscribe(System.out::println);
}
(9)first
@Test
public void first() throws InterruptedException {
//处理第一个 mono,如果第一个取消了,处理第二个...
Mono.firstWithSignal(Mono.empty(), Mono.just("mono hello")).subscribe(System.out::println);
Mono.firstWithValue(Mono.just("hello mono"), Mono.just("mono hello")).subscribe(System.out::println);
Mono.firstWithSignal(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);
Mono.firstWithValue(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);
}
(10)sequenceEqual
@Test
public void sequenceEqual() throws InterruptedException {
//比较两个 mono 是都相同
Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono")).subscribe(System.out::println);
Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals).subscribe(System.out::println);
Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals, 16).subscribe(System.out::println);
}
(11)using
@Test
public void using() throws InterruptedException {
//callable返回mono,
//function对mono进行操作
//consumer执行清理操作
//eager 为true时,consumer在subscribe之前调用
Mono.using(() -> Mono.just("hello mono"), Function.identity(), t -> System.out.println(t)).subscribe(System.out::println);
}
(12)when
@Test
public void when() throws InterruptedException {
//执行预设的操作
Mono.when(Mono.just("hello mono").filter(a -> a.equals("hello mono"))).subscribe(System.out::println);
}
(13)zip
压缩操作
边栏推荐
- spacy. load(“en_core_web_sm“)###OSError: [E050] Can‘t find model ‘en_core_web_sm‘.
- Win11 hotspot connection successful but no network? Solution of win11 mobile hotspot and network conflict
- Hardware development notes (V): basic process of hardware development, making a USB to RS232 module (IV): creating con connection device package and associating principle element devices
- [highly recommended] markdown grammar
- 麒麟系统开发笔记(五):制作安装麒麟系统的启动U盘、物理机安装麒麟系统以及搭建Qt开发环境
- Unity network development (II)
- 洞見數據價值,啟迪數字未來,《數字化的力量》問世
- Voir la valeur des données, éclairer l'avenir numérique, le pouvoir numérique est sorti
- 项目变更管理
- 转载:网络加载框架 - Retrofit
猜你喜欢

211 thèse de maîtrise en divinité à l'Université! 75 lignes et 20 mauvaises lignes! Réponse de l'école: le tuteur a arrêté d'inscrire...

Redis master-slave replication (9)

RK3568开发笔记(三):RK3568虚拟机基础环境搭建之更新源、安装网络工具、串口调试、网络连接、文件传输、安装vscode和samba共享服务

211高校神级硕士论文刷屏!75行字错了20行!学校回应:导师停招...
![spacy.load(“en_core_web_sm“)###OSError: [E050] Can‘t find model ‘en_core_web_sm‘.](/img/f5/e6e480f69481bef826b155fab57669.png)
spacy.load(“en_core_web_sm“)###OSError: [E050] Can‘t find model ‘en_core_web_sm‘.

QT practical skill: close unnecessary warning prompt on the right in the qtcreator editing area
![Jmter test command [note]](/img/96/4290b92beb0755c6724a3b8d7dc635.png)
Jmter test command [note]

Layout roadmap, the perfect combination of spatial layout and data visualization

基于Arduino框架下VSCode PlatformIO一个项目配置两种不同开发板的兼容模式

Qt文档阅读笔记-staticMetaObject解析与实例
随机推荐
数据库主键一定要自增吗?有哪些场景不建议自增?
Produced by Ali! Graphical ant script - idea plug-in cloudtoolkit
You have a chance, here is a stage
211高校神级硕士论文刷屏!75行字错了20行!学校回应:导师停招...
组件传值:兄弟间传值(非父子组件进行传值)
Why applets and the industrial Internet can complement each other
Software project lawyer due diligence white paper - full text 19 pages, please contact the author
QT scrollarea qscrollarea
组件传值:父组件与子组件传值用props
[技术杂谈][转载]ffmpeg压缩视频几个参数解析
关于 国产麒麟Qt编译报错“xxx.pri has modification time xxxx s in the futrue“ 的解决方法
Basic contents of external sorting
项目变更管理
Win11 how to change the desktop file path to disk D
洞见数据价值,启迪数字未来,《数字化的力量》问世
Flexer series: indexedstack in Flexer
Project change management
关于 SecureFx传输远程服务器中文显示乱码 的解决方法
The solution to the error "xxx.pri has modification time XXXX s in the futrue" in the compilation of domestic Kirin QT
Unity network development (II)