Understanding Netty Futures and Building a Simple Callback-Based Asynchronous Task Framework
This article explains the limitations of Java's Future, demonstrates Netty's non‑blocking Future with listeners, and walks through building a simple Java callback‑based asynchronous task framework—including timeout handling—using Netty, CompletableFuture, and custom Worker/Listener abstractions.
Netty is a classic network framework that uses NIO/AIO to support massive concurrent connections with few threads, relying heavily on a non‑blocking callback model implemented via Future/Promise.
Java's standard java.util.concurrent.Future has a major drawback: calling future.get() blocks the thread, making it unsuitable for true asynchronous callbacks, as illustrated by a simple executor example.
import java.util.concurrent.*;
/**
* @author wuweifeng wrote on 2019-12-10
* @version 1.0
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new Task());
//这一步get会阻塞当前线程
System.out.println(future.get());
executor.shutdown();
}
private static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(2000);
return 1;
}
}
}Netty's own Future can attach a Listener that is invoked when the asynchronous operation completes, avoiding the need to block on get(). A concise Netty client example shows adding a ChannelFutureListener to handle completion.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new IdleStateHandler(10, 0, 0), new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect().sync()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//do your job
}
});
} finally {
group.shutdownGracefully();
}To obtain similar callback behavior without Netty, the article defines a simple framework consisting of a Worker interface representing a task, a Listener interface for callbacks, and a Wrapper that holds a worker, its parameters, and a listener.
public interface Worker {
String action(Object object);
}
public interface Listener {
void result(Object result);
}
public class Wrapper {
private Object param;
private Worker worker;
private Listener listener;
public Object getParam() { return param; }
public void setParam(Object param) { this.param = param; }
public Worker getWorker() { return worker; }
public void setWorker(Worker worker) { this.worker = worker; }
public Listener getListener() { return listener; }
public void addListener(Listener listener) { this.listener = listener; }
}The main demo creates a worker, wraps it, and starts a new thread that executes the worker's action and then invokes the listener's result, demonstrating non‑blocking execution and callback handling.
public class Bootstrap {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
Worker worker = bootstrap.newWorker();
Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");
bootstrap.doWork(wrapper).addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(Thread.currentThread().getName());
System.out.println(result);
}
});
System.out.println(Thread.currentThread().getName());
}
private Wrapper doWork(Wrapper wrapper) {
new Thread(() -> {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().result(result);
}).start();
return wrapper;
}
private Worker newWorker() {
return new Worker() {
@Override
public String action(Object object) {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return object + " world";
}
};
}
}A variant adds timeout handling using CompletableFuture; if the task does not complete within the specified period, the listener receives a timeout message.
public class BootstrapNew {
public static void main(String[] args) {
BootstrapNew bootstrap = new BootstrapNew();
Worker worker = bootstrap.newWorker();
Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");
wrapper.addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(result);
}
});
CompletableFuture<Wrapper> future = CompletableFuture.supplyAsync(() -> bootstrap.doWork(wrapper));
try {
future.get(800, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
//超时了
wrapper.getListener().result("time out exception");
}
}
private Wrapper doWork(Wrapper wrapper) {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().result(result);
return wrapper;
}
private Worker newWorker() {
return new Worker() {
@Override
public String action(Object object) {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return object + " world";
}
};
}
}For more complex scenarios—such as mixing serial and parallel execution, handling per‑task success, failure, timeout, and exception callbacks, sharing or isolating thread pools, and defining strong or weak dependencies—the article outlines requirements and hints at using CompletableFuture.allOf and anyOf to coordinate groups of tasks.
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() ->
nextWrappers.get(finalI).work(poolExecutor, WorkerWrapper.this, remainTime - costTime), poolExecutor);
}
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}The article concludes that a fully featured asynchronous framework can be built on top of thread pools and CompletableFuture, providing callbacks, timeout control, and flexible task composition for real‑world use cases such as microservice orchestration, data pipelines, and web crawling.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Retail Technology
Official platform of JD Retail Technology, delivering insightful R&D news and a deep look into the lives and work of technologists.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
