UDF with Functional Reactive Programming: A case study
A practical example of unidirectional data flow with Kotlin coroutines and Flows on Android

TJ Dahunsi
Jul 10 2023 · 14 mins
The following is not official guidance. It is a zealous and very strict UDF implementation that aligns with my preference for functional reactive programming and immutable states. Please refer to the official Android architecture state production guide in your production applications.
Pure functions in state production
I try to build state production pipelines that consist of only pure functions. That is to produce state, the functions never require variables outside their explicitly passed arguments. Consider the following snippet:
data class ItemUiState( val itemId: String, val item: Item? = null ) class ItemViewModel( savedStateHandle: SavedStateHandle, private val itemRepository: ItemRepository ) : ViewModel() { private val _uiState = MutableStateFlow( ItemUiState(itemId = savedStateHandle.get<String>(ID_ARG)!!) ) val uiState: StateFlow<ItemUiState> = _uiState.asStateFlow() fun loadItem() { viewModelScope.launch { try { val loadedItem = itemRepository.loadItem( itemId = _uiState.value.itemId ) _uiState.update { it.copy(item = loadedItem) } } catch(exception: Exception) { ... } } } }
This is standard fare and good practice, however the loadItem()
method isn't a pure function; it requires access to the ViewModel
field variables itemRepository
and _uiState
. Fixing this is simple, if inconvenient:
fun loadItem( scope: CoroutineScope, uiState: MutableStateFlow<ItemUiState>, itemRepository: ItemRepository, ) { ... }
The change above does make it a pure function, but it's verbose to the point of being impractical. Nonetheless, it's something I obsess over and try to make all my state production pipelines resemble.
I go through this seemingly pedantic process because shared mutable states in class fields have no restrictions on concurrent access. loadItem()
may be called multiple times, causing multiple coroutines to concurrently mutate fields with a class level scope. This can be mitigated by holding references to launched Jobs or using a Mutex, but scaling these across multiple fields, different screens and multiple contributors poses a new set of organizational and managerial challenges. Failing to address the issue however causes a whole class of bugs I would much rather not deal with in highly concurrent applications.
State production rules
Ultimately I strive for my state production pipelines to:
Consist of pure functions: State production functions may only read from the arguments explicitly passed to it.
Have a static context: State production functions must have no access to field variables of any class. This enforces 1.
Be side effect free: State production functions must self contain task cancellation, retries, and asynchronous fetches in their bodies. Cancellation cannot be triggered from outside the function scope, no
Job
orDisposable
references that may be called upon from outside the function.Require an observer: State production functions must require the presence of an observer to run; no Schrodinger's state unless explicitly made hot. This implicitly means they must be restartable.
Be homogenous: State production semantics are homogenous across screens. If someone contributed to state production for screen A, the same mental model can be applied to all other screens.
The rest of this post outlines my method for achieving this.
Inline reducers as declarative state production functions
The CoroutineScope
and MutableStateFlow
arguments can be eliminated by modeling state production with flows. To familiarize yourself with this concept I recommend reading:
Theory: Unidirectional Data Flow as a Functional declaration
Interactive walkthrough: State production with unidirectional data flow and kotlin flows.
As a brief recap, a change in a State object can be defined inline as a Mutation<State>
.
typealias Mutation<State> = State.() -> State
State production is effectively applying multiple Mutation
instances to an initial State
over time. The signature of the typealias
is identical to the function
argument of MutableStateFlow.update()
. This is also the same idea popularized by Redux; however instead of writing a single reducer function that all actions and/or effects share, multiple reducing functions are written inline as a Mutation
.
Perhaps more conveniently, there is no need for side effect abstractions. All asynchronous actions can be performed inline in the Flow
by suspending, mapping, flatmapping or what have you. This eliminates the boiler plate of having to define a wrapper Action
/Effect
for every possible event that can change your state. Instead, you simply declare and dispatch the change.
This lets us rewrite the example above as a pure function without having to pass the CoroutineScope
and MutableStateFlow
as arguments every time, and also to define loadItem()
as an extension on ItemRepository
communicating clearly that the ItemRepository
is the source of state change:
// Original function fun loadItem() { viewModelScope.launch { try { val loadedItem = itemRepository.loadItem( itemId = _uiState.value.itemId ) _uiState.update { it.copy(item = loadedItem) } } catch(exception: Exception) { ... } } } // Static pure function fun ItemRepository.loadItem(itemId: String) = flow<Mutation<ItemUiState>> { try { val loadedItem = loadItem(itemId) // Equivalent to mutableStateFlow.update emit { copy(item = loadedItem) } } catch(exception: Exception) { ... } }
For user actions, the extension is on the Flow
of that action. If the user wanted to retry loading for example, the method definition would be:
fun Flow<Action.Retry>.reloadMutations( repository: ItemRepository ) = mapLatest { retryAction -> try { repository.loadItem(retryAction.itemId) } catch (exception: Exception) { ... // Return null to signal failure null } } // Equivalent to MutableStateFlow.update { } .mapToMutation { loadedItem -> // One may opt to add an // error message here if the loadedItem is null copy(item = loadedItem) }
inline fun <T, State> Flow<T>.mapToMutation( crossinline mapper: State.(T) -> State ): Flow<Mutation<State>> = // emit the update/mutation/reducing function map { item -> mutation { mapper(item) } }
Case study
The following is a case study on the approach described above in my fork of the open source Musify Spotify Jetpack Compose clone on the playground branch. State production pipelines for all screens follow the rules above and are assembled with the Mutator
library.
The screenshots that follow are for the podcast detail screen in the app:
The UI state that describes the screen and the actions a user can take on the screen are defined as:
sealed class PodcastShowDetailAction { object Retry : PodcastShowDetailAction() data class LoadAround(val podcastQuery: PodcastQuery?) : PodcastShowDetailAction() } data class PodcastShowDetailUiState( val currentQuery: PodcastQuery, val podcastShow: PodcastShow? = null, val currentlyPlayingEpisode: PodcastEpisode? = null, val isCurrentlyPlayingEpisodePaused: Boolean? = null, val loadingState: LoadingState = LoadingState.LOADING, val episodesForShow: TiledList<PodcastQuery, PodcastEpisode> = emptyTiledList() ) { enum class LoadingState { IDLE, LOADING, PLAYBACK_LOADING, ERROR } }
The entry point to state production for the screen is a ViewModel
. Hilt is used to inject all the inputs of the state production pipeline. These inputs are then used to assemble the PodcastShowDetailStateProducer
. This separation between the ViewModel
and the app's state production semantics makes state production:
Independent of and testable outside of the Android platform.
Portable accross platforms.
The ViewModel
becomes the Android entrypoint for state production and the business logic state holder for the platform's UI, this is the benefit espoused in the documentation.
@HiltViewModel class PodcastShowDetailViewModel @Inject constructor( @ApplicationContext context: Context, savedStateHandle: SavedStateHandle, podcastsRepository: PodcastsRepository, getCurrentlyPlayingEpisodePlaybackStateUseCase: GetCurrentlyPlayingEpisodePlaybackStateUseCase ) : ViewModel() { private val stateProducer = viewModelScope.podcastShowDetailStateProducer( showId = savedStateHandle[ MusifyNavigationDestinations.PodcastShowDetailScreen.NAV_ARG_PODCAST_SHOW_ID ]!!, countryCode = context.countryCode, podcastsRepository = podcastsRepository, getCurrentlyPlayingEpisodePlaybackStateUseCase = getCurrentlyPlayingEpisodePlaybackStateUseCase ) val state = stateProducer.state val actions = stateProducer.accept }
The state producer is a pure function that takes the state production inputs and user actions and converts it to state following the official state production guide:
Inputs: Data sources belong here.
Pipeline assembly and initialization: Though varied, they fall into 2 distinct categories:
Application driven: A conversion of the aforementioned inputs to a
List
ofFlow<Mutation<State>>
instances.User driven: The actions a user can perform to cause state changes. I model this as a
Flow<Action>
->Flow<Mutation<State>>
Output: A UI state representation of the state production pipeline.
// Inputs are explicitly passed as function arguments fun CoroutineScope.podcastShowDetailStateProducer( showId: String, countryCode: String, podcastsRepository: PodcastsRepository, getCurrentlyPlayingEpisodePlaybackStateUseCase: GetCurrentlyPlayingEpisodePlaybackStateUseCase, ) = actionStateFlowProducer<PodcastShowDetailAction, PodcastShowDetailUiState>( // The singular output of state production initialState = PodcastShowDetailUiState( ... ), // Inputs as sources of state change mutationFlows = listOf( ... ), // User actions as sources of state change actionTransform = { actions: Flow<PodcastShowDetailAction> -> ... } )
Let's break down the snippet above.
Inputs
The inputs for state production are the arguments of the function and are the data sources for state production:
fun CoroutineScope.podcastShowDetailStateProducer( showId: String, countryCode: String, podcastsRepository: PodcastsRepository, getCurrentlyPlayingEpisodePlaybackStateUseCase: GetCurrentlyPlayingEpisodePlaybackStateUseCase, ) : ActionStateFlowProducer<PodcastShowDetailAction, PodcastShowDetailUiState>
This is everything needed to produce the UI state wrapped up in a single function.
Pipeline assembly and initialization
The following describes the different sources of change in the state holder for the PodcaseShowDetailUiState
, and how they are used to initialize the state production pipeline.
Application driven
These produce state changes regardless of user actions, and are caused by changes in application state. They have the following definition:
mutationFlows: List<Flow<Mutation<State>>>
The state producer for the screen defined above has two application driven sources of change:
The currently playing episode use case.
The information for the podcast currently being viewed.
mutationFlows = listOf( getCurrentlyPlayingEpisodePlaybackStateUseCase.playbackStateMutations(), podcastsRepository.fetchShowMutations( showId = showId, countryCode = countryCode ) )
Currently Playing Episode Mutations
In the snippet below, the Flow
from the use case is collected from and used to update the UI state each time the playback state changes.
private fun GetCurrentlyPlayingEpisodePlaybackStateUseCase.playbackStateMutations(): Flow<Mutation<PodcastShowDetailUiState>> = currentlyPlayingEpisodePlaybackStateStream .mapToMutation { playbackState -> when (playbackState) { is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Ended -> copy( isCurrentlyPlayingEpisodePaused = null, currentlyPlayingEpisode = null ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Loading -> copy( loadingState = PodcastShowDetailUiState.LoadingState.PLAYBACK_LOADING ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Paused -> copy( currentlyPlayingEpisode = playbackState.pausedEpisode, isCurrentlyPlayingEpisodePaused = true ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Playing -> copy( loadingState = PodcastShowDetailUiState.LoadingState.IDLE, isCurrentlyPlayingEpisodePaused = when (isCurrentlyPlayingEpisodePaused) { null, true -> false else -> isCurrentlyPlayingEpisodePaused }, currentlyPlayingEpisode = playbackState.playingEpisode ) } }
PodcastsRepository fetch show Mutations
Performs the initial load in the state producer. It consumes a one-shot API from the PodcastRepository
to produce its state change. Note that the one-shot API is represented as a flow to make it compatible with the state changes caused by getCurrentlyPlayingEpisodePlaybackStateUseCase.playbackStateMutations()
as recommended in the official guide.
private fun PodcastsRepository.fetchShowMutations( showId: String, countryCode: String ) = flow<Mutation<PodcastShowDetailUiState>> { val result = fetchPodcastShow( showId = showId, countryCode = countryCode, ) if (result is FetchedResource.Success) emit { copy( loadingState = PodcastShowDetailUiState.LoadingState.IDLE, podcastShow = result.data ) } else emit { copy(loadingState = PodcastShowDetailUiState.LoadingState.ERROR) } }
User driven
There are two user actions that cause state changes. They are processed in the actionTransform lambda the has the following type signature:
actionTransform: SuspendingStateHolder<State>.(Flow<Action>) -> Flow<Mutation<State>>
It is a function that takes a Flow
of all the Action
instances generated by the user, and creates a Flow
of state changes. Its receiver also allows for suspending and reading the current state at a point in time, this communicates that state may change shortly after reading it.
typealias suspendingStateHolder<State> = StateHolder<suspend () -> State> interface StateHolder<State : Any> { val state: State }
Use of it typically involves splitting the Flow<Action>
stream into independent streams of its subtypes. This splitting allows for Flow
transformations applied to each subtype to be self contained. Retries can be debounced and delays induced without affecting the rest of the state production pipeline.
actionTransform = { actions -> actions.toMutationStream { when (val action = type()) { // action.flow is an indpendent Flow<PodcastShowDetailAction.LoadAround> is PodcastShowDetailAction.LoadAround -> action.flow.episodeMutations( podcastsRepository = podcastsRepository ) // action.flow is an indpendent Flow<PodcastShowDetailAction.Retry> is PodcastShowDetailAction.Retry -> action.flow.retryMutations( podcastsRepository = podcastsRepository, showId = showId, countryCode = countryCode ) } } }
Paginated episode load Mutations
As the user scrolls the available episodes, it triggers requests to load more data around the user's current position. When the list is empty (the podcastQuery
argument is null), it reads the start query from the existing UI state which is always non null, it then fetches episodes for the podcast and updates the UI state. This also allows for pagination to be stopped when the screen is left, and resume from exactly when it left of when resumed as seen in the following snippet:
context(SuspendingStateHolder<PodcastShowDetailUiState>) private suspend fun Flow<PodcastShowDetailAction.LoadAround>.episodeLoadMutations( podcastsRepository: PodcastsRepository ): Flow<Mutation<PodcastShowDetailUiState>> = map { it.podcastQuery ?: state().currentQuery } .toTiledList( // suspend and read where pagination should start from the current state. startQuery = state().currentQuery, queryFor = { copy(page = it) }, fetcher = podcastsRepository::podcastsFor ) .mapToMutation { copy(episodesForShow = it.distinctBy(PodcastEpisode::id)) }
Load retry Mutations
If the initial load from PodcastsRepository.fetchShowMutations()
fails, users may request a reload. When retrying, the state is first updated to signify that the UI is loading. After that, podcastsRepository.fetchShowMutations()
for the initial load is reused. The use of the mapLatestToManyMutations
(internally a flatMapLatest
) extension below guarantees that only one retry coroutine exists at any one time, implementing cancellation and retry semantics implicitly and exclusively within the implementing state production function. All Mutation
instances are also written inline without needing to create side effect abstractions.
private fun Flow<PodcastShowDetailAction.Retry>.retryMutations( podcastsRepository: PodcastsRepository, showId: String, countryCode: String ): Flow<Mutation<PodcastShowDetailUiState>> = // Uses flatMapLatest internally to make sure // only one retry is in progress at any one time mapLatestToManyMutations { // First update the loading state to loading emit { copy( loadingState = PodcastShowDetailUiState.LoadingState.Loading, ) } // Call the initial load function again emitAll( podcastsRepository.fetchShowMutations( showId = showId, countryCode = countryCode ) ) }
inline fun <T, State> Flow<T>.mapLatestToManyMutations( crossinline block: suspend FlowCollector<Mutation<State>>.(T) -> Unit ): Flow<Mutation<State>> = flatMapLatest { flow { block(it) } }
Output
The flows from the initialization of the state production pipeline are merged. This:
Makes each
Mutation
Flow
independent, debouncing in one flow will not affect other flows.Enforces strict concurrency in the same way a Mutex.lock() call would. Since each state change runs in the context of a
Flow
and has suspend semantics, the change is only applied atomically when theMutation
is invoked.
The output of the state production pipeline assembled above is one that meets the requirements defined above:
Consist of pure functions: All arguments needed to produce state are explicitly passed to each state production function; the functions are all fully self contained.
Static: Each function shown above is written in a static context, there is no enclosing class body where the functions can reach back into.
Is side effect free: Task cancellation has to be modeled in the produced flow using
Flow
operators likemapLatest
orFlatMapLatest
. Same for retries.Require an observer: The
StateFlow
produced has the sharing semantics ofSharingStarted.WhileSubscribed(5_000)
and may be changed. There are no unbounded fire-and-forget coroutines launched in aninit
block of aViewModel
as recommended by the official guide.Homogenous: All screens in the app follow the same state production templated. Actions and application state changes are reduced into an initial state.
Extra considerations
You should evaluate the following features of this UDF implementation to make sure it is compatible with your state production needs:
Pipeline feature | Consideration | Consideration guidelines |
---|---|---|
Each | Consider your state production pipeline. | This approach is justified if:
|
State is in a single object | Consider your consumer of state. | This is helpful to:
|
Functional reactive programming | Consider your team. | This approach requires willingness to use functional programming paradigms. Imperative APIs may still be used, but they are isolated to the static state production function they are housed in. For example the flow { } builder API may be used, and a while loop with a delay can be used for polling or other loop based operations. An example of this can be seen in the state production for the search screen. |
For:
Screens with a high number of contributions in response to user needs or product requirements
Product teams where ownership of a feature regularly changes hands
Product features where multiple teams contribute simultaneously
The benefits of this UDF implementation may justify its strictness. The major benefits are consistency, scalability and repeatability of the pipeline along with its strong concurrency and atomicity guarantees.
The same approach was used to produce state for all screens in the app listed below:
Here are some of their screenshots:
The entire static state production pipeline for PodcastShowDetailUiState
follows:
sealed class PodcastShowDetailAction { object Retry : PodcastShowDetailAction() data class LoadAround(val podcastQuery: PodcastQuery?) : PodcastShowDetailAction() } data class PodcastShowDetailUiState( val currentQuery: PodcastQuery, val podcastShow: PodcastShow? = null, val currentlyPlayingEpisode: PodcastEpisode? = null, val isCurrentlyPlayingEpisodePaused: Boolean? = null, val loadingState: LoadingState = LoadingState.LOADING, val episodesForShow: TiledList<PodcastQuery, PodcastEpisode> = emptyTiledList() ) { enum class LoadingState { IDLE, LOADING, PLAYBACK_LOADING, ERROR } } fun CoroutineScope.podcastShowDetailStateProducer( showId: String, countryCode: String, podcastsRepository: PodcastsRepository, getCurrentlyPlayingEpisodePlaybackStateUseCase: GetCurrentlyPlayingEpisodePlaybackStateUseCase, ) = actionStateFlowProducer<PodcastShowDetailAction, PodcastShowDetailUiState>( initialState = PodcastShowDetailUiState( currentQuery = PodcastQuery( showId = showId, countryCode = countryCode, page = Page(offset = 0) ) ), mutationFlows = listOf( getCurrentlyPlayingEpisodePlaybackStateUseCase.playbackStateMutations(), podcastsRepository.fetchShowMutations( showId = showId, countryCode = countryCode ) ), actionTransform = { actions -> actions.toMutationStream { when (val action = type()) { is PodcastShowDetailAction.LoadAround -> action.flow.episodeLoadMutations( podcastsRepository = podcastsRepository ) is PodcastShowDetailAction.Retry -> action.flow.retryMutations( podcastsRepository = podcastsRepository, showId = showId, countryCode = countryCode ) } } } ) private fun GetCurrentlyPlayingEpisodePlaybackStateUseCase.playbackStateMutations(): Flow<Mutation<PodcastShowDetailUiState>> = currentlyPlayingEpisodePlaybackStateStream .mapToMutation { when (it) { is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Ended -> copy( isCurrentlyPlayingEpisodePaused = null, currentlyPlayingEpisode = null ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Loading -> copy( loadingState = PodcastShowDetailUiState.LoadingState.PLAYBACK_LOADING ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Paused -> copy( currentlyPlayingEpisode = it.pausedEpisode, isCurrentlyPlayingEpisodePaused = true ) is GetCurrentlyPlayingEpisodePlaybackStateUseCase.PlaybackState.Playing -> copy( loadingState = PodcastShowDetailUiState.LoadingState.IDLE, isCurrentlyPlayingEpisodePaused = when (isCurrentlyPlayingEpisodePaused) { null, true -> false else -> isCurrentlyPlayingEpisodePaused }, currentlyPlayingEpisode = it.playingEpisode ) } } private fun Flow<PodcastShowDetailAction.Retry>.retryMutations( podcastsRepository: PodcastsRepository, showId: String, countryCode: String ): Flow<Mutation<PodcastShowDetailUiState>> = mapLatestToManyMutations { emit { copy( loadingState = PodcastShowDetailUiState.LoadingState.Loading, ) } emitAll( podcastsRepository.fetchShowMutations( showId = showId, countryCode = countryCode ) ) } context(SuspendingStateHolder<PodcastShowDetailUiState>) private suspend fun Flow<PodcastShowDetailAction.LoadAround>.episodeLoadMutations( podcastsRepository: PodcastsRepository ): Flow<Mutation<PodcastShowDetailUiState>> = map { it.podcastQuery ?: state().currentQuery } .toTiledList( startQuery = state().currentQuery, queryFor = { copy(page = it) }, fetcher = podcastsRepository::podcastsFor ) .mapToMutation { copy(episodesForShow = it.distinctBy(PodcastEpisode::id)) } private fun PodcastsRepository.fetchShowMutations( showId: String, countryCode: String ) = flow<Mutation<PodcastShowDetailUiState>> { val result = fetchPodcastShow( showId = showId, countryCode = countryCode, ) if (result is FetchedResource.Success) emit { copy( loadingState = PodcastShowDetailUiState.LoadingState.IDLE, podcastShow = result.data ) } else emit { copy(loadingState = PodcastShowDetailUiState.LoadingState.ERROR) } }