当前位置:网站首页>Convert callback function to Flow
Convert callback function to Flow
2022-08-04 08:02:00 【Mr_Tony】
一、前言
在kotlin中,Languages structure programs,提高了可读性,Transformation operations are also provided for legacy program logic,Here is how to convert the callback to Flow流,优化程序结构
二、代码示例
1、callbackFlow
这里演示callbackFlow
的使用方式.callbackFlow
It belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel
进行通信,所以可以使用Channel
的相关函数.
interface Listener{
fun listener()
fun end()
}
inner class TouchModel{
private var listener: Listener ?= null
fun registerListener(sourceListener: Listener){
listener = sourceListener
}
fun unregisterListener(){
listener = null
}
fun emit(){
listener?.listener()
}
fun end(){
listener?.end()
}
}
@Test
fun test(){
val model = TouchModel()
runBlocking {
val flow = flowFrom(model)
flow.onEach {
println("YM--->流:$it")
}.launchIn(this)
delay(1000)
model.emit()
delay(1000)
model.emit()
delay(1000)
model.emit()
delay(1000)
println("YM--->stream is about to end")
model.end()
delay(1000)
}
}
//callbackFlowIt belongs to multiple callbacks and can be triggered repeatedly,As the content is not usedChannel进行通信,所以可以使用Channel的相关函数
fun flowFrom(model: TouchModel): Flow<Int> = callbackFlow {
var count = 0
val callback = object : Listener{
override fun listener() {
// 为了避免阻塞,channelBuffered channels can be configured,I don't know how to deal with this at the moment
// trySend(count)//这两种方式都行
trySendBlocking(count)
.onFailure {
throwable ->
// Downstream has been cancelled or failed, can log here
}
count++
}
override fun end() {
//When the execution is over, it can be closed using the following methodschannel,或者抛出异常,该参数可选,
// channel.close(IllegalStateException("This state is not right"))
// close(IllegalStateException("This state is not right"))
// channel.close() 等同于 close()
println("YM--->Channel关闭")
close()
}
}
model.registerListener(callback)
//因为是冷流,所以需要使用awaitCloseDo pending blocking
awaitClose {
//关闭注册
println("YM--->解除注册")
model.unregisterListener()
}
}
2、suspendCancellableCoroutine
If for a single callback.可以使用suspendCancellableCoroutine
进行处理.示例代码如下:
interface Listener{
fun listener()
fun end()
}
inner class TouchModel{
private var listener: Listener ?= null
fun registerListener(sourceListener: Listener){
listener = sourceListener
}
fun unregisterListener(){
listener = null
}
fun emit(){
listener?.listener()
}
fun end(){
listener?.end()
}
}
@Test
fun test(){
val model = TouchModel()
runBlocking {
// val flow = flowFrom(model)
val job = async {
val flow = awaitCallback(model)
println("YM--->流:$flow")
}
// delay(1000)
// model.emit()
delay(1000)
println("YM--->stream is about to end")
model.end()
// job.cancel()//The flow can be undone,If the task is not over yet,This task can be undone directly
delay(1000)
}
}
suspend fun awaitCallback(model: TouchModel): Int = suspendCancellableCoroutine {
continuation ->
val callback = object : Listener {
// Implementation of some callback interface
override fun listener() {
continuation.resume(0){
//Used when coroutine resumes
continuation.resumeWithException(it)
}
// continuation.resumeWithException(cause)
println("YM---->isActive:${
continuation.isActive}--->isCancel:${
continuation.isCancelled}")
}
override fun end() {
continuation.cancel()
}
}
// Register callback with an API
model.registerListener(callback)
// Remove callback on cancellation
continuation.invokeOnCancellation {
println("YM---->挂起关闭")
model.unregisterListener()
}
// At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
}
可以看到,Execute once and terminate directly,It should be noted that if the task is not completed,直接进行continuation.cancel()
.那么就会执行continuation.invokeOnCancellation
函数.倘若,It has been executed againcontinuation.cancel()
.则不会执行continuation.invokeOnCancellation
.
3、CompletableDeferred
This can also monitor the conversion of the callback function,如下:
class CompletableDeferredTest {
val response = CompletableDeferred<Int>()
@Test
fun test(){
request(response)
runBlocking {
val result = response.await()
println("YM---->结果:${
result}")
// response.cancel() //If the undo is performed before the result is returned,那么就会触发CompletableDeferred.invokeOnCompletion()函数
delay(4000)
}
}
fun request(rep: CompletableDeferred<Int>){
Thread{
//The main purpose of using threads instead of coroutines here is to prove that this function can be executed without a coroutine environment
Thread.sleep(1000)//Delay the simulated request for two seconds
rep.complete(2)
}.start()
// rep.completeExceptionally(IllegalStateException("非法状态异常"))//This can throw exceptions
rep.invokeOnCompletion {
if (rep.isCancelled) {
println("Call cancelled")
}
}
}
}
三、参考链接
边栏推荐
- [Paper Notes] - Low Illumination Image Enhancement - Supervised - RetinexNet - 2018-BMVC
- 【电脑录制屏】如何使用bandicam录游戏 设置图文教程
- TCP协议详解
- 金仓数据库 KDTS 迁移工具使用指南 (7. 部署常见问题)
- 【UE虚幻引擎】UE5三步骤实现AI漫游与对话行为
- CSDN21天学习挑战赛——day1 正则表达式大总结
- 电脑系统数据丢失了是什么原因?找回方法有哪些?
- C# 实用的第三方库
- 安装GBase 8c数据库集群时,报错误码:80000306,显示Dcs cluster not healthy。怎么处理错误呢?
- 金仓数据库KingbaseES客户端编程接口指南-JDBC(5. JDBC 查询结果集处理)
猜你喜欢
随机推荐
Distributed Computing Experiment 4 Random Signal Analysis System
【STM32】STM32F103系列名称与封装、内存
JNI学习1.环境配置与简单函数实现
ExoPlayer添加Ffmpeg扩展实现软解功能
babylon 里面加gltf 模型
data:image/jpg;base64格式数据转化为图片
金仓数据库KingbaseES客户端编程接口指南-JDBC(10. JDBC 读写分离最佳实践)
int *p = &a、p = &a、*p = a的正确理解
研究性学习专题 3_LL(1)语法分析设计原理与实现
新特性解读 | MySQL 8.0 在线调整 REDO
金仓数据库KingbaseES客户端编程接口指南-JDBC(9. JDBC 读写分离)
CSDN21天学习挑战赛——day1 正则表达式大总结
虚拟机没有USB网卡选项怎么解决
redis分布式锁的实现
使用GBase 8c数据库的时候,遇到这种报错
金仓数据库的单节点如何转集群?
Mysql insert on duplicate key 死锁问题定位与解决
redis stream 实现消息队列
js-第一个出现两次的字母
异常值 识别与处理方法