当前位置:网站首页>Analysis on the principle of flow
Analysis on the principle of flow
2022-07-29 06:01:00 【Code and thinking】
Flow It is an asynchronous data flow based on coprocessing , So learning flow Before the principle, you need to master the knowledge related to synergy .
Don't talk much , Direct swing , No , Just start .
First, the simplest creation flow We all know flow{ emit(vale) }, Use here lambda Direct will block Pass in flow Inside the , We all know flow It's cold flow , Only when we collect collect flow When , Inside block Will execute , So no one objects , But how did this happen , In fact, it skillfully applies the hang function of the coroutine .
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
You can see us flow{} Back to a SafeFlow() object , Code block block Inherited is a suspended interface FlowCollector, Does it come to mind here launch A parameter in the source code block: suspend CoroutineScope.() -> Unit, Yes, it's the same !
public interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
This FlowCollector Very important , The official interpretation is :FlowCollector Used as a The intermediate or terminal collector of the stream , Express The entity that accepts the value emitted by the stream . This interface should not be implemented directly , Instead, it should be used as a receiver in the flow generator when implementing custom operators . The implementation of this interface is not thread safe . There is only one interface emit Method , We are all familiar with this method , This is the method we use to stuff data into the stream , You can see emit Is a suspend function .( Here is an analogy liveData The implementation of the co process )
Let's take a look at SafeFlow There is a rewritten collectSafely Method , See the specific implementation SafeFlow inherited AbstractFlow class
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
*
* A valid implementation of this method has the following constraints:
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
* The emission should happen in the context of the [collect] call.
* Please refer to the top-level [Flow] documentation for more details.
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
* thread-safe by default.
* To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
AbstractFlow You can see collect Implementation method , You can see AbstractFlow What is inherited is Flow Base class ,Flow There are no implementation methods in the base class , only one collect Abstract method of , So let's focus on AbstractFlow Inside collect How to realize asynchronous execution emit Of .
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
No matter use collect() still collect{} All the calls are Flow Of collect Abstract method ,collect The real realization is still AbstractFlow Inside , It's just going to be collect Of block Encapsulation is FlowCollector( rewrite emit Method to execute block)
Packaged FlowCollector and CoroutineContext Continue to be encapsulated as SafeCollector type , And pass the instance into collectSafely Method is used in SafeFlow To realize block Implementation .
@Suppress("UNCHECKED_CAST")
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
......
/**
* This is a clever implementation of state machine reuse . First , It checks whether it is not used at the same time ( We expressly prohibit ), Then cache only one completion instance ,
* Avoid extra allocation on each launch , Make it effectively garbage free on its thermal path .
*/
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn [email protected]{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
// Save raised emit abnormal ( Even check the context ) Fact
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// Check flow Whether in the same context
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// Check context
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
From the above analysis, we can see , Not invoked collect I didn't call collectSafely Methods don't collect what we stuff emit Data in , Only a call collect Method will be called to , Then call... In the form of an extension function flow Of block,block Inside, we are emit The data written inside , So as a receiver SafeCollector Called emit Method .
FlowCollector Of block stay SafeCollector China and Israel value In the form of , The last one to return emitFun Variable , The type of this variable is FlowCollector Of emit Of block type .
To see this emitFun You will find how there is one more parameter , because emit The default parameter of the suspended function is Continuation, This piece of kotlin Did it for us , But for java In general, it must pass Continuation go in . Pass in the same... In this way Continuation, To ensure the Continuation The unity of .
Having said so much, let's summarize :
Using the properties of extension function , Call to flow Of block And then called SafeCollector Of emit, And here it is emit Will be called to the incoming FlowCollector Of emit, And it came in emit The function is overridden and called block, So it will call collect Of block. Because it only calls collect Then call safeFlow Of collect function , Then it will call collectSafely Function to execute flow Code for . So don't call collect Words ,flow The code building blocks of will not be executed , Return at most one safeFlow The object of .
author :Lu_Hsiang
link :https://juejin.cn/post/7120112404393885733
边栏推荐
- Machine learning makes character recognition easier: kotlin+mvvm+ Huawei ml Kit
- Lock lock of concurrent programming learning notes and its implementation basic usage of reentrantlock, reentrantreadwritelock and stampedlock
- 【CV】请问卷积核(滤波器)3*3、5*5、7*7、11*11 都是具体什么数?
- Reporting Services- Web Service
- Operation commands in anaconda, such as removing old environment, adding new environment, viewing environment, installing library, cleaning cache, etc
- 【ML】机器学习模型之PMML--概述
- Refresh, swagger UI theme changes
- [pycharm] pycharm remote connection server
- Interesting talk about performance optimization thread pool: is the more threads open, the better?
- A preliminary study on fastjason's autotype
猜你喜欢

Flink, the mainstream real-time stream processing computing framework, is the first experience.
![[clustmaps] visitor statistics](/img/1a/173664a633fd14ea56696dd824acb6.png)
[clustmaps] visitor statistics

Laravel service container (inheritance and events)

My ideal job, the absolute freedom of coder farmers is the most important - the pursuit of entrepreneurship in the future

Go|gin quickly use swagger

Android studio login registration - source code (connect to MySQL database)

并发编程学习笔记 之 ReentrantLock实现原理的探究

【综述】图像分类网络

ASM插桩:学完ASM Tree api,再也不用怕hook了

【语义分割】SETR_Rethinking Semantic Segmentation from a Sequence-to-Sequence Perspective with Transformer
随机推荐
【比赛网站】收集机器学习/深度学习比赛网站(持续更新)
mysql插入百万数据(使用函数和存储过程)
并发编程学习笔记 之 工具类Semaphore(信号量)
[DL] introduction and understanding of tensor
Training log III of "Shandong University mobile Internet development technology teaching website construction" project
centos7 静默安装oracle
[clustmaps] visitor statistics
个人学习网站
IDEA中设置自动build-改动代码,不用重启工程,刷新页面即可
ssm整合
Go|gin quickly use swagger
浅谈分布式全闪存储自动化测试平台设计
[go] use of defer
Interesting talk about performance optimization thread pool: is the more threads open, the better?
Huawei 2020 school recruitment written test programming questions read this article is enough (Part 1)
Basic use of array -- traverse the circular array to find the maximum value, minimum value, maximum subscript and minimum subscript of the array
Activity交互问题,你确定都知道?
与张小姐的春夏秋冬(2)
Huawei 2020 school recruitment written test programming questions read this article is enough (Part 2)
Research and implementation of flash loan DAPP