Detect Hot Items in High‑Concurrency Sales Using OpenResty, Kafka & Elasticsearch
During flash‑sale events like JD or Taobao, a tiny fraction of products generate the majority of traffic, causing server overload; this guide explains how to identify and collect hotspot product data in real time using an OpenResty‑Lua front‑end, Kafka for messaging, and Elasticsearch for storage and analysis.
In high‑concurrency scenarios such as JD or Taobao flash‑sale events, thousands of users simultaneously try to purchase items, but 90% of traffic concentrates on a few hot products, which can crash servers.
This article shows how to recognize hot product data in real time using an OpenResty + Lua + Kafka + Elasticsearch solution.
1. Implementation Flow
(1) User accesses a URL (e.g., http://www.longxia.com/web/item/index.html) which is routed to OpenResty.
(2) Nginx location intercepts the request, strips a specific part of the path (e.g., “/web”), and forwards the request to the real service.
(3) Lua assembles product information and sends a message to Kafka; Kafka forwards the data to Logstash, which stores it in Elasticsearch.
2. Core Implementation Details
(1) Configure lua‑resty‑kafka (https://github.com/doujiang24/lua-resty-kafka) as an Nginx module.
(2) Write Lua script to collect request data and produce Kafka messages.
<code>-- dependencies
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
-- Kafka broker list
local broker_list = {
{ host = "192.168.203.237", port = 9092 }
}
-- create producer
local bp = producer:new(broker_list, { producer_type = "async" })
-- get request headers and IP
local headers = ngx.req.get_headers()
local ip = headers["X-REAL_IP"] or headers["X_FORWARDED_FCR"] or ngx.var.remote_addr or "0.0.0.0"
-- build message
local message = {}
message["uri"] = ngx.var.uri
message["ip"] = ip
message["token"] = "987654"
message["actime"] = os.date("%y-%M-%d %H:%m:%S")
-- send message
local ok, err = bp:send("collection_hot_item", nil, cjson.encode(message))
-- strip "/web" from URI and forward
local uri = ngx.var.uri
uri = string.gsub(uri, "/web", "")
ngx.exec(uri)
</code>(3) Nginx configuration to intercept page requests.
<code># intercept /web/item requests
location /web/item/ {
content_by_lua_file /usr/local/openresty/nginx/lua/item-access.lua;
}
location /item/ {
root /usr/local/web;
}
</code>(4) Logstash configuration to consume Kafka messages and push them to Elasticsearch.
<code>input {
kafka {
bootstrap_servers => "192.168.203.237:9092"
topics => ["collection_hot_item"]
group_id => "logstash_group"
consumer_threads => 3
codec => json
}
}
filter {
}
output {
elasticsearch {
hosts => ["http://192.168.203.238:9200"]
index => "hot_item"
}
}
</code>Summary: Operators can use Kibana to query hot‑item data stored in Elasticsearch and take targeted mitigation actions; alternatively, Druid can be used for richer analytics.
Lobster Programming
Sharing insights on technical analysis and exchange, making life better through technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.