当前位置:网站首页>Analysis on the principle of flow
Analysis on the principle of flow
2022-07-29 06:01:00 【Code and thinking】
Flow It is an asynchronous data flow based on coprocessing , So learning flow Before the principle, you need to master the knowledge related to synergy .
Don't talk much , Direct swing , No , Just start .
First, the simplest creation flow We all know flow{ emit(vale) }, Use here lambda Direct will block Pass in flow Inside the , We all know flow It's cold flow , Only when we collect collect flow When , Inside block Will execute , So no one objects , But how did this happen , In fact, it skillfully applies the hang function of the coroutine .
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
You can see us flow{} Back to a SafeFlow() object , Code block block Inherited is a suspended interface FlowCollector, Does it come to mind here launch A parameter in the source code block: suspend CoroutineScope.() -> Unit, Yes, it's the same !
public interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
This FlowCollector Very important , The official interpretation is :FlowCollector Used as a The intermediate or terminal collector of the stream , Express The entity that accepts the value emitted by the stream . This interface should not be implemented directly , Instead, it should be used as a receiver in the flow generator when implementing custom operators . The implementation of this interface is not thread safe . There is only one interface emit Method , We are all familiar with this method , This is the method we use to stuff data into the stream , You can see emit Is a suspend function .( Here is an analogy liveData The implementation of the co process )
Let's take a look at SafeFlow There is a rewritten collectSafely Method , See the specific implementation SafeFlow inherited AbstractFlow class
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
*
* A valid implementation of this method has the following constraints:
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
* The emission should happen in the context of the [collect] call.
* Please refer to the top-level [Flow] documentation for more details.
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
* thread-safe by default.
* To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
AbstractFlow You can see collect Implementation method , You can see AbstractFlow What is inherited is Flow Base class ,Flow There are no implementation methods in the base class , only one collect Abstract method of , So let's focus on AbstractFlow Inside collect How to realize asynchronous execution emit Of .
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
No matter use collect() still collect{} All the calls are Flow Of collect Abstract method ,collect The real realization is still AbstractFlow Inside , It's just going to be collect Of block Encapsulation is FlowCollector( rewrite emit Method to execute block)
Packaged FlowCollector and CoroutineContext Continue to be encapsulated as SafeCollector type , And pass the instance into collectSafely Method is used in SafeFlow To realize block Implementation .
@Suppress("UNCHECKED_CAST")
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
......
/**
* This is a clever implementation of state machine reuse . First , It checks whether it is not used at the same time ( We expressly prohibit ), Then cache only one completion instance ,
* Avoid extra allocation on each launch , Make it effectively garbage free on its thermal path .
*/
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn [email protected]{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
// Save raised emit abnormal ( Even check the context ) Fact
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// Check flow Whether in the same context
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// Check context
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
From the above analysis, we can see , Not invoked collect I didn't call collectSafely Methods don't collect what we stuff emit Data in , Only a call collect Method will be called to , Then call... In the form of an extension function flow Of block,block Inside, we are emit The data written inside , So as a receiver SafeCollector Called emit Method .
FlowCollector Of block stay SafeCollector China and Israel value In the form of , The last one to return emitFun Variable , The type of this variable is FlowCollector Of emit Of block type .
To see this emitFun You will find how there is one more parameter , because emit The default parameter of the suspended function is Continuation, This piece of kotlin Did it for us , But for java In general, it must pass Continuation go in . Pass in the same... In this way Continuation, To ensure the Continuation The unity of .
Having said so much, let's summarize :
Using the properties of extension function , Call to flow Of block And then called SafeCollector Of emit, And here it is emit Will be called to the incoming FlowCollector Of emit, And it came in emit The function is overridden and called block, So it will call collect Of block. Because it only calls collect Then call safeFlow Of collect function , Then it will call collectSafely Function to execute flow Code for . So don't call collect Words ,flow The code building blocks of will not be executed , Return at most one safeFlow The object of .
author :Lu_Hsiang
link :https://juejin.cn/post/7120112404393885733
边栏推荐
- XDFS&中国日报社在线协同编辑平台典型案例
- Thinkphp6 pipeline mode pipeline use
- 【go】defer的使用
- File permissions of day02 operation
- 【bug】XLRDError: Excel xlsx file; not supported
- 与张小姐的春夏秋冬(5)
- Training log II of the project "construction of Shandong University mobile Internet development technology teaching website"
- 【CV】请问卷积核(滤波器)3*3、5*5、7*7、11*11 都是具体什么数?
- Detailed explanation of MySQL statistical function count
- Markdown语法
猜你喜欢

Training log III of "Shandong University mobile Internet development technology teaching website construction" project

Thinkphp6 output QR code image format to solve the conflict with debug

"Shandong University mobile Internet development technology teaching website construction" project training log I

Ribbon学习笔记二

MySql统计函数COUNT详解

Machine learning makes character recognition easier: kotlin+mvvm+ Huawei ml Kit

iSCSI vs iSER vs NVMe-TCP vs NVMe-RDMA

全闪分布式,如何深度性能POC?

Flink connector Oracle CDC 实时同步数据到MySQL(Oracle19c)
![[pycharm] pycharm remote connection server](/img/b2/a4e1c095343f9e635ff3dad1e3c507.png)
[pycharm] pycharm remote connection server
随机推荐
isAccessible()方法:使用反射技巧让你的性能提升数倍
中海油集团,桌面云&网盘存储系统应用案例
Detailed explanation of atomic operation class atomicinteger in learning notes of concurrent programming
微信内置浏览器禁止缓存的问题
anaconda中移除旧环境、增加新环境、查看环境、安装库、清理缓存等操作命令
mysql插入百万数据(使用函数和存储过程)
Interesting talk about performance optimization thread pool: is the more threads open, the better?
Reporting Services- Web Service
【DL】搭建卷积神经网络用于回归预测(数据+代码详细教程)
C# 连接 SharepointOnline WebService
ASM插桩:学完ASM Tree api,再也不用怕hook了
C # judge whether the user accesses by mobile phone or computer
Operation commands in anaconda, such as removing old environment, adding new environment, viewing environment, installing library, cleaning cache, etc
File permissions of day02 operation
Valuable blog and personal experience collection (continuous update)
『全闪实测』数据库加速解决方案
[DL] build convolutional neural network for regression prediction (detailed tutorial of data + code)
与张小姐的春夏秋冬(3)
Training log II of the project "construction of Shandong University mobile Internet development technology teaching website"
Use of file upload (2) -- upload to Alibaba cloud OSS file server