Ensuring Ordered Consumption Across Related Kafka Topics Using Fine‑Grained Locks

This article explains how to guarantee that insert and update messages belonging to the same data entity are consumed in the correct order across separate Kafka topics by applying fine‑grained (or distributed) locks and a cache‑based detection mechanism, complete with Java/Spring code examples.

IT Architects Alliance
IT Architects Alliance
IT Architects Alliance
Ensuring Ordered Consumption Across Related Kafka Topics Using Fine‑Grained Locks

The article addresses the problem of ordered consumption when different Kafka topics (e.g., TOPIC_INSERT and TOPIC_UPDATE) contain related data and the processing order must be preserved (insert before update).

Kafka guarantees ordering only within the same partition of a single topic; across topics there is no built‑in ordering. To handle cross‑topic ordering, the author proposes using a fine‑grained lock per data identifier (id) so that only one thread processes messages for the same id at a time.

After locking, the insert consumer checks a cache to see whether an update for the same id has already arrived; if so, it processes the update first. Conversely, the update consumer checks whether the corresponding insert has been processed; if not, it stores the update in a cache keyed by id. When the later insert arrives, it detects the cached update and processes it before completing the insert.

Implementation details include:

Message sending example:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

Listener code (Java, Spring Kafka) demonstrates the lock acquisition, simulated processing delays, cache checks, and acknowledgment handling. The key parts are:

@Component
@Slf4j
public class KafkaListenerDemo {
    private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException {
        Thread.sleep(1000); // simulate out‑of‑order scenario
        String id = record.value();
        log.info("Received insert :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("Processing insert for {}", id);
            Thread.sleep(1000); // simulate business logic
            if (UPDATE_DATA_MAP.containsKey(id)) {
                doUpdate(id);
            }
            log.info("Insert processing for {} finished", id);
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord record, Acknowledgment acknowledgment) throws InterruptedException {
        String id = record.value();
        log.info("Received update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            if (!DATA_MAP.containsKey(id)) {
                log.info("Order anomaly detected, caching update {}", id);
                UPDATE_DATA_MAP.put(id, id);
            } else {
                doUpdate(id);
            }
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException {
        log.info("Starting update :: {}", id);
        Thread.sleep(1000);
        log.info("Update :: {} finished", id);
    }
}

Sample log output shows the update arriving first, being cached, then the insert processing the cached update, confirming that the approach resolves the cross‑topic ordering issue.

Finally, the article includes a promotional section offering free architecture resources and a disclaimer, which are not part of the technical solution.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavacachespringKafkaFine-Grained LockOrdered Consumption
IT Architects Alliance
Written by

IT Architects Alliance

Discussion and exchange on system, internet, large‑scale distributed, high‑availability, and high‑performance architectures, as well as big data, machine learning, AI, and architecture adjustments with internet technologies. Includes real‑world large‑scale architecture case studies. Open to architects who have ideas and enjoy sharing.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.