当前位置:网站首页>深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow
深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow
2022-07-07 22:17:00 【RikkaTheWorld】
系列电子书:传送门
Flow 是典型的冷数据流,所以它的值是按需计算的。然而在某些情况下,我们希望多个接收者订阅一个会更改的数据源。这就是我们使用 SharedFlow
的地方,它在概念上类似于邮件列表。我们还有 StateFlow
,它近似与一个可观察对象。让我们一个个了解它们。
SharedFlow
让我们从 MutableSharedFlow
开始,它就像一个广播通道:每个人都可以发送(发射)信息,信息会被被每个正在监听(收集)的协程接收。
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// 或者 MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)
上面的程序永远不会结束,因为 coroutineScope
会等待它里面用 launch
启动的协程结束,而这些协程一直监听 MutableSharedFlow
,显然, MutableSharedFlow
是不可关闭的,所以解决这个问题的唯一方法是取消整个作用域。
MutableSharedFlow
也可以持续发送信息。如果我们设置 replay
参数(默认设置为0),它会缓存最新的 n 个值,如果协程现在开始订阅,它将首先接收这些值。这个缓存也可以用 resetReplayCache
函数重置。
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow
在概念上类似于 RxJava 的 Subject
。当 replay
参数设置为0时,它类似于 PuiblishSubject
。当 replay
是1时,它类似于 BehaviorSubject
。当 replay
是 Int.MAX_VALUE
时,它类似于 ReplaySubject
。
在 Kotlin 中,我们希望在用于监听的接口和用于修改的接口之间有一些区别。例如,我们已经看到了 SendCahnnel
、 ReceiveChannel
和 Channel
的区别。同样的情况也适用于这里。 MutableSharedFlow
继承自 SharedFlow
和 FlowCollector
,前者继承自 Flow
,用于订阅,而 FlowCollector
则用于发射值。
interface MutableSharedFlow<T> :
SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
这些接口通常只用于暴露函数、发射或收集函数。
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
以下是 Android 上的典型用法:
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
shareIn
Flow 通常用于观察更改行为,如用户操作、数据库修改或新消息出现。我们已经知道了如何处理这些事件的方法,譬如已经学习了如何将多个 flow 合并为一个 flow。但是如果多个订阅者对这些更改感兴趣,或者我们想把一个 flow 变成多个 flow,该怎么解决呢?答案是使用 SharedFlow
,将一个 flow 转换成 SharedFlow
,最简单的方法是使用 sharIn
函数。
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C
shareIn
函数创建了一个 SharedFlow
,并从它的 flow 上发射元素。因为我们需要启动一个协程来收集这些 flow 上的元素, 所以 shareIn
的第一个参数是协程作用域。 第三个参数是 replay
, 默认值为0。第二个参数很有趣: started
决定流上的数据合适被发送。支持下面选项:
SharingStated.Eagerly
—— 立即发送数据。注意,如果你有一个有限的relay
值,你会失去一些你订阅之前发出的值(如果你的replay
设置为0,你将失去所有那些值)。
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
print("Done")
}
// (0.1 sec)
// Done
SharingStarted.Lazily
—— 当第一个订阅者观察时才开始发送数据。这保证了第一个订阅者能获得所有发射的值,而后续订阅者只保证获得最新的replay
数量的值。即使所有订阅者都消失了,上游仍然是激活的,只有最新的replay
数量的数据在没有订阅者的情况下才会被缓存
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach {
delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
WhileSubscribed()
- 当第一个订阅者出现时,flow 发射数据;当最后一个订阅者消失时,该 flow 将停止。如果我们的SharedFlow
停止时出现了一个新的订阅者,它将会再次启动。而添加新的订阅者有额外的可选配置参数:stopTimeoutMulls
(在最后一个订阅者消失后保留多长时间,默认为 0) 和replayExpirationMills
(停止后保存replay
的缓存数量多久,默认为Long.MAX_VALUE
)
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart {
println("Started") }
.onCompletion {
println("Finished") }
.onEach {
delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${
sharedFlow.first()}")
}
launch {
println("#2 ${
sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${
sharedFlow.first()}")
}
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
- 也可以通过实现
ShareingStated
接口来自定义一个策略
当多个服务对相同的更改感兴趣时,使用 sharedIn
非常方便。假设你需要观察数据库中位置信息是如何随时间变化的,下面就是 DTO(数据传输对象)在 Android 的 Room 上实现的:
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
问题是,如果多个服务需要依赖于这些位置信息,那么让每个服务单独观察数据库并不是最优的。相反,我们可以创建一个服务来监听这些更改,并将它们共享到 SharedFlow
中。这就是我们使用 shareIn
的地方。但是我们应该如何配置它们呢?你需要自己做决定。你希望你的订阅者在订阅时立即收到最新的位置列表吗? 如果是,则设置 replay
为1。如果你只想对更改做出反应,则设置为 0 即可。started
应该设置成什么呢? WhileSubscribed
看起来适合这个场景。
class LocationService(
private val locationDao: LocationDao,
private val scope: CoroutineScope
) {
private val locations = locationDao.observeLocations()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
)
fun observeLocations(): Flow<List<Location>> = locations
}
小心!我们不要为每个调用都创建一个新的 SharedFlow
。你只需要创建一个,并将其作为一个属性存储即可。
StateFlow
StateFlow
是 SharedFlow
的一个衍生概念。它的工作原理和设置了 replay
为1的 SharedFlow
类似。它总是存储一个值,可以使用 value
属性来访问该值。
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
初始值需要传递给构造函数。我们都使用 value
属性来访问和设置该值。正如你所看到的, MutableStateFlow
就像一个数据的可观察容器。
suspend fun main() = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect {
println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect {
println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
在 Android 上, StateFlow
被用作 LiveData
的现成替代品。首先,它完全支持协程,其次,它有一个初始值,所以它不需要为空。因此, StateFlow
经常用于表示 ViewModel
的状态。这个状态被观察着,并在此基础上显示和更新一个视图。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
stateIn
stateIn
是一个将 Flow<T>
转换为 StateFlow<T>
的函数。只能在作用域上去调用它,但它是一个挂起函数。请记住, StateFlow
始终都需要一个值,因此,如果你没有指定它,那么你需要等待直到第一个值被计算出来。
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
stateIn
的第二种变体不是挂起的,但它需要初始值和启动模式。此模式具有与 shareIn
相同的选项(如上所述)。
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
当我们需要订阅来自单个更改源的数据流时,我们通常会使用 stateIn
。在这个过程中,可以处理这些更改。
class LocationsViewModel(
private val locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map {
it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = emptyList(),
)
// ...
}
总结
在本章中,我们学习了 SharedFlow
和 StateFlow
,这两个东西对于 Android 开发者来说都是特别重要的,因为它们通常被用作 MVVM 模式的一部分。记住它们并考虑使用它们,特别是如果你在 Android 开发中使用 view model 的时候。
边栏推荐
- paddle一个由三个卷积层组成的网络完成cifar10数据集的图像分类任务
- 3年经验,面试测试岗20K都拿不到了吗?这么坑?
- QT creator add custom new file / Project Template Wizard
- 【编程题】【Scratch二级】2019.03 垃圾分类
- [leetcode] 20. Valid brackets
- redis你到底懂不懂之list
- Teach you to make a custom form label by hand
- Reading notes 004: Wang Yangming's quotations
- Pypharm uses, and the third-party library has errors due to version problems
- 【leetcode】day1
猜你喜欢
Kubectl 好用的命令行工具:oh-my-zsh 技巧和窍门
Connect diodes in series to improve voltage withstand
Preliminary test of optical flow sensor: gl9306
SQL connection problem after downloading (2)
How to measure whether the product is "just needed, high frequency, pain points"
Les mots ont été écrits, la fonction est vraiment puissante!
Daily question brushing record (16)
Laser slam learning (2d/3d, partial practice)
Single machine high concurrency model design
Robomaster visual tutorial (1) camera
随机推荐
C# 泛型及性能比较
[programming questions] [scratch Level 2] March 2019 garbage classification
Open display PDF file in web page
【编程题】【Scratch二级】2019.03 垃圾分类
2022.7.7-----leetcode. six hundred and forty-eight
The function is really powerful!
[leetcode] 20. Valid brackets
【编程题】【Scratch二级】2019.12 绘制十个正方形
Install sqlserver2019
每日刷题记录 (十六)
redis你到底懂不懂之list
Single machine high concurrency model design
Teach you to make a custom form label by hand
去了字节跳动,才知道年薪 40w 的测试工程师有这么多?
第四期SFO销毁,Starfish OS如何对SFO价值赋能?
一鍵免費翻譯300多頁的pdf文檔
Tools for debugging makefiles - tool for debugging makefiles
paddle入门-使用LeNet在MNIST实现图像分类方法一
52歲的周鴻禕,還年輕嗎?
[programming problem] [scratch Level 2] 2019.09 make bat Challenge Game