Backend Development 11 min read

Java Log Replay System Using Disruptor for High‑Performance Load Testing

This article details a Java‑based log replay solution that re‑implements Goreplay functionality using Disruptor, covering design concepts, log extraction, implementation steps, performance metrics, identified risks, and provides full source code examples for a scalable load‑testing engine.

FunTester
FunTester
FunTester
Java Log Replay System Using Disruptor for High‑Performance Load Testing

The current load‑testing setup relies on a customized Goreplay solution, which introduces compatibility and maintenance challenges within a Java tech stack. To address these issues, a Java implementation of log replay was researched by studying Goreplay's source code and design.

Design Overview

The overall architecture employs a high‑performance Disruptor queue to asynchronously process HTTP requests reconstructed from logged URLs and tokens. An illustration of the design is provided (image omitted).

Log Retrieval and Parsing

Logs are fetched via SQL from a gateway, minimally parsed to extract useful fields (currently only the URL), stored in OSS, and their links saved in a database after successful traffic recording.

Example log format:

/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
...

Implementation Steps

Extract useful information (URL and token) from logs into memory.

Configure host, read URLs and response headers (token, load‑test markers, common headers, mock identifiers) to assemble HTTP requests.

Create a Disruptor instance and launch asynchronous producers.

Consumers retrieve the HTTP request objects from the queue and execute them, achieving log‑driven traffic replay.

Performance Metrics

On a 6‑core, 16 GB machine, reading 10 million URLs takes roughly 9–13 seconds with negligible memory impact.

Streaming reads can achieve over 800 k lines per second.

Single producer throughput reaches 250 k QPS.

Overall single‑machine test peaks at 88 k QPS, fully utilizing CPU.

Risks

Asynchronous consumer buffering may drop messages if production outpaces consumption.

Consumer count is fixed at startup; improper configuration can cause bottlenecks and prevent dynamic scaling.

These risks are slated for incremental resolution.

Code Implementation

Producer Demo

def ft = {
    output("创建线程")
    fun {
        int i = 0
        while (key) {
            def url = logs.get(i % logs.size())
            def get = getHttpGet(HOST + url)
            get.addHeader("token", tokens.get(i % tokens.size()))
            get.addHeader(HttpClientConstant.USER_AGENT)
            ringBuffer.publishEvent {e, s ->
                e.setRequest(get)
            }
            i++
        }
    }
}
ft()

File Reading Utility

/**
 * 通过闭包传入方法读取超大文件部分内容
 *
 * @param filePath
 * @param function
 * @return
 */
public static List
readByLine(String filePath, Function
function) {
    if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())
        ParamException.fail("文件信息错误!" + filePath);
    logger.debug("读取文件名:{}", filePath);
    List
lines = new ArrayList<>();
    File file = new File(filePath);
    if (file.isFile() && file.exists()) { // 判断文件是否存在
        try (FileInputStream fileInputStream = new FileInputStream(file);
             InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);
             BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) {
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                String apply = function.apply(line);
                if (StringUtils.isNotBlank(apply)) lines.add(apply);
            }
        } catch (Exception e) {
            logger.warn("读取文件内容出错", e);
        }
    } else {
        logger.warn("找不到指定的文件:{}", filePath);
    }
    return lines;
}

Demo Main Class

package com.funtest.groovytest;

import com.funtester.base.constaint.FixedThread;
import com.funtester.config.HttpClientConstant;
import com.funtester.frame.execute.Concurrent;
import com.funtester.frame.execute.ThreadPoolUtil;
import com.funtester.httpclient.ClientManage;
import com.funtester.httpclient.FunLibrary;
import com.funtester.utils.ArgsUtil;
import com.funtester.utils.RWUtil;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.platform.commons.util.StringUtils;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;

class ReplayTest extends FunLibrary {

    static String url = "http://localhost:12345/test";
    static HttpGet httpGet = getHttpGet(url);
    static def HOST = "http://localhost:12345";
    static def key = true;
    static Disruptor
disruptor;

    public static void main(String[] args) {
        def logfile = "/Users/oker/Desktop/log.csv";
        def tokenfile = "/Users/oker/Desktop/token.csv";
        List
logs = RWUtil.readByLine(logfile, new Function
() {
            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null;
            }
        });
        List
tokens = RWUtil.readByLine(tokenfile, new Function
() {
            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null;
            }
        });
        output("总计 ${formatLong(logs.size())} 条日志");
        disruptor = new Disruptor<>(
                RequestEvent::new,
                512 * 512,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        RingBuffer
ringBuffer = disruptor.getRingBuffer();
        def ft = {
            output("创建线程");
            fun {
                int i = 0;
                while (key) {
                    def url = logs.get(i % logs.size());
                    def get = getHttpGet(HOST + url);
                    get.addHeader("token", tokens.get(i % tokens.size()));
                    get.addHeader(HttpClientConstant.USER_AGENT);
                    ringBuffer.publishEvent {e, s -> e.setRequest(get)};
                    i++;
                }
            }
        };
        ft();
        disruptor.handleEventsWith(new FunTester(10));
        ClientManage.init(10, 5, 0, "", 0);
        def util = new ArgsUtil(args);
        def thread = util.getIntOrdefault(0, 20);
        def times = util.getIntOrdefault(1, 60000);
        RUNUP_TIME = util.getIntOrdefault(2, 0);
        def tasks = [];
        thread.times {
            def tester = new FunTester(times);
            disruptor.handleEventsWith(tester);
            tasks << tester;
        }
        disruptor.start();
        new Concurrent(tasks, "这是千万级日志回放演示Demo").start();
    }

    private static class FunTester extends FixedThread implements EventHandler
, WorkHandler
{
        LinkedBlockingDeque
reqs = new LinkedBlockingDeque<>();
        FunTester(int limit) { super(null, limit, true); }
        @Override
        protected void doing() throws Exception { FunLibrary.executeOnly(reqs.take()); }
        @Override
        FixedThread clone() { return new FunTester(limit); }
        @Override
        protected void after() { super.after(); key = false; disruptor.shutdown(); }
        @Override
        void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception { if (reqs.size() < 100000) reqs.add(event.getRequest()); }
        @Override
        void onEvent(RequestEvent event) throws Exception { if (reqs.size() < 100000) reqs.add(event.getRequest()); }
    }

    private static class RequestEvent {
        HttpRequestBase request;
        public HttpRequestBase getRequest() { return request; }
        public void setRequest(HttpRequestBase request) { this.request = request; }
    }
}

Multiple groups are used as indicated in the design documentation.

Have Fun ~ Tester!

BackendJavaperformance testingload testingDisruptorlog-replay
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.