当前位置:网站首页>深潜Kotlin协程(十六):Channel
深潜Kotlin协程(十六):Channel
2022-06-30 10:09:00 【RikkaTheWorld】
系列电子书:传送门
Channel API 用于在协程间的原语通信。许多人把 channel 想象成 pipe(管道)。但我更喜欢一个不同的比喻,你熟悉用于交换书籍的公共书柜吗?一个人会在里面放上另一个人所需要找的书,这与 kotlinx.coroutines 的 Channel 非常相似。

Channel 支持任意数量的发送方和接收方。并且发送到 Channel 的每个值只会被一个协程接收(一次)。


Channel 是一个接口,它实现了另外两个接口:
SendChannel: 用于发送(添加元素)和关闭管道ReceiveChannel: 用于接收元素
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
//...
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
fun cancel(cause: CancellationException? = null)
// ...
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
由于这种区别,我们可以只暴露 ReceiveChannel 或 SendChannel 来限制 channel 的入口/出口点。
你可能注意到了,send 和 receive 都是挂起函数,这是一个基本特性:
- 当我们尝试
receive而 channel 中没有元素时,协程将被挂起,直到该元素可用。就像我们的“书柜”一样,当有人去书架上找一本书,而书架是空的时候,这个人就需挂起,直到有人在那里放了一个他要的书 - 另一方面,当 channel 达到了容量阈值时,
send将会被挂起。我们很快就会看到,大多数 channel 的容量都是有限的。就像我们的“书柜”一样,当有人想把一本书放在书架上,而书架已经放满了书时,这个人就得挂起,直到有人拿走一本书,从而腾出空间
If you need to send or receive from a non-suspending function, you can use trySend and tryReceive. Both 如果需要从非挂起函数中发送或接收信息,可以使用 trySend 和 tryReceive。这两个操作都是即时的,并返回 ChannelResult,包含了有关操作成功或失败的结果信息。我们只能对容量有限的 channel 使用 trySend 和 tryReceive,因为它们不适用于交会的 channel。
A channel might have any number of senders and receivers. However, the most common situation is when there is one coroutine on both sides of the channel.
一个 channel 可以有任意数量的发送方和接收方,然而, channel 最常见的情况是两端只有一个协程。


想要看看 channel 的简单示例,我们需要在单独的协程中有一个生产者(发送方)和一个消费者(接收方)。生产者将发送元素,消费者将接收它们,以下是它们的实现方式:
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) {
index ->
delay(1000)
println("Producing next one")
channel.send(index * 2)
}
}
launch {
repeat(5) {
val received = channel.receive()
println(received)
}
}
}
// (1 sec)
// Producing next one
// 0
// (1 sec)
// Producing next one
// 2
// (1 sec)
// Producing next one
// 4
// (1 sec)
// Producing next one
// 6
// (1 sec
// Producing next one
// 8
这样的实现并不好。接收方需要知道发送方发送了多少个元素,所以上述的情况很少会发生,我们宁愿一直监听,直到发送者发送。要接收 channel 上的元素,可以使用 for循环 或 consumeEach 函数,它会一直监听发送直到 channel 关闭。
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) {
index ->
println("Producing next one")
delay(1000)
channel.send(index * 2)
}
channel.close()
}
launch {
for (element in channel) {
println(element)
}
// 或者
// channel.consumeEach { element ->
// println(element)
// }
}
}
使用这种方式发送元素的常见问题是:很容易忘记关闭 channel,特别是在异常情况下。如果一个协程因为异常而停止生产,另一个协程将会永远的等待元素。使用 produce 函数要方便的多,它是一个返回 ReceiveChannel 的协程构建器。
// 这个函数将会创建一个 channel,并且一直在
// 上面生产正整数,直至输入的最大值
fun CoroutineScope.produceNumbers(
max: Int
): ReceiveChannel<Int> = produce {
var x = 0
while (x < max) send(x++)
}
当协程以任何方式结束(完成、停止、取消)时, produce 函数都会关闭通道。多亏了这一点,我们永远不会忘记调用 close。produce 构建器是一个非常受欢迎创建 channel 的方式,理由很充分:它提供了很多安全保障,并且方便。
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) {
index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (element in channel) {
println(element)
}
}
Channel 的类型
根据 channel 设置的容量大小,我们区分了四种类型的 channel:
- 无限制 —— 容量设置为
Channel.UNLIMITED的 channel,拥有一个无限的缓冲区,并且send永远不会挂起 - 缓冲 —— 有一个具体数量的缓冲区,或者设置为
Channel.BUFFERED(默认为64,可以通过在 JVM 中设置 kotlinx.coruoutines.channels.defaultBuffer 系统属性来重写)的 channel - 交会(默认) —— 容量为0,或者设置为
Channel.RENDEZVOUS(等于0) 的 channel。这意味着只有在发送者和接收者相遇时,才会发生数据通信。(所以它像一个图书交换点,而不是一个书柜) - 合并 —— 容量为1,或者设置为
Channel.CONFLATED的缓冲区,每个新元素会替换前一个元素
现在让我们来看看这些能力的实际应用。我们可以在 Channel 上设置它们,也可以在调用 produce 时设置。
我们将生产者快起来,接收者慢起来。在无限容量的情况下,channel 应该可以容纳所有的元素,然后让它们一个个的被接收处理。
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.UNLIMITED) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println("发送")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 0
// (1 sec)
// 2
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)
如果 channel 有具体的容量,我们首先生产到缓冲区满了,之后生产者将开始等待接收。
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = 3) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println("发送")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 2 * 0.1 = 0.8 sec)
// 0
// 发送
// (1 sec)
// 2
// 发送
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)
对于默认容量(Channel.RENDEZVOUS)的 channel,生产者将始终等待接收者。
suspend fun main(): Unit = coroutineScope {
val channel = produce {
// 或者 produce(capacity = Channel.RENDEZVOUS) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println("发送")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 发送
// (1 sec)
// 2
// 发送
// (1 sec)
// 4
// 发送
// (1 sec)
// 6
// 发送
// (1 sec)
// 8
// 发送
// (1 sec)
最后,在使用 Channel.CONFLATED 时,我们不会存储过去的元素。新元素将会替换之前的元素,因此我们将只能接收最后一个元素,丢失之前发送的元素。
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println("发送")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 8
onBufferOverflow
为了进一步定制 channel,我们可以控制缓冲区堆满时发生的情况(onBufferOverflow 参数),有以下选项:
SUSPEND(默认) —— 当缓冲区塞满时,send函数挂起DROP_OLDEST—— 当缓冲区塞满时,删除最老的元素DROP_LATEST—— 当缓冲区塞满时,删除最新的元素
正如你所想那样,channel 设置容量为 Channel.CONFLATED 的效果就等于设置容量数为1并且 onBufferOverFlow 为 DROP_OLDEST。目前, produce 函数不允许我们自定义 onBufferOverflow,因此要设置它,我们需要使用 Channel 函数来定义一个 channel。
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) {
index ->
channel.send(index * 2)
delay(100)
println("发送")
}
channel.close()
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (0.1 sec)
// 发送
// (1 - 4 * 0.1 = 0.6 sec)
// 6
// (1 sec)
// 8
onUndeleliveredElement
我们应该知道的另一个 Channel 函数的参数是 onUndeleliveredElement。当某个元素由于某些原因无法处理时会调用它。大多数情况下,它意味着 channel 被关闭或者取消后,有可能在 send、receive、receiveOrNull 或 hasNext 抛出错误时发生。我们通常使用它来关闭由该通道发送的资源。
val channel = Channel<Resource>(capacity) {
resource ->
resource.close()
}
// 或者
// val channel = Channel<Resource>(
// capacity,
// onUndeliveredElement = { resource ->
// resource.close()
// }
// )
// 生产者代码
val resourceToSend = openResource()
channel.send(resourceToSend)
// 消费者代码
val resourceReceived = channel.receive()
try {
// 接收工作
} finally {
resourceReceived.close()
}
Fan-out
多个协程可以从单个 channel 接收元素,然而,为了正确地接收它们,我们应该使用 for 循环(多个协程使用 consumeEach 是不安全的)。

fun CoroutineScope.produceNumbers() = produce {
repeat(10) {
delay(100)
send(it)
}
}
fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (msg in channel) {
println("#$id received $msg")
}
}
suspend fun main(): Unit = coroutineScope {
val channel = produceNumbers()
repeat(3) {
id ->
delay(10)
launchProcessor(id, channel)
}
}
// #0 received 0
// #1 received 1
// #2 received 2
// #0 received 3
// #1 received 4
// #2 received 5
// #0 received 6
// ...
元素均匀分布。 channel 有一个 FIFO(先进先出)的协程队列等待一个元素。这就是为什么在上面的例子中,可以看到每个元素被下一个协程接收(0,1,2,0,1,2…)。
为了更好地理解为什么,想象一下幼儿园的孩子们在排队买糖果,一旦他们得到一些,他们就会立即吃掉它们,然后走到队列的最后一个位置。这样的分配是公平的(假设糖果的数量是孩子数量的倍数,并且假设他们的父母对孩子吃糖果没有什么意见)。
Fan-in
多个协程可以发送到同一个 channel。在下面的例子中,你可以看到两个协程将元素发送到同一个 channel。
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long
) {
while (true) {
delay(time)
channel.send(text)
}
}
fun main() = runBlocking {
val channel = Channel<String>()
launch {
sendString(channel, "foo", 200L) }
launch {
sendString(channel, "BAR!", 500L) }
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
有时候,我们需要将多个渠道合并为一个渠道,为此,你可能会发现下面的函数很有用,因为它使用 produce 合并多个 channel。
fun <T> CoroutineScope.fanIn(
channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (elem in channel) {
send(elem)
}
}
}
}
Pipelines
有时我们设置两个 channel,其中一个产生的元素是基于从另一个接收到元素。在这种情况下,我们称之为管道。
// 一个 Channel 发送从 1 到 3
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
produce {
repeat(3) {
num ->
send(num + 1)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
produce {
for (num in numbers) {
send(num * num)
}
}
suspend fun main() = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
for (num in squared) {
println(num)
}
}
// 1
// 4
// 9
Channel 是原语通信的
当不同的协程需要互相通信时, Channel 很有用,它们保证没有冲突(例如,共享状态没有问题)并且公平。
想象一下不同的咖啡师正在冲咖啡。每个咖啡师都应该是独立工作的协程。不同的咖啡类型需要不同准备时间,但我们希望按照顺序处理订单。解决这个问题最简单的方案是在 Channel 中同时发送订单和生成的咖啡结果,可以使用 produce 生成器定义咖啡师:
suspend fun CoroutineScope.serveOrders(
orders: ReceiveChannel<Order>,
baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
for (order in orders) {
val coffee = prepareCoffee(order.type)
send(
CoffeeResult(
coffee = coffee,
customer = order.customer,
baristaName = baristaName
)
)
}
}
当我们设置一个管道时,我们可以使用前面定义的 fanIn 函数将不同的咖啡师产生的结果合并为一个:
val coffeeResults = fanIn(
serveOrders(ordersChannel, "Alex"),
serveOrders(ordersChannel, "Bob"),
serveOrders(ordersChannel, "Celine"),
)
在下一章中,你将会看到更多实际的例子。
Practical usage
实际使用情况
我们使用 channel 的一个典型情况是:一端产生值,另一端处理。这些例子包括响应用户点击、来自服务器的新通知或随着时间推移更新搜索结果(一个很好的例子是 SkyScanner,它通过查询多个航空公司网站来搜索最便宜的航班)。然而,在大多数情况下,最好使用 channelFlow 或 callbackFlow,它们都是 Channel 和 Flow 的混合体(我们将在构建 Flow 的章节中介绍它们)。

纯粹形式而言,我发现 channel 在一些更复杂的情况下很有用。例如,假设我们正在维护一个线上商店,比如亚马逊。让我们假设你的服务器收到了大量卖家的更改产品信息的提交。对于每个更改,我们首先需要找到最新的报价列表,然后逐个更新它们。

传统方法并不是最佳的,一个卖家甚至可能有成千上万的售价更改。在一个漫长的过程中完成这一切并不是一个好的主意。
首先,内部异常或服务器重启可能会让我们不知道停在哪里。其次,一个大卖家可能会阻塞服务器很长一段时间,从而让小卖家等待它们的更改被应用。此外,我们不应该同时发送太多的网络请求,以避免需要处理这些请求的服务(以及我们的网络接口)过载。
这个问题的解决方法可能是建立一个管道。第一个通道包含要处理的卖家,而第二个通道包含要更新的报价。这些通道会有一个缓冲区。当已经有太多的提交在等待时,第二个提交的缓冲区可以防止我们的服务有得更多的提交。因此,我们的服务器将能够平衡我们在同一时间更新的报价数量。
我们还可以很容易地添加一些中间步骤,例如删除重复项。通过定义在每个通道上监听的协程的数量,我们可以决定服务发送多少并发请求。控制这些参数给了我们很大的自由。还可以很容易地添加许多改进,如持久性(用于服务器重启的情况)或元素唯一性(用于卖家在前一个更改被处理之前进行另一个更改的情况)。

// 一个简单的实现
suspend fun handleOfferUpdates() = coroutineScope {
val sellerChannel = listenOnSellerChanges()
val offerToUpdateChannel = produce(capacity = UNLIMITED) {
repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {
launch {
for (seller in sellerChannel) {
val offers = offerService
.requestOffers(seller.id)
offers.forEach {
send(it) }
}
}
}
}
repeat(NUMBER_OF_CONCURRENT_UPDATE_SENDERS) {
launch {
for (offer in offerToUpdateChannel) {
sendOfferUpdate(offer)
}
}
}
}
总结
Channel 是一个强大的协程间原语通信的工具。它支持任意数量的发送方和接收方,并且发送到通道的每个值只能被接收一次。我们通常使用 produce 构建器来创建 channel,在 channel 中可以控制处理某些任务的协程数量。入金,我们最常用的是与 Flow 相关的 channel,这将在本书的后面介绍。
边栏推荐
- 技能梳理[email protected]体感机械臂
- From introduction to mastery of MySQL 50 lectures (32) -scylladb production environment cluster building
- 【深度学习】深度学习检测小目标常用方法
- 在 sCrypt 中实现高效的椭圆曲线点加法和乘法
- Remember the experience of an internship. It is necessary to go to the pit (I)
- 吴恩达2022机器学习专项课测评来了!
- Overview of currency
- Skill sorting [email protected]+adxl345+ Motor vibration + serial port output
- 马斯克推特粉丝过亿了,但他在线失联已一周
- Matplotlib notes: contour & Contour
猜你喜欢

Sarsa笔记

同事的接口文档我每次看着就头大,毛病多多。。。

Pytorch notes torch nn. BatchNorm1d

今晚19:00知识赋能第2期直播丨OpenHarmony智能家居项目之控制面板界面设计

最新SCI影响因子公布:国产期刊最高破46分!网友:算是把IF玩明白了

CVPR 2022 | 清华&字节&京东提出BrT:用于视觉和点云3D目标检测的桥接Transformer

Auto SEG loss: automatic loss function design
[email protected] control a dog's running on OLED"/>Skill combing [email protected] control a dog's running on OLED

I found a wave of "alchemy artifact" in the goose factory. The developer should pack it quickly

19:00 p.m. tonight, knowledge empowerment phase 2 live broadcast - control panel interface design of openharmony smart home project
随机推荐
Compétences Comb 27 @ Body sense Manipulator
nvm、nrm、npx使用(安装、基本命令、参数、curl、wget)
Q-Learning笔记
转卡通学习笔记
腾讯云数据库工程师能力认证重磅推出,各界共话人才培养难题
June training (day 30) - topology sorting
R语言aov函数进行重复测量方差分析(Repeated measures ANOVA、其中一个组内因素和一个组间因素)、分别使用interaction.plot函数和boxplot对交互作用进行可视化
7 大轻量易用的工具,给开发者减压提效,助力企业敏捷上云 | Techo Day 精彩回顾...
05_ Node JS file management module FS
19:00 p.m. tonight, knowledge empowerment phase 2 live broadcast - control panel interface design of openharmony smart home project
I found a wave of "alchemy artifact" in the goose factory. The developer should pack it quickly
Matplotlib notes: contour & Contour
Overview of currency
马斯克推特粉丝过亿了,但他在线失联已一周
Apple's 5g chip was revealed to have failed in research and development, and the QQ password bug caused heated discussion. Wei Lai responded to the short selling rumors. Today, more big news is here
透过华为军团看科技之变(五):智慧园区
The performance of arm's new CPU has been improved by 22%, up to 12 cores can be combined, and the GPU is first equipped with hardware optical tracking. Netizen: the gap with apple is growing
RobotFramework学习笔记:环境安装以及robotframework-browser插件的安装
LVGL 8.2 Image
无心剑中译狄金森《灵魂择其伴侣》