当前位置:网站首页>Deep dive kotlin synergy (16): Channel
Deep dive kotlin synergy (16): Channel
2022-06-30 10:52:00 【RikkaTheWorld】
Series eBook : Portal
Channel API Used for primitive communication between processes . Many people put channel Imagine pipe( The Conduit ). But I prefer a different metaphor , Are you familiar with the public bookcase for exchanging books ? One person will put in the book that another person needs to find , This is related to kotlinx.coroutines Of Channel Very similar .

Channel Support any number of senders and receivers . And send it to Channel Each value of is only received by one coroutine ( once ).


Channel It's an interface , It implements two other interfaces :
SendChannel: Used for sending ( Additive elements ) And closing the pipeReceiveChannel: Used to receive elements
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
//...
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
fun cancel(cause: CancellationException? = null)
// ...
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
Because of this difference , We can only expose ReceiveChannel or SendChannel To limit channel Entrance / Exit point .
You may have noticed ,send and receive Are all suspended functions , This is a basic feature :
- When we try
receiveand channel When there is no element in , The collaboration will be suspended , Until the element is available . Like ours “ Bookcase ” equally , When someone goes to the bookshelf to find a Book , When the bookshelf is empty , This person needs to hang up , Until someone put a book he wanted there - On the other hand , When channel When the capacity threshold is reached ,
sendWill be suspended . We will soon see , majority channel The capacity of is limited . Like ours “ Bookcase ” equally , When someone wants to put a book on the shelf , When the bookshelf is full of books , This person has to hang up , Until someone takes away a Book , To make room for
If you need to send or receive from a non-suspending function, you can use trySend and tryReceive. Both If you need to send or receive information from a non suspended function , have access to trySend and tryReceive. Both operations are immediate , And back to ChannelResult, Contains information about the results of successful or failed operations . We can only deal with those with limited capacity channel Use trySend and tryReceive, Because they don't apply to rendezvous channel.
A channel might have any number of senders and receivers. However, the most common situation is when there is one coroutine on both sides of the channel.
One channel There can be any number of senders and receivers , However , channel The most common case is that there is only one co process at both ends .


Want to see channel Simple example , We need a producer in a separate collaboration ( The sender ) And a consumer ( The receiving party ). The producer will send the element , Consumers will receive them , Here is how they are implemented :
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) {
index ->
delay(1000)
println("Producing next one")
channel.send(index * 2)
}
}
launch {
repeat(5) {
val received = channel.receive()
println(received)
}
}
}
// (1 sec)
// Producing next one
// 0
// (1 sec)
// Producing next one
// 2
// (1 sec)
// Producing next one
// 4
// (1 sec)
// Producing next one
// 6
// (1 sec
// Producing next one
// 8
Such an implementation is not good . The receiver needs to know how many elements the sender sent , So this rarely happens , We'd rather listen all the time , Until the sender sends . To receive channel On the element , have access to for loop or consumeEach function , It will listen to the transmission until channel close .
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) {
index ->
println("Producing next one")
delay(1000)
channel.send(index * 2)
}
channel.close()
}
launch {
for (element in channel) {
println(element)
}
// perhaps
// channel.consumeEach { element ->
// println(element)
// }
}
}
A common problem with sending elements this way is : It's easy to forget to close channel, Especially under abnormal circumstances . If a collaboration process stops production because of an exception , Another coroutine will wait for the element forever . Use produce Functions are much more convenient , It's a return ReceiveChannel Co process builder for .
// This function will create a channel, And all the time
// It produces positive integers , Up to the maximum value entered
fun CoroutineScope.produceNumbers(
max: Int
): ReceiveChannel<Int> = produce {
var x = 0
while (x < max) send(x++)
}
When the collaboration ends in any way ( complete 、 stop it 、 Cancel ) when , produce Function will close the channel . Thanks to that , We will never forget to call close.produce The builder is a very popular creation channel The way , For a good reason : It provides a lot of security , And convenient .
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) {
index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (element in channel) {
println(element)
}
}
Channel The type of
according to channel Set capacity size , We distinguish four types of channel:
- unlimited —— The capacity is set to
Channel.UNLIMITEDOf channel, Have an infinite buffer , alsosendNever hang up - buffer —— There is a specific number of buffers , Or set to
Channel.BUFFERED( The default is 64, It can be done by JVM Set in kotlinx.coruoutines.channels.defaultBuffer System properties to override ) Of channel - rendezvous ( Default ) —— Capacity of 0, Or set to
Channel.RENDEZVOUS( be equal to 0) Of channel. This means that only when the sender and the receiver meet , Data communication will occur .( So it's like a Book Exchange , Not a bookcase ) - Merge —— Capacity of 1, Or set to
Channel.CONFLATEDThe buffer , Each new element replaces the previous element
Now let's look at the practical application of these capabilities . We can do it in Channel Set them on , You can also call produce Set when .
We will speed up the producers , Receiver slow down . In the case of infinite capacity ,channel It should be able to hold all the elements , Then let them be received and processed one by one .
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.UNLIMITED) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println(" send out ")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (1 - 4 * 0.1 = 0.6 sec)
// 0
// (1 sec)
// 2
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)
If channel There is a specific capacity , We first produce until the buffer is full , After that, the producer will start waiting for reception .
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = 3) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println(" send out ")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (1 - 2 * 0.1 = 0.8 sec)
// 0
// send out
// (1 sec)
// 2
// send out
// (1 sec)
// 4
// (1 sec)
// 6
// (1 sec)
// 8
// (1 sec)
For default capacity (Channel.RENDEZVOUS) Of channel, The producer will always wait for the recipient .
suspend fun main(): Unit = coroutineScope {
val channel = produce {
// perhaps produce(capacity = Channel.RENDEZVOUS) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println(" send out ")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// send out
// (1 sec)
// 2
// send out
// (1 sec)
// 4
// send out
// (1 sec)
// 6
// send out
// (1 sec)
// 8
// send out
// (1 sec)
Last , In the use of Channel.CONFLATED when , We don't store past elements . The new element will replace the previous element , So we will only receive the last element , Missing previously sent elements .
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
repeat(5) {
index ->
send(index * 2)
delay(100)
println(" send out ")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (1 - 4 * 0.1 = 0.6 sec)
// 8
onBufferOverflow
For further customization channel, We can control what happens when the buffer is full (onBufferOverflow Parameters ), There are the following options :
SUSPEND( Default ) —— When the buffer is full ,sendFunction suspendDROP_OLDEST—— When the buffer is full , Delete the oldest elementDROP_LATEST—— When the buffer is full , Delete the latest element
Just as you think ,channel Set the capacity to Channel.CONFLATED The effect is equal to setting the capacity number to 1 also onBufferOverFlow by DROP_OLDEST. at present , produce Function does not allow us to customize onBufferOverflow, So set it , We need to use Channel Function to define a channel.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) {
index ->
channel.send(index * 2)
delay(100)
println(" send out ")
}
channel.close()
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (0.1 sec)
// send out
// (1 - 4 * 0.1 = 0.6 sec)
// 6
// (1 sec)
// 8
onUndeleliveredElement
Another thing we should know Channel The argument to the function is onUndeleliveredElement. It is called when an element cannot be processed for some reason . Most of the time , It means that the channel After being closed or cancelled , Are likely to send、receive、receiveOrNull or hasNext Occurs when an error is thrown . We usually use it to turn off the resources sent by the channel .
val channel = Channel<Resource>(capacity) {
resource ->
resource.close()
}
// perhaps
// val channel = Channel<Resource>(
// capacity,
// onUndeliveredElement = { resource ->
// resource.close()
// }
// )
// Producer code
val resourceToSend = openResource()
channel.send(resourceToSend)
// Consumer code
val resourceReceived = channel.receive()
try {
// Receiving work
} finally {
resourceReceived.close()
}
Fan-out
Multiple coroutines can be generated from a single channel Receiving elements , However , In order to receive them correctly , We should use for loop ( Multiple coroutines use consumeEach It's not safe ).

fun CoroutineScope.produceNumbers() = produce {
repeat(10) {
delay(100)
send(it)
}
}
fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (msg in channel) {
println("#$id received $msg")
}
}
suspend fun main(): Unit = coroutineScope {
val channel = produceNumbers()
repeat(3) {
id ->
delay(10)
launchProcessor(id, channel)
}
}
// #0 received 0
// #1 received 1
// #2 received 2
// #0 received 3
// #1 received 4
// #2 received 5
// #0 received 6
// ...
The elements are evenly distributed . channel There is one FIFO( fifo ) Waiting for an element . That's why in the example above , You can see that each element is received by the next coroutine (0,1,2,0,1,2…).
To better understand why , Imagine the children in the kindergarten waiting in line to buy candy , Once they get some , They will eat them immediately , Then go to the last position in the queue . Such a distribution is fair ( Suppose the number of sweets is a multiple of the number of children , And suppose their parents have no problem with their children eating candy ).
Fan-in
Multiple coroutines can be sent to the same channel. In the following example , You can see that two coroutines send elements to the same channel.
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long
) {
while (true) {
delay(time)
channel.send(text)
}
}
fun main() = runBlocking {
val channel = Channel<String>()
launch {
sendString(channel, "foo", 200L) }
launch {
sendString(channel, "BAR!", 500L) }
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
occasionally , We need to combine multiple channels into one channel , So , You may find the following functions useful , Because it uses produce Merge multiple channel.
fun <T> CoroutineScope.fanIn(
channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (elem in channel) {
send(elem)
}
}
}
}
Pipelines
Sometimes we set two channel, One of the generated elements is based on the element received from the other . under these circumstances , We call it a pipe .
// One Channel Send from 1 To 3
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
produce {
repeat(3) {
num ->
send(num + 1)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
produce {
for (num in numbers) {
send(num * num)
}
}
suspend fun main() = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
for (num in squared) {
println(num)
}
}
// 1
// 4
// 9
Channel Is primitive communication
When different processes need to communicate with each other , Channel It is useful to , They guarantee no conflict ( for example , There is no problem with sharing status ) And fair .
Imagine different baristas making coffee . Every barista should work independently . Different types of coffee require different preparation times , But we want to process the order in order . The simplest solution to this problem is to Channel Send the order and the generated coffee results at the same time , have access to produce The generator defines the barista :
suspend fun CoroutineScope.serveOrders(
orders: ReceiveChannel<Order>,
baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
for (order in orders) {
val coffee = prepareCoffee(order.type)
send(
CoffeeResult(
coffee = coffee,
customer = order.customer,
baristaName = baristaName
)
)
}
}
When we set up a pipe , We can use the previously defined fanIn Function to combine the results of different baristas into one :
val coffeeResults = fanIn(
serveOrders(ordersChannel, "Alex"),
serveOrders(ordersChannel, "Bob"),
serveOrders(ordersChannel, "Celine"),
)
In the next chapter , You will see more practical examples .
Practical usage
Actual use
We use channel A typical case of : One end produces a value , The other end deals with . These examples include responding to user clicks 、 New notifications from the server or update search results over time ( A good example is SkyScanner, It searches for the cheapest flights by querying multiple airline websites ). However , in the majority of cases , Best use channelFlow or callbackFlow, They are all Channel and Flow Mixture ( We will be building Flow They are described in the chapter of ).

In pure form , I find channel Useful in more complex situations . for example , Suppose we are maintaining an online store , Like Amazon . Let's assume that your server receives a large number of submissions from sellers to change product information . For each change , We first need to find the latest quotation list , Then update them one by one .

The traditional method is not the best , A seller may even have thousands of price changes . It's not a good idea to do all this in a long process .
First , Internal exceptions or server restarts may make us wonder where to stop . secondly , A big seller may block the server for a long time , This allows small sellers to wait for their changes to be applied . Besides , We should not send too many network requests at the same time , To avoid services that need to handle these requests ( And our network interface ) overload .
The solution to this problem may be to build a pipeline . The first channel contains the sellers to be processed , The second channel contains the quotation to be updated . These channels will have a buffer . When there are already too many submissions waiting , The second commit buffer prevents our service from committing more . therefore , Our server will be able to balance the number of quotes we update at the same time .
We can also easily add some intermediate steps , For example, delete duplicate items . By defining the number of coroutines listening on each channel , We can decide how many concurrent requests the service sends . Controlling these parameters gives us a lot of freedom . You can also easily add many improvements , Such as persistence ( For server restart ) Or element uniqueness ( Used when the seller makes another change before the previous change is processed ).

// A simple implementation
suspend fun handleOfferUpdates() = coroutineScope {
val sellerChannel = listenOnSellerChanges()
val offerToUpdateChannel = produce(capacity = UNLIMITED) {
repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) {
launch {
for (seller in sellerChannel) {
val offers = offerService
.requestOffers(seller.id)
offers.forEach {
send(it) }
}
}
}
}
repeat(NUMBER_OF_CONCURRENT_UPDATE_SENDERS) {
launch {
for (offer in offerToUpdateChannel) {
sendOfferUpdate(offer)
}
}
}
}
summary
Channel It is a powerful tool for primitive communication between processes . It supports any number of senders and receivers , And each value sent to the channel can only be received once . We usually use produce Builder to create channel, stay channel You can control the number of coroutines that handle certain tasks . Into gold , What we most often use is with Flow dependent channel, This will be introduced later in the book .
边栏推荐
- Typescript – classes in Es5, inheritance, static methods
- Review of mathematical knowledge: curve integral of the second type
- 前嗅ForeSpider教程:抽取数据
- Dow Jones Industrial Average
- 超长干货 | Kubernetes命名空间详解
- The AOV function of R language was used for repeated measures ANOVA (one intra group factor and one inter group factor) and interaction Plot function and boxplot to visualize the interaction
- Skill combing [email protected] somatosensory manipulator
- mysql数据库基础:约束、标识列
- scratch绘制正方形 电子学会图形化编程scratch等级考试二级真题和答案解析2022年6月
- JS FAQs
猜你喜欢

Dell et Apple, deux entreprises de PC établies, se sont effondrées rapidement

Migrate full RT thread to gd32f4xx (detailed)

Remember the experience of an internship. It is necessary to go to the pit (I)

nvm、nrm、npx使用(安装、基本命令、参数、curl、wget)

ArcGIS Pro scripting tool (5) - delete duplicates after sorting

历史上的今天:微软收购 PowerPoint 开发商;SGI 和 MIPS 合并

国产自研系统的用户突破4亿,打破美国企业的垄断,谷歌后悔不迭

China will force a unified charging interface. If Apple does not bow its head, iPhone will be kicked out of the Chinese market

MySQL导出sql脚本文件

同事的接口文档我每次看着就头大,毛病多多。。。
随机推荐
LVGL 8.2 Simple Image button
Voir le changement technologique à travers la Légion Huawei (5): Smart Park
Use keil5 software to simulate and debug gd32f305 from 0
The intelligent DNA molecular nano robot model is coming
[deep learning] common methods for deep learning to detect small targets
MATLAB image histogram equalization, namely spatial filtering
LVGL 8.2 Drop down in four directions
历史上的今天:微软收购 PowerPoint 开发商;SGI 和 MIPS 合并
CVPR 2022 | Tsinghua & bytek & JD put forward BRT: Bridging Transformer for vision and point cloud 3D target detection
Gd32 RT thread RTC driver function
时间复杂度与空间复杂度
Anhui "requirements for design depth of Hefei fabricated building construction drawing review" was printed and distributed; Hebei Hengshui city adjusts the pre-sale license standard for prefabricated
mysql数据库基础:视图、变量
CSDN博客运营团队2022年H1总结
LVGL 8.2 Simple Drop down list
Musk has more than 100 million twitter fans, but he has been lost online for a week
CP2112使用USB转IIC通信教学示例
Pycharm项目使用pyinstalle打包过程中问题及解决方案
Every time I look at my colleagues' interface documents, I get confused and have a lot of problems...
Apple's 5g chip was revealed to have failed in research and development, and the QQ password bug caused heated discussion. Wei Lai responded to the short selling rumors. Today, more big news is here