Escaping Callback Hell: From Blocking Calls to Reactive Streams in Kotlin

This article explains the code types involved in synchronous remote calls, demonstrates how blocking IO leads to callback hell, and shows how to simplify the programming model using Java CompletableFuture, AsyncHttpClient, and Project Reactor’s reactive streams to efficiently retrieve city data.

Programmer DD
Programmer DD
Programmer DD
Escaping Callback Hell: From Blocking Calls to Reactive Streams in Kotlin

This article briefly introduces the code types required for synchronous remote calls, then demonstrates how layered non‑blocking I/O can use resources efficiently, highlights the complexity of callback hell, and shows how reactive‑stream approaches simplify the programming model.

1. Target Service

The client calls a service exposing two endpoints. A request to /cityids returns a list of city IDs, e.g.:

[
    1,
    2,
    3,
    4,
    5,
    6,
    7
]

A request to /cities/{id} returns the details of a city, for example:

{
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
}

The client must first obtain the list of city IDs and then fetch each city's details to assemble a complete list.

2. Synchronous Calls

Using Spring Framework’s RestTemplate, the Kotlin functions for fetching city IDs and city details are:

private fun getCityIds(): List<String> {
    val cityIdsEntity = restTemplate.exchange(
        "http://localhost:$localServerPort/cityids",
        HttpMethod.GET,
        null,
        object : ParameterizedTypeReference<List<String>>() {}
    )
    return cityIdsEntity.body!!
}

private fun getCityForId(id: String): City {
    return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}

These functions can be combined to return the full city list:

val cityIds = getCityIds()
val cities = cityIds.stream()
    .map { cityId -> getCityForId(cityId) }
    .collect(Collectors.toList())
cities.forEach { city -> LOGGER.info(city.toString()) }

This straightforward code hides eight blocking calls (one to obtain the IDs and one per city), each executed on a separate thread.

3. Non‑Blocking I/O with Callbacks

Using AsyncHttpClient, a remote call returns a ListenableFuture. A callback can be attached to process the response when it becomes available:

val responseFuture: ListenableFuture<Response> = asyncHttpClient
    .prepareGet("http://localhost:$localServerPort/cityids")
    .execute()

responseFuture.addListener(Runnable {
    val response = responseFuture.get()
    val body = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue(body, object : TypeReference<List<Long>>() {})
    // further processing …
}, executor)

To fetch each city’s details, additional callbacks are nested, leading to the classic “callback hell” situation.

4. Using Java CompletableFuture

Replacing ListenableFuture with CompletableFuture provides a richer set of operators. For example, obtaining city IDs:

private fun getCityIds(): CompletableFuture<List<Long>> {
    return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cityids")
        .execute()
        .toCompletableFuture()
        .thenApply { response ->
            val s = response.responseBody
            objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
        }
}

Fetching a single city’s detail:

private fun getCityDetail(cityId: Long): CompletableFuture<City> {
    return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
        .execute()
        .toCompletableFuture()
        .thenApply { response ->
            val s = response.responseBody
            objectMapper.readValue(s, City::class.java)
        }
}

Combining the futures:

val cityIdsFuture = getCityIds()
val citiesFuture = cityIdsFuture.thenCompose { ids ->
    val cityFutures = ids.map { id -> getCityDetail(id) }
    CompletableFuture.allOf(*cityFutures.toTypedArray())
        .thenApply { cityFutures.map { it.join() } }
}

5. Using Project Reactor

Project Reactor implements the Reactive Streams specification. The service can return a Flux<Long> for city IDs:

private fun getCityIds(): Flux<Long> {
    return webClient.get()
        .uri("/cityids")
        .exchange()
        .flatMapMany { response ->
            LOGGER.info("Received city IDs")
            response.bodyToFlux<Long>()
        }
}

And a Mono<City> for a single city’s detail:

private fun getCityDetail(cityId: Long): Mono<City> {
    return webClient.get()
        .uri("/cities/{id}", cityId)
        .exchange()
        .flatMap { response ->
            LOGGER.info("Received city detail")
            response.bodyToMono()
        }
}

Combining them yields a concise reactive pipeline:

val cityIdsFlux = getCityIds()
val citiesFlux = cityIdsFlux.flatMap { id -> getCityDetail(id) }
return citiesFlux

This approach is far more expressive than the callback‑based version and demonstrates the advantage of reactive streams for asynchronous remote calls.

6. Conclusion

Reactive‑stream based solutions, especially with Project Reactor, clean up the tangled callbacks of traditional asynchronous code, offering a natural way to transform and combine data streams while crossing asynchronous boundaries such as remote service calls.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

springKotlinreactive-programmingCompletableFutureProject ReactorAsyncHttpClient
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.