当前位置:网站首页>Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
Flow It is a typical cold data flow , So its value is calculated on demand . In some cases, however , We want multiple recipients to subscribe to a data source that will change . That's what we use SharedFlow The place of , It is conceptually similar to a mailing list . We also have StateFlow, It approximates an observable object . Let's understand them one by one .
SharedFlow
Let's start with MutableSharedFlow Start , It's like a broadcast channel : Everyone can send ( launch ) Information , The information will be monitored by everyone ( collect ) Co process reception of .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// perhaps MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)
The above program will never end , because coroutineScope Will wait for it to be used inside launch The starting process ends , And these processes have been monitoring MutableSharedFlow, obviously , MutableSharedFlow It cannot be closed , So the only way to solve this problem is to cancel the whole scope .
MutableSharedFlow You can also send messages continuously . If we set replay Parameters ( The default setting is 0), It caches the latest n It's worth , If Xie Cheng starts subscribing now , It will first receive these values . This cache can also be used resetReplayCache Function reset .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow Similar in concept to RxJava Of Subject. When replay Parameter set to 0 when , It is similar to PuiblishSubject. When replay yes 1 when , It is similar to BehaviorSubject. When replay yes Int.MAX_VALUE when , It is similar to ReplaySubject.
stay Kotlin in , We hope that there are some differences between the interface used for listening and the interface used for modification . for example , We have seen SendCahnnel 、 ReceiveChannel and Channel The difference between . The same applies here . MutableSharedFlow Inherited from SharedFlow and FlowCollector, The former is inherited from Flow , To subscribe to , and FlowCollector Is used for the emission value .
interface MutableSharedFlow<T> :
SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
These interfaces are usually used only to expose functions 、 Launch or collect functions .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
Here are Android Typical usage on :
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
shareIn
Flow Usually used to observe change behavior , Such as user operation 、 Database modification or new messages appear . We already know how to deal with these events , For example, I have learned how to integrate multiple flow Merge into one flow. But if multiple subscribers are interested in these changes , Or we want to put one flow Become more than one flow, How to solve it ? The answer is to use SharedFlow, Will a flow convert to SharedFlow, The easiest way is to use sharIn function .
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C
shareIn The function creates a SharedFlow, And from its flow Upper emission element . Because we need to start a collaborative process to collect these flow On the element , therefore shareIn The first parameter of is the scope of the coroutine . The third parameter is replay, The default value is 0. The second parameter is interesting : started Decide that the data on the stream is suitable to be sent . The following options are supported :
SharingStated.Eagerly—— Send data now . Be careful , If you have a limitedrelayvalue , You will lose some values sent before you subscribe ( If yourreplaySet to 0, You will lose all those values ).
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
print("Done")
}
// (0.1 sec)
// Done
SharingStarted.Lazily—— Data is sent only when the first subscriber observes . This ensures that the first subscriber can get all the transmitted values , And subsequent subscribers only guarantee to get the latestreplayThe value of quantity . Even if all subscribers disappear , Upstream is still active , Only the latestreplayThe amount of data will be cached without subscribers
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach {
delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
WhileSubscribed()- When the first subscriber appears ,flow Launch data ; When the last subscriber disappears , The flow Will stop . If ourSharedFlowA new subscriber appears when it stops , It will start again . Adding new subscribers has additional optional configuration parameters :stopTimeoutMulls( How long will it stay after the last subscriber disappears , The default is 0) andreplayExpirationMills( Save after stopreplayHow long is the number of caches , The default isLong.MAX_VALUE)
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart {
println("Started") }
.onCompletion {
println("Finished") }
.onEach {
delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${
sharedFlow.first()}")
}
launch {
println("#2 ${
sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${
sharedFlow.first()}")
}
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
- It can also be achieved by
ShareingStatedInterface to define a policy
When multiple services are interested in the same change , Use sharedIn Very convenient . Suppose you need to observe how the location information in the database changes over time , The following is DTO( Data transmission object ) stay Android Of Room Implemented on the :
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
The problem is , If multiple services need to rely on these location information , It is not optimal for each service to observe the database alone . contrary , We can create a service to listen for these changes , And share them to SharedFlow in . That's what we use shareIn The place of . But how should we configure them ? You need to make your own decisions . Do you want your subscribers to receive the latest location list immediately when subscribing ? If it is , Is set replay by 1. If you just want to react to changes , Is set to 0 that will do .started What should be set ? WhileSubscribed It looks suitable for this scene .
class LocationService(
private val locationDao: LocationDao,
private val scope: CoroutineScope
) {
private val locations = locationDao.observeLocations()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
)
fun observeLocations(): Flow<List<Location>> = locations
}
look out ! Let's not create a new one for every call SharedFlow. You just need to create one , And store it as an attribute .
StateFlow
StateFlow yes SharedFlow A derived concept of . Its working principle and setting replay by 1 Of SharedFlow similar . It always stores a value , have access to value Property to access the value .
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
The initial value needs to be passed to the constructor . We all use value Property to access and set the value . As you can see , MutableStateFlow Like an observable container of data .
suspend fun main() = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect {
println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect {
println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
stay Android On , StateFlow be used as LiveData Ready made alternatives . First , It fully supports synergy , secondly , It has an initial value , So it doesn't need to be empty . therefore , StateFlow Often used to express ViewModel The state of . This state is observed , And on this basis, display and update a view .
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
stateIn
stateIn It is a will. Flow<T> Convert to StateFlow<T> Function of . It can only be called on the scope , But it is a suspended function . please remember , StateFlow Always need a value , therefore , If you don't specify it , Then you need to wait until the first value is calculated .
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
stateIn The second variant of is not suspended , But it needs initial value and startup mode . This mode has the same characteristics as shareIn Same options ( As mentioned above ).
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
When we need to subscribe to data streams from a single change source , We usually use them stateIn. In the process , You can handle these changes .
class LocationsViewModel(
private val locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map {
it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = emptyList(),
)
// ...
}
summary
In this chapter , We learned SharedFlow and StateFlow, These two things are for Android It is especially important for developers , Because they are often used as MVVM Part of the pattern . Remember them and consider using them , Especially if you are Android Used in development view model When .
边栏推荐
- QT adds resource files, adds icons for qaction, establishes signal slot functions, and implements
- “一个优秀程序员可抵五个普通程序员”,差距就在这7个关键点
- Prompt configure: error: required tool not found: libtool solution when configuring and installing crosstool ng tool
- 大数据开源项目,一站式全自动化全生命周期运维管家ChengYing(承影)走向何方?
- Using Google test in QT
- Single machine high concurrency model design
- 【编程题】【Scratch二级】2019.09 绘制雪花图案
- CoinDesk评波场去中心化进程:让人们看到互联网的未来
- 52歲的周鴻禕,還年輕嗎?
- 80%的人答错,苹果logo上的叶子到底朝左还是朝右?
猜你喜欢

接口测试要测试什么?
![[programming problem] [scratch Level 2] 2019.09 make bat Challenge Game](/img/81/c84432a7d7c2fe8ef377d8c13991d6.png)
[programming problem] [scratch Level 2] 2019.09 make bat Challenge Game

An error is reported during the process of setting up ADG. Rman-03009 ora-03113

【编程题】【Scratch二级】2019.12 绘制十个正方形

How does the markdown editor of CSDN input mathematical formulas--- Latex syntax summary

Stm32f1 and stm32cubeide programming example - rotary encoder drive

Coindesk comments on the decentralization process of the wave field: let people see the future of the Internet

【編程題】【Scratch二級】2019.12 飛翔的小鳥

3年经验,面试测试岗20K都拿不到了吗?这么坑?

腾讯安全发布《BOT管理白皮书》|解读BOT攻击,探索防护之道
随机推荐
Operating system principle --- summary of interview knowledge points
Is Zhou Hongyi, 52, still young?
[the most detailed in history] statistical description of overdue days in credit
Cause analysis and solution of too laggy page of [test interview questions]
Teach you to make a custom form label by hand
关于组织2021-2022全国青少年电子信息智能创新大赛西南赛区(四川)复赛的通知
深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow
Experience of autumn recruitment in 22 years
搭建ADG过程中复制报错 RMAN-03009 ORA-03113
Reptile practice (VIII): reptile expression pack
【编程题】【Scratch二级】2019.03 绘制方形螺旋
韦东山第二期课程内容概要
The difference between -s and -d when downloading packages using NPM
"An excellent programmer is worth five ordinary programmers", and the gap lies in these seven key points
[basis of recommendation system] sampling and construction of positive and negative samples
【史上最详细】信贷中逾期天数统计说明
Play sonar
腾讯安全发布《BOT管理白皮书》|解读BOT攻击,探索防护之道
ReentrantLock 公平锁源码 第0篇
How to learn a new technology (programming language)