当前位置:网站首页>Kotlin data flow - flow
Kotlin data flow - flow
2022-06-13 06:26:00 【m0_ forty-seven million nine hundred and fourteen thousand one 】
One .Flow The characteristics and use of
1. Use Flow Return multiple values
1. be known as flow Of Flow Type builder function .
2.flow{…} The code in the building block can be suspended .
3. function simpleFlow No longer marked suspend Modifier .
4. Use emit Function emission value .
5. Use collect Function to collect values .
// adopt flow Return multiple values flow Functions can be suspended
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)// Hang up
emit(i)// Emission produces an element
}
}
// Here we write an asynchronism with a coroutine
fun testMultipleValue3() = runBlocking {
simpleFlow().collect { value ->
print(value)
}
}
Result demonstration :
2.Flow Cold flow
flow Is a kind of similar to sequence Cold flow ,flow The code in the builder does not run until the stream is sent to the phone .
fun simpleFlow02() = flow<Int> {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
// Cold flow Only a call collect When Flow To collect elements
fun testFlowIsCold() = runBlocking {
val simpleFlow02 = simpleFlow02()
println("Calling collect...")
simpleFlow02.collect { value ->
println(value)
}
println("Calling collect again...")
simpleFlow02.collect { value ->
println(value)
}
}
Print the results :
3.Flow The continuity of
1. Each individual collection of streams is performed sequentially , Unless you use a special operator .
2. Each transition operator from upstream to downstream processes each emitted value , Then give it to the end operator .
//Flow The continuity of ( call chaining )
fun testFlowContinuation() = runBlocking {
//Flow Builder for
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it" // Transformation type
}.collect {
println("Collect $it")
}
}
Result demonstration :
4.Flow Builder for
1.fIowOf The builder defines a stream that emits a fixed set of values .
2. Use .asFlow() spread function , Various sets and sequences can be converted into streams .
//Flow Builder for flowOf asFlow
fun testFlowBuilder() = runBlocking {
flowOf("one", "two")
//onEach Back to a stream , The flow invokes the given operation before each value of the upstream flow is issued downstream .
.onEach {
delay(1000)
}.collect {
println(it)
}
listOf(1,2,3).asFlow().collect { value -> println(value) }
}
5.flow The context of
1. The collection of flows always takes place in the context of calling a coroutine , This property of the stream is called context saving .
2.flow{...} The code in the builder must follow the context to save properties , And emission from other contexts is not allowed (emit).
3.fIowOn The operator , This function is used to change the context of stream emission .
// change flow The context of
fun simpleFlow05() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..5) {
emit(i)
}
}.flowOn(Dispatchers.Default)
fun testFlowChangeContext() = runBlocking {
simpleFlow05().collect { value ->
println(
"Collected $value ${
Thread.currentThread().name
}"
)
}
println("")
}
Print the results :
Flow started DefaultDispatcher-worker-1
Collected 1 main
Collected 2 main
Collected 3 main
Collected 4 main
Collected 5 main
6.Flow The coordination process uses
Use launchIn Replace collect We can start the collection of flows in a separate process
//flow event
fun events() = (1..3).asFlow().onEach {
delay(100)
}
//flow The flow is used in conjunction with the process
fun testFlowLaunch() = runBlocking {
var job = events().onEach { event ->
println("Event: $event ${Thread.currentThread().name}")
}.launchIn(
CoroutineScope(Dispatchers.IO)
)
delay(1000)
job.cancelAndJoin()
}
Print the results :
Event: 1 DefaultDispatcher-worker-1
Event: 2 DefaultDispatcher-worker-1
Event: 3 DefaultDispatcher-worker-1
7.Flow Cancel
The flow adopts the same cooperative cancellation as the collaboration process . As usual , The collection of streams can be when the stream is in a cancelable suspend function ( for example delay) Cancel when suspended in .
1. Time out cancellation
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
// Time out cancellation
fun testCancelFlow() = runBlocking {
withTimeoutOrNull(2500) {
simpleFlow06().collect { value ->
println(value)
}
println("Done")
}
}
Result demonstration :
This stream is in 2.5 Seconds was canceled
2. Judge whether the condition is cancelled
fun simpleFlow06() = flow<Int> {
for (i in 1..5) {
emit(i)
delay(1000)
println("Emitting $i")
}
}
fun testCancelFlowCheck() = runBlocking {
// This is the case because the elements all pass through emit Function can be used cancel Cancel
/* simpleFlow06().collect {
value ->
print(value)
if (value==3) cancel()
}
}*/
// Another situation
//cancellable Definitely want to cancel , Each element collected will determine whether the cancellation condition is met
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
}
}
Effect demonstration :
8. Handle Flow Back pressure
1.buffer(), Code that emits elements in a concurrent run stream .
2.conflate(), Merge launch items , Do not process each value .
3.collectLatest(), Cancel and retransmit the last value .
4. When you have to change CoroutineDispatcher when , To flowOn The operator uses the same buffering mechanism , however buffer The function explicitly requests a buffer and Do not change the execution context .
fun simpleFlow07() = flow<Int> {
for (i in 1..5) {
delay(100)
emit(i)
println("Emitting $i")
}
}
fun testBackPressure()= runBlocking {
// Because the time of occurrence here is faster than that of collection , But only after collection can it be considered as the end , So the time required for launch and collection adds up
val measureTimeMillis = measureTimeMillis {
simpleFlow07()
// Switch threads due to this method and simpleFlow07 Functions are not in the same thread , Therefore, the launch can be carried out in parallel, and because of its own buffer , Here, the main thread will collect the emitted elements in order
//.flowOn(Dispatchers.Default)
//.buffer(50)// The cache can run the code in the stream concurrently
//.conflate()// Emit element merge , Note that this will lack elements
//.collectLatest {}// This will collect the last one
.collect { value ->
delay(300)
println(value)
}
}
println(measureTimeMillis)
}
Two .Flow The operator
1. Conversion operators
suspend fun performRequest(request: Int):String{
delay(1000)
return "response $request"
}
// Modify the type operator
fun testTransformFlowOperator()= runBlocking {
// Every time you change the type
/* (1..3).asFlow()
.map { value ->
performRequest(value)
}.collect {
println(it)
}*/
(1..3).asFlow()
//transform() Each element is modified multiple times
.transform { value ->
emit("Making request $value")
emit(performRequest(value))
}.collect {
println(it)
}
}
Print the results :
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
2. The length limit operator
fun numbers()= flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
}finally {
println("Finally in numbers")
}
}
fun testLimitLengthOperator()= runBlocking {
// The length limit operator take(): Limit output length
numbers().take(2).collect {value -> println(value) }
}
Print the results :
1
2
Finally in numbers
3. End operator
fun testTerminalOperator()= runBlocking {
//map Returns the result of a function operation
//reduce Accumulate values from the first element , And apply the operation... To the current accumulator value and to each element
val sum=(1..5).asFlow().map { it*it }.reduce{
// Here is the 1 To 5 Sum of squares
a,b->a+b
}
// When there is only one element single
val single = flowOf(1).single()
// Get first element
val first = flowOf(1..6).first()
// First set the elements of the operation , And finally with fold Parameter operation of
val sum1=(1..5).asFlow().fold(3){
a,b->a+b
}
println(sum)
println(sum1)
}
Print the results :
55
1
1..6
18
4. Combination operator
//zip() Merge operators
fun testZip()= runBlocking {
val asFlow = (1..3).asFlow().onEach { delay(300) }
val flowOf = flowOf("one", "Two", "Three").onEach { delay(400) }
val currentTimeMillis = System.currentTimeMillis()
// The time required for merging is printed out here , Prove that the whole process is asynchronous (400m)
asFlow.zip(flowOf){a,b->"$a $b"}.collect { value -> println("$value at ${System.currentTimeMillis()-currentTimeMillis}") }
}
fun requestFlow(i:Int)= flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")
}
Results the print :
1 one at 435
2 Two at 844
3 Three at 1255
5. Advection flow
A stream represents a sequence of values received asynchronously , So it's easy to encounter such a situation : Each value triggers a request for another sequence of values , However , Because of the asynchronous nature of flow , Therefore, different flattening modes are required , So , There are a series of flow flattening operators :
1.flatMapConcat Connection mode .
2.flatMapMerge Merge mode
3.flatMapLatest Latest flattening mode
// When one flow It's wrapped in another flow such as flow<flow<String>>, I need to use advection to turn into a flow
fun estFlatMapConcat()= runBlocking {
val startTime=System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
//flatMapLatest{}
//flatMapMerge The previous effect is similar to flatMapConcat The same order is different
// .flatMapMerge { }
.flatMapConcat{
// This call map Words requestFlow The return value is flow<flow<String>>, So call flatMapConcat I will untie one layer by myself Print the results
requestFlow(it)
}.collect {
value -> println("$value at ${System.currentTimeMillis()-startTime}")
}
}
flatMapLatest Print the results :
1: First at 149
2: First at 266
3: First at 370
3: Second at 882
flatMapMerge Print the results :
//1: First at 143
//2: First at 240
//3: First at 347
//1: Second at 655
//2: Second at 748
//3: Second at 860
flatMapConcat Print the results :
1: First at 129
1: Second at 641
2: First at 749
2: Second at 1257
3: First at 1367
3: Second at 1874
3、 ... and .Flow abnormal
1.Flow Handling exceptions
1. An exception occurred while receiving
fun simpleFlow08()= flow<Int> {
for (i in 1..4){
println("Emitting $i")
emit(i)
}
}
fun testFlowException()= runBlocking {
try {
simpleFlow08().collect {value ->
println(value)
check(value <=1 ){
"Collected $value"
}
}
}catch (e:Throwable){
println("Caught $e")
}
}
Print the results :
Emitting 1
1
Emitting 2
2
2. An exception occurred during the launch
fun testFlowException2()= runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
// There is an abnormality in the upstream , Under the guarantee of not breaking flow Where principles are involved, it is recommended to use catch function
}.catch { e:Throwable-> println("Caught $e")
emit(10) // You can write like this
}
.flowOn(Dispatchers.IO)
.collect {
println(it)
}
}
Print the results :
Caught java.lang.ArithmeticException: Div 0
1
10
2.Flow Handling of exceptions
1. Normal completion
fun testCompleteInFinally()= runBlocking {
try {
simpleFlow().collect { println(it) }
}finally {
println("Done")
}
}
fun simpleFlow() =flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}
2. Abnormal death
fun simpleFlow() = flow<Int> {
emit(3)
delay(100)
throw ActivityNotFoundException()
}
fun testCompleteInCompletion() = runBlocking {
/* simpleFlow().onCompletion { exception -> println("Flow completed $exception") }
.catch { e -> println(e) }
.collect {
println(it)
/* check(
it<=1){"Collected $it"}*/
}*/
// When he dies abnormally, he will get abnormal information It will also be executed after normal completion
simpleFlow().onCompletion { println("Done") }.collect { println(it) }
}
Print information :
边栏推荐
- BlockingQueue source code
- 不在以下合法域名列表中,微信小程序解决办法
- View绘制整体流程简析
- Commit specification
- Use of kotlin basic common sets list, set and map
- Uniapp (upload local pictures, preview pictures, convert Base64 format, upload audio files)
- MFS详解(五)——MFS元数据日志服务器安装与配置
- Differences among concurrent, parallel, serial, synchronous and asynchronous
- App performance test: (IV) power
- Detailed explanation of PHP distributed transaction principle
猜你喜欢
Super model logo online design and production tool
端午安康,使用祝福话语生成词云吧
Local file search tool everything
不在以下合法域名列表中,微信小程序解决办法
‘ipconfig‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件。
The web server failed to start Port 7001 was already in use
Binary search
The processing and application of C language to documents
无刷直流电机矢量控制(四):基于滑模观测器的无传感器控制
Wechat applet: click the event to obtain the current device information (basic)
随机推荐
Applet pull-up loading data
Uniapp mobile terminal uses canvas to draw background convex arc
超有范的 logo 在线设计制作工具
Recent problems
Omron Ping replaces the large domestic product jy-v640 semiconductor wafer box reader
端午安康,使用祝福话语生成词云吧
MFS details (vii) - - MFS client and Web Monitoring installation configuration
The web server failed to start Port 7001 was already in use
【js】var、let、const
Intelligent digital asset management helps enterprises win the post epidemic Era
BlockingQueue source code
Detailed explanation of Yanghui triangle
347. top k high frequency elements heap sort + bucket sort +map
High burst solution 2
Wechat applet (get location)
Kotlin foundation extension
Echart柱状图:x轴显示value,y轴显示类别
MFS explanation (VI) -- MFS chunk server installation and configuration
Use of kotlin basic common sets list, set and map
The processing and application of C language to documents