Mastering Kotlin Flow Cancellation: 13 Essential Techniques Explained
This comprehensive guide explores why cancelling Kotlin Flow is crucial for resource management, user experience, network efficiency, and battery life, and walks through thirteen practical cancellation methods—including Job cancellation, structured concurrency, time‑outs, boolean flags, operators like cancellable, take, first, single, and advanced post‑processing techniques—complete with runnable code examples and outputs.
Why Flow Cancellation Matters
In Kotlin Flow, cancelling a long‑running stream is essential for resource management, user experience, network efficiency, and battery life.
Resource Management: Prevent memory leaks and unnecessary CPU usage.
User Experience: Stop outdated operations when the user leaves a screen.
Network Efficiency: Cancel unnecessary network requests.
Battery Life: Reduce background processing on mobile devices.
Flow cancellation is more than just calling cancel(); various techniques have specific use‑cases and nuances.
Method 1: Job Cancellation – Basic Usage
The simplest way to cancel a Flow is by cancelling its Job. When a Job is cancelled, all Flows running in that coroutine scope are cancelled.
suspend fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
createNumberFlow().collect { value ->
println("Received: $value")
}
}
delay(3.seconds)
println("Cancelling job...")
job.cancel()
delay(1.seconds)
println("Program finished")
}
fun createNumberFlow() = flow {
repeat(10) { i ->
println("Emitting: $i")
emit(i)
delay(1.seconds)
}
}Output:
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Cancelling job...
Program finishedUnderlying principle: Calling job.cancel() sends a cancellation signal to the coroutine. The flow { … } builder checks cancellation at suspension points such as delay() and emit(). Once cancelled, the Flow stops emitting and the collector stops receiving values.
If a Flow has no suspension points, it will not respond to cancellation. Use ensureActive() to make it cancellable:
fun cancellableFlow() = flow {
repeat(1_000_000) { i ->
ensureActive()
emit(i)
}
}Method 2: Scope Cancellation – Structured Concurrency
Canceling a CoroutineScope stops all coroutines launched in that scope, demonstrating the advantage of structured concurrency.
class DataRepository {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
fun fetchDataStream(): Flow<String> = flow {
repeat(Int.MAX_VALUE) { i ->
emit("Data item $i")
delay(500.milliseconds)
}
}.flowOn(Dispatchers.IO)
fun startFetching(): Job {
return scope.launch {
fetchDataStream()
.catch { e -> println("Error: ${e.message}") }
.collect { data -> println("Processing: $data") }
}
}
fun cleanup() {
scope.cancel("Repository is being cleaned up")
}
}
suspend fun main() {
val repository = DataRepository()
val fetchJob = repository.startFetching()
delay(3.seconds)
println("Cleaning up repository...")
repository.cleanup()
delay(1.seconds)
println("Done")
}Output:
Processing: Data item 0
Processing: Data item 1
Processing: Data item 2
Processing: Data item 3
Processing: Data item 4
Processing: Data item 5
Cleaning up repository...
DoneMethod 3: withTimeout – Time‑Based Cancellation
Use withTimeout to cancel a Flow that runs longer than a specified duration.
suspend fun main() {
try {
withTimeout(5.seconds) {
slowDataFlow().collect { value ->
println("Received: $value")
}
}
} catch (e: TimeoutCancellationException) {
println("Operation timed out: ${e.message}")
}
}
fun slowDataFlow() = flow {
repeat(10) { i ->
println("Emitting: $i")
emit(i)
delay(1.seconds)
}
}Output:
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Operation timed out: Timed out waiting for 5000 msMethod 4: withTimeoutOrNull – Graceful Timeout
withTimeoutOrNullreturns null on timeout instead of throwing.
suspend fun main() {
val result = withTimeoutOrNull(5.seconds) {
slowDataFlow().collect { value -> println("Received: $value") }
"Done"
}
if (result == null) {
println("Operation timed out")
} else {
println("Operation finished: $result")
}
}Output:
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Operation timed outMethod 5: Boolean Flag – Manual Cancellation
class ManualCancellableFlow {
private val shouldStop = AtomicBoolean(false)
fun createFlow() = flow {
var i = 0
while (!shouldStop.get()) {
emit("Item: $i")
i++
delay(1.seconds)
}
}
fun stop() { shouldStop.set(true) }
}
suspend fun main() {
val manualFlow = ManualCancellableFlow()
val job = CoroutineScope(Dispatchers.Default).launch {
manualFlow.createFlow().collect { println(it) }
}
delay(3.5.seconds)
manualFlow.stop()
println("Stopped manually")
job.join()
}Output:
Item: 0
Item: 1
Item: 2
Stopped manuallyMethod 6: cancellable() Operator
The cancellable() operator forces a Flow to check ensureActive() before emitting each element, making otherwise non‑cancellable Flows responsive.
suspend fun main() = coroutineScope {
println("--- Without cancellable() ---")
val job1 = launch {
(1..5).asFlow()
.onEach { delay(100) }
.collect { value -> println("Collecting $value") }
}
delay(250)
job1.cancelAndJoin()
println("Job 1 cancelled")
println("
--- With cancellable() ---")
val job2 = launch {
(1..5).asFlow()
.cancellable()
.onEach { delay(100) }
.collect { value -> println("Collecting $value") }
}
delay(250)
job2.cancelAndJoin()
println("Job 2 cancelled")
}Output:
--- Without cancellable() ---
Collecting 1
Collecting 2
Collecting 3
Collecting 4
Collecting 5
Job 1 cancelled
--- With cancellable() ---
Collecting 1
Collecting 2
Job 2 cancelledMethod 7: take and takeWhile
takelimits the number of emitted items; takeWhile continues while a predicate holds.
suspend fun main() {
createNumberFlow()
.take(3)
.collect { value -> println("Received: $value") }
}Output:
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2 suspend fun main() {
createNumberFlow()
.takeWhile { it < 3 }
.collect { value -> println("Received: $value") }
}Output:
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3Method 8: first() – First Element Terminal Operator
suspend fun main() {
val numberFlow = flow {
println("Flow started")
emit(1)
println("This will not be printed")
emit(2)
}
val firstNumber = numberFlow.first()
println("First number is: $firstNumber")
}Output:
Flow started
First number is: 1Method 9: single() – Expect Exactly One Element
suspend fun main() {
// Success case
val singleElementFlow = flowOf(42)
println("Single element: ${singleElementFlow.single()}")
// Multiple elements – throws IllegalStateException
try {
val multipleElementsFlow = flowOf(1, 2, 3)
multipleElementsFlow.single()
} catch (e: IllegalStateException) {
println("Caught expected exception: ${e.message}")
}
// Empty flow – throws NoSuchElementException
try {
val emptyFlow = emptyFlow<Int>()
emptyFlow.single()
} catch (e: NoSuchElementException) {
println("Caught expected exception: ${e.message}")
}
}Output:
Single element: 42
Caught expected exception: Flow has more than one element
Caught expected exception: Flow is emptyMethod 10: Boolean Terminal Operators – any() , all() , none()
suspend fun main() {
val numberFlow = flow {
emit(1); println("Checking 1")
emit(2); println("Checking 2")
emit(3); println("This will not be printed")
emit(4)
}
val hasNumberGreaterThanTwo = numberFlow.any { it > 2 }
println("Has a number greater than two: $hasNumberGreaterThanTwo")
}Output:
Checking 1
Checking 2
Has a number greater than two: trueMethod 11: transformWhile – Advanced Conditional Transformation
import kotlinx.coroutines.flow.*
suspend fun main() {
(1..10).asFlow()
.transformWhile { value ->
if (value % 2 == 0) {
false // stop on first even number
} else {
emit(value * value) // emit square of odd numbers
true
}
}
.collect { println(it) }
}Output:
1
9Method 12: collectLatest – Keep Only the Latest Value
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
val queryFlow = flow {
emit("A"); delay(100)
emit("AB"); delay(100)
emit("ABC"); delay(300)
}
queryFlow.collectLatest { query ->
println("Searching for '$query'")
delay(200) // simulate network request
println("Finished search for '$query'")
}
}Output:
Searching for 'A'
Searching for 'AB'
Searching for 'ABC'
Finished search for 'ABC'Method 13: SharedFlow and StateFlow – Custom Cancellation
Hot flows like SharedFlow and StateFlow continue emitting regardless of collectors. The producer coroutine must monitor isActive to stop emission.
class DataProducer(private val scope: CoroutineScope) {
private val _sharedFlow = MutableSharedFlow<Int>()
val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow()
private var producerJob: Job? = null
fun start() {
producerJob = scope.launch(Dispatchers.Default) {
var i = 0
while (isActive) {
println("Producing $i")
_sharedFlow.emit(i++)
delay(1000)
}
}
}
fun stop() { producerJob?.cancel() }
}
suspend fun main() = coroutineScope {
val producer = DataProducer(this)
val collectorJob = launch {
producer.sharedFlow.collect { println("Collector 1 received: $it") }
}
producer.start()
delay(3500)
println("Stopping producer...")
producer.stop()
delay(1000)
collectorJob.cancel()
println("Done")
}Output:
Producing 0
Collector 1 received: 0
Producing 1
Collector 1 received: 1
Producing 2
Collector 1 received: 2
Producing 3
Collector 1 received: 3
Stopping producer...
DonePost‑Processing 1: catch – Exception Handling
Use catch to handle upstream exceptions, but re‑throw CancellationException to preserve cancellation semantics.
suspend fun main() = coroutineScope {
flow {
emit(1); delay(100)
emit(2); throw IOException("Something went wrong")
}
.onEach { println("onEach: $it") }
.catch { e ->
println("Caught exception: ${e.javaClass.simpleName}")
if (e is CancellationException) throw e
}
.onCompletion { cause -> println("onCompletion with cause: $cause") }
.collect { println("Collected $it") }
}Output:
onEach: 1
Collected 1
onEach: 2
Caught exception: IOException
onCompletion with cause: nullPost‑Processing 2: onCompletion – Resource Cleanup
suspend fun main() = coroutineScope {
val job = launch {
(1..5).asFlow()
.onEach { delay(100); println("Emitting $it") }
.onCompletion { cause ->
if (cause != null) {
println("Flow was cancelled with ${cause.javaClass.simpleName}")
} else {
println("Flow completed normally")
}
}
.collect { println("Collecting $it") }
}
delay(250)
job.cancelAndJoin()
println("Job cancelled")
}Output:
Emitting 1
Collecting 1
Emitting 2
Collecting 2
Flow was cancelled with CancellationException
Job cancelledKey Takeaways
Job Cancellation: Flow cancellation is cooperative and relies on suspension points or explicit ensureActive() checks.
Structured Concurrency: Launching Flows within a CoroutineScope ties their lifecycle to the scope, enabling automatic cancellation.
Timeouts: withTimeout and withTimeoutOrNull provide reliable time‑based cancellation.
Manual Flags: Use an AtomicBoolean or similar flag when standard mechanisms are insufficient.
Operators: take, takeWhile, first, single, and boolean operators ( any, all, none) automatically complete and cancel Flows based on element count or predicates.
cancellable : Makes otherwise non‑cancellable Flows responsive to cancellation.
Cleanup: Use onCompletion for resource release and catch for exception handling, remembering to re‑throw CancellationException when appropriate.
AndroidPub
Senior Android Developer & Interviewer, regularly sharing original tech articles, learning resources, and practical interview guides. Welcome to follow and contribute!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
