当前位置:网站首页>Kotlin asynchronous flow
Kotlin asynchronous flow
2022-06-24 13:39:00 【day_ moon】
//1. Indicates multiple values Stream is used to return multiple asynchronously computed values
fun foo(): List<Int> = listOf(1, 2, 3)
fun forEachList() {
foo().forEach { value -> println(value) }
}
//2. Sequence
fun foos(): Sequence<Int> = sequence {// Use a sequence (Sequence) To represent numbers
for (i in 1..3) {
Thread.sleep(1000)// wait for 100 millisecond
yield(i)// The next value is
}
}
fun forEachSequences() {
foos().forEach { value -> println(value) }
}
//3 Suspend function
suspend fun foo_Suspending(): List<Int> {//suspend Decoration does not block the main thread ,List<Int> We can only return all values at once
delay(1000)
return listOf(1, 2, 3)
}
fun main_Suspending() = runBlocking {
foo_Suspending().forEach { value -> println(value) }
}
//4.Flows
fun foo_flows(): Flow<Int> = flow {// The constructor function is named flow No longer mark suspend Modifier
for (i in 1..3) { //flow{...} Code blocks in can be suspended
delay(2000)
emit(i)// Value through emit Function is emitted from the stream
}
}
fun main_flows() = runBlocking<Unit> {
launch {// Used to check whether the main thread is blocked
for (k in 1..3) {
println("k $k")
delay(1000)// wait for 1000 millisecond Does not block the main thread
}
}
foo_flows().collect { value -> println("$value") }// collect Function from flow The value of
}
//5. The flow is cold
fun foo_cold(): Flow<Int> = flow {
for (i in 1..3) {//flow Each collection starts
println("Flow Turn on ")
delay(1000)
emit(i)
}
}
fun main_cold() = runBlocking {
val flows = foo_cold()
println("...$flows")
flows.collect { value -> println("$value") }// First open , Then print the value
println("... collect ")
}
//6. Cancel flow
fun foo_cancel(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun main_cancel() = runBlocking {
withTimeoutOrNull(1200) {// Running one will cancel
foo_cancel().collect { value -> println("$value") }
}
println("end")
}
//7. Stream builder asFlow
fun main_asFlow() = runBlocking {
(1..3).asFlow().collect { value -> println("$value") }
}
//8. Intermediate flow operator
suspend fun per_request(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_map() = runBlocking {
(1..3).asFlow()// Build flow
.map { request -> per_request(request) }// Intermediate operators
.collect { value -> println("$value") }
}
//9. Conversion operators
suspend fun per_transform(requst: Int): String {
delay(1000)
return "$requst"
}
fun main_transform() = runBlocking {
(1..3).asFlow()// Build flow
.transform { request ->
emit("request $request")// An asynchronous request is preceded by a string and followed by a response
emit(per_transform(request))
}// Intermediate operators
.collect { value -> println("$value") }
}
//10 Length limiting operator
fun number(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("end")// Run here to close
emit(3)
} finally {
println("finally")
}
}
fun main_take() = runBlocking {
number().take(2).collect { value -> println("$value") }//take Limit length
}
//11. Stream operators
fun main_reduce()= runBlocking {
val sun= (1..3).asFlow()// Tectonic flow
.map { it*it }
.reduce { a, b ->a + b }// Turn the result into addition Then return
println("$sun")
}
//12 The flow is continuous Like an assembly line
fun main_flowd() = runBlocking {
(1..10).asFlow().filter { println(" Request value $it")
it % 2 != 0 }
.map {
println(" The return value is $it")
}.collect { value -> println("$value") }
}
//13 Stream context reservation That is, after There will be value It's preserved
fun main_save()= runBlocking {
(1..5).asFlow().filter {it%2==0}.map {
println(" The return value is $it")
}.collect { value -> println(" Finally back to $value") }
}
//14 Misused withContext
fun flow_withContext():Flow<Int> = flow {
withContext(Dispatchers.Default){// This line reports a mistake Flow invariant is violated
for (i in 1..3){
emit(i)
println("$i")
}
}
}
fun main_withContext()= runBlocking {
flow_withContext().collect { value -> println("$value") }
}
//15 flowOn Operator The context of the flow cannot be changed
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun flow_flowOn():Flow<Int> = flow {
for (i in 1..3){
log("flow_flowOn $i")
emit(i)
}
}.flowOn(Dispatchers.IO)// Execute in child thread
fun main_flowOn()= runBlocking {
log(" Lord ")// Main thread execution
flow_flowOn().collect { value -> log("main_flowOn $value") }// Main thread execution
}
//16 buffer Reduce collection time
fun flow_buffer():Flow<Int> = flow {
for (i in 1..3){
delay(2000)// altogether 6000
emit(i)
}
}
fun main_buffer() = runBlocking {
val time= measureTimeMillis {
flow_buffer().buffer(1).collect { value ->
delay(300)// This step collects It only took 300 many
println("$value") }
}
println(" The time required $time")//6326
}
//17 Merge conflate
fun flows_conflate():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_conflate() = runBlocking {
flows_conflate().conflate().// The second number is merged ( discarded )
collect { value -> println("$value") }
}
//18 Process latest collectLatest
fun flow_collectLatest():Flow<Int> = flow {
for (i in 1..3){
emit(i)
}
}
fun main_collectLatest()= runBlocking {
flow_collectLatest().collectLatest { value ->
delay(300)// Delay 3 second
println("$value") }// Print only the last
}
//19 Combined flow zip combine The role of combined flow
val one=(1..2).asFlow()
val two= flowOf("4","5","6")
fun main_zip() = runBlocking {
// one.zip(two){a,b->"$a->$b"}.collect { value -> println("$value") }// Don't print 6
one.combine(two){a,b->"$a->$b"}.collect { value -> println("$value") }//2 repeat Then follow 6 Printed together
}
//20 Advection flow
// flatMapConcat Wait for the internal flow to complete , Then start collecting the next stream
//flatMapMerge Collect all incoming streams at the same time and merge their values into a single stream , In order to send the value as soon as possible
//flatMapLatest Process the latest value
fun flow_Flattening(i: Int):Flow<String> = flow {
emit("old $i")
delay(1000)
emit("new $i")
}
fun main_Flattening() = runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }.flatMapLatest { flow_Flattening(it) }.collect { value ->
println("$value Collection time ${System.currentTimeMillis() - startTime} ms ")
}
}
//21 Catch exception flow
//sampleStart
fun flow_catch(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { println("$value") }// Make an exception
"string $value"
}
fun main_catch() = runBlocking<Unit> {
//1. use try catch
// try {
// flow_catch().collect { value -> println(value) }
// } catch (e: Throwable) {
// println("Caught $e")// Catch any exception
// }
//2.catch
// flow_catch().catch { e-> emit("Caught $e") }.collect { value -> println("$value") }// use catch replace
//3. Put the exception in collect That is, downstream It doesn't work
flow_catch().catch { e-> emit("Caught $e") }.collect { value ->
// check(value <= "1") { "Collected $value" }//
println(value)
}
//4. Catch exceptions declaratively
flow_catch().onEach { value ->
check(value <= "1") { " Check $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}//22. Flow complete except finally Outside , also onCompletion
fun flows_oncompletion():Flow<Int> = flow {
emit(1)// Send numbers 1 Exception thrown after
throw RuntimeException()
}
fun main_onCompletion() = runBlocking {
flows_oncompletion().onCompletion {//onCompletion There is one Throwable Parameters can be used to determine whether the stream collection is completed normally or abnormally
cause ->if (cause!=null) println(" There are exceptions ") }
.catch { //onCompletion Operator does not handle exceptions The anomaly will still flow downstream
cause -> println(" Exception is $cause") }.collect { value -> println("$value") }
}
//23 Start the flow
fun main_even() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.collect( )// Wait for the collection stream to complete
println("end ..")// When the collection is completed To run this
}
//24 Start the flow
fun main_launchIn() = runBlocking {
(1..3).asFlow().onEach { delay(1000) }.onEach { event-> println("$event") // Register a piece of code as a response to an incoming event
}.launchIn( this)// Wait for the collection stream to complete
println("end ..")// Run this first
}边栏推荐
- Process basic properties
- Resolve symbol conflicts for dynamic libraries
- Integrate API interface parameter Dictionary of accounts of multiple local distribution companies - Express 100
- Hardware development notes (6): basic process of hardware development, making a USB to RS232 module (5): creating USB package library and associating principle graphic devices
- 这几个默认路由、静态路由的配置部署都不会,还算什么网工!
- 源码解析 Handler 面试宝典
- kotlin 协程上下文和调度器
- Comparator sort functional interface
- Go deep into high-performance JSON parsing libraries in go
- 《中国数据库安全能力市场洞察,2022》报告研究正式启动
猜你喜欢

敏捷之道 | 敏捷开发真的过时了么?

Beauty of script │ VBS introduction interactive practice

【5G NR】5G NR系统架构

数据科学家面临的七大挑战及解决方法

Vulnerability management mistakes that CIOs still make

Comparator sort functional interface

DTU上报的数据值无法通过腾讯云规则引擎填入腾讯云数据库中

One article explains R & D efficiency! Your concerns are

青藤入选工信部网安中心“2021年数字技术融合创新应用典型解决方案”

每日一题day8-515. 在每个树行中找最大值
随机推荐
国内首款开源MySQL HTAP数据库即将发布,三大看点提前告知
金鱼哥RHCA回忆录:DO447管理项目和开展作业--为ansible剧本创建一个项目
Integrated API interface code of domestic express companies for intra city distribution and ordering - Express 100
Comparator sort functional interface
Kotlin anonymous function and lambda
Why is open source technology so popular in the development of audio and video streaming media platform?
系统测试主要步骤
源码解析 Handler 面试宝典
Evolution of the message module of the play live series (3)
How long will it take to open a mobile account? Is online account opening safe?
One hour is worth seven days! Ingenuity in the work of programmers
#云原生征文#Ingress案例实战
数据科学家面临的七大挑战及解决方法
手机开户后多久才能通过?在线开户安全么?
[one picture series] one picture to understand Tencent Qianfan ipaas
Hands on data analysis unit 3 model building and evaluation
The agile way? Is agile development really out of date?
Interviewer: the MySQL database is slow to query. What are the possible reasons besides the index problem?
Use abp Zero builds a third-party login module (I): Principles
Implement Domain Driven Design - use ABP framework - update operational entities