How Redis Implements Multithreading: A Deep Dive into Its I/O Thread Model
This article explains Redis's single‑threaded architecture, its limitations, and how Redis 6.0+ adds configurable I/O threads to parallelize read/write handling, detailing the server initialization, event loop, task queues, and the interaction between the main thread and worker threads.
Redis is a high‑performance server that uses a single‑threaded event loop with epoll to achieve tens of thousands of QPS. While this design is fast, it cannot exploit multi‑core CPUs and a slow request can block all other clients, so long‑running commands such as KEYS * are discouraged.
1. Multithreaded Redis Service Startup
First clone a multithreaded Redis source tree:
# git clone https://github.com/redis/redis
# cd redis
# git checkout -b 6.2.0 6.2.0Multithreading is disabled by default. To enable it, edit redis.conf and set the io-threads and io-threads-do-reads options, for example:
vi /usr/local/soft/redis6/conf/redis.conf
io-threads 4 # number of I/O threads to start
io-threads-do-reads yes # also use I/O threads for read operationsWith these options enabled, the main entry point in src/server.c creates the I/O threads after the main server initialization:
// file: src/server.c
int main(int argc, char **argv) {
initServer(); // 1.1 main thread initialization
InitServerLast(); // 1.2 start I/O threads
aeMain(server.el); // enter event loop
}1.1 Main Thread Initialization
The initServer function performs four key steps:
Initialize read and write task queues.
Create an epoll object.
Listen on the configured port.
Register the listen socket with epoll.
Relevant code excerpts:
// file: src/server.c
void initServer() {
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
listenToPort(server.port, server.ipfd, &server.ipfd_count);
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL);
}
}1.2 I/O Thread Startup
The function initThreadedIO creates the configured number of I/O threads using pthread_create and registers IOThreadMain as the thread entry point:
// file: src/networking.c
void initThreadedIO(void) {
if (server.io_threads_num == 1) return; // multithreading disabled
for (int i = 0; i < server.io_threads_num; i++) {
pthread_t tid;
pthread_create(&tid, NULL, IOThreadMain, (void*)(long)i);
io_threads[i] = tid;
}
}Each I/O thread runs an infinite loop, waiting for tasks placed in its private queue io_threads_list[id] and processing them according to the global io_threads_op flag (read or write):
// file: src/networking.c
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
while (1) {
listRewind(io_threads_list[id], &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c, 0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
}
}2. Main Thread Event Loop
The core loop lives in aeMain, which repeatedly calls aeProcessEvents to poll epoll and dispatch registered callbacks:
// file: src/ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
}
} aeProcessEventsperforms three steps: invoke a pre‑sleep hook, call epoll_wait via aeApiPoll, and then invoke the appropriate read or write callbacks stored in eventLoop->events:
// file: src/ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
if (eventLoop->beforesleep && (flags & AE_CALL_BEFORE_SLEEP))
eventLoop->beforesleep(eventLoop);
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
if (fe->mask & AE_READABLE) fe->rfileProc();
if (fe->mask & AE_WRITABLE) fe->wfileProc();
}
return 0;
}2.1 New Connection Handling
When a listen socket becomes readable, acceptTcpHandler accepts the connection, creates a redisClient object, registers it with epoll, and sets its read handler to readQueryFromClient:
// file: src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd), 0, cip);
}2.2 Command Request Handling
Each client’s read callback is readQueryFromClient. In the multithreaded build it first checks postponeClientRead; if I/O threads are enabled for reads, the client is placed into server.clients_pending_read and the function returns:
// file: src/networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
if (postponeClientRead(c)) return; // queued for I/O thread
// normal read path (omitted for brevity)
} postponeClientReadadds the client to the pending‑read list when the conditions for threaded I/O are met:
// file: src/networking.c
int postponeClientRead(client *c) {
if (server.io_threads_active && server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) {
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read, c);
return 1;
}
return 0;
}2.3 Before‑Sleep Processing
Before each epoll_wait, the beforeSleep hook processes the pending read and write queues. It distributes pending reads among the I/O threads, wakes them, and then lets the main thread handle the 0‑th queue:
// file: src/server.c
void beforeSleep(aeEventLoop *eventLoop) {
handleClientsWithPendingReadsUsingThreads();
handleClientsWithPendingWritesUsingThreads();
// other housekeeping …
}The read‑handling routine hashes each client to a thread, updates the per‑thread pending count, and finally processes the 0‑th queue directly:
// file: src/networking.c
int handleClientsWithPendingReadsUsingThreads(void) {
listRewind(server.clients_pending_read, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id], c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++)
setIOPendingCount(j, listLength(io_threads_list[j]));
// main thread processes its own slice
listRewind(io_threads_list[0], &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
// wait for workers to finish …
}3. Write Path
After a command is processed, its reply is placed into the client’s output buffer via addReply. If the client has no pending writes, prepareClientToWrite adds the client to server.clients_pending_write:
// file: src/networking.c
int prepareClientToWrite(client *c) {
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
return C_OK;
}
void clientInstallWriteHandler(client *c) {
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write, c);
}The beforeSleep hook then calls handleClientsWithPendingWritesUsingThreads, which similarly hashes clients to I/O threads and signals them to execute writeToClient:
// file: src/networking.c
int handleClientsWithPendingWritesUsingThreads(void) {
// split pending writes among threads
listRewind(server.clients_pending_write, &li);
int item_id = 0;
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id], c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++)
setIOPendingCount(j, listLength(io_threads_list[j]));
// main thread processes its slice
listRewind(io_threads_list[0], &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c, 0);
}
// wait for workers …
}The actual write routine sends data from the fixed buffer first and then from the reply list:
// file: src/networking.c
int writeToClient(int fd, client *c, int handler_installed) {
while (clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = write(fd, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (nwritten <= 0) break;
// update sentlen …
} else {
robj *o = listNodeValue(listFirst(c->reply));
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
// update sentlen …
}
}
return 0;
}4. Summary and Limitations
The multithreaded Redis model introduces configurable I/O threads that parallelize the expensive read and write stages while keeping command execution in the main thread. The main thread still coordinates task distribution and must wait for all I/O workers to finish before proceeding, which can cause a single slow command to block the entire server.
Because the main thread blocks while waiting for worker threads, the theoretical concurrency gain is limited; a long‑running command that occupies an I/O thread will delay the wake‑up of the main loop, effectively blocking other clients. This design trade‑off explains why the author feels the current multithreaded implementation is not optimal.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
