Escaping Callback Hell: From Blocking Calls to Reactive Streams in Kotlin
This article explains the code types involved in synchronous remote calls, demonstrates how blocking IO leads to callback hell, and shows how to simplify the programming model using Java CompletableFuture, AsyncHttpClient, and Project Reactor’s reactive streams to efficiently retrieve city data.
This article briefly introduces the code types required for synchronous remote calls, then demonstrates how layered non‑blocking I/O can use resources efficiently, highlights the complexity of callback hell, and shows how reactive‑stream approaches simplify the programming model.
1. Target Service
The client calls a service exposing two endpoints. A request to /cityids returns a list of city IDs, e.g.:
[
1,
2,
3,
4,
5,
6,
7
]A request to /cities/{id} returns the details of a city, for example:
{
"country": "USA",
"id": 1,
"name": "Portland",
"pop": 1600000
}The client must first obtain the list of city IDs and then fetch each city's details to assemble a complete list.
2. Synchronous Calls
Using Spring Framework’s RestTemplate, the Kotlin functions for fetching city IDs and city details are:
private fun getCityIds(): List<String> {
val cityIdsEntity = restTemplate.exchange(
"http://localhost:$localServerPort/cityids",
HttpMethod.GET,
null,
object : ParameterizedTypeReference<List<String>>() {}
)
return cityIdsEntity.body!!
}
private fun getCityForId(id: String): City {
return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}These functions can be combined to return the full city list:
val cityIds = getCityIds()
val cities = cityIds.stream()
.map { cityId -> getCityForId(cityId) }
.collect(Collectors.toList())
cities.forEach { city -> LOGGER.info(city.toString()) }This straightforward code hides eight blocking calls (one to obtain the IDs and one per city), each executed on a separate thread.
3. Non‑Blocking I/O with Callbacks
Using AsyncHttpClient, a remote call returns a ListenableFuture. A callback can be attached to process the response when it becomes available:
val responseFuture: ListenableFuture<Response> = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
responseFuture.addListener(Runnable {
val response = responseFuture.get()
val body = response.responseBody
val cityIds: List<Long> = objectMapper.readValue(body, object : TypeReference<List<Long>>() {})
// further processing …
}, executor)To fetch each city’s details, additional callbacks are nested, leading to the classic “callback hell” situation.
4. Using Java CompletableFuture
Replacing ListenableFuture with CompletableFuture provides a richer set of operators. For example, obtaining city IDs:
private fun getCityIds(): CompletableFuture<List<Long>> {
return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
}
}Fetching a single city’s detail:
private fun getCityDetail(cityId: Long): CompletableFuture<City> {
return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
objectMapper.readValue(s, City::class.java)
}
}Combining the futures:
val cityIdsFuture = getCityIds()
val citiesFuture = cityIdsFuture.thenCompose { ids ->
val cityFutures = ids.map { id -> getCityDetail(id) }
CompletableFuture.allOf(*cityFutures.toTypedArray())
.thenApply { cityFutures.map { it.join() } }
}5. Using Project Reactor
Project Reactor implements the Reactive Streams specification. The service can return a Flux<Long> for city IDs:
private fun getCityIds(): Flux<Long> {
return webClient.get()
.uri("/cityids")
.exchange()
.flatMapMany { response ->
LOGGER.info("Received city IDs")
response.bodyToFlux<Long>()
}
}And a Mono<City> for a single city’s detail:
private fun getCityDetail(cityId: Long): Mono<City> {
return webClient.get()
.uri("/cities/{id}", cityId)
.exchange()
.flatMap { response ->
LOGGER.info("Received city detail")
response.bodyToMono()
}
}Combining them yields a concise reactive pipeline:
val cityIdsFlux = getCityIds()
val citiesFlux = cityIdsFlux.flatMap { id -> getCityDetail(id) }
return citiesFluxThis approach is far more expressive than the callback‑based version and demonstrates the advantage of reactive streams for asynchronous remote calls.
6. Conclusion
Reactive‑stream based solutions, especially with Project Reactor, clean up the tangled callbacks of traditional asynchronous code, offering a natural way to transform and combine data streams while crossing asynchronous boundaries such as remote service calls.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
