当前位置:网站首页>Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow

Deep dive kotlin collaboration (the end of 23): sharedflow and stateflow

2022-07-08 00:22:00 RikkaTheWorld

Series eBook : Portal


Flow It is a typical cold data flow , So its value is calculated on demand . In some cases, however , We want multiple recipients to subscribe to a data source that will change . That's what we use SharedFlow The place of , It is conceptually similar to a mailing list . We also have StateFlow, It approximates an observable object . Let's understand them one by one .

SharedFlow

Let's start with MutableSharedFlow Start , It's like a broadcast channel : Everyone can send ( launch ) Information , The information will be monitored by everyone ( collect ) Co process reception of .

suspend fun main(): Unit = coroutineScope {
    
    val mutableSharedFlow =
        MutableSharedFlow<String>(replay = 0)
    //  perhaps  MutableSharedFlow<String>()
    
    launch {
    
        mutableSharedFlow.collect {
    
            println("#1 received $it")
        }
    }
    
    launch {
    
        mutableSharedFlow.collect {
    
            println("#2 received $it")
        }
    }
    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}

// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)

The above program will never end , because coroutineScope Will wait for it to be used inside launch The starting process ends , And these processes have been monitoring MutableSharedFlow, obviously , MutableSharedFlow It cannot be closed , So the only way to solve this problem is to cancel the whole scope .

MutableSharedFlow You can also send messages continuously . If we set replay Parameters ( The default setting is 0), It caches the latest n It's worth , If Xie Cheng starts subscribing now , It will first receive these values . This cache can also be used resetReplayCache Function reset .

suspend fun main(): Unit = coroutineScope {
    
    val mutableSharedFlow = MutableSharedFlow<String>(
        replay = 2, 
    )
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
    mutableSharedFlow.emit("Message3")

    println(mutableSharedFlow.replayCache)
    // [Message2, Message3]

    launch {
    
        mutableSharedFlow.collect {
    
            println("#1 received $it")
        }
        // #1 received Message2
        // #1 received Message3
    }
    delay(100)
    mutableSharedFlow.resetReplayCache()
    println(mutableSharedFlow.replayCache) // []
}

MutableSharedFlow Similar in concept to RxJava Of Subject. When replay Parameter set to 0 when , It is similar to PuiblishSubject. When replay yes 1 when , It is similar to BehaviorSubject. When replay yes Int.MAX_VALUE when , It is similar to ReplaySubject.

stay Kotlin in , We hope that there are some differences between the interface used for listening and the interface used for modification . for example , We have seen SendCahnnelReceiveChannel and Channel The difference between . The same applies here . MutableSharedFlow Inherited from SharedFlow and FlowCollector, The former is inherited from Flow , To subscribe to , and FlowCollector Is used for the emission value .

interface MutableSharedFlow<T> :
    SharedFlow<T>, FlowCollector<T> {
    

    fun tryEmit(value: T): Boolean
    val subscriptionCount: StateFlow<Int>
    fun resetReplayCache()
}

interface SharedFlow<out T> : Flow<T> {
    
    val replayCache: List<T>
}

interface FlowCollector<in T> {
    
   suspend fun emit(value: T)
}

These interfaces are usually used only to expose functions 、 Launch or collect functions .

suspend fun main(): Unit = coroutineScope {
    
    val mutableSharedFlow = MutableSharedFlow<String>()
    val sharedFlow: SharedFlow<String> = mutableSharedFlow
    val collector: FlowCollector<String> = mutableSharedFlow

    launch {
    
        mutableSharedFlow.collect {
    
            println("#1 received $it")
        }
    }

    launch {
    
        sharedFlow.collect {
    
            println("#2 received $it")
        }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2

Here are Android Typical usage on :

class UserProfileViewModel {
    
    private val _userChanges =
        MutableSharedFlow<UserChange>()
    val userChanges: SharedFlow<UserChange> = _userChanges

    fun onCreate() {
    
        viewModelScope.launch {
    
            userChanges.collect(::applyUserChange)
        }
    }
    
    fun onNameChanged(newName: String) {
    
        // ...
        _userChanges.emit(NameChange(newName))
    }
    
    fun onPublicKeyChanged(newPublicKey: String) {
    
        // ...
        _userChanges.emit(PublicKeyChange(newPublicKey))
    }
}

shareIn

Flow Usually used to observe change behavior , Such as user operation 、 Database modification or new messages appear . We already know how to deal with these events , For example, I have learned how to integrate multiple flow Merge into one flow. But if multiple subscribers are interested in these changes , Or we want to put one flow Become more than one flow, How to solve it ? The answer is to use SharedFlow, Will a flow convert to SharedFlow, The easiest way is to use sharIn function .

suspend fun main(): Unit = coroutineScope {
    
    val flow = flowOf("A", "B", "C")
        .onEach {
     delay(1000) }

    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
        // replay = 0 (default)
    )
   
    delay(500)
    launch {
    
        sharedFlow.collect {
     println("#1 $it") }
    }

    delay(1000)
    launch {
    
        sharedFlow.collect {
     println("#2 $it") }
    }
    
    delay(1000)
    launch {
    
        sharedFlow.collect {
     println("#3 $it") }
    }
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C

shareIn The function creates a SharedFlow, And from its flow Upper emission element . Because we need to start a collaborative process to collect these flow On the element , therefore shareIn The first parameter of is the scope of the coroutine . The third parameter is replay, The default value is 0. The second parameter is interesting : started Decide that the data on the stream is suitable to be sent . The following options are supported :

  • SharingStated.Eagerly —— Send data now . Be careful , If you have a limited relay value , You will lose some values sent before you subscribe ( If your replay Set to 0, You will lose all those values ).
suspend fun main(): Unit = coroutineScope {
    
    val flow = flowOf("A", "B", "C")
    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
    )

    delay(100)
    launch {
    
        sharedFlow.collect {
     println("#1 $it") }
    }
    print("Done")
}
// (0.1 sec)
// Done
  • SharingStarted.Lazily —— Data is sent only when the first subscriber observes . This ensures that the first subscriber can get all the transmitted values , And subsequent subscribers only guarantee to get the latest replay The value of quantity . Even if all subscribers disappear , Upstream is still active , Only the latest replay The amount of data will be cached without subscribers
suspend fun main(): Unit = coroutineScope {
    
    val flow1 = flowOf("A", "B", "C")
    val flow2 = flowOf("D")
        .onEach {
     delay(1000) }

    val sharedFlow = merge(flow1, flow2).shareIn(
        scope = this,
        started = SharingStarted.Lazily,
    )

    delay(100)
    launch {
    
        sharedFlow.collect {
     println("#1 $it") }
    }

    delay(1000)
    launch {
    
        sharedFlow.collect {
     println("#2 $it") }
    }
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
  • WhileSubscribed() - When the first subscriber appears ,flow Launch data ; When the last subscriber disappears , The flow Will stop . If our SharedFlow A new subscriber appears when it stops , It will start again . Adding new subscribers has additional optional configuration parameters : stopTimeoutMulls( How long will it stay after the last subscriber disappears , The default is 0) and replayExpirationMills ( Save after stop replay How long is the number of caches , The default is Long.MAX_VALUE
suspend fun main(): Unit = coroutineScope {
    
    val flow = flowOf("A", "B", "C", "D")
        .onStart {
     println("Started") }
        .onCompletion {
     println("Finished") }
        .onEach {
     delay(1000) }
    
    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(),
    )

    delay(3000)
    launch {
    
        println("#1 ${
      sharedFlow.first()}")
    }
    launch {
    
        println("#2 ${
      sharedFlow.take(2).toList()}")
    }

    delay(3000)
    launch {
    
        println("#3 ${
      sharedFlow.first()}")
    }
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
  • It can also be achieved by ShareingStated Interface to define a policy

When multiple services are interested in the same change , Use sharedIn Very convenient . Suppose you need to observe how the location information in the database changes over time , The following is DTO( Data transmission object ) stay Android Of Room Implemented on the :

@Dao
interface LocationDao {
    
    @Insert(onConflict = OnConflictStrategy.IGNORE)
    suspend fun insertLocation(location: Location)
    
    @Query("DELETE FROM location_table")
    suspend fun deleteLocations()
    
    @Query("SELECT * FROM location_table ORDER BY time")
    fun observeLocations(): Flow<List<Location>>
}

The problem is , If multiple services need to rely on these location information , It is not optimal for each service to observe the database alone . contrary , We can create a service to listen for these changes , And share them to SharedFlow in . That's what we use shareIn The place of . But how should we configure them ? You need to make your own decisions . Do you want your subscribers to receive the latest location list immediately when subscribing ? If it is , Is set replay by 1. If you just want to react to changes , Is set to 0 that will do .started What should be set ? WhileSubscribed It looks suitable for this scene .

class LocationService(
    private val locationDao: LocationDao,
    private val scope: CoroutineScope
) {
    
    private val locations = locationDao.observeLocations()
        .shareIn(
            scope = scope,
            started = SharingStarted.WhileSubscribed(),
        )

    fun observeLocations(): Flow<List<Location>> = locations
}

look out ! Let's not create a new one for every call SharedFlow. You just need to create one , And store it as an attribute .

StateFlow

StateFlow yes SharedFlow A derived concept of . Its working principle and setting replay by 1 Of SharedFlow similar . It always stores a value , have access to value Property to access the value .

interface StateFlow<out T> : SharedFlow<T> {
    
    val value: T
}

interface MutableStateFlow<T> :
    StateFlow<T>, MutableSharedFlow<T> {
    
    override var value: T
        fun compareAndSet(expect: T, update: T): Boolean
}

The initial value needs to be passed to the constructor . We all use value Property to access and set the value . As you can see , MutableStateFlow Like an observable container of data .

suspend fun main() = coroutineScope {
    
    val state = MutableStateFlow("A")
    println(state.value) // A
    launch {
    
        state.collect {
     println("Value changed to $it") }
        // Value changed to A
    }
   
    delay(1000)
    state.value = "B" // Value changed to B
    
    delay(1000)
    launch {
    
        state.collect {
     println("and now it is $it") }
        // and now it is B
    }

    delay(1000)
    state.value = "C" // Value changed to C and now it is C
}

stay Android On , StateFlow be used as LiveData Ready made alternatives . First , It fully supports synergy , secondly , It has an initial value , So it doesn't need to be empty . therefore , StateFlow Often used to express ViewModel The state of . This state is observed , And on this basis, display and update a view .

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {
    
    private val _uiState =
        MutableStateFlow<NewsState>(LoadingNews)
    val uiState: StateFlow<NewsState> = _uiState
    
    fun onCreate() {
    
        scope.launch {
    
            _uiState.value =
                NewsLoaded(newsRepository.getNews())
        }
    }
}

stateIn

stateIn It is a will. Flow<T> Convert to StateFlow<T> Function of . It can only be called on the scope , But it is a suspended function . please remember , StateFlow Always need a value , therefore , If you don't specify it , Then you need to wait until the first value is calculated .

suspend fun main() = coroutineScope {
    
    val flow = flowOf("A", "B", "C")
        .onEach {
     delay(1000) }
        .onEach {
     println("Produced $it") }
    val stateFlow: StateFlow<String> = flow.stateIn(this)
    
    println("Listening")
    println(stateFlow.value)
    stateFlow.collect {
     println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C

stateIn The second variant of is not suspended , But it needs initial value and startup mode . This mode has the same characteristics as shareIn Same options ( As mentioned above ).

suspend fun main() = coroutineScope {
    
    val flow = flowOf("A", "B")
        .onEach {
     delay(1000) }
        .onEach {
     println("Produced $it") }

    val stateFlow: StateFlow<String> = flow.stateIn(
        scope = this,
        started = SharingStarted.Lazily,
        initialValue = "Empty"
    )

    println(stateFlow.value)
    
    delay(2000)
    stateFlow.collect {
     println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B

When we need to subscribe to data streams from a single change source , We usually use them stateIn. In the process , You can handle these changes .

class LocationsViewModel(
    private val locationService: LocationService
) : ViewModel() {
    
    private val location = locationService.observeLocations()
        .map {
     it.toLocationsDisplay() }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.Lazily,
            initialValue = emptyList(),
        )

    // ...
}

summary

In this chapter , We learned SharedFlow and StateFlow, These two things are for Android It is especially important for developers , Because they are often used as MVVM Part of the pattern . Remember them and consider using them , Especially if you are Android Used in development view model When .

原网站

版权声明
本文为[RikkaTheWorld]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/189/202207072217104087.html