Ensuring Ordered Consumption Across Related Kafka Topics Using Fine‑Grained Locks

This article explains how to guarantee that insert messages are processed before update messages for the same data identifier across different Kafka topics by applying fine‑grained distributed locks and a caching strategy to detect and correct out‑of‑order consumption.

Top Architect
Top Architect
Top Architect
Ensuring Ordered Consumption Across Related Kafka Topics Using Fine‑Grained Locks

In Kafka, ordering is guaranteed only within the same topic and partition; different topics cannot rely on the producer's order, which creates challenges when related data (e.g., insert and update) must be consumed in sequence.

The proposed solution introduces two topics, TOPIC_INSERT and TOPIC_UPDATE, each carrying the same data identifier id. A fine‑grained lock is applied per id so that only one thread processes messages for a given identifier at a time, avoiding the performance penalty of a global synchronized block.

After locking, the insert consumer checks a cache for any pending update messages; if found, it processes the update first. Conversely, the update consumer verifies whether the corresponding insert data already exists; if not, it stores the update in a cache keyed by id for later handling.

Message sending example:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

Listener implementation (simplified):

@Component
@Slf4j
public class KafkaListenerDemo {
    private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException {
        Thread.sleep(1000); // simulate order anomaly
        String id = record.value();
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("Start processing insert for {}", id);
            Thread.sleep(1000); // simulate insert work
            if (UPDATE_DATA_MAP.containsKey(id)) {
                doUpdate(id);
            }
            log.info("Insert processing for {} finished", id);
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException {
        String id = record.value();
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            if (!DATA_MAP.containsKey(id)) {
                log.info("Order anomaly detected, caching update {}", id);
                UPDATE_DATA_MAP.put(id, id);
            } else {
                doUpdate(id);
            }
        } finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException {
        log.info("Start processing update {}", id);
        Thread.sleep(1000); // simulate update work
        log.info("Update {} finished", id);
    }
}

Sample log output demonstrates the detection and correction of out‑of‑order consumption:

接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

The observed logs confirm that the approach successfully handles ordering issues when related data spans multiple Kafka topics.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Backend DevelopmentKafkaFine-Grained LockOrdered ConsumptionSpring Kafka
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.