当前位置:网站首页>Convert callback function to Flow
Convert callback function to Flow
2022-08-04 08:02:00 【Mr_Tony】
一、前言
在kotlin中,Languages structure programs,提高了可读性,Transformation operations are also provided for legacy program logic,Here is how to convert the callback to Flow流,优化程序结构
二、代码示例
1、callbackFlow
这里演示callbackFlow的使用方式.callbackFlowIt belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel进行通信,所以可以使用Channel的相关函数.
interface Listener{
fun listener()
fun end()
}
inner class TouchModel{
private var listener: Listener ?= null
fun registerListener(sourceListener: Listener){
listener = sourceListener
}
fun unregisterListener(){
listener = null
}
fun emit(){
listener?.listener()
}
fun end(){
listener?.end()
}
}
@Test
fun test(){
val model = TouchModel()
runBlocking {
val flow = flowFrom(model)
flow.onEach {
println("YM--->流:$it")
}.launchIn(this)
delay(1000)
model.emit()
delay(1000)
model.emit()
delay(1000)
model.emit()
delay(1000)
println("YM--->stream is about to end")
model.end()
delay(1000)
}
}
//callbackFlowIt belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel进行通信,所以可以使用Channel的相关函数
fun flowFrom(model: TouchModel): Flow<Int> = callbackFlow {
var count = 0
val callback = object : Listener{
override fun listener() {
// 为了避免阻塞,channelBuffered channels can be configured,I don't know how to deal with this at the moment
// trySend(count)//这两种方式都行
trySendBlocking(count)
.onFailure {
throwable ->
// Downstream has been cancelled or failed, can log here
}
count++
}
override fun end() {
//When the execution is over, it can be closed using the following methodschannel,或者抛出异常,该参数可选,
// channel.close(IllegalStateException("This state is not right"))
// close(IllegalStateException("This state is not right"))
// channel.close() 等同于 close()
println("YM--->Channel关闭")
close()
}
}
model.registerListener(callback)
//因为是冷流,所以需要使用awaitCloseDo pending blocking
awaitClose {
//关闭注册
println("YM--->解除注册")
model.unregisterListener()
}
}
2、suspendCancellableCoroutine
If for a single callback.可以使用suspendCancellableCoroutine进行处理.示例代码如下:
interface Listener{
fun listener()
fun end()
}
inner class TouchModel{
private var listener: Listener ?= null
fun registerListener(sourceListener: Listener){
listener = sourceListener
}
fun unregisterListener(){
listener = null
}
fun emit(){
listener?.listener()
}
fun end(){
listener?.end()
}
}
@Test
fun test(){
val model = TouchModel()
runBlocking {
// val flow = flowFrom(model)
val job = async {
val flow = awaitCallback(model)
println("YM--->流:$flow")
}
// delay(1000)
// model.emit()
delay(1000)
println("YM--->stream is about to end")
model.end()
// job.cancel()//The flow can be undone,If the task is not over yet,This task can be undone directly
delay(1000)
}
}
suspend fun awaitCallback(model: TouchModel): Int = suspendCancellableCoroutine {
continuation ->
val callback = object : Listener {
// Implementation of some callback interface
override fun listener() {
continuation.resume(0){
//Used when coroutine resumes
continuation.resumeWithException(it)
}
// continuation.resumeWithException(cause)
println("YM---->isActive:${
continuation.isActive}--->isCancel:${
continuation.isCancelled}")
}
override fun end() {
continuation.cancel()
}
}
// Register callback with an API
model.registerListener(callback)
// Remove callback on cancellation
continuation.invokeOnCancellation {
println("YM---->挂起关闭")
model.unregisterListener()
}
// At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
}
可以看到,Execute once and terminate directly,It should be noted that if the task is not completed,直接进行continuation.cancel().那么就会执行continuation.invokeOnCancellation函数.倘若,It has been executed againcontinuation.cancel().则不会执行continuation.invokeOnCancellation.
3、CompletableDeferred
This can also monitor the conversion of the callback function,如下:
class CompletableDeferredTest {
val response = CompletableDeferred<Int>()
@Test
fun test(){
request(response)
runBlocking {
val result = response.await()
println("YM---->结果:${
result}")
// response.cancel() //If the undo is performed before the result is returned,那么就会触发CompletableDeferred.invokeOnCompletion()函数
delay(4000)
}
}
fun request(rep: CompletableDeferred<Int>){
Thread{
//The main purpose of using threads instead of coroutines here is to prove that this function can be executed without a coroutine environment
Thread.sleep(1000)//Delay the simulated request for two seconds
rep.complete(2)
}.start()
// rep.completeExceptionally(IllegalStateException("非法状态异常"))//This can throw exceptions
rep.invokeOnCompletion {
if (rep.isCancelled) {
println("Call cancelled")
}
}
}
}
三、参考链接
边栏推荐
猜你喜欢

form表单提交到数据库储存

leetcode 22.8.1 二进制加法

分布式计算MapReduce | Spark实验

Lightweight Backbone VGNetG Achieves "No Choice, All" Lightweight Backbone Network

分布式计算实验3 基于PRC的书籍信息管理系统

RT-Thread Studio学习(十二)W25Q128(SPI)的读写

【JS 逆向百例】某网站加速乐 Cookie 混淆逆向详解
![[想要访问若依后台]若依框架报错401请求访问:error认证失败,无法访问系统资源](/img/aa/701fef9d8d7eaf25082e1289799b77.png)
[想要访问若依后台]若依框架报错401请求访问:error认证失败,无法访问系统资源

一天搞定JDBC01:连接数据库并执行sql语句

LLVM编译技术应用分析
随机推荐
智能健身动作识别:PP-TinyPose打造AI虚拟健身教练!
【UE虚幻引擎】UE5实现动态导航样条线绘制
C语言strchr()函数以及strstr()函数的实现
MySQL BIGINT 数据类型
Lightweight Backbone VGNetG Achieves "No Choice, All" Lightweight Backbone Network
【剑指Offer】二分法例题
金仓数据库KingbaseES客户端编程接口指南-JDBC(6. JDBC 大对象数据处理)
一天搞定JDBC02:开启事务
js异步变同步、同步变异步
安装GBase 8c数据库的时候,报错显示“Resource:gbase8c already in use”,这怎么处理呢?
【虚幻引擎UE】UE5实现WEB和UE通讯思路
inject() can only be used inside setup() or functional components.
推荐几种可以直接翻译PDF英文文献的方法
金仓数据库KingbaseES客户端编程接口指南-JDBC(8. JDBC 元数据处理)
高等代数_证明_对称矩阵一定能够相似对角化
MySQL 8.0.29 详细安装(windows zip版)
怎么写专利更容易通过?
[Paper Notes] - Low Illumination Image Enhancement - Supervised - RetinexNet - 2018-BMVC
GBase 8c数据库集群中,怎么替换节点呢?比如设置A节点为gtm,换到B节点上。
小程序如何使用订阅消息(PHP代码+小程序js代码)