当前位置:网站首页>Convert callback function to Flow

Convert callback function to Flow

2022-08-04 08:02:00 Mr_Tony

一、前言

在kotlin中,Languages ​​structure programs,提高了可读性,Transformation operations are also provided for legacy program logic,Here is how to convert the callback to Flow流,优化程序结构

二、代码示例

1、callbackFlow

这里演示callbackFlow的使用方式.callbackFlowIt belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel进行通信,所以可以使用Channel的相关函数.

   interface Listener{
    
        fun listener()
        fun end()
    }
inner class TouchModel{
    
        private var listener: Listener ?= null
        fun registerListener(sourceListener: Listener){
    
            listener = sourceListener
        }
        fun unregisterListener(){
    
            listener = null
        }

        fun emit(){
    
            listener?.listener()
        }
        fun end(){
    
            listener?.end()
        }
    }
   @Test
    fun test(){
    
        val model = TouchModel()
        runBlocking {
    

            val flow = flowFrom(model)

            flow.onEach {
    
                println("YM--->流:$it")
            }.launchIn(this)
            delay(1000)
            model.emit()
            delay(1000)
            model.emit()
            delay(1000)
            model.emit()
            delay(1000)
            println("YM--->stream is about to end")
            model.end()
            delay(1000)

        }
    }
    //callbackFlowIt belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel进行通信,所以可以使用Channel的相关函数
    fun flowFrom(model: TouchModel): Flow<Int> = callbackFlow {
    
        var count = 0
        val callback = object : Listener{
    
            override fun listener() {
    
// 为了避免阻塞,channelBuffered channels can be configured,I don't know how to deal with this at the moment
// trySend(count)//这两种方式都行
                    trySendBlocking(count)
                        .onFailure {
     throwable ->
                            // Downstream has been cancelled or failed, can log here
                        }
                    count++
            }

            override fun end() {
    
                //When the execution is over, it can be closed using the following methodschannel,或者抛出异常,该参数可选,
// channel.close(IllegalStateException("This state is not right"))
// close(IllegalStateException("This state is not right"))
// channel.close() 等同于 close()
                println("YM--->Channel关闭")
                close()
            }
        }
        model.registerListener(callback)
        //因为是冷流,所以需要使用awaitCloseDo pending blocking
        awaitClose {
    
            //关闭注册
            println("YM--->解除注册")
            model.unregisterListener()
        }
    }

2、suspendCancellableCoroutine

If for a single callback.可以使用suspendCancellableCoroutine进行处理.示例代码如下:

      interface Listener{
    
        fun listener()
        fun end()
    }

    inner class TouchModel{
    
        private var listener: Listener ?= null
        fun registerListener(sourceListener: Listener){
    
            listener = sourceListener
        }
        fun unregisterListener(){
    
            listener = null
        }

        fun emit(){
    
            listener?.listener()
        }
        fun end(){
    
            listener?.end()
        }
    }
       @Test
    fun test(){
    
              val model = TouchModel()
        runBlocking {
    
// val flow = flowFrom(model)
            val job = async {
    
                val flow = awaitCallback(model)
                println("YM--->流:$flow")
            }
// delay(1000)
// model.emit()
            delay(1000)
            println("YM--->stream is about to end")
            model.end()
// job.cancel()//The flow can be undone,If the task is not over yet,This task can be undone directly
            delay(1000)
        }
    }
  suspend fun awaitCallback(model: TouchModel): Int = suspendCancellableCoroutine {
     continuation ->
        val callback = object : Listener {
     // Implementation of some callback interface
            override fun listener() {
    
                continuation.resume(0){
    //Used when coroutine resumes
                    continuation.resumeWithException(it)
                }
// continuation.resumeWithException(cause)
       println("YM---->isActive:${
      continuation.isActive}--->isCancel:${
      continuation.isCancelled}")
            }

            override fun end() {
    
                continuation.cancel()
            }
        }
        // Register callback with an API
        model.registerListener(callback)
        // Remove callback on cancellation
        continuation.invokeOnCancellation {
    
            println("YM---->挂起关闭")
            model.unregisterListener()
        }
        // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
    }

可以看到,Execute once and terminate directly,It should be noted that if the task is not completed,直接进行continuation.cancel().那么就会执行continuation.invokeOnCancellation函数.倘若,It has been executed againcontinuation.cancel().则不会执行continuation.invokeOnCancellation.

3、CompletableDeferred

This can also monitor the conversion of the callback function,如下:

class CompletableDeferredTest {
    
    val response = CompletableDeferred<Int>()
    @Test
    fun test(){
    
        request(response)
        runBlocking {
    
            val result = response.await()
            println("YM---->结果:${
      result}")
// response.cancel() //If the undo is performed before the result is returned,那么就会触发CompletableDeferred.invokeOnCompletion()函数
            delay(4000)
        }
    }

     fun request(rep: CompletableDeferred<Int>){
    

         Thread{
    //The main purpose of using threads instead of coroutines here is to prove that this function can be executed without a coroutine environment
             Thread.sleep(1000)//Delay the simulated request for two seconds
             rep.complete(2)
         }.start()
// rep.completeExceptionally(IllegalStateException("非法状态异常"))//This can throw exceptions
         rep.invokeOnCompletion {
    
             if (rep.isCancelled) {
    
                 println("Call cancelled")
             }
         }
    }

}

三、参考链接

  1. callbackFlow

  2. 使用更为安全的方式收集 Android UI 数据流

  3. Kotlin–suspendCancellableCoroutine和suspendCoroutine的区别及使用_Th.one的博客-CSDN博客

  4. CoroutineScope - CompletableDeferred cancellation

原网站

版权声明
本文为[Mr_Tony]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/216/202208040632311360.html