Adetunji Dahunsi

Interactive tutorial: State production with unidirectional data flow and Kotlin Flows

an interactive overview of the state production pipeline with flows

TJ Dahunsi

Feb 07 2023 · 15 min read

State is what is. A declaration of things known at a certain point in time. As time passes however, state changes as data sources backing the state are updated and events happen. In mobile apps this presents a challenge; defining a convenient and concise means to produce state over time.

This page is a Jetpack Compose for web powered interactive experiment that highlights various ways of producing state with a Flow. At the end of it, you should have a mental framework to help choose a state production pipeline that is most beneficial to your use cases.

Code for the examples demonstrated, along with the source of this page and coordinates to a kotlin multiplatform library for the techniques shown can be found here.

The following is my personal opinion and not of my employer.

Producing state

Producing state is at its core, is nothing more than consolidating sources of changes to state. A generally sound way of doing this is with unidirectional data flow (UDF), therefore all techniques covered on this page are implementations of UDF. Each illustration will also have an UDF visual aid to help convey the "state goes down and events go up" idiom.

While the tenets of UDF are simple, there's a bit more to implementing it properly especially with Flows. Let's start simple. In the following we have a snail along a track. The snail has the following properties:

  • It's progress along the track.
  • It's color.

An easy way to represent the state production pipeline for the snail is with a single MutableStateFlow:

1data class Snail1State( 2 val progress: Float = 0f, 3 val color: Color = MutedColors.colors(false).first(), 4 val colors: List<Color> = MutedColors.colors(false) 5) 6 7class Snail1StateHolder { 8 9 private val _state = MutableStateFlow(Snail1State()) 10 val state: StateFlow<Snail1State> = _state.asStateFlow() 11 12 fun setSnailColor(index: Int) { 13 _state.update { 14 it.copy(color = it.colors[index]) 15 } 16 } 17 18 fun setProgress(progress: Float) { 19 _state.update { 20 it.copy(progress = progress) 21 } 22 } 23}

Interact with the snail by dragging it around and changing its color.

In the above, the update function takes a function with the signature (State) -> State as its only argument. This the fundamental unit of state change, a function that takes the current state and changes it to the next state.

The fundamental unit of state change is a function literal with the signature (State) -> State.

The above works provided the only sources of state change are from user actions that call synchronous or suspend functions. Things get a little more interesting when sources of state change include:

  • Flows.
  • Flows and suspend functions.

Let's start with just Flows. Consider the snail again, except this time, it has a single source of state change; time. As time passes, the snail slowly progresses on its track. Using a Flow, we can easily define the state for it.

1class Snail2StateHolder( 2 scope: CoroutineScope 3) { 4 val progress: StateFlow<Float> = intervalFlow(500) 5 .map { it.toFloat() % 100 } 6 .stateIn( 7 scope = scope, 8 started = SharingStarted.WhileSubscribed(), 9 initialValue = 0f 10 ) 11}

The snail's progress is dependent only on time.

This works well since there's just a single source of state change. Things get a little more complicated if we have multiple sources that attempt to change state. First we introduce a new property to the snail; it's speed:

1enum class Speed(val multiplier: Int){ 2 One(1), Two(2), Three(3), Four(4) 3}

Next, we define a state for the snail, and a state holder that produces its state:

1data class Snail3State( 2 val progress: Float = 0f, 3 val speed: Speed = Speed.One, 4) 5 6class Snail3StateHolder( 7 scope: CoroutineScope 8) { 9 private val speed: Flow<Speed> =10 11 private val progress: Flow<Float> =12 13 val state: StateFlow<Snail3State> = combine( 14 progress, 15 speed, 16 ::Snail3State 17 ) 18 .stateIn(...) 19}

The snail's state is now dependent on its progress and its speed.

In the above, we can see a general pattern emerging. For each source of state change, we can simply add its flow to the combine function to produce our state.

User actions as Flows

These sources of change mentioned above can be from anything that can be modeled as a flow, be it reading data from a database or user actions. Imagine if we want to be able to change the snail's color on a whim. We can add a MutableSharedFlow for the color of the snail to our state holder class, and then combine it with the rest of the flows to produce the state.

1data class Snail4State( 2 val progress: Float = 0f, 3 val speed: Speed = Speed.One, 4 val color: Color = Color.Blue, 5 val colors: List<Color> = listOf() 6) 7 8class Snail4StateHolder( 9 scope: CoroutineScope 10) { 11 12 private val speed: Flow<Speed> =13 14 private val progress: Flow<Float> =15 16 private val color: MutableStateFlow<Color> = MutableStateFlow(Color.Cyan) 17 18 val state: StateFlow<Snail4State> = combine( 19 progress, 20 speed, 21 color, 22 ::Snail4State 23 ) 24 .stateIn(...) 25 26 fun setSnailColor(color: Color) { 27 this.color.value = color 28 } 29}

Tap on a color to change the snail's color.

When the user wants to change the color, the setColor method is called, the color MutableStateFlow has its value updated, and it is emitted into the stream produced by the combine function to produce the new state.

Combining changes in state

Combining sources of state as a general means of state production is rather robust, lends itself to a wide range of cases, and works well for simple to moderate state production pipelines. It also scales linearly, that is each source of state change will need to be added to the combine function. This poses a problem for states with more than 5 sources of change as the combine function allows for at most 5 flows. This is called the arity of the combine function.

One way around this is to combine the sources of state change into intermediate states, before combining them again into the final state.

1data class LargeState( 2 val property1: Int, 3 val property2: Int, 4 val property3: Int, 5 val property4: Int, 6 val property5: Int, 7 val property6: Int, 8 val property7: Int, 9 val property8: Int, 10) 11 12data class IntermediateState1( 13 val property1: Int, 14 val property2: Int, 15 val property3: Int, 16 val property4: Int, 17) 18 19data class IntermediateState2( 20 val property5: Int, 21 val property6: Int, 22 val property7: Int, 23 val property8: Int, 24) 25 26fun intFlow() = flowOf(1) 27 28class LargeStateHolder { 29 private val intermediateState1 = combine( 30 intFlow(), 31 intFlow(), 32 intFlow(), 33 intFlow(), 34 ::IntermediateState1 35 ) 36 37 private val intermediateState2 = combine( 38 intFlow(), 39 intFlow(), 40 intFlow(), 41 intFlow(), 42 ::IntermediateState2 43 ) 44 45 val state = combine( 46 intermediateState1, 47 intermediateState2 48 ) { state1, state2 -> 49 LargeState( 50 state1.property1, 51 state1.property2, 52 state1.property3, 53 state1.property4, 54 state2.property5, 55 state2.property5, 56 state2.property6, 57 state2.property7, 58 ) 59 } 60}

The above works, but can be difficult to maintain. As new sources of state are added or removed over time, especially those from user events, the signatures of the IntermediateState instances will need to change to accommodate the arity of the combine function causing cascading changes. This brings us to another state production approach, merging sources of change.

Merging changes in state

We've established that state production fundamentally is introducing changes to state over time. That is the new state is the old state plus the change in state. Expressing this mathematically becomes:

newState = oldState + Δstate

Where Δstate is the change in state, and is how MutableStateFlow.update() works. We can make the Δstate more expressive and easier to read by representing it in code as a type alias:

1typealias Mutation<State> = State.() -> State

The signature of the declaration State.() -> State is identical to the fundamental unit of state change declared earlier (State) -> State with the MutableStateFlow update method.

The declaration above now defines the unit of state change (Δstate) for any state T as a Mutation; a lambda with T as the receiver that when invoked, returns T. The above is extremely powerful as we can represent any state change for any state declaration with a single type.

To produce state then, we simply have to start with an initial state, and incrementally apply state Mutations to it over time to create our state production pipeline. This is sometimes called reducing changes into state.

Expressing this in code with Flows is very concise and requires just two operators:

  • merge: Used to merge all sources of Mutation (Δstate) into a single stream
  • scan: Used to reduce the stream of Mutation into the state being produced.

In our snail example, we can express the same state production pipeline with user actions as:

To help with readability, creating a Mutation inside a Flow transformation lambda is done with an inline function: aFlow.map { mutation { copy(...) } }

1data class Snail5State( 2 ..., 3) 4 5class Snail5StateHolder( 6 private val scope: CoroutineScope 7) { 8 9 private val speed: Flow<Speed> =10 11 private val speedChanges: Flow<Mutation<Snail5State>> = speed 12 .map { mutation { copy(speed = it) } } 13 14 private val progressChanges: Flow<Mutation<Snail5State>> = intervalFlow 15 .map { mutation { copy(progress = (progress + 1) % 100) } } 16 17 private val changeEvents = MutableSharedFlow<Mutation<Snail5State>>() 18 19 val state: StateFlow<Snail5State> = merge( 20 progressChanges, 21 speedChanges, 22 changeEvents, 23 ) 24 .scan(Snail5State()) { state, mutation -> mutation(state) } 25 .stateIn(...) 26 27 fun setSnailColor(index: Int) { 28 scope.launch { 29 changeEvents.emit { copy(color = colors[index]) } 30 } 31 } 32}

Notice that user actions are now propagated with a MutableSharedFlow instead of a MutableStateFlow. This is because StateFlow conflates emissions which can make it prioritize only the latest events when shared between methods invoked by user events.

Snail5 is identical to Snail4; just with different state production semantics.

Why merge

This example has all the functionality the combine approach did, but with a slight complexity cost.

It however brings the following advantages:

  • merge is an n-ary function; it has no arity limits; you can merge as many Flows as you want.
  • New state can be a product of old state; a Mutation is an accumulating/reducing function.
  • All user actions that change state can share the same source Flow: changeEvents a MutableSharedFlow.
  • All source flows can independently contribute their state mutations
  • All source flows at the time of introducing the state mutation can read the existing state.
  • State changes can come from Flows or suspend functions easily.

Why MutableSharedFlow

The switch to MutableSharedFlow from MutableStateFlow for propagating user actions is because StateFlow conflates emissions. If two separate methods attempted to use the same MutableStateFlow to emit a Mutation of state, the StateFlow may only emit the latest Mutation. That is MutableStateFlow does not guarantee that every update to its value property is seen by its collectors.

MutableSharedFlow on the other hand has an emit method which suspends until the Mutation is delivered. This means that multiple coroutines can be launched across several method invocations and call emit on the same MutableShared Flow and none of them will cancel out the other. The order in which they are applied also don't matter as Mutation instances just describe changes to state; they are designed to be independent.

If the emission order of Mutation instances across multiple user events matter to you, keep reading until the Conflicts in state production and Conflict resolution in state production sections.

These advantages can be easily illustrated with an example. If the user wanted to manually move the snail to a certain position, it would simply be:

1class Snail6StateHolder( 2 private val scope: CoroutineScope 3) { 4 5 private val progressChanges: Flow<Mutation<Snail6State>> =67 8 private val changeEvents = MutableSharedFlow<Mutation<Snail6State>>() 9 10 fun setSnailColor(index: Int) { 11 scope.launch { 12 changeEvents.emit { copy(color = colors[index]) } 13 } 14 } 15 16 fun setProgress(progress: Float) { 17 scope.launch { 18 changeEvents.emit { copy(progress = progress) } 19 } 20 } 21}

Drag the snail to place it anywhere on its track. Also, try to hold it in place and see what happens.

That is, we can simply introduce a state change to the progress property of the state despite the progessChanges flow also contributing to a change of the same property. This is something that would be rather difficult with the combine approach. This is because the combine approach only lets you set state properties of state to create new state. The merge approach instead lets you mutate or change properties of state and apply them to the existing state.

Furthermore both setSnailColor and setProgress contribute their changes to state using the same MutableSharedFlow: changeEvents. This approach scales well because no mater how many methods are added that change the State from user events, they don't need any more variables to be declared in the state holder class.

Why Mutations

The method of state production described above may seem unfamiliar at first, however it simply builds on the pattern established in Snail1 with a single MutableStateFlow. It expands the pattern to work not just for state changes from synchronous and suspend methods, but for state changes from Flow instances as well.

Merging Mutation instances can be thought of an extension to MutableStateFlow.update(), where updates can come from both Flows and suspend functions.

Formalizing state production

The merge approach can be formalized into an extension function on the CoroutineScope the state is produced in:

1fun <State: Any> CoroutineScope.stateFlowProducer( 2 initialState: State, 3 started: SharingStarted = SharingStarted.WhileSubscribed(), 4 mutationFlows: List<Flow<Mutation<State>>> 5) : StateFlowProducer<State>

Where the use of it in the snail example becomes:

1class Snail7StateHolder( 2 private val scope: CoroutineScope 3) { 4 5 private val speedChanges: Flow<Mutation<Snail7State>> =6 7 private val progressChanges: Flow<Mutation<Snail7State>> =8 9 private val stateProducer = scope.stateFlowProducer( 10 initialState = Snail7State(), 11 started = SharingStarted.WhileSubscribed(), 12 mutationFlows = listOf( 13 speedChanges, 14 progressChanges, 15 ) 16 ) 17 18 val state: StateFlow<Snail7State> = stateProducer.state 19 20 fun setSnailColor(index: Int) = stateProducer.launch { 21 mutate { copy(color = colors[index]) } 22 } 23 24 fun setProgress(progress: Float) = stateProducer.launch { 25 mutate { copy(progress = progress) } 26 } 27}

Snail7 is identical to Snail6; just with a formalized state production approach.

Conflicts in state production

So far, all sources of state change have been relatively harmonious. That is, they don't conflict or compete with each other. Sometimes however, especially with asynchronous data sources, state changes can clash. This most often occurs when user events trigger a set of cascading state changes.

This is best illustrated with an example. Say we wanted to expose our snail to the experience of day and night. Not only that, we want the experience to animate smoothly. We can do this by adding a new method:

1class Snail8StateHolder( 2 private val scope: CoroutineScope 3) { 4 5 private val stateProducer = scope.stateFlowProducer( 6 ... 7 ) 8 ... 9 10 fun setMode(isDark: Boolean) = stateProducer.launch { 11 mutate { copy(isDark = isDark) } 12 /* Collect from a flow that animates color changes */ 13 interpolateColors( 14 startColors = state.value.colors.map(Color::argb).toIntArray(), 15 endColors = MutedColors.colors(isDark).map(Color::argb).toIntArray() 16 ).collect { (progress, colors) -> 17 mutate { 18 copy( 19 colorInterpolationProgress = progress, 20 colors = colors 21 ) 22 } 23 } 24 } 25}

In Jetpack Compose apps, animating color changes is best done with the animateColorAsState APIs instead of manually as shown in the example above. The example is merely used to demonstrate long running operations that cause state changes, like uploading a file with a progress bar.

Tap the toggle button to switch between light and dark modes for the snail. Notice that tapping in quick succession will cause the UI to flicker as state changes conflict.

In the above, each time setMode is called, we first update the state to the new mode, and then crucially, begin to collect from a finite flow that updates the colors available to choose from.

The source of conflict here is the interpolateColors Flow. If setMode is called twice in quick succession, there will be two instances of the interpolateColors Flow running which may cause the UI to flicker.

Conflict resolution in state production

There are two ways of dealing with the issue above:

State locking

We can add a new boolean field to the state called isInterpolating. If setMode is called when interpolateColors is running, we return immediately.

1data class Snail9State( 2 ... 3 val isInterpolating: Boolean = false, 4) 5 6class Snail9StateHolder( 7 private val scope: CoroutineScope 8) { 9 10 private val stateProducer = scope.stateFlowProducer( 11 ... 12 ) 13 ... 14 15 fun setMode(isDark: Boolean) = stateProducer.launch { 16 if (state.value.isInterpolating) return@launch 17 mutate { copy(isDark = isDark, isInterpolating = true) } 18 interpolateColors( 19 ... 20 ).collect { (progress, colors) -> 21 mutate { 22 copy(...) 23 } 24 } 25 mutate { copy(isInterpolating = false) } 26 } 27}

Tap the toggle button many times again. Notice that it ignores the toggle event while the animation is running.

This works, but it has the unfortunate side effect of making the user wait until we're done interpolating before they can change the mode again.

Conflict elimination

Eliminating the source of conflict in the example above would require being able to stop collecting from interpolateColors each time setMode is invoked. There are two ways of doing this:

Job cancelation

In the above scope.launch() returns a Job for the suspending function that collects from interpolateColors. Keeping a reference to this Job allows for canceling the Flow when next the setMode method is invoked.

1class Snail10StateHolder( 2 private val scope: CoroutineScope 3) { 4 5 private val stateProducer = scope.stateFlowProducer( 6 ... 7 ) 8 private var setModeJob: Job? = null 9 ... 10 11 fun setMode(isDark: Boolean) = stateProducer.launch { 12 setModeJob?.cancel() 13 setModeJob = currentCoroutineContext()[Job] 14 mutate { copy(isDark = isDark) } 15 interpolateColors( 16 ... 17 ).collect { (progress, colors) -> 18 mutate { 19 copy(...) 20 } 21 } 22 } 23}

Tap the toggle button many times. Notice that with each tap, the colors reverse their changes.

This works, although it has the caveat of something we've seen before; it scales linearly. Each method invocation that could have potential conflicts in state by virtue of it causing multiple changes in state will need a Job reference to allow for cancelling collection.

User events as flows

In this approach, the user event of setting the mode is modeled as a Flow, and by using the flatMapLatest Flow operator, the interpolateColors Flow is automatically canceled each time the SetMode Flow emits.

1data class SetMode( 2 val isDark: Boolean, 3 val startColors: List<Color> 4) 5 6private fun Flow<SetMode>.mutations(): Flow<Mutation<Snail11State>> = 7 flatMapLatest { (isDark, startColors) -> 8 flow { 9 emit(mutation { copy(isDark = isDark) }) 10 emitAll( 11 interpolateColors( 12 startColors = startColors.map(Color::toArgb).toIntArray(), 13 endColors = MutedColors.colors(isDark) 14 ) 15 .map { mutation { copy(colors = it) } } 16 ) 17 } 18 }

In the above, SetMode is a summary of the user event. It encapsulates everything needed to process the Mutation. The use of flatMapLatest cancels any ongoing state changes as a result of previous events and guarantees there are no conflicts.

Scaling the above to cover state production for our snail is non trivial, and often requires a library. In the case of the snail example, applying the concept above yields:

1class Snail11StateHolder( 2 scope: CoroutineScope 3) { 4 5 private val speedChanges: Flow<Mutation<Snail11State>> =6 7 private val progressChanges: Flow<Mutation<Snail11State>> =8 9 private val mutator = scope.actionStateFlowProducer<Action, Snail11State>( 10 initialState = Snail11State(), 11 started = SharingStarted.WhileSubscribed(), 12 mutationFlows = listOf( 13 speedChanges, 14 progressChanges 15 ), 16 actionTransform = { actions -> 17 actions.toMutationStream { 18 when (val type = type()) { 19 is Action.SetColor -> type.flow.colorMutations() 20 is Action.SetMode -> type.flow.modeMutations() 21 is Action.SetProgress -> type.flow.progressMutations() 22 } 23 } 24 } 25 ) 26 27 val state: StateFlow<Snail11State> = mutator.state 28 29 val actions: (Action) -> Unit = mutator.accept 30 31 private fun Flow<Action.SetColor>.colorMutations(): Flow<Mutation<Snail11State>> = 32 mapLatest { 33 mutation { copy(colorIndex = it.index) } 34 } 35 36 private fun Flow<Action.SetProgress>.progressMutations(): Flow<Mutation<Snail11State>> = 37 mapLatest { 38 mutation { copy(progress = it.progress) } 39 } 40 41 private fun Flow<Action.SetMode>.modeMutations(): Flow<Mutation<Snail11State>> = 42 flatMapLatest { (isDark, startColors) -> 43 flow { 44 emit(mutation { copy(isDark = isDark) }) 45 emitAll( 46 interpolateColors( 47 startColors = startColors.map(Color::argb).toIntArray(), 48 endColors = MutedColors.colors(isDark).map(Color::argb).toIntArray() 49 ) 50 .map { (progress, colors) -> 51 mutation { 52 copy( 53 colorInterpolationProgress = progress, 54 colors = colors 55 ) 56 } 57 } 58 ) 59 } 60 }

Snail11 is identical to Snail10; just with different state production semantics.

In the above, all there are two sources of state Mutations:

  • Data sources in the mutationFlows argument; defined as Flow<Mutation>
  • User events in the actionTransform argument; defined as (Flow) -> Flow<Mutation>

Crucially the actionTransform takes a Flow of all Action instances, splits them out into individual Flows for each Action, and finally applies Flow transformations to each Action Flow to turn them into Flow<Mutation>:

  • colorMutations and progressMutations both use mapLatest to guarantee no conflicts because mapLatest automatically cancels any suspending function invoked in its lambda.
  • modeMutations does the same as the above but uses flatMapLatest and the flow { } builder because it collects from a Flow instead of just invoking a suspend function.

Choosing a state production pipeline

The above highlights a common motif in this document; the state production pipeline is only as complicated as the kind of state produced, and the state changes that can occur. Simple states require simple pipelines, and complicated states often require higher level abstractions that enforce guarantees in state production. These guarantees often come with a complexity cost.

Depending on the particulars of your state production pipeline, a rule of thumb that can be applied is:

Simple state production pipelines

For small and simple states, combine sources that contribute to your state. No library is required.

Intermediate state production pipelines

For intermediate states that have lots of different kinds of user events, merge changes to state so you do not have to create multiple MutableStateFlow instances to manage each user event. You may opt to use a library, or roll out a small custom implementation of the techniques described in the merge section above.

Large and complex state production pipelines

For state production pipelines that:

  • Have multiple contributors to the same state property that may conflict
  • Have user events that can cause multiple mutations of state to occur
  • Have state changes that involve collecting from other Flows

Model your state production pipeline as a Flow to allow for tighter control of sources of change, and use a library that offers ergonomic APIs for your transformations.

47