Kotlin Flow: A Deep Dive into Reactive Streams in Kotlin

Table of Contents

In modern Android and Kotlin development, handling asynchronous data streams efficiently is crucial. Whether you’re working with API responses, user input events, or real-time updates, Kotlin Flow provides a powerful, coroutine-based solution. In this guide, we’ll explore Kotlin Flow in-depth, covering its concepts, operators, builders, real-world use cases, and best practices.

What is Kotlin Flow?

Kotlin Flow is a cold, asynchronous, and reactive stream API introduced as part of Kotlin Coroutines. It is designed to handle sequential data streams efficiently, making it a great alternative to RxJava and LiveData.

Key Characteristics of Flow

  • Cold Stream — A Flow only starts emitting values when it is collected.
  • Sequential Execution — Emissions, transformations, and collections occur one after another.
  • Backpressure Support — Unlike RxJava, Flow automatically handles backpressure.
  • Coroutine Friendly — Works seamlessly with Kotlin Coroutines.

Basic Example of Flow

Kotlin
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        emit(i)  // Emitting values one by one
    }
}

fun main() = runBlocking {
    simpleFlow().collect { value -> println(value) }
}

Output:

Kotlin
1
2
3
4
5

Use Flow when dealing with data streams that require coroutine support and backpressure handling.

Flow Builders

1. flow {} – Custom Flow Creation

Kotlin
val customFlow: Flow<Int> = flow {
    for (i in 1..3) {
        emit(i)  // Emit values
    }
}

2. flowOf() – Flow from Fixed Values

Kotlin
val numbersFlow = flowOf(1, 2, 3, 4, 5)

3. asFlow() – Convert Collections to Flow

Kotlin
val listFlow = listOf(1, 2, 3).asFlow()

Transforming Flow Data with Operators

1. map {} – Transform Each Element

Kotlin
simpleFlow().map { it * 2 }.collect { println(it) }

Output: 2, 4, 6, 8, 10

2. filter {} – Filter Elements

Kotlin
simpleFlow().filter { it % 2 == 0 }.collect { println(it) }

Output: 2, 4

3. collectLatest {} – Cancel Previous Collection

Kotlin
simpleFlow().collectLatest { value ->
    println("Processing $value")
}

4. flatMapConcat {} – Sequential Mapping

Kotlin
val flow1 = flowOf(1, 2, 3)
flow1.flatMapConcat { value -> flowOf(value * 10) }
    .collect { println(it) }

Output: 10, 20, 30

Handling Flow Cancellation & Exceptions

1. Cancelling a Flow

Kotlin
val job = launch {
    simpleFlow().collect { println(it) }
}
delay(2000)
job.cancel()  // Cancels the Flow

2. Handling Exceptions

Kotlin
flow {
    emit(1)
    throw RuntimeException("Error!")
}.catch { e -> println("Caught: ${e.message}") }
 .collect { println(it) }

StateFlow & SharedFlow

StateFlow — Holds a State

StateFlow is similar to LiveData and is used to store a single state that updates over time.

Kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val stateFlow = MutableStateFlow(0)

    val job = launch {
        stateFlow.collect { value ->
            println("Collector received: $value")
        }
    }

    delay(500)
    stateFlow.value = 1
    delay(500)
    stateFlow.value = 2
    delay(500)
    job.cancel()
}


//Output

Collector received: 0
Collector received: 1
Collector received: 2

StateFlow always emits the latest value to collectors. It behaves like LiveData, and collectors receive the current state immediately upon subscription.

SharedFlow — Broadcasts Data to Multiple Collectors

Kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Create a MutableSharedFlow with a buffer capacity of 2
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,
        extraBufferCapacity = 2
    )

    val job1 = launch {
        sharedFlow.collect { value ->
            println("Collector 1 received: $value")
        }
    }

    val job2 = launch {
        sharedFlow.collect { value ->
            println("Collector 2 received: $value")
        }
    }

    // Delay to ensure collectors are active before emitting
    delay(100)

    sharedFlow.emit(10)
    sharedFlow.emit(20)

    delay(500)

    job1.cancel()
    job2.cancel()
}

// Output

Collector 1 received: 10
Collector 2 received: 10
Collector 1 received: 20
Collector 2 received: 20

By default, MutableSharedFlow has no buffer and replay = 0, meaning it won’t emit any value unless a collector is ready at the moment of emission.

  • We set extraBufferCapacity = 2, allowing SharedFlow to buffer a couple of values while the collectors start.
  • We add delay(100) before emitting, ensuring both collectors are already collecting.

This way, both collectors reliably receive all values.

SharedFlow broadcasts emitted values to all active collectors. It’s great for one-time events (like navigation, snackbars, etc.), and multiple collectors will receive the same emissions.

When to Use Kotlin Flow?

  • Fetching API data periodically
  • Streaming real-time UI updates
  • Handling user input events
  • Data transformation pipelines

Conclusion

Kotlin Flow is a modern, efficient, and coroutine-friendly way to handle asynchronous data streams. By using Flow builders, operators, exception handling, and StateFlow/SharedFlow, developers can build scalable, efficient applications.

Skill Up: Software & AI Updates!

Receive our latest insights and updates directly to your inbox

Related Posts

error: Content is protected !!