High-Performance Data Collection and Persistence Using OpenResty, Lua, and Kafka
The article describes a high‑throughput, fault‑tolerant data‑collection pipeline built with OpenResty, Lua, and Kafka that writes incoming records to disk using a cross‑worker mutex and cached file handles, rotates files via Nginx reload, and achieves over 13× performance versus typical online methods.
Unified data collection is the entry point for the data product team, handling massive traffic (tens of thousands QPS). High performance and high availability are critical because a one‑second outage could lose thousands of data records.
The technology selection principle emphasizes simplicity and reliability over cutting‑edge features. The final stack chosen is OpenResty with Lua for data reception, Filebeat for transport, and Kafka for storage.
Why not write directly to Kafka from OpenResty? Data must travel over the network to reach Kafka, and downstream failures (network issues, Kafka outages) are possible. Relying solely on in‑memory buffering would risk data loss when downstream problems persist, because worker memory is limited. Therefore the design first persists data to disk, then asynchronously forwards it to Kafka, decoupling reception from transmission.
OpenResty does not provide a native data‑to‑file API; the only file‑related function is ngx.log , which is unsuitable for data persistence. A naive approach using Lua's io.open on every request is inefficient. To achieve high‑performance file writes under high concurrency, three challenges must be solved:
Avoid data corruption when multiple workers write to the same file.
Maintain efficient write throughput under heavy load.
Perform seamless file rotation without affecting business.
The solution uses a lua_shared_dict as a cross‑worker mutex, allowing exclusive access to a file during a write operation.
在nginx.conf 文件,增加"lua_shared_dict file_locks 1m;"
在业务系统lua文件里,这样使用:
............
local lock, err = resty_lock:new("file_locks")
if not lock then
ngx.log(ngx.WARN, "failed to create lock: " .. err)
end
............
local elapsed, err = lock:lock(topic)
if not elapsed then
ngx.log(ngx.WARN, "failed to lock: " .. err)
end
/*这里写业务逻辑*/
local ok, err = lock:unlock()
if not ok then
ngx.log(ngx.WARN, "failed to unlock: " .. err)
end
............Additionally, init_worker_by_lua_block creates a global table fileHandleList that caches opened file handles, preventing repeated open/close cycles and improving efficiency.
在nginx.conf 文件,增加"init_worker_by_lua_block {fileHandleList = {}; }"
在业务系统lua文件里,这样使用:
............
local filePath = "/data/" .. filename .. ".log"
if io.type(fileHandleList[filePath]) ~= 'file' then
fileHandleList[filePath] = assert(io.open(filePath, "a"))
end
local fileHandle = fileHandleList[filePath]
............
fileHandle:write(message, "\n")
fileHandle:flush()
............For file rotation, a HUP reload ( nginx -s reload ) is triggered after moving completed files to an archive directory. This mirrors log rotation behavior but applies to the custom data files, ensuring the business continues without interruption.
data_path="/openresty/nginx/data"
olddata_path="/openresty/nginx/olddata"
mkdir -p ${olddata_path}/$(date -d "yesterday" +"%Y%m%d")/
data_name=`ls ${data_path} |grep '\.json$' |awk -F'.json$' '{print $1}'|sort|uniq|xargs`
for loop in ${data_name}
do
mv ${data_path}/${loop}.json ${olddata_path}/$(date -d "yesterday" +"%Y%m%d")/${loop}.json
done
/openresty/nginx/sbin/nginx -s reloadBenchmarking shows this solution outperforms common online approaches by a factor of 13.5, demonstrating the effectiveness of the lock‑based concurrency control, persistent file‑handle caching, and graceful rotation strategy.
37 Interactive Technology Team
37 Interactive Technology Center
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.