How to Clear a 10‑Million‑Message Queue in 5 Hours: A Five‑Step Rescue Plan
When a flash‑sale causes a 10 million‑message backlog and consumers only process 200 messages per second, this guide shows a five‑step, 5‑hour strategy—horizontal scaling, message downgrade, flow control, temporary dump, and parallel blasting—to restore throughput and prevent system collapse.
Scenario
During a flash‑sale, the order message queue accumulated 10 million pending messages while consumer throughput was only 200 messages per second.
Root Causes
Insufficient consumption capacity : 200 msg/s × 50 instances = 10 k msg/s, far below the backlog.
Mixed message types : Non‑critical log messages blocked core order messages.
Pull‑strategy flaw : Large batch pulls caused thread blockage.
Five‑Step Emergency Plan (Clear Backlog in 5 Hours)
Step 1 – Horizontal Scaling (30 min)
Increase Kafka partitions and consumer replicas.
# Expand Kafka partitions
kafka-topics.sh --alter --topic orders \
--partitions 200 --bootstrap-server kafka1:9092
# Scale consumer pods (K8s)
kubectl scale deployment order-consumer --replicas=100Step 2 – Message Downgrade (Emergency Bleed)
Filter out low‑priority messages at the consumer level.
// Consumer filters non‑core messages
public void handleMessage(Message msg) {
if (msg.getPriority() < Message.PRIORITY_HIGH) {
// ACK and skip
msg.acknowledge();
return;
}
processOrder(msg);
}Step 3 – Flow Control (Prevent Avalanche)
Configure RocketMQ and Kafka pull limits.
// RocketMQ pull threshold
consumer.setPullThresholdForQueue(100); // pause if backlog >100
consumer.setPullBatchSize(20); // pull 20 msgs each time
# Kafka consumer properties
max.poll.records=50 # 50 msgs per poll
fetch.max.bytes=1048576 # 1 MB per poll
max.partition.fetch.bytes=1048576Step 4 – Temporary Dump (Asynchronous Recovery)
Persist incoming messages to disk while processing core messages.
def dump_messages():
with open('/data/backup/msgs.bin', 'wb') as f:
while True:
msgs = consumer.poll(1000) # pull 1000 msgs
if not msgs:
break
f.write(msgpack.packb(msgs)) # binary batch write
# Skip non‑core messages
filtered = [m for m in msgs if m.priority > 1]
process_messages(filtered)Step 5 – Ultimate Solution – Parallel Blast (Core Operation)
Create a new high‑throughput topic and launch massive parallel consumers.
# Create new topic
kafka-topics --create --topic orders_emerg \
--partitions 300 --replication-factor 3
# Forward backlogged messages
while (true) {
Message msg = originalConsumer.receive();
emergencyProducer.send(msg); // to new topic
}
# Batch consumer configuration
props.put("max.poll.records", 1000);
props.put("fetch.max.bytes", 5242880); // 5 MB per poll
ExecutorService pool = Executors.newFixedThreadPool(32);
consumer.subscribe("orders_emerg");
while (running) {
ConsumerRecords records = consumer.poll(1000);
pool.submit(() -> batchProcess(records));
}Rescue Timeline (5‑Hour Countdown)
Key Principles (Architect’s Emergency Handbook)
Divide and Conquer : Split backlog, use multiple topics, parallel consumption.
Discard Non‑Core : Drop low‑priority messages directly.
Space‑for‑Time : Dump to disk to buy processing time.
Over‑Provision Burst : Temporarily allocate extra resources for peak load.
Final Thought Exercise
When resources can only be scaled up three‑fold, how would you combine downgrade, scaling, and batch processing to clear the backlog within five hours?
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
