How libtask Implements Coroutines for High‑Performance Servers in C

libtask, a lightweight coroutine library written by Russ Cox, demonstrates how to build a cooperative multitasking scheduler in C, detailing task creation, context switching, epoll‑based I/O handling, and a non‑preemptive FIFO scheduling model that enables asynchronous I/O to appear synchronous for server development.

Tencent IMWeb Frontend Team
Tencent IMWeb Frontend Team
Tencent IMWeb Frontend Team
How libtask Implements Coroutines for High‑Performance Servers in C
本文在公司内网有不错的反响,但不同于传统的前端技术文章,所以阅读起来可能有点晦涩。假设读者已经了解了协程的概念、实现协程的底层技术支持,基于底层基础,我们来看看如何实现协程以及协程的应用。

什么是 libtask

libtask 是 Google 大佬 Russ Cox(Go 的核心开发者)所写。libtask 非常有意思,为数不多的代码就可以让人了解和理解协程的具体应用,很值得学习,我感兴趣的点在于如何在服务器中使用协程,传统的服务器,基本都是多进程、多线程、池化、单线程/多线程多路复用等等,而 libtask 使用少量的代码就让我看到了如何使用协程写一个服务器,非常赞(源码分析)。

libtask 源码解析

我们从 libtask 的 main 函数开始,这个 main 函数就是我们在 C 语言中使用的入口函数,libtask 本身实现了 main,用户使用 libtask 时,要实现的是 taskmain 函数。taskmain 和 main 的函数声明是一样的。下面我们看一下 main 函数。

int main(int argc, char **argv) {
    struct sigaction sa, osa;
    // 注册SIGQUIT信号处理函数
    memset(&sa, 0, sizeof sa);
    sa.sa_handler = taskinfo;
    sa.sa_flags = SA_RESTART;
    sigaction(SIGQUIT, &sa, &osa);

    // 保存命令行参数
    argv0 = argv[0];
    taskargc = argc;
    taskargv = argv;

    if(mainstacksize == 0)
        mainstacksize = 256*1024;
    // 创建第一个协程
    taskcreate(taskmainstart, nil, mainstacksize);
    // 开始调度
    taskscheduler();
    fprint(2, "taskscheduler returned in main!
");
    abort();
    return 0;
}

main 函数主要的两个逻辑是 taskcreate 和 taskscheduler 函数。我们先来看 taskcreate。

int taskcreate(void (*fn)(void*), void *arg, uint stack) {
    int id;
    Task *t;

    t = taskalloc(fn, arg, stack);
    taskcount++;
    id = t->id;
    // 记录位置
    t->alltaskslot = nalltask;
    // 保存到 alltask 中
    alltask[nalltask++] = t;
    // 修改状态为就绪,可以被调度,并且加入到就绪队列
    taskready(t);
    return id;
}

taskcreate 首先调用 taskalloc 分配一个表示协程的结构体 Task。我们看看这个结构体的定义。

struct Task {
    char name[256]; // offset known to acid
    char state[256];
    // 前后指针
    Task *next;
    Task *prev;
    Task *allnext;
    Task *allprev;
    // 执行上下文
    Context context;
    // 睡眠时间
    uvlong alarmtime;
    uint id;
    // 栈信息
    uchar *stk;
    uint stksize;
    // 是否退出了
    int exiting;
    // 在 alltask 的索引
    int alltaskslot;
    // 是否是系统协程
    int system;
    // 是否就绪状态
    int ready;
    // 入口函数
    void (*startfn)(void*);
    // 入口参数
    void *startarg;
    // 自定义数据
    void *udata;
};

接着看看 taskalloc 的实现。

static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) {
    Task *t;
    sigset_t zero;
    uint x, y;
    ulong z;

    /* allocate the task and stack together */
    t = malloc(sizeof *t + stack);
    memset(t, 0, sizeof *t);
    // 栈的内存位置
    t->stk = (uchar*)(t+1);
    // 栈大小
    t->stksize = stack;
    // 协程 id
    t->id = ++taskidgen;
    // 协程工作函数和参数
    t->startfn = fn;
    t->startarg = arg;

    /* do a reasonable initialization */
    memset(&t->context.uc, 0, sizeof t->context.uc);
    sigemptyset(&zero);
    // 初始化 uc_sigmask 字段为空,即不阻塞信号
    sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);

    /* must initialize with current context */
    getcontext(&t->context.uc);
    // 设置协程执行时的栈位置和大小
    t->context.uc.uc_stack.ss_sp = t->stk + 8;
    t->context.uc.uc_stack.ss_size = t->stksize - 64;
    z = (ulong)t;
    y = z;
    z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */
    x = z >> 16;
    // 保存信息到 uc 字段
    makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);

    return t;
}

taskalloc 函数代码看起来很多,但是逻辑不算复杂,就是申请 Task 结构体所需的内存和执行时栈的内存,然后初始化各个字段。这样,一个协程就诞生了。接着执行 taskready 把协程加入就绪队列。

void taskready(Task *t) {
    t->ready = 1;
    addtask(&taskrunqueue, t);
}

void addtask(Tasklist *l, Task *t) {
    if(l->tail) {
        l->tail->next = t;
        t->prev = l->tail;
    } else {
        l->head = t;
        t->prev = nil;
    }
    l->tail = t;
    t->next = nil;
}

taskrunqueue 记录了所有就绪的协程。创建了协程并加入队列后,协程还没有开始执行,就像操作系统的进程和线程一样,需要有一个调度器来调度执行。下面我们看看调度器的实现。

static void taskscheduler(void) {
    int i;
    Task *t;
    for(;;) {
        // 没有用户协程了,则退出
        if(taskcount == 0)
            exit(taskexitval);
        // 从就绪队列拿出一个协程
        t = taskrunqueue.head;
        if(t == nil) {
            fprint(2, "no runnable tasks! %d tasks stalled
", taskcount);
            exit(1);
        }
        // 从就绪队列删除该协程
        deltask(&taskrunqueue, t);
        t->ready = 0;
        // 保存正在执行的协程
        taskrunning = t;
        // 切换次数加一
        tasknswitch++;
        // 切换到 t 执行,并且保存当前上下文到 taskschedcontext(即下面要执行的代码)
        contextswitch(&taskschedcontext, &t->context);
        // 执行到这说明没有协程在执行(t 切换回来的),置空
        taskrunning = nil;
        // 刚才执行的协程 t 退出了
        if(t->exiting) {
            // 不是系统协程,则个数减一
            if(!t->system)
                taskcount--;
            // 当前协程在 alltask 的索引
            i = t->alltaskslot;
            // 把最后一个协程换到当前协程的位置,因为它要退出了
            alltask[i] = alltask[--nalltask];
            // 更新被置换协程的索引
            alltask[i]->alltaskslot = i;
            // 释放堆内存
            free(t);
        }
    }
}

从就绪队列中拿出一个协程 t,并把 t 移出就绪队列

通过 contextswitch 切换到协程 t 中执行

协程 t 切换回调度中心,如果 t 已经退出,则修改数据结构,然后回收它占据的内存。继续调度其他协程执行。

至此,协程就开始跑起来了。调度机制是先进先出、非抢占式,协程自行调用 taskyield 让出 CPU。

int taskyield(void) {
    int n = tasknswitch;
    // 插入就绪队列,等待后续调度
    taskready(taskrunning);
    taskstate("yield");
    // 切换协程
    taskswitch();
    // 返回让出次数
    return tasknswitch - n - 1;
}

void taskswitch(void) {
    needstack(0);
    contextswitch(&taskrunning->context, &taskschedcontext);
}

static void contextswitch(Context *from, Context *to) {
    if(swapcontext(&from->uc, &to->uc) < 0) {
        fprint(2, "swapcontext failed: %r
");
        assert(0);
    }
}

下面我们通过一个例子展示如何基于协程实现一个 TCP 服务器。

void taskmain(int argc, char **argv) {
    // 启动一个 tcp 服务器
    if((fd = netannounce(TCP, 0, atoi(argv[1]))) < 0) {
        // ...
    }
    // 改为非阻塞模式
    fdnoblock(fd);
    // accept 成功后创建一个客户端协程
    while((cfd = netaccept(fd, remote, &rport)) >= 0) {
        taskcreate(proxytask, (void*)cfd, STACK);
    }
}

netaccept 通过 accept 逐个处理 TCP 连接,在 accept 之前会调用 fdwait 将 fd 注册到 epoll 并挂起当前协程。

int netaccept(int fd, char *server, int *port) {
    int cfd, one;
    struct sockaddr_in sa;
    socklen_t len;
    // 注册事件到 epoll,等待事件触发
    fdwait(fd, 'r');
    len = sizeof sa;
    if((cfd = accept(fd, (void*)&sa, &len)) < 0) {
        return -1;
    }
    // 把客户端 fd 改成非阻塞模式
    fdnoblock(cfd);
    one = 1;
    setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof one);
    return cfd;
}

fdwait 首先把 fd 注册到 epoll 中,然后把协程切换到下一个待执行的协程。epoll 事件触发后会把对应协程重新加入就绪队列。

void fdwait(int fd, int rw) {
    if(!startedfdtask) {
        startedfdtask = 1;
        epfd = epoll_create(1);
        // 创建一个协程做 I/O 管理
        taskcreate(fdtask, 0, 32768);
    }
    struct epoll_event ev = {0};
    ev.data.ptr = taskrunning;
    switch(rw) {
        case 'r': ev.events |= EPOLLIN | EPOLLPRI; break;
        case 'w': ev.events |= EPOLLOUT; break;
    }
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
    // 切换到其他协程,等待被唤醒
    taskswitch();
    // 唤醒后删除事件
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);
}

fdtask 是负责 epoll 轮询的系统协程,它在没有其他就绪协程时进入 epoll_wait,并在事件到来时把相应的用户协程加入就绪队列。

void fdtask(void *v) {
    int i, ms;
    Task *t;
    uvlong now;
    tasksystem(); // 标记为系统协程
    struct epoll_event events[1000];
    for(;;) {
        // 让其他就绪协程先跑
        while(taskyield() > 0) ;
        // 没有其他协程时才阻塞等待 I/O
        errno = 0;
        if((t = sleeping.head) == nil)
            ms = -1;
        else {
            now = nsec();
            if(now >= t->alarmtime)
                ms = 0;
            else if(now + 5*1000*1000*1000L >= t->alarmtime)
                ms = (t->alarmtime - now) / 1000000;
            else
                ms = 5000;
        }
        int nevents = epoll_wait(epfd, events, 1000, ms);
        if(nevents < 0) {
            if(errno == EINTR) continue;
            fprint(2, "epoll: %s
", strerror(errno));
            taskexitall(0);
        }
        for(i = 0; i < nevents; i++) {
            taskready((Task *)events[i].data.ptr);
        }
        now = nsec();
        while((t = sleeping.head) && now >= t->alarmtime) {
            deltask(&sleeping, t);
            if(!t->system && --sleepingcounted == 0)
                taskcount--;
            taskready(t);
        }
    }
}

libtask 还提供了把阻塞系统调用转为协程友好 API 的例子,例如 fdread:如果 read 返回 EAGAIN,则调用 fdwait 等待可读事件,再继续读取。

int fdread(int fd, void *buf, int n) {
    int m;
    while((m = read(fd, buf, n)) < 0 && errno == EAGAIN)
        fdwait(fd, 'r');
    return m;
}

通过上述实现,libtask 把异步 I/O 转成同步风格,让业务代码保持简洁,同时提供了一个轻量级的协程调度框架,适合在高性能服务器中使用。

libtask illustration
libtask illustration
C++epollcoroutineasynchronous I/Olibtask
Tencent IMWeb Frontend Team
Written by

Tencent IMWeb Frontend Team

IMWeb Frontend Community gathering frontend development enthusiasts. Follow us for refined live courses by top experts, cutting‑edge technical posts, and to sharpen your frontend skills.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.