高性能 IO 的设计

一个内存数据库的设计与实现(一)

Posted by John Mactavish on April 1, 2021

这是我看了《Redis 设计与实现》后一时兴起决定做的项目。它会帮助我熟悉高性能 IO、持久化机制甚至分布式算法。

高性能数据库少不了高性能的 I/O 模块。 高性能的 I/O 牵扯到特定的系统调用,而一般我们的服务器都使用 Linux 系统,所以下面也只讨论 Linux。

I/O 按其是否阻塞调用者可分为阻塞 IO(Blocking I/O,简称 BIO)与非阻塞 IO(NonBlocking I/O,简称 NIO)。 阻塞 I/O 在无法从缓冲区获得数据时会持续等待,而非阻塞 I/O 则始终立刻返回。下面的例子来自《Unix 环境高级编程(第三版)》 的第十四章:

#include "apue.h"
#include <errno.h>
#include <fcntl.h>

char buf[5000];

int main(void) {
  int ntowrite, nwrite;
  char *ptr;

  ntowrite = read(STDIN_FILENO, buf, sizeof(buf)); // 阻塞读,等待终端输入
  fprintf(stderr, "read %d bytes\n", ntowrite);


  set_fl(STDOUT_FILENO, O_NONBLOCK); // 设置非阻塞写

  ptr = buf;
  while (ntowrite > 0) {
    errno = 0;
    nwrite = write(STDOUT_FILENO, ptr, ntowrite); // write 会立即返回
    fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno); // 未读到数据时,errno 不为 0

    if (nwrite > 0) { // 读到了 nwrite 个字节的数据
      ptr += nwrite;
      ntowrite -= nwrite;
    }
  }
  clr_fl(STDOUT_FILENO, O_NONBLOCK);
  exit(0);
}

对于一个给定的描述符,有两种为其指定 NIO 的方法。

  • 在用 open 系统调用打开时,指定 O_NONBLOCK 标志
  • 对于已打开的描述符,调用 fcntl 打开 O_NONBLOCK 标志

注意阻塞 I/O 并不会导致忙等待而浪费 CPU 时间,操作系统会挂起阻塞的线程,调度其他线程执行。 那除了编程风格的不同,它们还有什么区别吗?当然有。我们程序的处理流程一般是读 I/O、计算、写 I/O。 为了提高程序并发性,同时服务更多的客户,我们需要同时进行大量的这个相同的流程。

一种直觉的思路是引入多线程, 所有的线程执行相同的流程。但是多线程不是银弹(Silver Bullet)。当线程数量与 CPU 核心数相当时,多线程可以 被调度到不同的核心上并行进行;而线程数远大于核心数时,会出现频繁的线程调度,线程数越多,上下文切换所花时间占 CPU 总时间 的比例越高(即效率越低)。参考陈硕的经验公式,设密集计算所占的时间比重为 P(0<P≤1),而系统一共有 C 个 CPU, 为了让这 C 个 CPU 跑满而又不过载,线程池大小的经验公式 T=C/P。考虑到 P 值的估计不是很准确,T 的最佳值可以上下浮动 50%。 那么对于完全的计算密集型程序,线程数应等于核心数,更多的线程反而会降低效率,拖慢程序;考虑我们上面的处理流程, 最匹配的线程数也不会太大,距离动辄几万的并发数相距遥远。一种改进的方法是让线程变得更轻量化,这样一来,我们就能低开销地创建大量 线程了,参考 Go 的协程;但是这不是这篇文章的重点。

那么在有限的线程数下要如何提高并发数呢。现在非阻塞 I/O 可以派上用场了。我们可以用一个线程来专门处理 I/O,其他线程则只处理计算。 I/O 线程可以同时轮询多个 I/O,把就绪的 I/O 加入计算线程池的处理队列中。

// 创建TCP套接字并绑定端口8888,进行服务监听
listenfd = serverSocket(8888,"tcp");
clientFdSet = empty_set();
while(true){ // 开启事件监听循环
    // accept同步非阻塞调用,判断是否接收了新的连接
    newfd = acceptNonBlock(listenfd);

    if(newfd != EMPTY){
        // 如果存在新连接将其加入监听连接集合
        clientFdSet.add(newfd);
    }
    // 申请一个1024字节的缓冲区
    buffer = new buffer(1024);
    for(clientfd in clientFdSet){
        // 非阻塞read读
        num = readNonBlock(clientfd,buffer);
        if(num > 0){
            // 读缓冲区存在数据
            data = buffer;
            ... dosomething
            if(needClose(data)){
                // 关闭连接时,移除当前监听的连接
                clientFdSet.remove(clientfd)
            }
        }
        ... dosomething
        // 清空buffer
        buffer.clear();
    }
}

但是注意每个文件描述符对应的 I/O 状态查询,都必须通过一次 NIO 系统调用(readNonBlock)才能完成。 在循环中反复进行系统调用限制了这种方案的效率。

为此,Linux 又提供了 I/O 多路复用(I/O Multiplexing)的系统调用以与 NIO 系统调用配套使用。 I/O 多路复用允许一次传递许多个文件描述符进行批量的 I/O 状态查询,一次系统调用就能得到所有文件描述符的 I/O 状态。 Linux 的 I/O 多路复用有三种系统调用:select、poll、epoll。

select 系统调用本身是同步、阻塞的,当所传递的文件描述符集合中都没有就绪的 I/O 事件时,执行 select 系统调用的线程将会进入阻塞态, 直到至少一个文件描述符对应的 I/O 事件就绪,则唤醒被 select 阻塞的线程。唤醒后获得 CPU 的线程在 select 系统调用返回后可以遍历所传入的文件描述符集合,处理完成了 I/O 事件的文件描述符。

# select.select(rlist, wlist, xlist[, timeout])

# 前三个参数是由“可等待对象”组成的序列:

# rlist:待读取的文件描述符列表
# wlist:待写入的文件描述符列表
# xlist:可能产生异常的文件描述符列表

# 可选的 timeout 参数以一个浮点数表示超时秒数。当省略 timeout 参数时该函数将阻塞直到至少有一个文件描述符准备就绪。超时值为零表示执行轮询且永不阻塞。

# 返回值是三个列表,包含已就绪对象,返回的三个列表是前三个参数的子集。当超时时间已到且没有文件描述符就绪时,返回三个空列表。

import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0) # 设置非阻塞读
server.bind(('localhost', 50000))
server.listen(5)
inputs = [server]
# 线程间共享的生产者-消费者队列,IO 线程填入就绪的文件描述符,计算线程取出描述符立即读出数据并计算
global_queues = Queue.Queue()

while True:
    readable, writable, exceptional = select.select(inputs, [], inputs)
    for s in readable:
        if s is server: # 新的连接建立
            connection, client_address = s.accept()
            connection.setblocking(0) # 设置非阻塞读
            inputs.append(connection) # 加入监听的“可读文件描述符”列表 
        else: # 已有的连接可读
            global_queues.put(connection) # 不在当前线程读,以免耽误事件循环的响应速度

    for s in exceptional:
        inputs.remove(s)
        s.close()
        global_queues.remove(s)

上面的 Python 代码仅用于说明使用方法,实际上的系统调用签名如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 参数 nfds:一共需要监听的 readfds、writefds、exceptfds 中文件描述符个数 + 1
// 参数 readfds/writefds/exceptfds:需要监听读、写、异常事件的文件描述符集合
// 参数 timeout:select 是同步阻塞的,当 timeout 时间内都没有任何 I/O 事件就绪,则调用线程被唤醒并返回 (ret=0)
//               timeout 为 null 代表永久阻塞
// 返回值 ret:
//     1.返回大于 0 的整数,代表传入的 readfds/writefds/exceptfds 中共有 ret 个被激活(需要应用程序自己遍历)
//     2.返回 0,在阻塞超时前没有任何 I/O 事件就绪
//     3.返回 -1,出现错误

select 将系统调用的参数与返回值混合到了一起,readfds、writefds 与 exceptfds 在调用时是用户构造的参数, 同时调用返回后,它们的内容已被修改,用户需要遍历它们用 FD_ISSET 宏检查每一个文件描述符是否就绪。 即使只有一个就绪,也总是需要遍历所有文件描述符才能找到它,这无疑很降低效率。同时,因为内核为了安全不能相信用户空间的数据, 所以系统调用时,不能只拷贝 readfds 等的地址指针到内核堆栈完事,而是要完全拷贝指针所指的整个数组,这同样会降低性能。

#define MAXBUF 256
char buffer[MAXBUF];
int fds[5]; // 实际存放文件描述符的数组
fd_set rset; 
// 准备工作
// ......
while(1) {
	FD_ZERO(&rset);
  	for (i = 0; i < 5; i++) {
  		FD_SET(fds[i], &rset); // 在 rset 上设置要监控的文件描述符
  	}
 
	select(max+1, &rset, NULL, NULL, NULL);
 
	for(i = 0; i < 5; i++) { // 总是需要遍历所有文件描述符,即使只有一个就绪
		if (FD_ISSET(fds[i], &rset)) { // 这个就绪了
			memset(buffer, 0, MAXBUF);
			read(fds[i], buffer, MAXBUF);
			puts(buffer);
		}
	}	
  }

而且出于性能的考量,内核设置了 select 所监听文件描述符集合元素的最大数量(一般为 1024,可在内核启动时指定), 使得单次 select 所能监听的连接数受到了限制。

在此基础上,poll 系统调用解决了 select 系统调用受限于内核配置参数的限制问题,可以同时监听更多文件描述符的 I/O 状态 (但不能超过内核对当前进程所能拥有的最大文件描述符数目的限制);同时优化了接口设计,将参数与返回值进行了分离;但是 其他问题照旧存在。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// 参数 fds,要监听的 pollfd 数组集合
// 参数 nfds,需要监听的元素个数,即 fds 数组的大小
// 参数 timeout,阻塞的超时时间(传入 -1 代表永久阻塞)

// 参数 events 和返回值 revents 分开了
struct pollfd {
    int   fd;         // file descriptor 对应的文件描述符 
    short events;     // requested events 需要监听的事件
    short revents;    // returned events 返回时,就绪的事件
};
// events/revents是位图表示的

// revents & POLLIN == 1 存在就绪的读事件
// revents & POLLOUT == 1 存在就绪的写事件
// revents & POLLHUP == 1 存在对端断开连接或是通信完成事件

因为前两种系统调用的缺点,实际中用的更多的是 epoll。 epoll 直接在内核中分配内存空间来缓存被监听的文件描述符集合。通过系统调用 epoll_create,在内核中创建了一个 epoll 结构, 而在应用程序中只需要保留 epoll 结构的句柄就可对其进行访问(也是一个文件描述符)。可以动态地在内核空间的 epoll 结构中 增加、删除或更新所要监听的文件描述符以及不同的监听事件(epoll_ctl),而不必每次都全量地传递需要监听的文件描述符集合。 同时,epoll 系统调用返回时,只会将真正活跃的、完成了 I/O 事件的文件描述符返回,避免了全量的遍历。 在并发的连接数很大,但闲置连接占比很高时,epoll 的性能大大优于 select/poll 这两种 I/O 多路复用器。 epoll 的时间复杂度为 O(M),即处理的开销不随着并发连接数 N 的增加而增加,而是仅仅和监控的活跃连接数 M 相关。

而当活跃连接数占比一直超过 90% 甚至更高时,epoll 由于内部结构比较复杂,其性能比 select/poll 还要低一点

// epoll 使用时大致依赖三个系统调用

// 1. epoll_create 创建一个 epoll 结构,返回对应 epoll 的文件描述符,自 Linux 2.6.8 开始,参数 size 将被忽略,但必须大于 0 以保持兼容
    int epoll_create(int size);
// 2. epoll_ctl 控制某一 epoll 结构(epfd),向其增加/删除/更新(op)某一其它连接(fd),监控其 I/O事件(event)
//     op 有三种合法值:EPOLL_CTL_ADD 代表新增、EPOLL_CTL_MOD 代表更新、EPOLL_CTL_DEL 代表删除
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 3. epoll_wait 令某一 epoll (epfd)同步阻塞地开始监听感兴趣的 I/O 事件(events),同时指定所监听 fd 的最大个数(maxevents)与阻塞超时时间(timeout) 
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
#define MAX_EVENTS 5
#define READ_SIZE 10
#include <stdio.h>     // for printf()
#include <unistd.h>    // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
 
int main()
{
  char read_buffer[READ_SIZE + 1];
  int epoll_fd = epoll_create(1);
 
  struct epoll_event event;
  event.events |= EPOLLIN; // epoll 读事件
  event.events |= EPOLLET; // 设置边缘触发,下面会讲到
  event.data.fd = 0; // stdin
  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event); // 添加一个要监控的描述符
 
  while(1)
  {
    struct epoll_event events[MAX_EVENTS]; // 这个是返回值
    int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, 30000); // 阻塞等待 I/O 就绪
    printf("%d ready events\n", event_count);
    for(int i = 0; i < event_count; i++)
    {
      printf("Reading file descriptor '%d' -- ", events[i].data.fd);
      read(events[i].data.fd, read_buffer, READ_SIZE); // 这些文件描述符必能立刻读出数据
    }
  }
 
  close(epoll_fd)
  return 0;
}

注意 epoll 的返回还有两种触发机制:

  • 电平触发(level-trggered): 只要文件描述符关联的读(写)缓冲区非空(不满),有数据可以读取(有空间可以写入),就一直进行通知
  • 边缘触发(edge-triggered): 只有当文件描述符关联的读(写)缓冲区由空转化为非空(由满转化为不满)的时候,发出通知

参考资料:

“小熊餐馆” 的文章

《Unix 环境高级编程(第三版)》第十四章

《Linux 高性能服务器编程》

《Linux 多线程服务端编程》

如果你喜欢我的文章,请我吃根冰棒吧 (o゜▽゜)o ☆

contribution

最后附上 GitHub:https://github.com/gonearewe