当前位置:网站首页>Kotlin collaboration channel
Kotlin collaboration channel
2022-06-13 06:26:00 【m0_ forty-seven million nine hundred and fourteen thousand one 】
One .Channel
Channel It is actually a concurrent secure queue , It can be used to connect processes , Communication between different processes .

1.Channel Use
Import dependence
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0-RC-native-mt'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.0-RC-native-mt'In different processes , use Channel To realize sending and receiving .
//Channel Send and receive
fun testKnowChannel()= runBlocking {
val channel = Channel<Int> ()
// sender
val launch = GlobalScope.launch {
while (true) {
var i = 0
delay(1000)
// send out
channel.send(++i)
println("send $i")
}
}
// The receiver
val launch1 = GlobalScope.launch {
while (true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(launch,launch1)
}Result demonstration :

2.Channel buffer
Channel It's actually a queue , There must be a buffer in the queue , So once the buffer is full , And no one has ever called receive And take the function ,send You need to suspend . Deliberately slow down the receiver , Find out send Always hang up , until receive After that, we will continue to carry out .
//Channel Send and receive
fun testKnowChannel()= runBlocking {
val channel = Channel<Int> ()
// sender
val launch = GlobalScope.launch {
while (true) {
var i = 0
delay(1000)
channel.send(++i)
println("send $i")
}
}
// The receiver The order will not change
val launch1 = GlobalScope.launch {
while (true) {
delay(2000)
val element = channel.receive()
println("receive $element")
}
}
joinAll(launch,launch1)
}Result demonstration :

What I am receiving is delay 2 second , Longer than the transmission delay , But it did not cause the problem of sending and receiving data for many times Channel It's buffered
3. iteration Channel
Channel The body really looks like a sequence , So we can directly get a Channel Of iterator.
//Channel The iterator
fun testIterateChannel()= runBlocking {
// Buffer size
val channel = Channel<Int> (Channel.UNLIMITED)
// sender
val launch = GlobalScope.launch {
for (x in 1..5){
channel.send(x*x)
println("send: ${x * x}")
}
}
// The receiver
val launch1 = GlobalScope.launch {
/* val iterator = channel.iterator()// iterator
while (iterator.hasNext()){
val next = iterator.next()//
println("receive $next")
delay(2000)
}*/
// This is a for Cyclic writing
for (element in channel){
println("receive $element")
delay(2000)
}
}
joinAll(launch,launch1)
}Result demonstration .

4.produce And actor
1. A convenient way to construct producers and consumers .
2. We can go through p℃duce Method to start a producer collaboration , And return a ReceiveChanneI, Other processes can use this Channe| To receive data . In turn, , We can use actor Start a consumer program .
1. Producer's agreement
// Producer's agreement This will be sent and received one by one
fun testFastProducerChannel()= runBlocking{
val produce: ReceiveChannel<Int> = GlobalScope.produce {
// repeat
repeat(100){
delay(1000)
send(it)
}
}
val launch = GlobalScope.launch {
for (i in produce) {
println("Received: $i")
}
}
launch.join()
}
2. Consumer collaboration
// Here, there is an endless loop of receivers on it , When the following cooperation process sends a message, the above will immediately receive
fun testFastConsumerChannel()= runBlocking {
val actor = GlobalScope.actor<Int> {
while (true){
val element=receive()
println(element)
}
}
val producer = GlobalScope.launch {
for (i in 0..3) {
actor.send(i)
}
}
producer.join()
}5.Channel The closing of the
1. produce and actor Back to Channel Will be closed after the execution of the corresponding collaboration , That's exactly what happened ,Ch
annel It is called thermal data flow .
2. For one Channel, If we call its close Method , It will immediately stop receiving new elements , In other words, it's isClosedForSend Will return immediately trueo And because the Channel The existence of buffer , At this time, there may be some elements that have not been processed , So wait until all the elements are read isClosedForReceive Will return true.
3. Channel The life cycle of is best maintained by the leading Party , It is suggested that the leading Party should realize the closure .
fun testCloseChannel()= runBlocking{
// Channel size refers to 3, Send several at a time
val channel = Channel<Int>(3)
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
//channel The best way to close is by the leading Party ( sender )
channel.close()
println("1: ${channel.isClosedForSend} ${channel.isClosedForReceive}")
}
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(1000)
println("2: ${channel.isClosedForSend} ${channel.isClosedForReceive}")
}
}
joinAll(producer,consumer)
}Print the results :
send 0
receive 0
send 1
send 2
1: true false
2: true false
receive 1
2: true false
receive 2
2: true true6.BroadcastChannel
Mentioned earlier , Sending end and receiving end Channe There is a one to many situation , In terms of data processing itself , Although there are multiple receivers , But the same element will only be read by one receiver . Radio is not , Multiple receivers do not have mutually exclusive behavior .
// Here we simulate Broadcast one to many situations
fun testBroadcast()= runBlocking{
val broadcastChannel =BroadcastChannel<Int>(Channel.BUFFERED)
val launch = GlobalScope.launch {
List(30) {
delay(1000)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(2){index->
GlobalScope.launch {
val openSubscription = broadcastChannel.openSubscription()
for (i in openSubscription){
println("[#$index] received: $i")
}
}
}.joinAll()
}Print the results :

Two .Channel Multiplexing
such as , In data communication system or computer network system , The bandwidth or capacity of the transmission media is often greater than the demand for transmitting a single signal , In order to make effective use of communication lines , We want a channel to transmit multiple signals at the same time , This is called multiplexing technology .
1. Reuse multiple await
Join the rely on
implementation 'com.squareup.retrofit2:retrofit:2.9.0'
implementation 'com.squareup.retrofit2:converter-moshi:2.9.0'
implementation 'com.google.code.gson:gson:2.8.6'data class User(
val `data`: List<Data>,
val name: String,
val address: String
) {
data class Data(
val category: String,
val icon: String,
val id: Int,
val link: String,
val name: String,
val order: Int,
val visible: Int
)
}val userServiceApi: UserServiceApi by lazy {
val retrofit = Retrofit.Builder()
.client(OkHttpClient.Builder().addInterceptor {
it.proceed(
it.request()
).apply {
// Log.d("jason", "request:${code()}")
}
}.build()).baseUrl("https://www.wanandroid.com/friend/").addConverterFactory(MoshiConverterFactory.create())
.build()
retrofit.create(UserServiceApi::class.java)
}
interface UserServiceApi {
@GET("json")
// Hang up
suspend fun User(name: String):User
}// The principle of multiplexing is the same as the following This is a async How to write it
// The local path
private val cachePath = "E://xx.text"
private val gson = Gson() //GSon object
// Outer data class
data class Response<T>(val value: T, val isLocal: Boolean)
//CoroutineScope The extension function of async Is a coroutine that returns a value // Start the process Context ( Sub thread )
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
//delay(1000) // Deliberate delay
// Read data in local file and use Gson analysis Reflection call
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
// Request network data
userServiceApi.User(name)
}
// Start the main coordination process ( Synchronous blocking )
fun testSelectAwait() = runBlocking<Unit> {
GlobalScope.launch {
// Get data class
val localRequest = getUserFromLocal("xxx")
val remoteRequest =getUserFromRemote("xxx")
//select The function of determines who returns the data first and returns
val userResponse = select<Response<User>> {
//onAwait yes SelectClause1 Function of , Can return value (Await yes async The execution method of can return a value 《 Sync 》)
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) }
}
// Judge whether the value exists , Exist and print
userResponse.value?.let { println(it) }
}.join()
}
2. Reuse multiple Channel
Just like the last example , Returns the fastest return channel The news of
fun testSelectChannel()= runBlocking {
val listOf = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
listOf[0].send(200)
}
GlobalScope.launch {
delay(50)
listOf[1].send(100)
}
// Select the fastest received value
val select = select<Int> {
listOf.forEach { channel ->
channel.onReceive {
it
}
}
}
println(select)
}Result demonstration :
1003.SelectClause
How do we know which events can be se《ect Well ? In fact, all can be select All the events are SelectClauseN type , Include :
1. SelectClause0: The corresponding event has no return value , for example join no return value , that onJoin Namely SelectClauseN type . When using ,onJoin The argument to is a parameterless function .
2. SelectClause1: The corresponding event has a return value , Ahead onAwait and onReceive This is the case .
3. SelectCIause2: The corresponding event has a return value , In addition, an additional parameter is required , for example ChanneI.onSend There are two parameters , The first is Channe| Value of data type , Indicates the value to be sent ; The second is the callback parameter when the sending succeeds .
If we want to confirm whether the suspend function supports select, Just check whether there is a corresponding Se|ectCIauseN The type can be called back .
1.SelectClause0 Test cases
// Multiplexing SelectClause0
fun testSelectClause0()= runBlocking {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")
}
val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
select <Unit>{
job1.onJoin{
println("job 1 onJoin")
}
job2.onJoin{
println("job 2 onJoin")
}
}
}Print the results :
job 2
job 2 onJoin2.SelectCIause2 Test cases
// Multiplexing SelectClause2
fun testSelectClause2()= runBlocking {
val listOf = listOf(Channel<Int>(), Channel<Int>())
println(listOf)
launch (Dispatchers.IO){
select <Unit>{
launch {
delay(10)
listOf[0].onSend(200){
sendChannel ->
println("sent on $sendChannel")
}
}
launch {
delay(100)
// The first parameter
listOf[0].onSend(100){
// The second parameter
sendChannel ->
println("sent on $sendChannel")
}
}
}
}
GlobalScope.launch {
println(listOf[0].receive())
}
GlobalScope.launch {
println(listOf[1].receive())
}
}Print the results :
[[email protected]{EmptyQueue}, [email protected]{EmptyQueue}]
200
sent on [email protected]{EmptyQueue}4. Use flow Implement multiplexing
Add dependency
implementation 'org.jetbrains.kotlin:kotlin-reflect:1.5.0'// The principle of multiplexing is the same as the following This is a async How to write it
// The local path
private val cachePath = "E://xx.text"
private val gson = Gson() //GSon object
// Outer data class
data class Response<T>(val value: T, val isLocal: Boolean)
//CoroutineScope The extension function of async Is a coroutine that returns a value // Start the process Context ( Sub thread )
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
//delay(1000) // Deliberate delay
// Read data in local file and use Gson analysis Reflection call
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
// Request network data
userServiceApi.User(name)
}
//flow Multiplexing
fun testSelectFlow() = runBlocking<Unit> {
// function -> coroutines -> Flow -> Flow Merge
val name = "xiaohua"
coroutineScope {
val collect = listOf(this::getUserFromLocal, this::getUserFromRemote)
.map { function ->
function.call(name) // Reference method
}.map { deferred ->
flow { emit(deferred.await()) }
}.merge().collect {
user -> println(user)
}
}
}3、 ... and . Concurrent security
Coroutines are thread based , But since threads have concurrency problems , The coordination process must also have
fun testSafeConcurrentTools()= runBlocking<Unit> {
var count = 0
List(1000){
GlobalScope.launch {
count++
}
}.joinAll()
println(count)
}Print the results :
9951. Concurrency security of coroutines
In addition to our common means of solving concurrency problems in threads , The concurrency framework also provides some concurrency security tools , Include :
Channel: Concurrent secure message channel , We are already very familiar with .
Mutex: Lightweight lock , its |ock and un|ock Semantically, it is similar to thread lock , The reason why it is lightweight is that it does not block threads when locks cannot be obtained , Instead, hang and wait for the release of the lock .
Semaphore: Lightweight semaphores , Semaphores can have multiple , The coroutine can perform concurrent operations after obtaining the semaphore .
When Semaphore The parameters for 1 when , The effect is equivalent to Mute×.
1.AtomicInteger( Atomic operation )
fun testSafeConcurrent()= runBlocking<Unit> {
// Atomic operation
var count = AtomicInteger(0)
List(1000){
GlobalScope.launch {
count.incrementAndGet()
}
}.joinAll()
println(count.get())
}Print the results :
10002.Mutex( Lightweight thread lock )
fun testSafeConcurrentTools1()= runBlocking<Unit> {
var count =0 // Lightweight thread lock
val mutex= Mutex()
List(1000){
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}Print the results :
10003.Semaphore( The signal )
fun testSafeConcurrentTools12()= runBlocking<Unit> {
var count =0 // The signal
val semaphore= Semaphore(1)
List(1000){
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}Print the results :
10004. Avoid accessing external states
When writing a function, it is required that it must not access external state , Operations can only be done based on parameters , Provide the result of the operation through the return value .
fun testAvoidAccessOuterVariable()= runBlocking<Unit> {
var count =0
// Cannot access external state , Operations can only be performed by parameters
val result= count + List(1000){
GlobalScope.async {
1
}
}.map {
it.await()
}.sum()
println(count)
}Print the results :
1000边栏推荐
- Uniapp (upload local pictures, preview pictures, convert Base64 format, upload audio files)
- Cross process two-way communication using messenger
- [2022 college entrance examination season] what I want to say as a passer-by
- Wechat applet custom tabbar (session customer service) vant
- 不在以下合法域名列表中,微信小程序解决办法
- 【新手上路常见问答】一步一步理解程序设计
- Echart折线图:当多条折线存在相同name,图例仍全部显示
- Wechat applet (pull-down refresh data) novice to
- Wechat applet (function transfer parameters, transfer multiple parameters, page Jump)
- 电镀挂具RFID工序管理解决方案
猜你喜欢

端午安康,使用祝福话语生成词云吧

Echart柱状图:x轴显示value,y轴显示类别
![[solution] camunda deployment process should point to a running platform rest API](/img/ef/5b893e9c315c10db6c1db46b4c3f5a.jpg)
[solution] camunda deployment process should point to a running platform rest API

本地文件秒搜工具 Everything

MFS详解(六)——MFS Chunk Server服务器安装与配置

【新手上路常见问答】关于技术管理

Echart histogram: X-axis displays value, Y-axis displays category

Omron Ping replaces the large domestic product jy-v640 semiconductor wafer box reader

Custom view - extensible collapsexpendview

Free screen recording software captura download and installation
随机推荐
Simple use of event bus
Adding classes dynamically in uni app
Relationship between fragment lifecycle and activity
Thread pool learning
View绘制整体流程简析
synchronized浅析
[MySQL] basic knowledge review
Applet Use of spaces
Win10 drqa installation
不在以下合法域名列表中,微信小程序解决办法
楊輝三角形詳解
欧姆龙平替国产大货—JY-V640半导体晶元盒读写器
App performance test: (I) startup time
Analysis of synchronized
App performance test: (III) traffic monitoring
High burst solution 2
Intelligent digital asset management helps enterprises win the post epidemic Era
[solution] camunda deployment process should point to a running platform rest API
Cross process two-way communication using messenger
Thread correlation point