当前位置:网站首页>kotlin 协程通道
kotlin 协程通道
2022-06-24 12:58:00 【day_moon】
// 1.通道 传输值流的方法
fun main_channel() = runBlocking {
val channel = Channel<Int>()//声明一个channel
launch {
for (i in 1..3) {
channel.send(i + i)
}//发送1+1 2+2 3+3
channel.close()//没有立即关闭通道
}
repeat(3) { // 这样写也是可以for (y in channel) println(y)
println("接收到的是${channel.receive()}")
}
println("end ..")
}
//2.构建通道生产者
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//构建通道生产者
for (i in 1..3) send(i + i)
}
fun main_produce() = runBlocking {
val produce = producer()
produce.consumeEach { println("$it") }//迭代每一个生产者
println("end ..")
}
//3 管道
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成无穷个 没有返回类型
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {//先收到的,然后处理后再发出
for (x in numbers) send(x + x)
}
fun main_pip() = runBlocking {
val numbers = produceNumbers() //每次加1 生成无穷个 没有返回类型
val squares = square(numbers) //先收到的,然后处理后再发出
repeat(3) {//重复3次
println(squares.receive())
}
println("end ..") //完成
coroutineContext.cancelChildren() // 取消子协程
}
//4.管道的素数
fun main_filters() = runBlocking {
var cur = numberssend(2)//从二开始 2 3 4..
repeat(10) {//取10个数
val number = cur.receive()
println(" $number")
cur = filters(cur, number)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numberssend(start: Int) = produce<Int> {//数据每次加1发送
var x = start
while (true) send(x++)
}
//从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)//可以被发送的数整除 然后发送
}
//5.扇出
fun mainFanout() = runBlocking<Unit> {
val producer = produceNumber()
repeat(5) {
launchProcessor(it, producer)
}
delay(950)
producer.cancel() //取消
}
fun CoroutineScope.produceNumber() = produce<Int> {
var x = 1
while (true) {
send(x++) //从1开始和每次加1
delay(100) //
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
//6.扇入
fun main_in() = runBlocking {
val channel = Channel<String>()
//两个协程来发送字符串
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
//7.带缓冲的通道
fun main_Buffered() = runBlocking<Unit> {
val channel = Channel<Int>(4) //容量为4
val sender = launch { // 启动协程
repeat(10) {
println("发送的数据 $it")//0-4
channel.send(it)
}
}
delay(100)//延迟1秒
sender.cancel() //取消
}
//8.通道是公平的
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//9.计时通道
fun main_Ticker() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}边栏推荐
- Interviewer: the MySQL database is slow to query. What are the possible reasons besides the index problem?
- 90%的项目经理都跳过的坑,你现在还在坑里吗?
- [sdx62] wcn685x IPA failure analysis and solution
- kotlin 组合挂起函数
- 万用表测量电阻图解及使用注意事项
- Why did the audio and video based cloud conference usher in a big explosion of development?
- How can the new webmaster avoid the ups and downs caused by SEO optimization?
- kotlin 接口 泛型 协变 逆变
- Introduction to reptile to give up 01: Hello, reptile!
- 真正的项目经理强者,都是闭环高手!
猜你喜欢

Hands on data analysis unit 3 model building and evaluation

Without home assistant, zhiting can also open source access homekit and green rice devices?

吉时利静电计宽测量范围

‘高并发&高性能&高可用服务程序’编写及运维指南

Seven challenges faced by data scientists and Solutions

Beauty of script │ VBS introduction interactive practice

Usage of multimeter

Activity生命周期

Who is the fish and who is the bait? Summary of honeypot recognition methods from the perspective of red team

**Unity中莫名其妙得小问题-灯光和天空盒
随机推荐
Party, Google's autoregressive Wensheng graph model
发扬连续作战优良作风 全力以赴确保北江大堤安全
Redis面试题
Huawei PC grows against the trend, and product power determines everything
Ask a question about SQL view
华为 PC 逆势增长,产品力决定一切
Process basic properties
Getting started with the go Cobra command line tool
10 个 Reduce 常用“奇技淫巧”
Vipshop's "special sale" business is no longer easy to do?
2022年施工升降机司机(建筑特殊工种)考试试题及在线模拟考试
Resolve symbol conflicts for dynamic libraries
Geological disaster early warning monitoring RTU
SYSTEMd common component description
项目经理的晋级之路
2022年氟化工艺考试模拟100题及答案
吉时利静电计宽测量范围
Main steps of system test
Hands on data analysis unit 3 model building and evaluation
3. caller service call - dapr