这是我看了《Redis 设计与实现》后一时兴起决定做的项目。它会帮助我熟悉高性能 IO、持久化机制甚至分布式算法。
高性能数据库少不了高性能的 I/O 模块。 高性能的 I/O 牵扯到特定的系统调用,而一般我们的服务器都使用 Linux 系统,所以下面也只讨论 Linux。
I/O 按其是否阻塞调用者可分为阻塞 IO(Blocking I/O,简称 BIO)与非阻塞 IO(NonBlocking I/O,简称 NIO)。 阻塞 I/O 在无法从缓冲区获得数据时会持续等待,而非阻塞 I/O 则始终立刻返回。下面的例子来自《Unix 环境高级编程(第三版)》 的第十四章:
#include "apue.h"
#include <errno.h>
#include <fcntl.h>
char buf[5000];
int main(void) {
int ntowrite, nwrite;
char *ptr;
ntowrite = read(STDIN_FILENO, buf, sizeof(buf)); // 阻塞读,等待终端输入
fprintf(stderr, "read %d bytes\n", ntowrite);
set_fl(STDOUT_FILENO, O_NONBLOCK); // 设置非阻塞写
ptr = buf;
while (ntowrite > 0) {
errno = 0;
nwrite = write(STDOUT_FILENO, ptr, ntowrite); // write 会立即返回
fprintf(stderr, "nwrite = %d, errno = %d\n", nwrite, errno); // 未读到数据时,errno 不为 0
if (nwrite > 0) { // 读到了 nwrite 个字节的数据
ptr += nwrite;
ntowrite -= nwrite;
}
}
clr_fl(STDOUT_FILENO, O_NONBLOCK);
exit(0);
}
对于一个给定的描述符,有两种为其指定 NIO 的方法。
- 在用 open 系统调用打开时,指定 O_NONBLOCK 标志
- 对于已打开的描述符,调用 fcntl 打开 O_NONBLOCK 标志
注意阻塞 I/O 并不会导致忙等待而浪费 CPU 时间,操作系统会挂起阻塞的线程,调度其他线程执行。 那除了编程风格的不同,它们还有什么区别吗?当然有。我们程序的处理流程一般是读 I/O、计算、写 I/O。 为了提高程序并发性,同时服务更多的客户,我们需要同时进行大量的这个相同的流程。
一种直觉的思路是引入多线程,
所有的线程执行相同的流程。但是多线程不是银弹(Silver Bullet)。当线程数量与 CPU 核心数相当时,多线程可以
被调度到不同的核心上并行进行;而线程数远大于核心数时,会出现频繁的线程调度,线程数越多,上下文切换所花时间占 CPU 总时间
的比例越高(即效率越低)。参考陈硕的经验公式,设密集计算所占的时间比重为 P(0<P≤1)
,而系统一共有 C 个 CPU,
为了让这 C 个 CPU 跑满而又不过载,线程池大小的经验公式 T=C/P
。考虑到 P 值的估计不是很准确,T 的最佳值可以上下浮动 50%。
那么对于完全的计算密集型程序,线程数应等于核心数,更多的线程反而会降低效率,拖慢程序;考虑我们上面的处理流程,
最匹配的线程数也不会太大,距离动辄几万的并发数相距遥远。一种改进的方法是让线程变得更轻量化,这样一来,我们就能低开销地创建大量
线程了,参考 Go 的协程;但是这不是这篇文章的重点。
那么在有限的线程数下要如何提高并发数呢。现在非阻塞 I/O 可以派上用场了。我们可以用一个线程来专门处理 I/O,其他线程则只处理计算。 I/O 线程可以同时轮询多个 I/O,把就绪的 I/O 加入计算线程池的处理队列中。
// 创建TCP套接字并绑定端口8888,进行服务监听
listenfd = serverSocket(8888,"tcp");
clientFdSet = empty_set();
while(true){ // 开启事件监听循环
// accept同步非阻塞调用,判断是否接收了新的连接
newfd = acceptNonBlock(listenfd);
if(newfd != EMPTY){
// 如果存在新连接将其加入监听连接集合
clientFdSet.add(newfd);
}
// 申请一个1024字节的缓冲区
buffer = new buffer(1024);
for(clientfd in clientFdSet){
// 非阻塞read读
num = readNonBlock(clientfd,buffer);
if(num > 0){
// 读缓冲区存在数据
data = buffer;
... dosomething
if(needClose(data)){
// 关闭连接时,移除当前监听的连接
clientFdSet.remove(clientfd);
}
}
... dosomething
// 清空buffer
buffer.clear();
}
}
但是注意每个文件描述符对应的 I/O 状态查询,都必须通过一次 NIO 系统调用(readNonBlock)才能完成。 在循环中反复进行系统调用限制了这种方案的效率。
为此,Linux 又提供了 I/O 多路复用(I/O Multiplexing)的系统调用以与 NIO 系统调用配套使用。 I/O 多路复用允许一次传递许多个文件描述符进行批量的 I/O 状态查询,一次系统调用就能得到所有文件描述符的 I/O 状态。 Linux 的 I/O 多路复用有三种系统调用:select、poll、epoll。
select 系统调用本身是同步、阻塞的,当所传递的文件描述符集合中都没有就绪的 I/O 事件时,执行 select 系统调用的线程将会进入阻塞态, 直到至少一个文件描述符对应的 I/O 事件就绪,则唤醒被 select 阻塞的线程。唤醒后获得 CPU 的线程在 select 系统调用返回后可以遍历所传入的文件描述符集合,处理完成了 I/O 事件的文件描述符。
# select.select(rlist, wlist, xlist[, timeout])
# 前三个参数是由“可等待对象”组成的序列:
# rlist:待读取的文件描述符列表
# wlist:待写入的文件描述符列表
# xlist:可能产生异常的文件描述符列表
# 可选的 timeout 参数以一个浮点数表示超时秒数。当省略 timeout 参数时该函数将阻塞直到至少有一个文件描述符准备就绪。超时值为零表示执行轮询且永不阻塞。
# 返回值是三个列表,包含已就绪对象,返回的三个列表是前三个参数的子集。当超时时间已到且没有文件描述符就绪时,返回三个空列表。
import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0) # 设置非阻塞读
server.bind(('localhost', 50000))
server.listen(5)
inputs = [server]
# 线程间共享的生产者-消费者队列,IO 线程填入就绪的文件描述符,计算线程取出描述符立即读出数据并计算
global_queues = Queue.Queue()
while True:
readable, writable, exceptional = select.select(inputs, [], inputs)
for s in readable:
if s is server: # 新的连接建立
connection, client_address = s.accept()
connection.setblocking(0) # 设置非阻塞读
inputs.append(connection) # 加入监听的“可读文件描述符”列表
else: # 已有的连接可读
global_queues.put(connection) # 不在当前线程读,以免耽误事件循环的响应速度
for s in exceptional:
inputs.remove(s)
s.close()
global_queues.remove(s)
上面的 Python 代码仅用于说明使用方法,实际上的系统调用签名如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 参数 nfds:一共需要监听的 readfds、writefds、exceptfds 中文件描述符个数 + 1
// 参数 readfds/writefds/exceptfds:需要监听读、写、异常事件的文件描述符集合
// 参数 timeout:select 是同步阻塞的,当 timeout 时间内都没有任何 I/O 事件就绪,则调用线程被唤醒并返回 (ret=0)
// timeout 为 null 代表永久阻塞
// 返回值 ret:
// 1.返回大于 0 的整数,代表传入的 readfds/writefds/exceptfds 中共有 ret 个被激活(需要应用程序自己遍历)
// 2.返回 0,在阻塞超时前没有任何 I/O 事件就绪
// 3.返回 -1,出现错误
select 将系统调用的参数与返回值混合到了一起,readfds、writefds 与 exceptfds 在调用时是用户构造的参数, 同时调用返回后,它们的内容已被修改,用户需要遍历它们用 FD_ISSET 宏检查每一个文件描述符是否就绪。 即使只有一个就绪,也总是需要遍历所有文件描述符才能找到它,这无疑很降低效率。同时,因为内核为了安全不能相信用户空间的数据, 所以系统调用时,不能只拷贝 readfds 等的地址指针到内核堆栈完事,而是要完全拷贝指针所指的整个数组,这同样会降低性能。
#define MAXBUF 256
char buffer[MAXBUF];
int fds[5]; // 实际存放文件描述符的数组
fd_set rset;
// 准备工作
// ......
while(1) {
FD_ZERO(&rset);
for (i = 0; i < 5; i++) {
FD_SET(fds[i], &rset); // 在 rset 上设置要监控的文件描述符
}
select(max+1, &rset, NULL, NULL, NULL);
for(i = 0; i < 5; i++) { // 总是需要遍历所有文件描述符,即使只有一个就绪
if (FD_ISSET(fds[i], &rset)) { // 这个就绪了
memset(buffer, 0, MAXBUF);
read(fds[i], buffer, MAXBUF);
puts(buffer);
}
}
}
而且出于性能的考量,内核设置了 select 所监听文件描述符集合元素的最大数量(一般为 1024,可在内核启动时指定), 使得单次 select 所能监听的连接数受到了限制。
在此基础上,poll 系统调用解决了 select 系统调用受限于内核配置参数的限制问题,可以同时监听更多文件描述符的 I/O 状态 (但不能超过内核对当前进程所能拥有的最大文件描述符数目的限制);同时优化了接口设计,将参数与返回值进行了分离;但是 其他问题照旧存在。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// 参数 fds,要监听的 pollfd 数组集合
// 参数 nfds,需要监听的元素个数,即 fds 数组的大小
// 参数 timeout,阻塞的超时时间(传入 -1 代表永久阻塞)
// 参数 events 和返回值 revents 分开了
struct pollfd {
int fd; // file descriptor 对应的文件描述符
short events; // requested events 需要监听的事件
short revents; // returned events 返回时,就绪的事件
};
// events/revents是位图表示的
// revents & POLLIN == 1 存在就绪的读事件
// revents & POLLOUT == 1 存在就绪的写事件
// revents & POLLHUP == 1 存在对端断开连接或是通信完成事件
因为前两种系统调用的缺点,实际中用的更多的是 epoll。
epoll 直接在内核中分配内存空间来缓存被监听的文件描述符集合。通过系统调用 epoll_create,在内核中创建了一个 epoll 结构,
而在应用程序中只需要保留 epoll 结构的句柄就可对其进行访问(也是一个文件描述符)。可以动态地在内核空间的 epoll 结构中
增加、删除或更新所要监听的文件描述符以及不同的监听事件(epoll_ctl),而不必每次都全量地传递需要监听的文件描述符集合。
同时,epoll 系统调用返回时,只会将真正活跃的、完成了 I/O 事件的文件描述符返回,避免了全量的遍历。
在并发的连接数很大,但闲置连接占比很高时,epoll 的性能大大优于 select/poll 这两种 I/O 多路复用器。
epoll 的时间复杂度为 O(M)
,即处理的开销不随着并发连接数 N
的增加而增加,而是仅仅和监控的活跃连接数 M
相关。
而当活跃连接数占比一直超过 90% 甚至更高时,epoll 由于内部结构比较复杂,其性能比 select/poll 还要低一点
// epoll 使用时大致依赖三个系统调用
// 1. epoll_create 创建一个 epoll 结构,返回对应 epoll 的文件描述符,自 Linux 2.6.8 开始,参数 size 将被忽略,但必须大于 0 以保持兼容
int epoll_create(int size);
// 2. epoll_ctl 控制某一 epoll 结构(epfd),向其增加/删除/更新(op)某一其它连接(fd),监控其 I/O事件(event)
// op 有三种合法值:EPOLL_CTL_ADD 代表新增、EPOLL_CTL_MOD 代表更新、EPOLL_CTL_DEL 代表删除
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 3. epoll_wait 令某一 epoll (epfd)同步阻塞地开始监听感兴趣的 I/O 事件(events),同时指定所监听 fd 的最大个数(maxevents)与阻塞超时时间(timeout)
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
#define MAX_EVENTS 5
#define READ_SIZE 10
#include <stdio.h> // for printf()
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
int main()
{
char read_buffer[READ_SIZE + 1];
int epoll_fd = epoll_create(1);
struct epoll_event event;
event.events |= EPOLLIN; // epoll 读事件
event.events |= EPOLLET; // 设置边缘触发,下面会讲到
event.data.fd = 0; // stdin
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event); // 添加一个要监控的描述符
while(1)
{
struct epoll_event events[MAX_EVENTS]; // 这个是返回值
int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, 30000); // 阻塞等待 I/O 就绪
printf("%d ready events\n", event_count);
for(int i = 0; i < event_count; i++)
{
printf("Reading file descriptor '%d' -- ", events[i].data.fd);
read(events[i].data.fd, read_buffer, READ_SIZE); // 这些文件描述符必能立刻读出数据
}
}
close(epoll_fd)
return 0;
}
注意 epoll 的返回还有两种触发机制:
- 电平触发(level-trggered): 只要文件描述符关联的读(写)缓冲区非空(不满),有数据可以读取(有空间可以写入),就一直进行通知
- 边缘触发(edge-triggered): 只有当文件描述符关联的读(写)缓冲区由空转化为非空(由满转化为不满)的时候,发出通知
参考资料:
《Unix 环境高级编程(第三版)》第十四章
《Linux 高性能服务器编程》
《Linux 多线程服务端编程》
如果你喜欢我的文章,请我吃根冰棒吧 (o゜▽゜)o ☆
最后附上 GitHub:https://github.com/gonearewe