当前位置:网站首页>Kotlin data flow - flow
Kotlin data flow - flow
2022-06-13 06:26:00 【m0_ forty-seven million nine hundred and fourteen thousand one 】
One .Flow The characteristics and use of
1. Use Flow Return multiple values
1. be known as flow Of Flow Type builder function .
2.flow{…} The code in the building block can be suspended .
3. function simpleFlow No longer marked suspend Modifier .
4. Use emit Function emission value .
5. Use collect Function to collect values .

// adopt flow Return multiple values flow Functions can be suspended
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)// Hang up
emit(i)// Emission produces an element
}
}
// Here we write an asynchronism with a coroutine
fun testMultipleValue3() = runBlocking {
simpleFlow().collect { value ->
print(value)
}
}Result demonstration :

2.Flow Cold flow
flow Is a kind of similar to sequence Cold flow ,flow The code in the builder does not run until the stream is sent to the phone .
fun simpleFlow02() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
// Cold flow Only a call collect When Flow To collect elements
fun testFlowIsCold() = runBlocking {
val simpleFlow02 = simpleFlow02()
println("Calling collect...")
simpleFlow02.collect { value ->
println(value)
}
println("Calling collect again...")
simpleFlow02.collect { value ->
println(value)
}
}Print the results :

3.Flow The continuity of
1. Each individual collection of streams is performed sequentially , Unless you use a special operator .
2. Each transition operator from upstream to downstream processes each emitted value , Then give it to the end operator .
//Flow The continuity of ( call chaining )
fun testFlowContinuation() = runBlocking {
//Flow Builder for
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it" // Transformation type
}.collect {
println("Collect $it")
}
}Result demonstration :

4.Flow Builder for
1.fIowOf The builder defines a stream that emits a fixed set of values .
2. Use .asFlow() spread function , Various sets and sequences can be converted into streams .
//Flow Builder for flowOf asFlow
fun testFlowBuilder() = runBlocking {
flowOf("one", "two")
//onEach Back to a stream , The flow invokes the given operation before each value of the upstream flow is issued downstream .
.onEach {
delay(1000)
}.collect {
println(it)
}
listOf(1,2,3).asFlow().collect { value -> println(value) }
}5.flow The context of
1. The collection of flows always takes place in the context of calling a coroutine , This property of the stream is called context saving .
2.flow{...} The code in the builder must follow the context to save properties , And emission from other contexts is not allowed (emit).
3.fIowOn The operator , This function is used to change the context of stream emission .
// change flow The context of
fun simpleFlow05() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..5) {
emit(i)
}
}.flowOn(Dispatchers.Default)
fun testFlowChangeContext() = runBlocking {
simpleFlow05().collect { value ->
println(
"Collected $value ${
Thread.currentThread().name
}"
)
}
println("")
}Print the results :
Flow started DefaultDispatcher-worker-1
Collected 1 main
Collected 2 main
Collected 3 main
Collected 4 main
Collected 5 main6.Flow The coordination process uses
Use launchIn Replace collect We can start the collection of flows in a separate process
//flow event
fun events() = (1..3).asFlow().onEach {
delay(100)
}
//flow The flow is used in conjunction with the process
fun testFlowLaunch() = runBlocking {
var job = events().onEach { event ->
println("Event: $event ${Thread.currentThread().name}")
}.launchIn(
CoroutineScope(Dispatchers.IO)
)
delay(1000)
job.cancelAndJoin()
}Print the results :
Event: 1 DefaultDispatcher-worker-1
Event: 2 DefaultDispatcher-worker-1
Event: 3 DefaultDispatcher-worker-17.Flow Cancel
The flow adopts the same cooperative cancellation as the collaboration process . As usual , The collection of streams can be when the stream is in a cancelable suspend function ( for example delay) Cancel when suspended in .
1. Time out cancellation
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
// Time out cancellation
fun testCancelFlow() = runBlocking {
withTimeoutOrNull(2500) {
simpleFlow06().collect { value ->
println(value)
}
println("Done")
}
}Result demonstration :

This stream is in 2.5 Seconds was canceled
2. Judge whether the condition is cancelled
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
fun testCancelFlowCheck() = runBlocking {
// This is the case because the elements all pass through emit Function can be used cancel Cancel
/* simpleFlow06().collect {
value ->
print(value)
if (value==3) cancel()
}
}*/
// Another situation
//cancellable Definitely want to cancel , Each element collected will determine whether the cancellation condition is met
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
}
}Effect demonstration :

8. Handle Flow Back pressure
1.buffer(), Code that emits elements in a concurrent run stream .
2.conflate(), Merge launch items , Do not process each value .
3.collectLatest(), Cancel and retransmit the last value .
4. When you have to change CoroutineDispatcher when , To flowOn The operator uses the same buffering mechanism , however buffer The function explicitly requests a buffer and Do not change the execution context .
fun simpleFlow07() = flow<Int> {
for (i in 1..5) {
delay(100)
emit(i)
println("Emitting $i")
}
}
fun testBackPressure()= runBlocking {
// Because the time of occurrence here is faster than that of collection , But only after collection can it be considered as the end , So the time required for launch and collection adds up
val measureTimeMillis = measureTimeMillis {
simpleFlow07()
// Switch threads due to this method and simpleFlow07 Functions are not in the same thread , Therefore, the launch can be carried out in parallel, and because of its own buffer , Here, the main thread will collect the emitted elements in order
//.flowOn(Dispatchers.Default)
//.buffer(50)// The cache can run the code in the stream concurrently
//.conflate()// Emit element merge , Note that this will lack elements
//.collectLatest {}// This will collect the last one
.collect { value ->
delay(300)
println(value)
}
}
println(measureTimeMillis)
}Two .Flow The operator
1. Conversion operators
suspend fun performRequest(request: Int):String{
delay(1000)
return "response $request"
}
// Modify the type operator
fun testTransformFlowOperator()= runBlocking {
// Every time you change the type
/* (1..3).asFlow()
.map { value ->
performRequest(value)
}.collect {
println(it)
}*/
(1..3).asFlow()
//transform() Each element is modified multiple times
.transform { value ->
emit("Making request $value")
emit(performRequest(value))
}.collect {
println(it)
}
}Print the results :
Making request 1
response 1
Making request 2
response 2
Making request 3
response 32. The length limit operator
fun numbers()= flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
}finally {
println("Finally in numbers")
}
}
fun testLimitLengthOperator()= runBlocking {
// The length limit operator take(): Limit output length
numbers().take(2).collect {value -> println(value) }
}Print the results :
1
2
Finally in numbers3. End operator
fun testTerminalOperator()= runBlocking {
//map Returns the result of a function operation
//reduce Accumulate values from the first element , And apply the operation... To the current accumulator value and to each element
val sum=(1..5).asFlow().map { it*it }.reduce{
// Here is the 1 To 5 Sum of squares
a,b->a+b
}
// When there is only one element single
val single = flowOf(1).single()
// Get first element
val first = flowOf(1..6).first()
// First set the elements of the operation , And finally with fold Parameter operation of
val sum1=(1..5).asFlow().fold(3){
a,b->a+b
}
println(sum)
println(sum1)
}Print the results :
55
1
1..6
184. Combination operator
//zip() Merge operators
fun testZip()= runBlocking {
val asFlow = (1..3).asFlow().onEach { delay(300) }
val flowOf = flowOf("one", "Two", "Three").onEach { delay(400) }
val currentTimeMillis = System.currentTimeMillis()
// The time required for merging is printed out here , Prove that the whole process is asynchronous (400m)
asFlow.zip(flowOf){a,b->"$a $b"}.collect { value -> println("$value at ${System.currentTimeMillis()-currentTimeMillis}") }
}
fun requestFlow(i:Int)= flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}Results the print :
1 one at 435
2 Two at 844
3 Three at 12555. Advection flow
A stream represents a sequence of values received asynchronously , So it's easy to encounter such a situation : Each value triggers a request for another sequence of values , However , Because of the asynchronous nature of flow , Therefore, different flattening modes are required , So , There are a series of flow flattening operators :
1.flatMapConcat Connection mode .
2.flatMapMerge Merge mode
3.flatMapLatest Latest flattening mode
// When one flow It's wrapped in another flow such as flow<flow<String>>, I need to use advection to turn into a flow
fun estFlatMapConcat()= runBlocking {
val startTime=System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
//flatMapLatest{}
//flatMapMerge The previous effect is similar to flatMapConcat The same order is different
// .flatMapMerge { }
.flatMapConcat{
// This call map Words requestFlow The return value is flow<flow<String>>, So call flatMapConcat I will untie one layer by myself Print the results
requestFlow(it)
}.collect {
value -> println("$value at ${System.currentTimeMillis()-startTime}")
}
}flatMapLatest Print the results :
1: First at 149
2: First at 266
3: First at 370
3: Second at 882flatMapMerge Print the results :
//1: First at 143
//2: First at 240
//3: First at 347
//1: Second at 655
//2: Second at 748
//3: Second at 860flatMapConcat Print the results :
1: First at 129
1: Second at 641
2: First at 749
2: Second at 1257
3: First at 1367
3: Second at 18743、 ... and .Flow abnormal
1.Flow Handling exceptions
1. An exception occurred while receiving
fun simpleFlow08()= flow<Int> {
for (i in 1..4){
println("Emitting $i")
emit(i)
}
}
fun testFlowException()= runBlocking {
try {
simpleFlow08().collect {value ->
println(value)
check(value <=1 ){
"Collected $value"
}
}
}catch (e:Throwable){
println("Caught $e")
}
}Print the results :
Emitting 1
1
Emitting 2
22. An exception occurred during the launch
fun testFlowException2()= runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
// There is an abnormality in the upstream , Under the guarantee of not breaking flow Where principles are involved, it is recommended to use catch function
}.catch { e:Throwable-> println("Caught $e")
emit(10) // You can write like this
}
.flowOn(Dispatchers.IO)
.collect {
println(it)
}
}Print the results :
Caught java.lang.ArithmeticException: Div 0
1
102.Flow Handling of exceptions
1. Normal completion
fun testCompleteInFinally()= runBlocking {
try {
simpleFlow().collect { println(it) }
}finally {
println("Done")
}
}
fun simpleFlow() =flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}2. Abnormal death
fun simpleFlow() = flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}
fun testCompleteInCompletion() = runBlocking {
/* simpleFlow().onCompletion { exception -> println("Flow completed $exception") }
.catch { e -> println(e) }
.collect {
println(it)
/* check(
it<=1){"Collected $it"}*/
}*/
// When he dies abnormally, he will get abnormal information It will also be executed after normal completion
simpleFlow().onCompletion { println("Done") }.collect { println(it) }
}Print information :

边栏推荐
- Super model logo online design and production tool
- Common websites and tools
- Applet pull-up loading data
- Huawei developer certification and deveco studio compiler Download
- App performance test: (III) traffic monitoring
- How to view APK version number from apk
- MFS详解(五)——MFS元数据日志服务器安装与配置
- Uniapp dynamically shows / hides the navigation bar return button
- Waterfall flow layout of uni app Homepage
- 杨辉三角形详解
猜你喜欢

JVM Foundation

楊輝三角形詳解

Recommend a capacity expansion tool to completely solve the problem of insufficient disk space in Disk C and other disks

Analysis of 43 cases of MATLAB neural network: Chapter 10 classification of discrete Hopfield Neural Network -- evaluation of scientific research ability of colleges and Universities

Huawei developer certification and deveco studio compiler Download

MFS详解(七)——MFS客户端与web监控安装配置

【新手上路常见问答】一步一步理解程序设计

Echart柱状图:堆叠柱状图显示value
不在以下合法域名列表中,微信小程序解决办法

Solutions to common problems in small program development
随机推荐
楊輝三角形詳解
Wechat applet: basic review
Cross process two-way communication using messenger
MFS详解(七)——MFS客户端与web监控安装配置
Wechat applet jumps to H5 page with parameters
The processing and application of C language to documents
[solution] camunda deployment process should point to a running platform rest API
[written examination questions of meituan]
[DP 01 backpack]
超有范的 logo 在线设计制作工具
Not in the following list of legal domain names, wechat applet solution
【新手上路常见问答】关于技术管理
Basic knowledge of knowledge map
Thread pool learning
《MATLAB 神经网络43个案例分析》:第11章 连续Hopfield神经网络的优化——旅行商问题优化计算
BlockingQueue source code
[JS] handwriting call(), apply(), bind()
Failed to extract manifest from apk: processexception:%1 is not a valid Win32 Application.
App performance test: (III) traffic monitoring
Applet pull-up loading data