Nginx源码分析之--网络IO

操作系统提供了recv, send等接口。网络IO时,Nginx源码是如何封装使用的呢?

网络IO操作方法集合

ngx_os.h中定义了如下struct,表示网络IO操作方法集合类型。代码如下:

typedef struct {
    ngx_recv_pt        recv;  //接收字节数据函数指针
    ngx_recv_chain_pt  recv_chain;
    ngx_recv_pt        udp_recv;
    ngx_send_pt        send;  //发送字节数据函数指针
    ngx_send_pt        udp_send;
    ngx_send_chain_pt  udp_send_chain;
    ngx_send_chain_pt  send_chain;
    ngx_uint_t         flags;
} ngx_os_io_t;   //网络IO操作方法集合结构体定义,不同的platform实现不同

本质是一组函数指针集合,由具体platform相关的代码自行实现函数定义。

ngx_recv_pt类型

//从connection读取字节数据
typedef ssize_t (*ngx_recv_pt)(ngx_connection_t *c, u_char *buf, size_t size); 

ngx_recv_chain_pt类型

//从connection读取ngx_chain_t(buf链表),即批量读取
typedef ssize_t (*ngx_recv_chain_pt)(ngx_connection_t *c, ngx_chain_t *in, off_t limit);

ngx_send_pt类型

//向connection发送字节数据
typedef ssize_t (*ngx_send_pt)(ngx_connection_t *c, u_char *buf, size_t size);

ngx_send_chain_pt类型

//向connection批量发送字节数据
typedef ngx_chain_t *(*ngx_send_chain_pt)(ngx_connection_t *c, ngx_chain_t *in, off_t limit);

不同的platform实现

Linux系统定义ngx_os_io_t实例

static ngx_os_io_t ngx_linux_io = {
    ngx_unix_recv,
    ngx_readv_chain,
    ngx_udp_unix_recv,
    ngx_unix_send,
    ngx_udp_unix_send,
    ngx_udp_unix_sendmsg_chain,
#if (NGX_HAVE_SENDFILE)
    ngx_linux_sendfile_chain,
    NGX_IO_SENDFILE
#else
    ngx_writev_chain,
    0
#endif
};

ngx_linux_init.c文件初始化了一个static变量ngx_linux_io,ngx_linux_io在同一文件签名为ngx_os_specific_init的函数中会被赋值给程序全局变量ngx_os_io.

ngx_unix_recv实现ngx_recv_pt类型

ngx_recv.c文件中实现了连接的接收字节数据方法,内部调用了系统recv API。加了少量注释的代码如下:

ssize_t
ngx_unix_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
    ssize_t       n;
    ngx_err_t     err;
    ngx_event_t  *rev;

    rev = c->read;  //连接上的可读事件

#if (NGX_HAVE_KQUEUE)

    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
                       "recv: eof:%d, avail:%d, err:%d",
                       rev->pending_eof, rev->available, rev->kq_errno);

        if (rev->available == 0) {
            if (rev->pending_eof) {
                rev->ready = 0;
                rev->eof = 1;

                if (rev->kq_errno) {
                    rev->error = 1;
                    ngx_set_socket_errno(rev->kq_errno);

                    return ngx_connection_error(c, rev->kq_errno,
                               "kevent() reported about an closed connection");
                }

                return 0;

            } else {
                rev->ready = 0;
                return NGX_AGAIN;
            }
        }
    }

#endif

#if (NGX_HAVE_EPOLLRDHUP)

    if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                       "recv: eof:%d, avail:%d",
                       rev->pending_eof, rev->available);

        if (!rev->available && !rev->pending_eof) {
            rev->ready = 0;
            return NGX_AGAIN;
        }
    }

#endif

    do {
        n = recv(c->fd, buf, size, 0);  //尝试接收size大小的字节数据

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
                       "recv: fd:%d %z of %uz", c->fd, n, size);

        if (n == 0) { //接收到的字节数为0,表示EOF,直接返回读取到的字节数为0
            rev->ready = 0;
            rev->eof = 1;

#if (NGX_HAVE_KQUEUE)

            /*
             * on FreeBSD recv() may return 0 on closed socket
             * even if kqueue reported about available data
             */

            if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                rev->available = 0;
            }

#endif

            return 0;
        }

        if (n > 0) { // 读取到的字节数不为0,返回读取到的字节数n

#if (NGX_HAVE_KQUEUE)

            if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                rev->available -= n;

                /*
                 * rev->available may be negative here because some additional
                 * bytes may be received between kevent() and recv()
                 */

                if (rev->available <= 0) {
                    if (!rev->pending_eof) {
                        rev->ready = 0;
                    }

                    rev->available = 0;
                }

                return n;
            }

#endif

#if (NGX_HAVE_EPOLLRDHUP)

            if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
                && ngx_use_epoll_rdhup)
            {
                if ((size_t) n < size) {
                    if (!rev->pending_eof) {
                        rev->ready = 0;
                    }

                    rev->available = 0;
                }

                return n;
            }

#endif

            if ((size_t) n < size
                && !(ngx_event_flags & NGX_USE_GREEDY_EVENT))
            {
                rev->ready = 0;
            }

            return n;
        }

        //接收字节数失败(即n等于-1)的处理,对err赋值,检测具体的err
        err = ngx_socket_errno;

        if (err == NGX_EAGAIN || err == NGX_EINTR) {
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
                           "recv() not ready");
            n = NGX_AGAIN; //err为EAGAIN时,函数返回-2,由调用者控制重试逻辑(一般超时前再次加入事件监听)

        } else {
            n = ngx_connection_error(c, err, "recv() failed"); //连接没有被reset的话,n被赋值为NGX_ERROR
            break;
        }

    } while (err == NGX_EINTR); //EINTR: 由于信号中断未能读到任何数据,则继续下一次循环读取

    rev->ready = 0;

    if (n == NGX_ERROR) {
        rev->error = 1;
    }

    return n;
}

ngx_unix_send实现ngx_send_pt类型

ngx_send.c文件中实现了连接的接收字节数据方法,内部调用了系统send API。加了少量注释的代码如下:

ssize_t
ngx_unix_send(ngx_connection_t *c, u_char *buf, size_t size)
{
    ssize_t       n;
    ngx_err_t     err;
    ngx_event_t  *wev;

    wev = c->write; //连接上的可写事件

#if (NGX_HAVE_KQUEUE)

    if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) {
        (void) ngx_connection_error(c, wev->kq_errno,
                               "kevent() reported about an closed connection");
        wev->error = 1;
        return NGX_ERROR;
    }

#endif

    for ( ;; ) {
        n = send(c->fd, buf, size, 0); //尝试向连接写入size个字节数据

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
                       "send: fd:%d %z of %uz", c->fd, n, size);

        if (n > 0) { //成功写入n个字节数据,函数返回n
            if (n < (ssize_t) size) {
                wev->ready = 0; //实际写入字节数比预期的少,置ready为0(fd内核缓冲区满)
            }

            c->sent += n;

            return n;
        }

        err = ngx_socket_errno;

        if (n == 0) { //写入字节数为0
            ngx_log_error(NGX_LOG_ALERT, c->log, err, "send() returned zero");
            wev->ready = 0;
            return n;
        }

        // 以下为send返回-1时的处理
        if (err == NGX_EAGAIN || err == NGX_EINTR) {
            wev->ready = 0;

            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
                           "send() not ready");

            if (err == NGX_EAGAIN) {
                return NGX_AGAIN; //函数返回,交给调用者决定重试
            }
            // 如果err为NGX_EINTR(信号中断导致send返回-1),继续下一次循环尝试写入
        } else {
            wev->error = 1;
            (void) ngx_connection_error(c, err, "send() failed");
            return NGX_ERROR; //除了EGAGIN、EINTR信号外,其他返回失败-1
        }
    }
}

其他函数指针类型实例

省略

其他系统

darwin,freebsd,solaris等类Unix平台,像Linux系统一样,都定义了ngx_os_io实例,各自也在
ngx_os_specific_init函数中将ngx_os_io实例值赋值给了全局变量ngx_os_io.

全局变量ngx_os_io

ngx_os_io表示平台无关的IO操作集合,不同的系统有不同的实现(类似工厂模式),即都实现了ngx_os_specific_init方法。那么具体使用哪种实现的ngx_os_specific_init方法呢?
这是在./configure中auto/sources,auto/os/conf, auto/os/linux等config脚本判定的。简单说,就是根据platform决定采用LINUX_SRCS源码文件还是其他SRCS来编译。

全局变量ngx_io

event事件模块中,ngx_os_io被赋值给ngx_io. 存疑:目前没看到源码ngx_io被赋值为其他值,那么为啥需要再分配一个变量ngx_io呢?

使用ngx_io网络IO

宏定义

#define ngx_recv             ngx_io.recv //连接接收字节数据方法
#define ngx_recv_chain       ngx_io.recv_chain
#define ngx_udp_recv         ngx_io.udp_recv
#define ngx_send             ngx_io.send //连接发送字节数据方法
#define ngx_send_chain       ngx_io.send_chain
#define ngx_udp_send         ngx_io.udp_send
#define ngx_udp_send_chain   ngx_io.udp_send_chain

在ngx_event.h中定义了如上一组宏。通过前面的分析可以知道:ngx_recv宏的即是ngx_unix_recv了(类Unixx系统)。

连接IO

struct ngx_connection_s {
    ...
    ngx_socket_t   fd;
    ngx_recv_pt    recv;
    ngx_send_pt    send;
    ...
};

如上,通过对connection的recv字段赋值为ngx_recv宏,就相当于调用ngx_unix_recv了。当然根据连接类型,可能需要改写conncetion的recv字段, 如:udp连接读数据会被赋值为ngx_udp_recv, ssl业务连接会被赋值为ngx_ssl_recv。

总结

如上分析了Nginx封装、使用网络连接IO的代码。可以看出具有跨平台、高内聚的特点,代码非常nice,值得学习。