当前位置:网站首页>Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
Flow It is a typical cold data flow , So its value is calculated on demand . In some cases, however , We want multiple recipients to subscribe to a data source that will change . That's what we use SharedFlow
The place of , It is conceptually similar to a mailing list . We also have StateFlow
, It approximates an observable object . Let's understand them one by one .
SharedFlow
Let's start with MutableSharedFlow
Start , It's like a broadcast channel : Everyone can send ( launch ) Information , The information will be monitored by everyone ( collect ) Co process reception of .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// perhaps MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)
The above program will never end , because coroutineScope
Will wait for it to be used inside launch
The starting process ends , And these processes have been monitoring MutableSharedFlow
, obviously , MutableSharedFlow
It cannot be closed , So the only way to solve this problem is to cancel the whole scope .
MutableSharedFlow
You can also send messages continuously . If we set replay
Parameters ( The default setting is 0), It caches the latest n It's worth , If Xie Cheng starts subscribing now , It will first receive these values . This cache can also be used resetReplayCache
Function reset .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow
Similar in concept to RxJava Of Subject
. When replay
Parameter set to 0 when , It is similar to PuiblishSubject
. When replay
yes 1 when , It is similar to BehaviorSubject
. When replay
yes Int.MAX_VALUE
when , It is similar to ReplaySubject
.
stay Kotlin in , We hope that there are some differences between the interface used for listening and the interface used for modification . for example , We have seen SendCahnnel
、 ReceiveChannel
and Channel
The difference between . The same applies here . MutableSharedFlow
Inherited from SharedFlow
and FlowCollector
, The former is inherited from Flow
, To subscribe to , and FlowCollector
Is used for the emission value .
interface MutableSharedFlow<T> :
SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
These interfaces are usually used only to expose functions 、 Launch or collect functions .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
Here are Android Typical usage on :
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
shareIn
Flow Usually used to observe change behavior , Such as user operation 、 Database modification or new messages appear . We already know how to deal with these events , For example, I have learned how to integrate multiple flow Merge into one flow. But if multiple subscribers are interested in these changes , Or we want to put one flow Become more than one flow, How to solve it ? The answer is to use SharedFlow
, Will a flow convert to SharedFlow
, The easiest way is to use sharIn
function .
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C
shareIn
The function creates a SharedFlow
, And from its flow Upper emission element . Because we need to start a collaborative process to collect these flow On the element , therefore shareIn
The first parameter of is the scope of the coroutine . The third parameter is replay
, The default value is 0. The second parameter is interesting : started
Decide that the data on the stream is suitable to be sent . The following options are supported :
SharingStated.Eagerly
—— Send data now . Be careful , If you have a limitedrelay
value , You will lose some values sent before you subscribe ( If yourreplay
Set to 0, You will lose all those values ).
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
print("Done")
}
// (0.1 sec)
// Done
SharingStarted.Lazily
—— Data is sent only when the first subscriber observes . This ensures that the first subscriber can get all the transmitted values , And subsequent subscribers only guarantee to get the latestreplay
The value of quantity . Even if all subscribers disappear , Upstream is still active , Only the latestreplay
The amount of data will be cached without subscribers
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach {
delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
WhileSubscribed()
- When the first subscriber appears ,flow Launch data ; When the last subscriber disappears , The flow Will stop . If ourSharedFlow
A new subscriber appears when it stops , It will start again . Adding new subscribers has additional optional configuration parameters :stopTimeoutMulls
( How long will it stay after the last subscriber disappears , The default is 0) andreplayExpirationMills
( Save after stopreplay
How long is the number of caches , The default isLong.MAX_VALUE
)
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart {
println("Started") }
.onCompletion {
println("Finished") }
.onEach {
delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${
sharedFlow.first()}")
}
launch {
println("#2 ${
sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${
sharedFlow.first()}")
}
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
- It can also be achieved by
ShareingStated
Interface to define a policy
When multiple services are interested in the same change , Use sharedIn
Very convenient . Suppose you need to observe how the location information in the database changes over time , The following is DTO( Data transmission object ) stay Android Of Room Implemented on the :
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
The problem is , If multiple services need to rely on these location information , It is not optimal for each service to observe the database alone . contrary , We can create a service to listen for these changes , And share them to SharedFlow
in . That's what we use shareIn
The place of . But how should we configure them ? You need to make your own decisions . Do you want your subscribers to receive the latest location list immediately when subscribing ? If it is , Is set replay
by 1. If you just want to react to changes , Is set to 0 that will do .started
What should be set ? WhileSubscribed
It looks suitable for this scene .
class LocationService(
private val locationDao: LocationDao,
private val scope: CoroutineScope
) {
private val locations = locationDao.observeLocations()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
)
fun observeLocations(): Flow<List<Location>> = locations
}
look out ! Let's not create a new one for every call SharedFlow
. You just need to create one , And store it as an attribute .
StateFlow
StateFlow
yes SharedFlow
A derived concept of . Its working principle and setting replay
by 1 Of SharedFlow
similar . It always stores a value , have access to value
Property to access the value .
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
The initial value needs to be passed to the constructor . We all use value
Property to access and set the value . As you can see , MutableStateFlow
Like an observable container of data .
suspend fun main() = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect {
println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect {
println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
stay Android On , StateFlow
be used as LiveData
Ready made alternatives . First , It fully supports synergy , secondly , It has an initial value , So it doesn't need to be empty . therefore , StateFlow
Often used to express ViewModel
The state of . This state is observed , And on this basis, display and update a view .
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
stateIn
stateIn
It is a will. Flow<T>
Convert to StateFlow<T>
Function of . It can only be called on the scope , But it is a suspended function . please remember , StateFlow
Always need a value , therefore , If you don't specify it , Then you need to wait until the first value is calculated .
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
stateIn
The second variant of is not suspended , But it needs initial value and startup mode . This mode has the same characteristics as shareIn
Same options ( As mentioned above ).
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
When we need to subscribe to data streams from a single change source , We usually use them stateIn
. In the process , You can handle these changes .
class LocationsViewModel(
private val locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map {
it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = emptyList(),
)
// ...
}
summary
In this chapter , We learned SharedFlow
and StateFlow
, These two things are for Android It is especially important for developers , Because they are often used as MVVM Part of the pattern . Remember them and consider using them , Especially if you are Android Used in development view model When .
边栏推荐
- 华泰证券官方网站开户安全吗?
- 2022-07-07:原本数组中都是大于0、小于等于k的数字,是一个单调不减的数组, 其中可能有相等的数字,总体趋势是递增的。 但是其中有些位置的数被替换成了0,我们需要求出所有的把0替换的方案数量:
- [研发人员必备]paddle 如何制作自己的数据集,并显示。
- 赞!idea 如何单窗口打开多个项目?
- Trust orbtk development issues 2022
- Common selectors are
- Qt添加资源文件,为QAction添加图标,建立信号槽函数并实现
- 全自动化处理每月缺卡数据,输出缺卡人员信息
- 智慧监管入场,美团等互联网服务平台何去何从
- paddle入门-使用LeNet在MNIST实现图像分类方法二
猜你喜欢
【编程题】【Scratch二级】2019.12 绘制十个正方形
[programming problem] [scratch Level 2] draw ten squares in December 2019
Single machine high concurrency model design
Is 35 really a career crisis? No, my skills are accumulating, and the more I eat, the better
51与蓝牙模块通讯,51驱动蓝牙APP点灯
Operating system principle --- summary of interview knowledge points
SQL knowledge summary 004: Postgres terminal command summary
【编程题】【Scratch二级】2019.03 垃圾分类
单机高并发模型设计
52岁的周鸿祎,还年轻吗?
随机推荐
Basic principle and usage of dynamic library, -fpic option context
浪潮云溪分布式数据库 Tracing(二)—— 源码解析
Experience of autumn recruitment in 22 years
Emotional post station 010: things that contemporary college students should understand
How to measure whether the product is "just needed, high frequency, pain points"
35岁真就成了职业危机?不,我的技术在积累,我还越吃越香了
数据库查询——第几高的数据?
ReentrantLock 公平锁源码 第0篇
[C language] objective questions - knowledge points
[programming problem] [scratch Level 2] March 2019 draw a square spiral
Development of a horse tourism website (optimization of servlet)
QT establish signal slots between different classes and transfer parameters
某马旅游网站开发(对servlet的优化)
QT and OpenGL: load 3D models using the open asset import library (assimp)
深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow
Single machine high concurrency model design
手写一个模拟的ReentrantLock
5G NR 系统消息
DNS 系列(一):为什么更新了 DNS 记录不生效?
paddle入门-使用LeNet在MNIST实现图像分类方法一