当前位置:网站首页>Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow
2022-07-08 00:22:00 【RikkaTheWorld】
Series eBook : Portal
Flow It is a typical cold data flow , So its value is calculated on demand . In some cases, however , We want multiple recipients to subscribe to a data source that will change . That's what we use SharedFlow The place of , It is conceptually similar to a mailing list . We also have StateFlow, It approximates an observable object . Let's understand them one by one .
SharedFlow
Let's start with MutableSharedFlow Start , It's like a broadcast channel : Everyone can send ( launch ) Information , The information will be monitored by everyone ( collect ) Co process reception of .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// perhaps MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)
The above program will never end , because coroutineScope Will wait for it to be used inside launch The starting process ends , And these processes have been monitoring MutableSharedFlow, obviously , MutableSharedFlow It cannot be closed , So the only way to solve this problem is to cancel the whole scope .
MutableSharedFlow You can also send messages continuously . If we set replay Parameters ( The default setting is 0), It caches the latest n It's worth , If Xie Cheng starts subscribing now , It will first receive these values . This cache can also be used resetReplayCache Function reset .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow Similar in concept to RxJava Of Subject. When replay Parameter set to 0 when , It is similar to PuiblishSubject. When replay yes 1 when , It is similar to BehaviorSubject. When replay yes Int.MAX_VALUE when , It is similar to ReplaySubject.
stay Kotlin in , We hope that there are some differences between the interface used for listening and the interface used for modification . for example , We have seen SendCahnnel 、 ReceiveChannel and Channel The difference between . The same applies here . MutableSharedFlow Inherited from SharedFlow and FlowCollector, The former is inherited from Flow , To subscribe to , and FlowCollector Is used for the emission value .
interface MutableSharedFlow<T> :
SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
These interfaces are usually used only to expose functions 、 Launch or collect functions .
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
Here are Android Typical usage on :
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
shareIn
Flow Usually used to observe change behavior , Such as user operation 、 Database modification or new messages appear . We already know how to deal with these events , For example, I have learned how to integrate multiple flow Merge into one flow. But if multiple subscribers are interested in these changes , Or we want to put one flow Become more than one flow, How to solve it ? The answer is to use SharedFlow, Will a flow convert to SharedFlow, The easiest way is to use sharIn function .
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C
shareIn The function creates a SharedFlow, And from its flow Upper emission element . Because we need to start a collaborative process to collect these flow On the element , therefore shareIn The first parameter of is the scope of the coroutine . The third parameter is replay, The default value is 0. The second parameter is interesting : started Decide that the data on the stream is suitable to be sent . The following options are supported :
SharingStated.Eagerly—— Send data now . Be careful , If you have a limitedrelayvalue , You will lose some values sent before you subscribe ( If yourreplaySet to 0, You will lose all those values ).
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
print("Done")
}
// (0.1 sec)
// Done
SharingStarted.Lazily—— Data is sent only when the first subscriber observes . This ensures that the first subscriber can get all the transmitted values , And subsequent subscribers only guarantee to get the latestreplayThe value of quantity . Even if all subscribers disappear , Upstream is still active , Only the latestreplayThe amount of data will be cached without subscribers
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach {
delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect {
println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect {
println("#2 $it") }
}
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
WhileSubscribed()- When the first subscriber appears ,flow Launch data ; When the last subscriber disappears , The flow Will stop . If ourSharedFlowA new subscriber appears when it stops , It will start again . Adding new subscribers has additional optional configuration parameters :stopTimeoutMulls( How long will it stay after the last subscriber disappears , The default is 0) andreplayExpirationMills( Save after stopreplayHow long is the number of caches , The default isLong.MAX_VALUE)
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart {
println("Started") }
.onCompletion {
println("Finished") }
.onEach {
delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${
sharedFlow.first()}")
}
launch {
println("#2 ${
sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${
sharedFlow.first()}")
}
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
- It can also be achieved by
ShareingStatedInterface to define a policy
When multiple services are interested in the same change , Use sharedIn Very convenient . Suppose you need to observe how the location information in the database changes over time , The following is DTO( Data transmission object ) stay Android Of Room Implemented on the :
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
The problem is , If multiple services need to rely on these location information , It is not optimal for each service to observe the database alone . contrary , We can create a service to listen for these changes , And share them to SharedFlow in . That's what we use shareIn The place of . But how should we configure them ? You need to make your own decisions . Do you want your subscribers to receive the latest location list immediately when subscribing ? If it is , Is set replay by 1. If you just want to react to changes , Is set to 0 that will do .started What should be set ? WhileSubscribed It looks suitable for this scene .
class LocationService(
private val locationDao: LocationDao,
private val scope: CoroutineScope
) {
private val locations = locationDao.observeLocations()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
)
fun observeLocations(): Flow<List<Location>> = locations
}
look out ! Let's not create a new one for every call SharedFlow. You just need to create one , And store it as an attribute .
StateFlow
StateFlow yes SharedFlow A derived concept of . Its working principle and setting replay by 1 Of SharedFlow similar . It always stores a value , have access to value Property to access the value .
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
The initial value needs to be passed to the constructor . We all use value Property to access and set the value . As you can see , MutableStateFlow Like an observable container of data .
suspend fun main() = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect {
println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect {
println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
stay Android On , StateFlow be used as LiveData Ready made alternatives . First , It fully supports synergy , secondly , It has an initial value , So it doesn't need to be empty . therefore , StateFlow Often used to express ViewModel The state of . This state is observed , And on this basis, display and update a view .
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
stateIn
stateIn It is a will. Flow<T> Convert to StateFlow<T> Function of . It can only be called on the scope , But it is a suspended function . please remember , StateFlow Always need a value , therefore , If you don't specify it , Then you need to wait until the first value is calculated .
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
stateIn The second variant of is not suspended , But it needs initial value and startup mode . This mode has the same characteristics as shareIn Same options ( As mentioned above ).
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B")
.onEach {
delay(1000) }
.onEach {
println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
When we need to subscribe to data streams from a single change source , We usually use them stateIn. In the process , You can handle these changes .
class LocationsViewModel(
private val locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map {
it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = emptyList(),
)
// ...
}
summary
In this chapter , We learned SharedFlow and StateFlow, These two things are for Android It is especially important for developers , Because they are often used as MVVM Part of the pattern . Remember them and consider using them , Especially if you are Android Used in development view model When .
边栏推荐
- 快速上手使用本地测试工具postman
- Reptile practice (VIII): reptile expression pack
- 搭建ADG过程中复制报错 RMAN-03009 ORA-03113
- Trust orbtk development issues 2022
- Robomaster visual tutorial (10) target prediction
- 测试流程不完善,又遇到不积极的开发怎么办?
- SQL knowledge summary 004: Postgres terminal command summary
- 去了字节跳动,才知道年薪 40w 的测试工程师有这么多?
- [programming problem] [scratch Level 2] December 2019 flying birds
- Open display PDF file in web page
猜你喜欢

Coindesk comments on the decentralization process of the wave field: let people see the future of the Internet

Is 35 really a career crisis? No, my skills are accumulating, and the more I eat, the better

QT adds resource files, adds icons for qaction, establishes signal slot functions, and implements

Reptile practice (VIII): reptile expression pack

ROS from entry to mastery (IX) initial experience of visual simulation: turtlebot3

QT and OpenGL: load 3D models using the open asset import library (assimp)

大数据开源项目,一站式全自动化全生命周期运维管家ChengYing(承影)走向何方?

Development of a horse tourism website (optimization of servlet)
![[programming problem] [scratch Level 2] December 2019 flying birds](/img/5e/a105f8615f3991635c9ffd3a8e5836.png)
[programming problem] [scratch Level 2] December 2019 flying birds

Operating system principle --- summary of interview knowledge points
随机推荐
【编程题】【Scratch二级】2019.12 绘制十个正方形
How to put recyclerview in nestedscrollview- How to put RecyclerView inside NestedScrollView?
Prompt configure: error: required tool not found: libtool solution when configuring and installing crosstool ng tool
每日刷题记录 (十六)
[programming questions] [scratch Level 2] March 2019 garbage classification
商品的设计等整个生命周期,都可以将其纳入到产业互联网的范畴内
What if the testing process is not perfect and the development is not active?
[programming problem] [scratch Level 2] December 2019 flying birds
【史上最详细】信贷中逾期天数统计说明
赞!idea 如何单窗口打开多个项目?
Zhou Hongqi, 52 ans, est - il encore jeune?
“一个优秀程序员可抵五个普通程序员”,差距就在这7个关键点
Tencent security released the white paper on BOT Management | interpreting BOT attacks and exploring ways to protect
华泰证券官方网站开户安全吗?
第一讲:链表中环的入口结点
应用实践 | 数仓体系效率全面提升!同程数科基于 Apache Doris 的数据仓库建设
QT and OpenGL: loading 3D models using the open asset import library (assimp) - Part 2
When creating body middleware, express Is there any difference between setting extended to true and false in urlencoded?
Stm32f1 and stm32cubeide programming example - rotary encoder drive
Qt添加资源文件,为QAction添加图标,建立信号槽函数并实现