Spring Boot + Kafka: Real‑Time Tracking of Long‑Running Tasks

This article demonstrates how to use Spring Boot 3.5.0 together with Kafka as a high‑throughput message bridge to asynchronously execute long‑running tasks, publish their progress to a Kafka topic, and let clients poll a REST endpoint for real‑time status updates.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Boot + Kafka: Real‑Time Tracking of Long‑Running Tasks

Environment : Spring Boot 3.5.0.

Long‑running tasks consume significant server resources and must run asynchronously to avoid blocking the client. The client polls a provided URL for progress. Kafka is used as the transport for task‑progress messages, enabling real‑time, reliable tracking.

2.1 Docker setup for Kafka

docker run -d \
  --name kafka \
  --ulimit nofile=65536:65536 \
  -e TZ=Asia/Shanghai \
  -e KAFKA_ENABLE_KRAFT=yes \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e [email protected]:9093 \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -p 9092:9092 \
  --memory=512m \
  --cpus="1.0" \
  bitnami/kafka

Kafka‑UI

docker run -d \
  --name kafka-ui \
  -e KAFKA_CLUSTERS_0_NAME=kraft-kafka \
  -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=localhost:9092 \
  -e DYNAMIC_CONFIG_ENABLED=true \
  -p 8080:8080 \
  provectuslabs/kafka-ui

2.3 Application configuration

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    consumer:
      group-id: task-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      properties:
        '[spring.json.trusted.packages]': '*'
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        '[spring.json.trusted.packages]': '*'

2.4 Data model

public record TaskStatus(String taskId, String taskName, float percentageComplete, Status status, String resultUrl) {
  public enum Status { SUBMITTED, STARTED, RUNNING, FINISHED, TERMINATED }
}

public record TaskRequest(String name) { }

2.5 Producer service

@Service
public class KafkaProducerService {
  private final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
  private final KafkaTemplate<String, TaskStatus> kafkaTemplate;

  public KafkaProducerService(KafkaTemplate<String, TaskStatus> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  public void send(String topicName, String key, TaskStatus value) {
    var future = kafkaTemplate.send(topicName, key, value);
    future.whenComplete((sendResult, exception) -> {
      if (exception != null) {
        future.completeExceptionally(exception);
      } else {
        future.complete(sendResult);
      }
      logger.info("Task status sent to Kafka topic: " + value);
    });
  }
}

Consumer service

@Service
public class KafkaConsumerService {
  private final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
  private final KafkaConsumer<String, TaskStatus> kafkaConsumer;

  public KafkaConsumerService(KafkaConsumer<String, TaskStatus> kafkaConsumer) {
    this.kafkaConsumer = kafkaConsumer;
  }

  public TaskStatus getLatestTaskStatus(String taskId) {
    ConsumerRecord<String, TaskStatus> latestUpdate = null;
    // KafkaConsumer is not thread‑safe; concurrent calls may cause ConcurrentModificationException
    ConsumerRecords<String, TaskStatus> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200));
    if (!consumerRecords.isEmpty()) {
      Iterator<ConsumerRecord<String, TaskStatus>> it = consumerRecords.records(taskId).iterator();
      while (it.hasNext()) {
        latestUpdate = it.next();
      }
      logger.info("Task [{}] execution status: {}", taskId, latestUpdate.value());
    }
    return latestUpdate != null ? latestUpdate.value() : null;
  }
}

2.6 Asynchronous task service

@Service
public class TaskService {
  private final KafkaConsumer<String, TaskStatus> kafkaConsumer;
  private final KafkaProducerService kafkaProducerService;
  private final KafkaAdmin kafkaAdmin;

  public TaskService(KafkaConsumer<String, TaskStatus> kafkaConsumer,
                     KafkaProducerService kafkaProducerService,
                     KafkaAdmin kafkaAdmin) {
    this.kafkaConsumer = kafkaConsumer;
    this.kafkaProducerService = kafkaProducerService;
    this.kafkaAdmin = kafkaAdmin;
  }

  @Async
  public void process(String taskId, TaskRequest taskRequest, UriComponentsBuilder b) {
    try {
      createNewTopic(taskId);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 0.0f, Status.SUBMITTED, null));
      Thread.sleep(2000L);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 10.0f, Status.STARTED, null));
      Thread.sleep(5000L);
      float progress = 10.0f;
      for (int i = 0; i < 10; i++) {
        TimeUnit.MILLISECONDS.sleep(new Random().nextLong(4000));
        progress += new Random().nextInt(9);
        updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), progress, Status.RUNNING, null));
      }
      UriComponents resultURL = b.path("/tasks/{id}/result").buildAndExpand(taskId);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 100.0f, Status.FINISHED, resultURL.toUriString()));
    } catch (InterruptedException | ExecutionException e) {
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.name(), 100.0f, Status.TERMINATED, null));
      throw new RuntimeException(e);
    }
  }

  private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {
    Map<String, String> topicConfig = new HashMap<>();
    // retain 24 hours
    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000));
    NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
    try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
      adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }
    kafkaConsumer.subscribe(Collections.singletonList(topicName));
  }

  private void updateTaskExecutionProgess(TaskStatus taskStatus) {
    kafkaProducerService.send(taskStatus.taskId(), taskStatus.taskId(), taskStatus);
  }
}

Kafka consumer bean definition

@Configuration
public class KafkaConfig {
  @Bean
  public KafkaConsumer<String, TaskStatus> kafkaConsumer(ConsumerFactory<String, TaskStatus> consumerFactory) {
    return (KafkaConsumer<String, TaskStatus>) consumerFactory.createConsumer();
  }
}

2.5 Controller API

@RestController
@RequestMapping("/tasks")
public class TaskController {
  private final TaskService taskService;
  private final KafkaConsumerService kafkaConsumerService;

  public TaskController(TaskService taskService, KafkaConsumerService kafkaConsumerService) {
    this.taskService = taskService;
    this.kafkaConsumerService = kafkaConsumerService;
  }

  @GetMapping
  public ResponseEntity<String> processAsync(TaskRequest task, UriComponentsBuilder b) {
    String taskId = UUID.randomUUID().toString().replace("-", "");
    UriComponentsBuilder cloneBuilder = b.cloneBuilder();
    UriComponents progressURL = b.path("/tasks/{id}/progress").buildAndExpand(taskId);
    taskService.process(taskId, task, cloneBuilder);
    return ResponseEntity.ok("Task progress URL: " + progressURL.toUriString());
  }

  @GetMapping("{taskId}/progress")
  public ResponseEntity<?> getProgress(@PathVariable String taskId) {
    TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId);
    if (taskStatus == null) {
      return ResponseEntity.ok("");
    }
    return ResponseEntity.ok().body(taskStatus);
  }
}

2.6 Testing

Creating a task via the /tasks endpoint returns a progress URL. Polling /tasks/{id}/progress shows the task status evolving from SUBMITTEDSTARTEDRUNNINGFINISHED. Screenshots illustrate the UI at each stage (task creation, running, completed, and the raw Kafka topic messages).

Create task UI
Create task UI
Running task UI
Running task UI
Task completed UI
Task completed UI

Note : After a service restart the in‑memory consumer loses its position, so progress information cannot be retrieved unless additional persistence is added.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaDockerKafkaSpring BootAsync ProcessingTask Tracking
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.