当前位置:网站首页>kotlin 协程通道
kotlin 协程通道
2022-06-24 12:58:00 【day_moon】
// 1.通道 传输值流的方法
fun main_channel() = runBlocking {
val channel = Channel<Int>()//声明一个channel
launch {
for (i in 1..3) {
channel.send(i + i)
}//发送1+1 2+2 3+3
channel.close()//没有立即关闭通道
}
repeat(3) { // 这样写也是可以for (y in channel) println(y)
println("接收到的是${channel.receive()}")
}
println("end ..")
}
//2.构建通道生产者
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {//构建通道生产者
for (i in 1..3) send(i + i)
}
fun main_produce() = runBlocking {
val produce = producer()
produce.consumeEach { println("$it") }//迭代每一个生产者
println("end ..")
}
//3 管道
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce<Int> {//每次加1 生成无穷个 没有返回类型
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {//先收到的,然后处理后再发出
for (x in numbers) send(x + x)
}
fun main_pip() = runBlocking {
val numbers = produceNumbers() //每次加1 生成无穷个 没有返回类型
val squares = square(numbers) //先收到的,然后处理后再发出
repeat(3) {//重复3次
println(squares.receive())
}
println("end ..") //完成
coroutineContext.cancelChildren() // 取消子协程
}
//4.管道的素数
fun main_filters() = runBlocking {
var cur = numberssend(2)//从二开始 2 3 4..
repeat(10) {//取10个数
val number = cur.receive()
println(" $number")
cur = filters(cur, number)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numberssend(start: Int) = produce<Int> {//数据每次加1发送
var x = start
while (true) send(x++)
}
//从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道
fun CoroutineScope.filters(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)//可以被发送的数整除 然后发送
}
//5.扇出
fun mainFanout() = runBlocking<Unit> {
val producer = produceNumber()
repeat(5) {
launchProcessor(it, producer)
}
delay(950)
producer.cancel() //取消
}
fun CoroutineScope.produceNumber() = produce<Int> {
var x = 1
while (true) {
send(x++) //从1开始和每次加1
delay(100) //
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
//6.扇入
fun main_in() = runBlocking {
val channel = Channel<String>()
//两个协程来发送字符串
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
//7.带缓冲的通道
fun main_Buffered() = runBlocking<Unit> {
val channel = Channel<Int>(4) //容量为4
val sender = launch { // 启动协程
repeat(10) {
println("发送的数据 $it")//0-4
channel.send(it)
}
}
delay(100)//延迟1秒
sender.cancel() //取消
}
//8.通道是公平的
data class Ball(var hits: Int)
fun main_fair() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//9.计时通道
fun main_Ticker() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}边栏推荐
- Best practices of swagger in egg project
- 万用表的使用方法
- 项目经理的晋级之路
- 10 个 Reduce 常用“奇技淫巧”
- 源碼解析 Handler 面試寶典
- 真正的项目经理强者,都是闭环高手!
- One article explains R & D efficiency! Your concerns are
- MySQL interview questions
- Coinbase will launch the first encryption derivative for individual investors
- 8 - Format integers and floating point numbers
猜你喜欢

图扑软件数字孪生海上风电 | 向海图强,奋楫争先

如何避免严重网络安全事故的发生?

Vulnerability management mistakes that CIOs still make

Activity生命周期

10 个 Reduce 常用“奇技淫巧”

Comparator sort functional interface

Main steps of system test

青藤入选工信部网安中心“2021年数字技术融合创新应用典型解决方案”

工业物联网(IIoT)的八个主要趋势

Huawei PC grows against the trend, and product power determines everything
随机推荐
Talk about GC of JVM
Cloud native essay solicitation progress case practice
万用表测量电阻图解及使用注意事项
Liux command
不用Home Assistant,智汀也开源接入HomeKit、绿米设备?
Ask a question about SQL view
美国会参议院推进两党枪支安全法案
Interviewer: the MySQL database is slow to query. What are the possible reasons besides the index problem?
【AI玩家养成记】用AI识别邻居家旺财是什么品种
系统测试主要步骤
SYSTEMd common component description
90%的项目经理都跳过的坑,你现在还在坑里吗?
位于相同的分布式端口组但不同主机上的虚拟机无法互相通信
Seven challenges faced by data scientists and Solutions
Can inspection results be entered after the completion of inspection lot UD with long-term inspection characteristics in SAP QM?
源碼解析 Handler 面試寶典
Kotlin keyword extension function
Gateway processing flow of zuul source code analysis
10 reduce common "tricks"
常识知识点