当前位置:网站首页>Kotlin data flow - flow
Kotlin data flow - flow
2022-06-13 06:26:00 【m0_ forty-seven million nine hundred and fourteen thousand one 】
One .Flow The characteristics and use of
1. Use Flow Return multiple values
1. be known as flow Of Flow Type builder function .
2.flow{…} The code in the building block can be suspended .
3. function simpleFlow No longer marked suspend Modifier .
4. Use emit Function emission value .
5. Use collect Function to collect values .

// adopt flow Return multiple values flow Functions can be suspended
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)// Hang up
emit(i)// Emission produces an element
}
}
// Here we write an asynchronism with a coroutine
fun testMultipleValue3() = runBlocking {
simpleFlow().collect { value ->
print(value)
}
}Result demonstration :

2.Flow Cold flow
flow Is a kind of similar to sequence Cold flow ,flow The code in the builder does not run until the stream is sent to the phone .
fun simpleFlow02() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
// Cold flow Only a call collect When Flow To collect elements
fun testFlowIsCold() = runBlocking {
val simpleFlow02 = simpleFlow02()
println("Calling collect...")
simpleFlow02.collect { value ->
println(value)
}
println("Calling collect again...")
simpleFlow02.collect { value ->
println(value)
}
}Print the results :

3.Flow The continuity of
1. Each individual collection of streams is performed sequentially , Unless you use a special operator .
2. Each transition operator from upstream to downstream processes each emitted value , Then give it to the end operator .
//Flow The continuity of ( call chaining )
fun testFlowContinuation() = runBlocking {
//Flow Builder for
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it" // Transformation type
}.collect {
println("Collect $it")
}
}Result demonstration :

4.Flow Builder for
1.fIowOf The builder defines a stream that emits a fixed set of values .
2. Use .asFlow() spread function , Various sets and sequences can be converted into streams .
//Flow Builder for flowOf asFlow
fun testFlowBuilder() = runBlocking {
flowOf("one", "two")
//onEach Back to a stream , The flow invokes the given operation before each value of the upstream flow is issued downstream .
.onEach {
delay(1000)
}.collect {
println(it)
}
listOf(1,2,3).asFlow().collect { value -> println(value) }
}5.flow The context of
1. The collection of flows always takes place in the context of calling a coroutine , This property of the stream is called context saving .
2.flow{...} The code in the builder must follow the context to save properties , And emission from other contexts is not allowed (emit).
3.fIowOn The operator , This function is used to change the context of stream emission .
// change flow The context of
fun simpleFlow05() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..5) {
emit(i)
}
}.flowOn(Dispatchers.Default)
fun testFlowChangeContext() = runBlocking {
simpleFlow05().collect { value ->
println(
"Collected $value ${
Thread.currentThread().name
}"
)
}
println("")
}Print the results :
Flow started DefaultDispatcher-worker-1
Collected 1 main
Collected 2 main
Collected 3 main
Collected 4 main
Collected 5 main6.Flow The coordination process uses
Use launchIn Replace collect We can start the collection of flows in a separate process
//flow event
fun events() = (1..3).asFlow().onEach {
delay(100)
}
//flow The flow is used in conjunction with the process
fun testFlowLaunch() = runBlocking {
var job = events().onEach { event ->
println("Event: $event ${Thread.currentThread().name}")
}.launchIn(
CoroutineScope(Dispatchers.IO)
)
delay(1000)
job.cancelAndJoin()
}Print the results :
Event: 1 DefaultDispatcher-worker-1
Event: 2 DefaultDispatcher-worker-1
Event: 3 DefaultDispatcher-worker-17.Flow Cancel
The flow adopts the same cooperative cancellation as the collaboration process . As usual , The collection of streams can be when the stream is in a cancelable suspend function ( for example delay) Cancel when suspended in .
1. Time out cancellation
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
// Time out cancellation
fun testCancelFlow() = runBlocking {
withTimeoutOrNull(2500) {
simpleFlow06().collect { value ->
println(value)
}
println("Done")
}
}Result demonstration :

This stream is in 2.5 Seconds was canceled
2. Judge whether the condition is cancelled
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
fun testCancelFlowCheck() = runBlocking {
// This is the case because the elements all pass through emit Function can be used cancel Cancel
/* simpleFlow06().collect {
value ->
print(value)
if (value==3) cancel()
}
}*/
// Another situation
//cancellable Definitely want to cancel , Each element collected will determine whether the cancellation condition is met
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
}
}Effect demonstration :

8. Handle Flow Back pressure
1.buffer(), Code that emits elements in a concurrent run stream .
2.conflate(), Merge launch items , Do not process each value .
3.collectLatest(), Cancel and retransmit the last value .
4. When you have to change CoroutineDispatcher when , To flowOn The operator uses the same buffering mechanism , however buffer The function explicitly requests a buffer and Do not change the execution context .
fun simpleFlow07() = flow<Int> {
for (i in 1..5) {
delay(100)
emit(i)
println("Emitting $i")
}
}
fun testBackPressure()= runBlocking {
// Because the time of occurrence here is faster than that of collection , But only after collection can it be considered as the end , So the time required for launch and collection adds up
val measureTimeMillis = measureTimeMillis {
simpleFlow07()
// Switch threads due to this method and simpleFlow07 Functions are not in the same thread , Therefore, the launch can be carried out in parallel, and because of its own buffer , Here, the main thread will collect the emitted elements in order
//.flowOn(Dispatchers.Default)
//.buffer(50)// The cache can run the code in the stream concurrently
//.conflate()// Emit element merge , Note that this will lack elements
//.collectLatest {}// This will collect the last one
.collect { value ->
delay(300)
println(value)
}
}
println(measureTimeMillis)
}Two .Flow The operator
1. Conversion operators
suspend fun performRequest(request: Int):String{
delay(1000)
return "response $request"
}
// Modify the type operator
fun testTransformFlowOperator()= runBlocking {
// Every time you change the type
/* (1..3).asFlow()
.map { value ->
performRequest(value)
}.collect {
println(it)
}*/
(1..3).asFlow()
//transform() Each element is modified multiple times
.transform { value ->
emit("Making request $value")
emit(performRequest(value))
}.collect {
println(it)
}
}Print the results :
Making request 1
response 1
Making request 2
response 2
Making request 3
response 32. The length limit operator
fun numbers()= flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
}finally {
println("Finally in numbers")
}
}
fun testLimitLengthOperator()= runBlocking {
// The length limit operator take(): Limit output length
numbers().take(2).collect {value -> println(value) }
}Print the results :
1
2
Finally in numbers3. End operator
fun testTerminalOperator()= runBlocking {
//map Returns the result of a function operation
//reduce Accumulate values from the first element , And apply the operation... To the current accumulator value and to each element
val sum=(1..5).asFlow().map { it*it }.reduce{
// Here is the 1 To 5 Sum of squares
a,b->a+b
}
// When there is only one element single
val single = flowOf(1).single()
// Get first element
val first = flowOf(1..6).first()
// First set the elements of the operation , And finally with fold Parameter operation of
val sum1=(1..5).asFlow().fold(3){
a,b->a+b
}
println(sum)
println(sum1)
}Print the results :
55
1
1..6
184. Combination operator
//zip() Merge operators
fun testZip()= runBlocking {
val asFlow = (1..3).asFlow().onEach { delay(300) }
val flowOf = flowOf("one", "Two", "Three").onEach { delay(400) }
val currentTimeMillis = System.currentTimeMillis()
// The time required for merging is printed out here , Prove that the whole process is asynchronous (400m)
asFlow.zip(flowOf){a,b->"$a $b"}.collect { value -> println("$value at ${System.currentTimeMillis()-currentTimeMillis}") }
}
fun requestFlow(i:Int)= flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}Results the print :
1 one at 435
2 Two at 844
3 Three at 12555. Advection flow
A stream represents a sequence of values received asynchronously , So it's easy to encounter such a situation : Each value triggers a request for another sequence of values , However , Because of the asynchronous nature of flow , Therefore, different flattening modes are required , So , There are a series of flow flattening operators :
1.flatMapConcat Connection mode .
2.flatMapMerge Merge mode
3.flatMapLatest Latest flattening mode
// When one flow It's wrapped in another flow such as flow<flow<String>>, I need to use advection to turn into a flow
fun estFlatMapConcat()= runBlocking {
val startTime=System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
//flatMapLatest{}
//flatMapMerge The previous effect is similar to flatMapConcat The same order is different
// .flatMapMerge { }
.flatMapConcat{
// This call map Words requestFlow The return value is flow<flow<String>>, So call flatMapConcat I will untie one layer by myself Print the results
requestFlow(it)
}.collect {
value -> println("$value at ${System.currentTimeMillis()-startTime}")
}
}flatMapLatest Print the results :
1: First at 149
2: First at 266
3: First at 370
3: Second at 882flatMapMerge Print the results :
//1: First at 143
//2: First at 240
//3: First at 347
//1: Second at 655
//2: Second at 748
//3: Second at 860flatMapConcat Print the results :
1: First at 129
1: Second at 641
2: First at 749
2: Second at 1257
3: First at 1367
3: Second at 18743、 ... and .Flow abnormal
1.Flow Handling exceptions
1. An exception occurred while receiving
fun simpleFlow08()= flow<Int> {
for (i in 1..4){
println("Emitting $i")
emit(i)
}
}
fun testFlowException()= runBlocking {
try {
simpleFlow08().collect {value ->
println(value)
check(value <=1 ){
"Collected $value"
}
}
}catch (e:Throwable){
println("Caught $e")
}
}Print the results :
Emitting 1
1
Emitting 2
22. An exception occurred during the launch
fun testFlowException2()= runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
// There is an abnormality in the upstream , Under the guarantee of not breaking flow Where principles are involved, it is recommended to use catch function
}.catch { e:Throwable-> println("Caught $e")
emit(10) // You can write like this
}
.flowOn(Dispatchers.IO)
.collect {
println(it)
}
}Print the results :
Caught java.lang.ArithmeticException: Div 0
1
102.Flow Handling of exceptions
1. Normal completion
fun testCompleteInFinally()= runBlocking {
try {
simpleFlow().collect { println(it) }
}finally {
println("Done")
}
}
fun simpleFlow() =flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}2. Abnormal death
fun simpleFlow() = flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}
fun testCompleteInCompletion() = runBlocking {
/* simpleFlow().onCompletion { exception -> println("Flow completed $exception") }
.catch { e -> println(e) }
.collect {
println(it)
/* check(
it<=1){"Collected $it"}*/
}*/
// When he dies abnormally, he will get abnormal information It will also be executed after normal completion
simpleFlow().onCompletion { println("Done") }.collect { println(it) }
}Print information :

边栏推荐
- Echart柱状图:echart实现堆叠柱状图
- AI实现亲人“复活”|老照片修复|老照片上色,免费APP推荐
- Intelligent digital asset management helps enterprises win the post epidemic Era
- Kotlin foundation extension
- JVM基础
- 万能播放器 PotPlayer 的下载与安装,直播流 m3u8 导入
- Wechat applet (pull-down refresh data) novice to
- Multiple reception occurs in the uniapp message delivery
- MFS详解(六)——MFS Chunk Server服务器安装与配置
- Applet disable native top
猜你喜欢

MFS详解(七)——MFS客户端与web监控安装配置

c语言对文件相关的处理和应用
![[one · data 𞓜 simple implementation of the leading two-way circular linked list]](/img/a2/08f55012cd815190db76237f013961.png)
[one · data 𞓜 simple implementation of the leading two-way circular linked list]

JVM基础

1+1 > 2, share creators can help you achieve

Echart histogram: stacked histogram displays value

Echart折线图:多条折线图每次仅展示一条

Recommend a capacity expansion tool to completely solve the problem of insufficient disk space in Disk C and other disks

Learning records countless questions (JS)
Not in the following list of legal domain names, wechat applet solution
随机推荐
Free screen recording software captura download and installation
The processing and application of C language to documents
The title of the WebView page will be displayed in the top navigation bar of the app. How to customize
Failed to extract manifest from apk: processexception:%1 is not a valid Win32 Application.
Commit specification
自定义View —— 可伸展的CollapsExpendView
Using the shutter floor database framework
Learning records countless questions (JS)
Wechat applet custom tabbar (session customer service) vant
JS to realize bidirectional data binding
Wechat applet development (requesting background data and encapsulating request function)
Thread correlation point
App performance test: (III) traffic monitoring
Wechat applet (get location)
Multiple reception occurs in the uniapp message delivery
SSM integration
MFS详解(七)——MFS客户端与web监控安装配置
You should consider upgrading via
BlockingQueue源码
动态链接库嵌套样例