Adetunji Dahunsi

Unidirectional Data Flow as a Functional declaration

Representing UI production pipelines as a function of a stream of inputs yielding a stream of outputs

TJ Dahunsi

Mar 15 2022 · 12 min read

Categories:
architecture
kotlin

There are a lot of interesting experiments and learnings from the android community about the production of UI state in the context of a Unidirectional Data Flow (UDF) app. Like with most things, while the “what”, in this case UDF, may be well defined, “how” is quite the moving target.

This article will briefly cover some approaches on the topic, and then introduce my personal take on it. Extra emphasis on personal, my work isn't peer reviewed. Its an experiment and not a reccomendation!

The goal of an UDF implementation is to represent the effects of a user’s actions with a UI state. The representation of this UI state is open ended, some opt to do this in a single encapsulating data class, others choose to do this with fields on a class body. The latter has some novel implementations, Fabio Collini demonstrated one using Jetpack Compose’s mutableStateOf functions here:

In the case of the former where a single data class is used to encapsulate the entirety of UI state, given that a user may take multiple actions over time, with each action resulting in one or more changes to state, one way of summarizing a UDF implementation is with the following:

1val udfImplementation: (Flow<Action>) -> Flow<UiState> = ... 2

A UDF implementation may be defined as a function that transforms a stream of actions into a stream of state

The above, while concise, is rather dense. It says that a UDF implementation is essentially a function that takes a stream of action and produces a stream of state. What I love about this statement is how applicable it is to existing UDF implementations.

Spotify’s Mobius:

1public static void main(String[] args) { 2 // Let's make a Mobius Loop 3 MobiusLoop<Integer, CounterEvent, CounterEffect> loop = Mobius 4 .loop(new CounterLogic(), new CounterEffectHandler()) 5 .startFrom(0); 6 7 // And start using our loop 8 loop.dispatchEvent(CounterEvent.INCREMENT); // Model is now 1 9 loop.dispatchEvent(CounterEvent.DECREMENT); // Model is now 0 10 loop.dispatchEvent(CounterEvent.DECREMENT); // Sound effect plays! Model is still 0 11} 12

MVI with Spotify’s Mobius

Arkadii Ivanov’s MVIKotlin:

1internal class CalculatorStoreFactory(private val storeFactory: StoreFactory) { 2 3 fun create(): CalculatorStore = 4 object : CalculatorStore, Store<Intent, State, Nothing> by storeFactory.create( 5 name = "CounterStore", 6 initialState = State(), 7 reducer = ReducerImpl 8 ) { 9 } 10 11 private object ReducerImpl : Reducer<State, Intent> { 12 override fun State.reduce(msg: Intent): State = 13 when (msg) { 14 is Intent.Increment -> copy(value = value + 1L) 15 is Intent.Decrement -> copy(value = value - 1L) 16 } 17 } 18} 19

MVI with Arkadii Ivanov’s MVIKotlin

…and Cash App’s Molecule:

1@Composable 2fun SomePresenter(events: Flow<EventType>): ModelType { 3 // ... 4} 5 6val models: StateFlow<ModelType> = scope.launchMolecule { 7 SomePresenter(events) 8} 9

UDF with Cash App’s Molecule

The implementations above differ in approach, side effect propagation, available levels of abstraction and so on, but they all effectively serve to reduce actions into state.

Any proponent of React’s Redux should find this familiar, the concepts are the same: action goes in, state comes out. Molecule is particularly interesting for reasons well summarized by the team behind it:

The reason this is exciting, and the reason for the history lesson above, is that Compose does enable a new way of writing our logic. The use of a compiler plugin unlocks the language in a way that otherwise could not be achieved with raw coroutine library APIs.

That set off something for me however, a little code challenge. How succinct / easy would it be to write an implementation of (Flow<Action>) -> Flow<State> with just simple coroutine library APIs?

Enter Mutator

A Mutator is an interface declaration specifying an input and its resulting output.

1interface Mutator<Action : Any, State : Any> { 2 val state: State 3 val accept: (Action) -> Unit 4} 5

A Mutator is a declaration of an input Action to it’s ensuing State output

However, an interface declaration is only as good as its implementation. Most MVI implementations require the use of a Reducer to process each action into state. With Kotlin’s receivers, we can define a generic reducer as a Mutation.

1data class Mutation<T : Any>( 2 val mutate: T.() -> T 3) 4

A generic reducer is a class describing the mutation of the reduction

In the above, rather than define a specific Reducer for actions, a data class holding a single lambda defining the reduction works just as well.

Next is the fun part. An MVI implementation of the type above with Kotlin Flows would have the type Mutator<Action, StateFlow<State>> . The above could be easily achieved if we could split the input Action stream by its type and specify a Flow of Mutation<State> for each one. The resulting Flow<Mutation<State>> for each Action can then be merged into a Flow<State> given an initial State to reduce into.

For example, consider the Jetpack Compose multiplatform desktop app in the image below:

alt_text

The state in the app is driven by a Mutator

The screen's UI state is defined as:

1data class State( 2 val gridSize: Int = 1, 3 val shouldScrollToTop: Boolean = true, 4 val isInNavRail: Boolean = false, 5 val hasFetchedAuthStatus: Boolean = false, 6 val isSignedIn: Boolean = false, 7 val queryState: QueryState, 8 val lastVisibleKey: Any? = null, 9 val items: List<ArchiveItem> = listOf() 10)

The user can also take a variety of actions that mutate the state, summarized with a sealed class as:

1sealed class Action(val key: String) { 2 sealed class Fetch : Action(key = "Fetch") { 3 abstract val query: ArchiveQuery 4 5 data class Reset(override val query: ArchiveQuery) : Fetch() 6 7 data class LoadMore(override val query: ArchiveQuery) : Fetch() 8 } 9 10 data class FilterChanged(val descriptor: Descriptor) : Action(key = "FilterChanged") 11 12 data class GridSize(val size: Int) : Action(key = "GridSize") 13 14 data class ToggleFilter(val isExpanded: Boolean? = null) : Action(key = "ToggleFilter") 15 16 data class LastVisibleKey(val itemKey: Any) : Action(key = "LastVisibleKey") 17} 18

This is standard MVI (Model-View-Intent) fare; each Action takes in the arguments it needs to perform its function, analogous to a regular method invocation. Each Action then causes its own Mutation of state. A StateFlow based Mutator for the screen can be defined as:

1fun archiveListMutator( 2 scope: CoroutineScope, 3 initialState: State? = null, 4 archiveRepository: ArchiveRepository, 5 authRepository: AuthRepository, 67): ArchiveListMutator = stateFlowMutator( 8 scope = scope, 9 initialState = initialState ?: State(...), 10 started = SharingStarted.WhileSubscribed(2_000L), 11 actionTransform = { actions -> 12 merge( 13, 14 authRepository.authMutations(), 15 actions.toMutationStream(keySelector = Action::key) { 16 when (val action = type()) { 17 is Action.Fetch -> action.flow.fetchMutations( 18 scope = scope, 19 repo = archiveRepository 20 ) 21 is Action.FilterChanged -> action.flow.filterChangedMutations() 22 is Action.ToggleFilter -> action.flow.filterToggleMutations() 23 is Action.LastVisibleKey -> action.flow.resetScrollMutations() 24 is Action.GridSize -> action.flow.gridSizeMutations() 25 } 26 } 27 ).monitorWhenActive(lifecycleStateFlow) 28 } 29) 30

ArchiveMutator definition. Note how each Action is responsible for its own mutations

In the above, each Action stream is mapped to its own Mutation stream using the Flow<Action>.toMutationStream() extension method. The implementations range from the simple:

1private fun Flow<Action.GridSize>.gridSizeMutations(): Flow<Mutation<State>> = 2 distinctUntilChanged() 3 .map { 4 Mutation<State> { 5 copy(gridSize = it.size) 6 } 7 } 8

A simple Action to Mutation stream: distinctUntilChanged, then mapped

to nuanced:

1/** 2 * Every toggle isExpanded == null should be processed, however every specific request to 3 * expand or collapse, should be distinct until changed. 4 */ 5private fun Flow<Action.ToggleFilter>.filterToggleMutations(): Flow<Mutation<State>> = 6 map { it.isExpanded } 7 .scan(listOf<Boolean?>()) { emissions, isExpanded -> 8 (emissions + isExpanded).takeLast(2) 9 } 10 .transformWhile { emissions -> 11 when { 12 emissions.isEmpty() -> Unit 13 emissions.size == 1 -> emit(emissions.first()) 14 else -> { 15 val (previous, current) = emissions 16 if (current == null || current != previous) emit(current) 17 } 18 } 19 true 20 } 21 .map { isExpanded -> 22 Mutation<State> { 23 copy(queryState = queryState.copy(expanded = isExpanded ?: !queryState.expanded)) 24 } 25 } 26

A more complex Action to Mutation stream: distinctUntilChanged unless null is emitted

The great thing above is each Action is independent and fully responsible for its resultant Mutation<State>. Using the appropriate Flow operators, one can:

  • Process each Action sequentially
  • Operate on only the most recently emitted
  • Or come up with whatever processing pipeline is needed on a per Action basis. Each Action subtype can even be split by its subtype for even more complex pipelines.

The source for the app shown above can be found here:

Fun fact, this article you're reading was written and edited with the app linked above!

Imperative vs Functional Programming

Interestingly, transforming each Action Flow into a cold Flow<Mutation<State>> with an extension function ends up mirroring the more conventional and imperative “invoke method to mutate state” paradigm, but in a more static and functional way. The Flow extensions have no reference to class variables; instead, they describe what changes to apply to the State at the time they are processed.

Consider the following imperative snippet where UI state is mutated via method calls on a private backing variable when fetching articles for a category:

1 fun fetchArticles(category: String) { 2 fetchJob?.cancel() 3 fetchJob = viewModelScope.launch { 4 try { 5 val newsItems = repository.newsItemsForCategory(category) 6 _uiState.update { 7 it.copy(newsItems = newsItems) 8 } 9 } catch (ioe: IOException) { 10 // Handle the error and notify the UI when appropriate. 11 _uiState.update { 12 val messages = getMessagesFromThrowable(ioe) 13 it.copy(userMessages = messages) 14 } 15 } 16 } 17 } 18

The equivalent functional representation of the above as a Flow<Mutation<State>> of the SetCategory Action is:

1fun Flow<Action.SetCategory>.categoryChangeMutations( 2 repository: Repository 3): Flow<Mutation<NewsUiState>> = 4 mapLatest { (category) -> 5 try { 6 val newsItems = repository.newsItemsForCategory(category) 7 Mutation<NewsUiState> { 8 copy(newsItems = newsItems) 9 } 10 } catch (ioe: IOException) { 11 // Handle the error and notify the UI when appropriate. 12 val messages = getMessagesFromThrowable(ioe) 13 Mutation<NewsUiState> { 14 copy(userMessages = messages) 15 } 16 } 17 } 18

Notice the use of maplatest to match the imperative version where a job reference is kept to prevent having two requests in flight.

I personally prefer the functional version because I get to:

  • Drop the backing _uiState variable that I would need to mutate imperatively
  • Drop the extra Job? class variable that prevented accidental simultaneous fetches
  • Define the changes to state caused by the SetCategory Action statically. The function has everything it needs to produce its Mutation, and does not need the call context of a class body to read class fields.

The disadvantages of the above being:

  • The classic: "readers and maintainers of the code need to be familiar with Flows and their operators".
  • Extra object allocation:
    • For every use of Action.SetCategory that the imperative version avoids by simply being a method call.
    • For every Mutation created
    • For every UI state created after the Mutation is applied

In my personal opinion, the benefits outweigh the caveats, leaving what is essentially the redux pattern without the hassle of Reducers and side effect handlers. That is, it has all the pros and cons of Redux minus boilerplate.

For what it's worth, I don't think functional programming is always better than imperative programming, I just prefer it here for working on user events/actions as a stream. Imperative code can be a lot easier to write or think about. A suspending function is a lot more readable to me than Single.just() or Maybe.just() or whatever the factory function for creating a stream is in your FRP library of choice.

Testing

No architectural pattern exploration is complete without a section on testing. Being able to represent changes to UI state as a result of a particular Action with a cold Flow offers a huge idiomatic advantage for asserting the (Flow<Action>) -> Flow<UiState> maxim.

For reasons why it is desirable to test cold Flows instead of hot Flows, especially StateFlow, consider reading Testing StateFlow Is Annoying by Bill Phillips.

Above, we outlined the nuanced business logic of "distinctUntilChanged unless null" in filterToggleMutations() for Action.ToggleFilter. A test to verify its behavior using Cash app's Turbine is:

1 @Test 2 fun testFilterToggleMutations() = runTest { 3 val initialQueryState = QueryState( 4 expanded = false, 5 startQuery = ArchiveQuery(kind = ArchiveKind.Articles), 6 currentQuery = ArchiveQuery(kind = ArchiveKind.Articles), 7 ) 8 val initialState = State( 9 queryState = initialQueryState 10 ) 11 listOf( 12 Action.ToggleFilter(isExpanded = null), 13 Action.ToggleFilter(isExpanded = null), 14 Action.ToggleFilter(isExpanded = true), 15 Action.ToggleFilter(isExpanded = true), 16 Action.ToggleFilter(isExpanded = true), 17 Action.ToggleFilter(isExpanded = true), 18 Action.ToggleFilter(isExpanded = false), 19 ) 20 .asFlow() 21 .filterToggleMutations() 22 .reduceInto(initialState) 23 .test { 24 // First item is initial state 25 assertEquals( 26 expected = initialState, 27 actual = awaitItem() 28 ) 29 // After processing null for `isExpanded`, the toggle should have flipped 30 assertEquals( 31 expected = initialState.queryState.copy(expanded = true), 32 actual = awaitItem().queryState 33 ) 34 // Toggle expanded again 35 assertEquals( 36 expected = initialState.queryState.copy(expanded = false), 37 actual = awaitItem().queryState 38 ) 39 // Explicit set expanded to true 40 assertEquals( 41 expected = initialState.queryState.copy(expanded = true), 42 actual = awaitItem().queryState 43 ) 44 45 // Consecutive emissions of true should have been ignored 46 47 // Explicit set expanded to false 48 assertEquals( 49 expected = initialState.queryState.copy(expanded = false), 50 actual = awaitItem().queryState 51 ) 52 53 awaitComplete() 54 } 55 } 56

The above is idiomatic and terse; for a given set of actions, assert the input yielded the expected output.

For Mutation Flow pipelines that take arguments, mocks need to only be provided for the arguments the method requires. In the archiveListMutator() definition above, you may have noticed other Mutation contributions outside actions taken by the user. Specifically AuthRepository.authMutations() which is defined as:

1internal fun AuthRepository.authMutations(): Flow<Mutation<State>> = 2 isSignedIn 3 .distinctUntilChanged() 4 .map { 5 Mutation { 6 copy( 7 isSignedIn = it, 8 hasFetchedAuthStatus = true, 9 ) 10 } 11 }

A test for this becomes:

1 @Test 2 fun testAuthMutations() = runTest { 3 val initialState = State(...) 4 val authRepository = mockk<AuthRepository>() 5 6 every { authRepository.isSignedIn } returns listOf( 7 false, 8 false, 9 true, 10 ).asFlow() 11 12 authRepository 13 .authMutations() 14 .reduceInto(initialState) 15 .test { 16 assertEquals( 17 expected = initialState, 18 actual = awaitItem() 19 ) 20 assertEquals( 21 expected = initialState.copy(isSignedIn = false), 22 actual = awaitItem() 23 ) 24 assertEquals( 25 expected = initialState.copy(isSignedIn = true), 26 actual = awaitItem() 27 ) 28 awaitComplete() 29 } 30 } 31

With a Mutator, not only is the expression (Flow<Action>) -> Flow<UiState> explicit, testing it also is.

Wrapping up

There are a lot more things to share about my findings from my experiments with Mutator, particularly those that produce a StateFlow. From handling UI events, to representing calls that suspend and return a value. Again, care should be taken to note, the above is not prescriptive. This is one way of several to represent a pipeline of the production of UI state.

The Mutator definition, stateFlowMutator() function and Flow<T>.toMutationStream{ } extension function can be found here:

The above being syntactic sugar around a Flow pipeline as an implementation of the generic UDF function (Flow<Action>) -> Flow<State>.

As promised, here’s the implementation for splitting a Flow into its subtypes:

1/** 2 * Class holding the context of the [Action] emitted that is being split out into 3 * a [Mutation] [Flow]. 4 * 5 * Use typically involves invoking [type] to identify the [Action] stream being transformed, and 6 * subsequently invoking [flow] to perform a custom transformation on the split out [Flow]. 7 */ 8data class TransformationContext<Action : Any>( 9 private val type: Action, 10 val backing: Flow<Action> 11) { 12 13 /** 14 * A convenience for the backing [Flow] of the [Action] subtype from the parent [Flow] 15 */ 16 inline val <reified Subtype : Action> Subtype.flow: Flow<Subtype> 17 get() = backing as Flow<Subtype> 18 19 /** 20 * the first [Action] of the specified type emitted from the parent [Flow] 21 */ 22 fun type() = type 23} 24 25/** 26 * Transforms a [Flow] of [Action] to a [Flow] of [Mutation] of [State], allowing for finer grained 27 * transforms on subtypes of [Action]. This allows for certain actions to be processed differently 28 * than others. For example: a certain action may need to only cause mutations on distinct 29 * emissions, whereas other actions may need to use more complex [Flow] transformations like 30 * [Flow.flatMapMerge] and so on. 31 * 32 * [transform]: a function for mapping independent [Flow]s of [Action] to [Flow]s of [State] 33 * [Mutation]s 34 * @see [splitByType] 35 */ 36fun <Action : Any, State : Any> Flow<Action>.toMutationStream( 37 // Ergonomic hack to simulate multiple receivers 38 transform: TransformationContext<Action>.() -> Flow<Mutation<State>> 39): Flow<Mutation<State>> = splitByType( 40 selector = { it }, 41 transform = transform 42) 43 44/** 45 * Transforms a [Flow] of [Input] to a [Flow] of [Output] by splitting the original into [Flow]s 46 * of type [Selector]. Each independent [Flow] of the [Selector] type can then be transformed 47 * into a [Flow] of [Output]. 48 * 49 * [selector]: The mapping to the type the [Input] [Flow] should be split into 50 * [transform]: a function for mapping independent [Flow]s of [Selector] to [Flow]s of [Output] 51 */ 52fun <Input : Any, Selector : Any, Output : Any> Flow<Input>.splitByType( 53 selector: (Input) -> Selector, 54 // Ergonomic hack to simulate multiple receivers 55 transform: TransformationContext<Selector>.() -> Flow<Output> 56): Flow<Output> = 57 channelFlow<Flow<Output>> [email protected]{ 58 val keysToFlowHolders = mutableMapOf<String, FlowHolder<Selector>>() 59 this@splitByType 60 .collect { item -> 61 val mapped = selector(item) 62 val flowKey = mapped::class.qualifiedName 63 ?: throw IllegalArgumentException("Only well defined classes can be split") 64 when (val existingHolder = keysToFlowHolders[flowKey]) { 65 null -> { 66 val holder = FlowHolder(mapped) 67 keysToFlowHolders[flowKey] = holder 68 val emission = TransformationContext(mapped, holder.exposedFlow) 69 val mutationFlow = transform(emission) 70 channel.send(mutationFlow) 71 } 72 else -> { 73 // Wait for downstream to be connected 74 existingHolder.internalSharedFlow.subscriptionCount.first { it > 0 } 75 existingHolder.internalSharedFlow.emit(mapped) 76 } 77 } 78 } 79 } 80 .flatMapMerge( 81 concurrency = Int.MAX_VALUE, 82 transform = { it } 83 ) 84 85fun <State : Any> Flow<Mutation<State>>.reduceInto(initialState: State): Flow<State> = 86 scan(initialState) { state, mutation -> mutation.mutate(state) } 87 88/** 89 * Container for representing a [Flow] of a subtype of [Action] that has been split out from 90 * a [Flow] of [Action] 91 */ 92private data class FlowHolder<Action>( 93 val firstEmission: Action, 94) { 95 val internalSharedFlow: MutableSharedFlow<Action> = MutableSharedFlow() 96 val exposedFlow: Flow<Action> = internalSharedFlow.onStart { emit(firstEmission) } 97} 98

Android Weekly Feature

10