当前位置:网站首页>Kotlin的协程:上下文
Kotlin的协程:上下文
2022-07-03 17:39:00 【okclouderx】
Kotlin 的协程上下文叫做 CoroutineContext,通常用来切换线程池。
CoroutineContext 的应用
launch
launch 的第一个参数 context 是 CoroutineContext,默认值是 EmptyCoroutineContext。
如果需要指定 launch 工作的线程池,就需要指定 CoroutineContext 参数。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
withContext
withContext 用来切换线程执行代码。它的第一个参数是 CoroutineContext,指定线程池。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
}
在 getUserInfoIo 中指定 withContext 的上下文是 Dispatchers.IO。
fun main() = runBlocking {
val user = getUserInfoIo()
logX(user)
}
suspend fun getUserInfoIo(): String {
logX("Before IO Context.")
withContext(Dispatchers.IO) {
logX("In IO Context.")
delay(1000)
}
logX("After IO Context.")
return "BoyCoder"
}
输出如下:
================================
Before IO Context.
Thread:main @coroutine#1, time:1656560405319
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656560405351
================================
================================
After IO Context.
Thread:main @coroutine#1, time:1656560406360
================================
================================
BoyCoder
Thread:main @coroutine#1, time:1656560406361
================================
可以看到在 Thread:DefaultDispatcher-worker-1 线程执行协程1。其他代码在 main 线程。
suspend main
main 函数有 suspend 关键字的版本,可以直接执行挂起函数。
suspend fun main() {
val user = getUserInfoIo()
logX(user)
}
它和 runBlocking 的区别在于 withContext 切换线程后,都执行在 DefaultDispatcher-worker-1 线程。
================================
Before IO Context.
Thread:main, time:1656569681374
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1, time:1656569681440
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-1, time:1656569682449
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-1, time:1656569682449
================================
runBlocking
runBlocking 的第一个参数也是 CoroutineContext,默认为 EmptyCoroutineContext。
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
}
可以给 runBlocking 增加自定义的 CoroutineContext。
fun main() = runBlocking(Dispatchers.IO) {
val user = getUserInfoIo()
logX(user)
}
输出如下:
================================
Before IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570311862
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570311896
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570312903
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-2 @coroutine#1, time:1656570312903
================================
可以看出增加 Dispatchers.IO 后,协程一直执行在 DefaultDispatcher-worker-2 线程。
Dispatchers 调度器
Kotlin 内置的 Dispatchers 有 4 种,他们本质上都继承 CoroutineContext。
- Default:默认调度器,用于 CPU 密集型任务,线程数和核心数一致。
- Main:主线程调度器,只在 Android、Swing 等 UI 平台有用,普通 JVM 工程无法使用。
- UnConfined:无限制调度器,协程可能运行在任意线程上。
- IO:IO 调度器,用于 IO 密集型任务,线程数会多一些,比如 64 个线程
public actual object Dispatchers {
/** * The default [CoroutineDispatcher] that is used by all standard builders like * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. * * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used * by this dispatcher is equal to the number of CPU cores, but is at least two. * Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel. */
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
/** * A coroutine dispatcher that is confined to the Main thread operating with UI objects. * This dispatcher can be used either directly or via [MainScope] factory. * Usually such dispatcher is single-threaded. * * Access to this property may throw [IllegalStateException] if no main thread dispatchers are present in the classpath. * * Depending on platform and classpath it can be mapped to different dispatchers: * - On JS and Native it is equivalent of [Default] dispatcher. * - On JVM it is either Android main thread dispatcher, JavaFx or Swing EDT dispatcher. It is chosen by * [`ServiceLoader`](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html). * * In order to work with `Main` dispatcher, the following artifacts should be added to project runtime dependencies: * - `kotlinx-coroutines-android` for Android Main thread dispatcher * - `kotlinx-coroutines-javafx` for JavaFx Application thread dispatcher * - `kotlinx-coroutines-swing` for Swing EDT dispatcher * * In order to set a custom `Main` dispatcher for testing purposes, add the `kotlinx-coroutines-test` artifact to * project test dependencies. * * Implementation note: [MainCoroutineDispatcher.immediate] is not supported on Native and JS platforms. */
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
/** * A coroutine dispatcher that is not confined to any specific thread. * It executes initial continuation of the coroutine in the current call-frame * and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without * mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid * stack overflows. * * ### Event loop * Event loop semantics is a purely internal concept and have no guarantees on the order of execution * except that all queued coroutines will be executed on the current thread in the lexical scope of the outermost * unconfined coroutine. * * For example, the following code: * ``` * withContext(Dispatchers.Unconfined) { * println(1) * withContext(Dispatchers.Unconfined) { // Nested unconfined * println(2) * } * println(3) * } * println("Done") * ``` * Can print both "1 2 3" and "1 3 2", this is an implementation detail that can be changed. * But it is guaranteed that "Done" will be printed only when both `withContext` are completed. * * * Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption, * but still want to execute it in the current call-frame until its first suspension, then you can use * an optional [CoroutineStart] parameter in coroutine builders like * [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the * the value of [CoroutineStart.UNDISPATCHED]. */
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
/** * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. * * Additional threads in this pool are created and are shutdown on demand. * The number of threads used by this dispatcher is limited by the value of * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property. * It defaults to the limit of 64 threads or the number of cores (whichever is larger). * * Moreover, the maximum configurable number of threads is capped by the * `kotlinx.coroutines.scheduler.max.pool.size` system property. * If you need a higher number of parallel threads, * you should use a custom dispatcher backed by your own thread pool. * * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — * typically execution continues in the same thread. */
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
Dispatchers.IO
Dispatchers.IO 可能会复用 Dispatchers.Default 的线程。从上面例子可以看出,虽然设置的 Dispatchers.IO,实际是 DefaultDispatcher-worker 线程。
将 runBlocking 的 CoroutineContext 改为 Dispatchers.Default
fun main() = runBlocking(Dispatchers.Default) {
val user = getUserInfoIo()
logX(user)
}
输出结果如下:
================================
Before IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575072324
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575072358
================================
================================
After IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575073366
================================
================================
BoyCoder
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575073367
================================
可以看到 withContext 切换到 IO 后,也用了 DefaultDispatcher-worker-1。这是因为 Dispatchers.Default 被 Dispatchers.IO 复用线程。
自定义 Dispatchers
使用自定义的 executor 然后转换为 Dispatcher 作为 CoroutineContext。
val mySingleDispatcher = Executors.newSingleThreadExecutor {
Thread(it, "mySingleThread").apply {
isDaemon = true
}
}
.asCoroutineDispatcher()
fun main() = runBlocking(mySingleDispatcher) {
val user = getUserInfoIo()
logX(user)
}
输出如下:
================================
Before IO Context.
Thread:mySingleThread @coroutine#1, time:1656575809476
================================
================================
In IO Context.
Thread:DefaultDispatcher-worker-1 @coroutine#1, time:1656575809510
================================
================================
After IO Context.
Thread:mySingleThread @coroutine#1, time:1656575810521
================================
================================
BoyCoder
Thread:mySingleThread @coroutine#1, time:1656575810521
================================
只有 In IO Context 运行在 DefaultDispatcher-worker-1,其他代码都运行在自定义的 dispatcher。
Dispatchers.Unconfined
如果 launch 使用默认 context,执行顺序为 1、4、2、3。
fun main() = runBlocking {
logX("Before launch") // 1
launch {
logX("In launch") // 2
delay(1000)
logX("End launch") // 3
}
logX("After launch") // 4
}
输出如下
================================
Before launch
Thread:main @coroutine#1, time:1656576229645
================================
================================
After launch
Thread:main @coroutine#1, time:1656576229677
================================
================================
In launch
Thread:main @coroutine#2, time:1656576229679
================================
================================
End launch
Thread:main @coroutine#2, time:1656576230686
================================
如果 launch 使用 Unconfined,执行顺序是不定的。
fun main() = runBlocking {
logX("Before launch") // 1
launch(Dispatchers.Unconfined) {
logX("In launch") // 2
delay(1000)
logX("End launch") // 3
}
logX("After launch") // 4
}
输出如下
================================
Before launch
Thread:main @coroutine#1, time:1656576759031
================================
================================
In launch
Thread:main @coroutine#2, time:1656576759055
================================
================================
After launch
Thread:main @coroutine#1, time:1656576759060
================================
================================
End launch
Thread:kotlinx.coroutines.DefaultExecutor @coroutine#2, time:1656576760059
================================
执行顺序变为 1、2、4、3。而且标记 3 执行在 DefaultExecutor。
Dispatchers.Unconfined 可能执行在任何线程,不应随意使用 Dispatchers.Unconfined。
CoroutineScope 协程作用域
使用 launch 时,必须有 CoroutineScope 协程作用域,launch 是 CoroutineScope 的扩展函数。
CoroutineScope 的实现很简单,它是接口类,只有 coroutineContext 成员。
public interface CoroutineScope {
/** * The context of this scope. * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope. * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages. * * By convention, should contain an instance of a [job][Job] to enforce structured concurrency. */
public val coroutineContext: CoroutineContext
}
指定自定义的 CoroutineScope,使用 scope 启动 3 个协程,实现结构化并发。
fun main() = runBlocking {
val scope = CoroutineScope(Job())
scope.launch {
logX("first start")
delay(1000)
logX("first end")
}
scope.launch {
logX("second start")
delay(1000)
logX("second end")
}
scope.launch {
logX("third start")
delay(1000)
logX("third end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
first start
Thread:DefaultDispatcher-worker-2 @coroutine#2, time:1656577491658
================================
================================
third start
Thread:DefaultDispatcher-worker-3 @coroutine#4, time:1656577491663
================================
================================
second start
Thread:DefaultDispatcher-worker-1 @coroutine#3, time:1656577491660
================================
所有 scope 启动的协程都被取消,没有继续执行。
Job 和 Dispatcher
Job
Job 继承了 CoroutineContext.Element。
public interface Job : CoroutineContext.Element {
}
Element 继承 CoroutineContext
@kotlin.SinceKotlin public interface CoroutineContext {
public interface Element : kotlin.coroutines.CoroutineContext {
}
}
因此 Job 本身就是 CoroutineContext。
Dispatchers
Dispatchers 是单例类,内部的有 4 个预置 Dispatchers。
public actual object Dispatchers {
}
以 Default 为例,它是 CoroutineDispatcher 类,继承关系如下:
CoroutineDispatcher -> ContinuationInterceptor -> CoroutineContext.Element -> CoroutineContext
因此 Dispatchers 也是 CoroutineContext。
CoroutineContext 的设计
CoroutineContext 的设计方式和 Map 的设计方式很类似。
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext =
public fun minusKey(key: Key<*>): CoroutineContext
- get 相当于 map 的 get
- fold 相当于 map 的 foreach
- plus 相当于 map 的 put
- minusKey 相当于 map 的 remove
利用 CoroutineContext 的接口,可以写出 + 、[] 这种类似集合的操作。因为 CoroutineContext 重载了 plus 操作符,可以用 + 代替 plus;重载 get 操作符,可以用 [] 代替 get。
@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking {
val scope = CoroutineScope(Job() + mySingleDispatcher)
scope.launch {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
delay(500)
logX("first end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
true
Thread:mySingleThread @coroutine#2, time:1656584502696
================================
可以看出 CoroutineContext 内部的 dispatchers 就是 mySingleDispatcher。
CoroutineName 协程名称
CoroutineName 可以指定协程名称,它本质也是 CoroutineContext。
@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking {
val scope = CoroutineScope(Job() + mySingleDispatcher)
scope.launch(CoroutineName("MyFirstCoroutine")) {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
delay(500)
logX("first end")
}
delay(500)
scope.cancel()
delay(1000)
}
输出如下
================================
true
Thread:mySingleThread @MyFirstCoroutine#2, time:1656584886943
================================
协程名称变为 CoroutineName 定义的 MyFirstCoroutine,#2 是协程 id。
CoroutineExceptionHandler 协程异常处理
CoroutineExceptionHandler 用来捕获协程中的异常,它也是 CoroutineContext。
@OptIn(ExperimentalStdlibApi::class)
suspend fun main() {
val scope = CoroutineScope(Job() + mySingleDispatcher)
val handler = CoroutineExceptionHandler {
_, throwable ->
println("catch exception $throwable")
}
val job = scope.launch(handler) {
logX(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
val str: String? = null
str!!.length
logX("first end")
}
job.join()
}
输出如下
================================
true
Thread:mySingleThread @coroutine#1, time:1656585762125
================================
catch exception java.lang.NullPointerException
故意抛出空指针异常,然后给 CoroutineExceptionHandler 捕获。
挂起函数和 CoroutineContext
suspend 挂起函数能直接访问 coroutineContext。
suspend fun testCoroutineContext() = coroutineContext
fun main() = runBlocking {
print(testCoroutineContext())
}
输出如下
[CoroutineId(1), "coroutine#1":BlockingCoroutine{
Active}@63e2203c, [email protected]1efed156]
挂起函数需要在协程或者另外一个挂起函数中运行,因此它也能访问协程的 CoroutineContext。
coroutineContext 是 Continuation.kt 的内部成员。
public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}
总结
- CoroutineContext 协程上下文和 map 很类似,和 map 一样,它可以添加或者获取 Element。
- Job、Dispatchers、CoroutineName、CoroutineExceptionHandler 本质都是 CoroutineContext。
- CoroutineScope 封装了 CoroutineContext,CoroutineContext 是 CoroutineScope 的成员。
- suspend 挂起函数也和 CoroutineContext 有关。挂起函数要在协程中运行,协程有它的 CoroutineContext,因此挂起函数也能访问 CoroutineContext。
边栏推荐
- Notes on problems -- watching videos on edge will make the screen green
- Implementation of Tetris in C language
- 1164 Good in C
- Y is always discrete and can't understand, how to solve it? Answer: read it several times
- Test your trained model
- ArrayList分析3 : 删除元素
- Baiwen.com 7 days Internet of things smart home learning experience punch in the next day
- Luogu: p2685 [tjoi2012] Bridge
- Financial management (Higher Vocational College) financial management online Assignment 1 in autumn 20
- Introduction to SolidWorks gear design software tool geartrax
猜你喜欢
[combinatorics] recursive equation (summary of the solution process of recursive equation | homogeneous | double root | non-homogeneous | characteristic root is 1 | exponential form | the bottom is th
Unity notes unityxr simple to use
SWM32系列教程4-端口映射及串口应用
Vs2013 has blocked the installer, and ie10 needs to be installed
Luogu: p1155 [noip2008 improvement group] double stack sorting (bipartite graph, simulation)
Tensorboard quick start (pytoch uses tensorboard)
Kubernetes resource object introduction and common commands (4)
Draw some simple graphics with MFC
Internet Hospital his Management Platform source, online Inquiry, appointment Registration Smart Hospital Small program source
[error reporting] omp: error 15: initializing libiomp5md dll, but found libiomp5md. dll already initialized.
随机推荐
[combinatorics] recursive equation (example of solving recursive equation without multiple roots | complete process of solving recursive equation without multiple roots)
Vs code plug-in korofileheader
Electronic Science and technology 20th autumn "Microcomputer Principle and application" online assignment 2 [standard answer]
AcWing 4489. 最长子序列
kubernetes资源对象介绍及常用命令(三)
IntelliJ 2021.3 short command line when running applications
MinGW compile boost library
[RT thread] NXP rt10xx device driver framework -- RTC construction and use
link preload prefetch
[combinatorics] recursive equation (the problem of solving recursive equation with multiple roots | the problem is raised)
Leetcode 538 converts binary search tree into cumulative tree -- recursive method and iterative method
Graduation summary
[combinatorics] recursive equation (four cases where the non-homogeneous part of a linear non-homogeneous recursive equation with constant coefficients is the general solution of the combination of po
聊聊支付流程的设计与实现逻辑
Internet Hospital his Management Platform source, online Inquiry, appointment Registration Smart Hospital Small program source
Servlet specification Part II
Interviewer: why is the value nil not equal to nil?
Type conversion, variable
[combinatorics] recursive equation (special solution example 1 Hannover tower complete solution process | special solution example 2 special solution processing when the characteristic root is 1)
Brief introduction to the core functions of automatic penetration testing tool