当前位置:网站首页>Kotlin协程:续体、续体拦截器、调度器
Kotlin协程:续体、续体拦截器、调度器
2022-07-31 16:38:00 【LeeDuo.】
一.Continuation
Continuation接口是协程中最核心的接口,代表着挂起点之后的续体,代码如下:
public interface Continuation<in T> {
// 续体的上下文
public val context: CoroutineContext
// 该方法用于恢复续体的执行
// result为挂起点执行完成的返回值,T为返回值的类型
public fun resumeWith(result: Result<T>)
}
1.Continuation图解
二.ContinuationInterceptor
ContinuationInterceptor接口继承自Element接口,是协程中的续体拦截器,代码如下:
public interface ContinuationInterceptor : CoroutineContext.Element {
// 拦截器的Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 拦截器对续体进行拦截时会调用该方法,并对continuation进行缓存
// 拦截判断:根据传入的continuation对象与返回的continuation对象是否相同
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 当interceptContinuation方法拦截的协程执行完毕后,会调用该方法
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
// get方法多态实现
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
@Suppress("UNCHECKED_CAST")
return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
}
@Suppress("UNCHECKED_CAST")
return if (ContinuationInterceptor === key) this as E else null
}
// minusKey方法多态实现
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
}
return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
}
}
三.CoroutineDispatcher
CoroutineDispatcher类继承自AbstractCoroutineContextElement类,实现了ContinuationInterceptor接口,是协程调度器的基类,代码如下:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// ContinuationInterceptor的多态实现,调度器本质上就是拦截器
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{
it as? CoroutineDispatcher })
// 用于判断调度器是否要调用dispatch方法进行调度,默认为true
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
// 调度的核心方法,在这里进行调度,执行block
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 如果调度是由Yield方法触发的,默认通过dispatch方法实现
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
// ContinuationInterceptor接口的方法,将续体包裹成DispatchedContinuation,并传入当前调度器
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
// 释放父协程与子协程的关联。
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
// 重载了"+"操作,直接返回others
// 因为两个调度器相加没有意义,同一个上下文中只能有一个调度器
// 如果需要加的是调度器对象,则直接替换成最新的,因此直接返回
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
override fun toString(): String = "$classSimpleName@$hexAddress"
}
四.EventLoop
EventLoop类继承自CoroutineDispatcher类,用于协程中任务的分发执行,只在runBlocking方法中和Dispatchers.Unconfined调度器中使用。与Handler中的Looper类似,在创建后会存储在当前线程的ThreadLocal中。EventLoop本身不支持延时执行任务,如果需要可以自行继承EventLoop并实现Delay接口,EventLoop中预留了一部分变量和方法用于延时需求的扩展。
为什么协程需要EventLoop呢?协程的本质是续体传递,而续体传递的本质是回调,假设在Dispatchers.Unconfined调度下,要连续执行多个suspend方法,就会有多个续体传递,假设suspend方法达到一定数量后,就会造成StackOverflow,进而引起崩溃。同样的,我们知道调用runBlocking会阻塞当前线程,而runBlocking阻塞的原理就是执行“死循环”,因此需要在循环中做任务的分发,去执行内部协程在Dispatchers.Unconfined调度器下加入的任务。
EventLoop代码如下:
internal abstract class EventLoop : CoroutineDispatcher() {
// 用于记录使用当前EventLoop的runBlocking方法和Dispatchers.Unconfined调度器的数量
private var useCount = 0L
// 表示当前的EventLoop是否被暴露给其他的线程
// runBlocking会将EventLoop暴露给其他线程
// 因此,当runBlocking使用时,shared必须为true
private var shared = false
// Dispatchers.Unconfined调度器的任务执行队列
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
// 处理任务队列的下一个任务,该方法只能在EventLoop所在的线程调用
// 返回值<=0,说明立刻执行下一个任务
// 返回值>0,说明等待这段时间后,执行下一个任务
// 返回值为Long.MAX_VALUE,说明队列里没有任务了
public open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return 0
}
// 队列是否为空
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
// 下一个任务多长时间后执行
protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
return if (queue.isEmpty) Long.MAX_VALUE else 0L
}
// 任务的核心处理方法
public fun processUnconfinedEvent(): Boolean {
// 若队列为空,则返回
val queue = unconfinedQueue ?: return false
// 从队首取出一个任务,如果为空,则返回
val task = queue.removeFirstOrNull() ?: return false
// 执行
task.run()
return true
}
// 表示当前EventLoop是否可以在协程上下文中被调用
// EventLoop本质上也是协程上下文
// 如果EventLoop在runBlocking方法中使用,必须返回true
public open fun shouldBeProcessedFromContext(): Boolean = false
// 向队列中添加一个任务
public fun dispatchUnconfined(task: DispatchedTask<*>) {
// 若队列为空,则创建一个新的队列
val queue = unconfinedQueue ?:
ArrayQueue<DispatchedTask<*>>().also {
unconfinedQueue = it }
queue.addLast(task)
}
// EventLoop当前是否还在被使用
public val isActive: Boolean
get() = useCount > 0
// EventLoop当前是否还在被Unconfined调度器使用
public val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)
// 判断队列是否为空
public val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty ?: true
// 下面三个方法用于计算使用当前的EventLoop的runBlocking方法和Unconfined调度器的数量
// useCount是一个64位的数,
// 它的高32位用于记录Unconfined调度器的数量,低32位用于记录runBlocking方法的数量
private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L
fun incrementUseCount(unconfined: Boolean = false) {
useCount += delta(unconfined)
// runBlocking中使用,shared为true
if (!unconfined) shared = true
}
fun decrementUseCount(unconfined: Boolean = false) {
useCount -= delta(unconfined)
// 如果EventLoop还在被使用
if (useCount > 0) return
assert {
useCount == 0L }
// 如果EventLoop不被使用了,并且在EventLoop中使用过
if (shared) {
// 关闭相关资源,并在ThreadLocal中移除
shutdown()
}
}
protected open fun shutdown() {
}
}
协程中提供了EventLoopImplBase类,间接继承自EventLoop,实现了Delay接口,用来延时执行任务。同时,协程中还提供单例对象ThreadLocalEventLoop用于EventLoop在ThreadLocal中的存储。
边栏推荐
- MySQL多表联合查询
- 【Yugong Series】July 2022 Go Teaching Course 022-Dictionary of Go Containers
- Stuck in sill idealTree buildDeps during npm installation, npm installation is slow, npm installation is stuck in one place
- 小程序:matlab解微分方程「建议收藏」
- 研发过程中的文档管理与工具
- js的toString方法
- 【愚公系列】2022年07月 Go教学课程 020-Go容器之数组
- 多主复制下处理写冲突(1)-同步与异步冲突检测及避免冲突
- 动态规划之线性dp(下)
- 网站漏洞修复服务商关于越权漏洞分析
猜你喜欢
Golang——从入门到放弃
仿生毛毛虫机器人源码
GP 6总体架构学习笔记
C language - function
【TypeScript】深入学习TypeScript类型操作
[pytorch] pytorch automatic derivation, Tensor and Autograd
【7.29】代码源 - 【排列】【石子游戏 II】【Cow and Snacks】【最小生成数】【数列】
复杂高维医学数据挖掘与疾病风险分类研究
SringMVC中个常见的几个问题
Huawei's top engineers lasted nine years "anecdotal stories network protocol" PDF document summary, is too strong
随机推荐
Flutter gets the height of the status bar statusbar
JS基础小练习
研发过程中的文档管理与工具
AcWing 1282. 搜索关键词 题解((AC自动机)Trie+KMP)+bfs)
Golang go-redis cluster模式下不断创建新连接,效率下降问题解决
[TypeScript]OOP
LevelSequence源码分析
【愚公系列】2022年07月 Go教学课程 021-Go容器之切片操作
并发性,时间和相对性
Single-cell sequencing workflow (single-cell RNA sequencing)
How to switch remote server in gerrit
联邦学习:联邦场景下的多源知识图谱嵌入
Flink_CDC搭建及简单使用
基于C语言的编译器设计与实现
华为顶级工程师历时9年总结的“趣谈网络协议”PDF文档,太强了
2022年整理LeetCode最新刷题攻略分享(附中文详细题解)
Multi-datacenter operation and detection of concurrent writes
flowable工作流所有业务概念
最后写入胜利(丢弃并发写入)
6-22 Vulnerability exploit - postgresql database password cracking