In modern Android and Kotlin development, handling asynchronous data streams efficiently is crucial. Whether you’re working with API responses, user input events, or real-time updates, Kotlin Flow provides a powerful, coroutine-based solution. In this guide, we’ll explore Kotlin Flow in-depth, covering its concepts, operators, builders, real-world use cases, and best practices.
What is Kotlin Flow?
Kotlin Flow is a cold, asynchronous, and reactive stream API introduced as part of Kotlin Coroutines. It is designed to handle sequential data streams efficiently, making it a great alternative to RxJava and LiveData.
Key Characteristics of Flow
- Cold Stream — A Flow only starts emitting values when it is collected.
- Sequential Execution — Emissions, transformations, and collections occur one after another.
- Backpressure Support — Unlike RxJava, Flow automatically handles backpressure.
- Coroutine Friendly — Works seamlessly with Kotlin Coroutines.
Basic Example of Flow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
emit(i) // Emitting values one by one
}
}
fun main() = runBlocking {
simpleFlow().collect { value -> println(value) }
}
Output:
1
2
3
4
5
Use Flow when dealing with data streams that require coroutine support and backpressure handling.
Flow Builders
1. flow {}
– Custom Flow Creation
val customFlow: Flow<Int> = flow {
for (i in 1..3) {
emit(i) // Emit values
}
}
2. flowOf()
– Flow from Fixed Values
val numbersFlow = flowOf(1, 2, 3, 4, 5)
3. asFlow()
– Convert Collections to Flow
val listFlow = listOf(1, 2, 3).asFlow()
Transforming Flow Data with Operators
1. map {}
– Transform Each Element
simpleFlow().map { it * 2 }.collect { println(it) }
Output: 2, 4, 6, 8, 10
2. filter {}
– Filter Elements
simpleFlow().filter { it % 2 == 0 }.collect { println(it) }
Output: 2, 4
3. collectLatest {}
– Cancel Previous Collection
simpleFlow().collectLatest { value ->
println("Processing $value")
}
4. flatMapConcat {}
– Sequential Mapping
val flow1 = flowOf(1, 2, 3)
flow1.flatMapConcat { value -> flowOf(value * 10) }
.collect { println(it) }
Output: 10, 20, 30
Handling Flow Cancellation & Exceptions
1. Cancelling a Flow
val job = launch {
simpleFlow().collect { println(it) }
}
delay(2000)
job.cancel() // Cancels the Flow
2. Handling Exceptions
flow {
emit(1)
throw RuntimeException("Error!")
}.catch { e -> println("Caught: ${e.message}") }
.collect { println(it) }
StateFlow & SharedFlow
StateFlow — Holds a State
StateFlow is similar to LiveData and is used to store a single state that updates over time.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val stateFlow = MutableStateFlow(0)
val job = launch {
stateFlow.collect { value ->
println("Collector received: $value")
}
}
delay(500)
stateFlow.value = 1
delay(500)
stateFlow.value = 2
delay(500)
job.cancel()
}
//Output
Collector received: 0
Collector received: 1
Collector received: 2
StateFlow always emits the latest value to collectors. It behaves like LiveData, and collectors receive the current state immediately upon subscription.
SharedFlow — Broadcasts Data to Multiple Collectors
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Create a MutableSharedFlow with a buffer capacity of 2
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 2
)
val job1 = launch {
sharedFlow.collect { value ->
println("Collector 1 received: $value")
}
}
val job2 = launch {
sharedFlow.collect { value ->
println("Collector 2 received: $value")
}
}
// Delay to ensure collectors are active before emitting
delay(100)
sharedFlow.emit(10)
sharedFlow.emit(20)
delay(500)
job1.cancel()
job2.cancel()
}
// Output
Collector 1 received: 10
Collector 2 received: 10
Collector 1 received: 20
Collector 2 received: 20
By default, MutableSharedFlow
has no buffer and replay = 0
, meaning it won’t emit any value unless a collector is ready at the moment of emission.
- We set
extraBufferCapacity = 2
, allowingSharedFlow
to buffer a couple of values while the collectors start. - We add
delay(100)
before emitting, ensuring both collectors are already collecting.
This way, both collectors reliably receive all values.
SharedFlow broadcasts emitted values to all active collectors. It’s great for one-time events (like navigation, snackbars, etc.), and multiple collectors will receive the same emissions.
When to Use Kotlin Flow?
- Fetching API data periodically
- Streaming real-time UI updates
- Handling user input events
- Data transformation pipelines
Conclusion
Kotlin Flow is a modern, efficient, and coroutine-friendly way to handle asynchronous data streams. By using Flow builders, operators, exception handling, and StateFlow/SharedFlow, developers can build scalable, efficient applications.