Spring Boot + Kafka: Real‑Time Tracking of Long‑Running Tasks
This article demonstrates how to use Spring Boot 3.5.0 together with Kafka as a high‑throughput message bridge to asynchronously execute long‑running tasks, publish their progress to a Kafka topic, and let clients poll a REST endpoint for real‑time status updates.
Environment : Spring Boot 3.5.0.
Long‑running tasks consume significant server resources and must run asynchronously to avoid blocking the client. The client polls a provided URL for progress. Kafka is used as the transport for task‑progress messages, enabling real‑time, reliable tracking.
2.1 Docker setup for Kafka
docker run -d \
--name kafka \
--ulimit nofile=65536:65536 \
-e TZ=Asia/Shanghai \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e [email protected]:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-p 9092:9092 \
--memory=512m \
--cpus="1.0" \
bitnami/kafkaKafka‑UI
docker run -d \
--name kafka-ui \
-e KAFKA_CLUSTERS_0_NAME=kraft-kafka \
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=localhost:9092 \
-e DYNAMIC_CONFIG_ENABLED=true \
-p 8080:8080 \
provectuslabs/kafka-ui2.3 Application configuration
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: task-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
'[spring.json.trusted.packages]': '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
'[spring.json.trusted.packages]': '*'2.4 Data model
public record TaskStatus(String taskId, String taskName, float percentageComplete, Status status, String resultUrl) {
public enum Status { SUBMITTED, STARTED, RUNNING, FINISHED, TERMINATED }
}
public record TaskRequest(String name) { }2.5 Producer service
@Service
public class KafkaProducerService {
private final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
private final KafkaTemplate<String, TaskStatus> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, TaskStatus> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String topicName, String key, TaskStatus value) {
var future = kafkaTemplate.send(topicName, key, value);
future.whenComplete((sendResult, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(sendResult);
}
logger.info("Task status sent to Kafka topic: " + value);
});
}
}Consumer service
@Service
public class KafkaConsumerService {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, TaskStatus> kafkaConsumer;
public KafkaConsumerService(KafkaConsumer<String, TaskStatus> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}
public TaskStatus getLatestTaskStatus(String taskId) {
ConsumerRecord<String, TaskStatus> latestUpdate = null;
// KafkaConsumer is not thread‑safe; concurrent calls may cause ConcurrentModificationException
ConsumerRecords<String, TaskStatus> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, TaskStatus>> it = consumerRecords.records(taskId).iterator();
while (it.hasNext()) {
latestUpdate = it.next();
}
logger.info("Task [{}] execution status: {}", taskId, latestUpdate.value());
}
return latestUpdate != null ? latestUpdate.value() : null;
}
}2.6 Asynchronous task service
@Service
public class TaskService {
private final KafkaConsumer<String, TaskStatus> kafkaConsumer;
private final KafkaProducerService kafkaProducerService;
private final KafkaAdmin kafkaAdmin;
public TaskService(KafkaConsumer<String, TaskStatus> kafkaConsumer,
KafkaProducerService kafkaProducerService,
KafkaAdmin kafkaAdmin) {
this.kafkaConsumer = kafkaConsumer;
this.kafkaProducerService = kafkaProducerService;
this.kafkaAdmin = kafkaAdmin;
}
@Async
public void process(String taskId, TaskRequest taskRequest, UriComponentsBuilder b) {
try {
createNewTopic(taskId);
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 0.0f, Status.SUBMITTED, null));
Thread.sleep(2000L);
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 10.0f, Status.STARTED, null));
Thread.sleep(5000L);
float progress = 10.0f;
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(new Random().nextLong(4000));
progress += new Random().nextInt(9);
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), progress, Status.RUNNING, null));
}
UriComponents resultURL = b.path("/tasks/{id}/result").buildAndExpand(taskId);
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 100.0f, Status.FINISHED, resultURL.toUriString()));
} catch (InterruptedException | ExecutionException e) {
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 100.0f, Status.TERMINATED, null));
throw new RuntimeException(e);
}
}
private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {
Map<String, String> topicConfig = new HashMap<>();
// retain 24 hours
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000));
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
}
kafkaConsumer.subscribe(Collections.singletonList(topicName));
}
private void updateTaskExecutionProgess(TaskStatus taskStatus) {
kafkaProducerService.send(taskStatus.taskId(), taskStatus.taskId(), taskStatus);
}
}Kafka consumer bean definition
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<String, TaskStatus> kafkaConsumer(ConsumerFactory<String, TaskStatus> consumerFactory) {
return (KafkaConsumer<String, TaskStatus>) consumerFactory.createConsumer();
}
}2.5 Controller API
@RestController
@RequestMapping("/tasks")
public class TaskController {
private final TaskService taskService;
private final KafkaConsumerService kafkaConsumerService;
public TaskController(TaskService taskService, KafkaConsumerService kafkaConsumerService) {
this.taskService = taskService;
this.kafkaConsumerService = kafkaConsumerService;
}
@GetMapping
public ResponseEntity<String> processAsync(TaskRequest task, UriComponentsBuilder b) {
String taskId = UUID.randomUUID().toString().replace("-", "");
UriComponentsBuilder cloneBuilder = b.cloneBuilder();
UriComponents progressURL = b.path("/tasks/{id}/progress").buildAndExpand(taskId);
taskService.process(taskId, task, cloneBuilder);
return ResponseEntity.ok("Task progress URL: " + progressURL.toUriString());
}
@GetMapping("{taskId}/progress")
public ResponseEntity<?> getProgress(@PathVariable String taskId) {
TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId);
if (taskStatus == null) {
return ResponseEntity.ok("");
}
return ResponseEntity.ok().body(taskStatus);
}
}2.6 Testing
Creating a task via the /tasks endpoint returns a progress URL. Polling /tasks/{id}/progress shows the task status evolving from SUBMITTED → STARTED → RUNNING → FINISHED. Screenshots illustrate the UI at each stage (task creation, running, completed, and the raw Kafka topic messages).
Note : After a service restart the in‑memory consumer loses its position, so progress information cannot be retrieved unless additional persistence is added.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
