原创分享 从源代码角度看 epoll 在 Go 中的使用(一)

nobodynoname1943 · 2020年02月22日 · 最后由 astaxie 回复于 2020年02月23日 · 1367 次阅读
本帖已被设为精华帖!

Go 提供了功能完备的标准网络库:net包,net包的实现相当之全面,http\tcp\udp 均有实现且对用户提供了简单友好的使用接口。在 Linux 系统上 Go 使用了 epoll 来实现 net 包的核心部分,本文从用户接口层入手,分析 Go 在 Linux 平台上的 epoll 使用,文中若有不当之处请指出。

对于服务端程序而言,主要流程是 Listen->Accept->Send/Write,客户端主要流程 Connect->Send/Write,本文以这两个流程深入分析net包在 Go 中是如何实现的。

Listen

监听方法是在 ListenConfig 结构中的 Listen 方法实现的 (net/dial.go):

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    // ...
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    }

    return l, nil
}

Listen函数实现中,两个关键流程是DefaultResolver.resolveAddrListlistenTCP

  • DefaultResolver.resolveAddrList是根据协议名称和地址取得 Internet 协议族地址列表,由于resolveAddrList的代码比较固定,在此不做详细解释,感兴趣的童鞋可以去翻阅。
  • listenTCPlistenUnix从地址列表中取得满足条件的地址进行实际监听操作, 具体根据传入的协议族来确定。

接下来看看listenTCP的代码 (net/tcpsock_posix.go):

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

创建监听 socket fd 是在 internetSocket 中进行的,而 socket fs 是通过 socket 函数创建的 (net/sock_posix.go):

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 调用各平台对应的socket api创建socket
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    // 设置socket选项
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    // 创建fd
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }

    // 监听
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // TCP
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            // UDP
            if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        }
    }
    // 发起连接,非listen socket会走到这里来
    if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    return fd, nil
}

socket函数主要流程:新建 socket-->设置 socket option-->创建 fd-->进入监听逻辑。sysSocket根据平台有不同实现,windows 实现在socket_windows.go中,linux 实现则在sock_cloexec.go中,本文重点分析在 linux 平台上的实现 (net/sock_cloexec.go):

func sysSocket(family, sotype, proto int) (int, error) {
    // 系统socket函数
    s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
    switch err {
    case nil:
        return s, nil
    default:
        return -1, os.NewSyscallError("socket", err)
    case syscall.EPROTONOSUPPORT, syscall.EINVAL:
    }

    // linux内核版本低于2.6.27时,代码会走到这里,下面的内容主要是防止在fork时候导致描述符泄露
    // 实际上手动实现简易版SOCK_CLOEXEC
    syscall.ForkLock.RLock()
    // do other things...
}

socketFunc创建了 socket,通知将 socket 设置非阻塞(SOCK_NONBLOCK)以及 fork 时关闭(SOCK_CLOEXEC),这两个标志是在 linux 内核版本 2.6.27 之后添加,在此之前的版本代码将会走到syscall.ForkLock.RLock(),主要是为了防止在 fork 时导致文件描述符泄露。

当 socket 创建之后进入新建 fd 流程,在 Go 的包装层面,fd 均以netFD结构表示,该接口描述原始 socket 的地址信息、协议类型、协议族以及 option,netFD在整个包装结构中居于用户接口的下一层。最后进入监听逻辑,逻辑走向区分 TCP 和 UDP,而监听逻辑比较简单,即调用系统 bind 和 listen 接口 (net/sock_posix.go):

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    // ...

    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

listenStream主要做了以下几件事:

  1. 检查未完成连接和已完成连接两个队列是否超出系统预设。
  2. 调用 socket bind 接口。
  3. 调用 socket listen 接口。
  4. 初始化 fd。
  5. 调用 socket getsockname 接口。

以上流程和日常写 socket 代码流程并无太大差异,唯有第 4 流程不同,第 4 流程是与底层的netpoll交互。

Linux 平台上,系统提供了五种 IO 模型:阻塞 IO、非阻塞 IO、IO 多路复用、信号驱动 IO 和异步 IO,对应到内核层面提供的用户接口即 select、poll 和 epoll。Go net 包是基于 epoll 进行封装的,基本模型结合了 epoll 和 Go 语言的优势:epoll+goroutine,这样达到异步且高并发。

回到源代码上,fd.init()完成网络轮询器初始化操作,开始与更底层的封装打交道,最底层的封装是 epoll 调用 (runtime/netpoll_epoll.go):

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    println("runtime: epollcreate failed with", -epfd)
    throw("runtime: netpollinit failed")
}

epollcreate创建了 epoll handle 并设置为CLOEXEC属性,此处是 epoll handle 的创建,netpollinit之后调用 (runtime/netpoll_epoll.go):

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    // 可读,可写,对端断开,边缘触发
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    // 存放user data,后面读写均会用到pollDesc
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    // 注册epoll事件
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

由上可见,调用epoll_ctl完成 epoll 监听事件注册,_EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET为所关心事件,具体含义可以查看 epoll 手册。

Accept

Listen成功之后返回TCPListener对象,手动调用Accept进入监听状态,最终会走到与 epoll 交互流程:
TCPListener.Accept-->TCPListener.accept-->netFD.accept-->FD.Accept

从这里开始,进入到与pollDesc交互的地方 (internal/poll/fd_unix.go):

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            continue
        }
        return -1, nil, errcall, err
    }
}

fd.pd.prepareRead检查当前 fd 是否允许 accept,实际上是检查更底层的pollDesc是否可读。检查完毕之后,尝试调用accept获取已连接的 socket,注意此代码在 for 循环内,说明Accept是阻塞的,直到有连接进来;当遇到EAGINECONNABORTED错误会重试,其他错误都抛给更上层。

fd.pd.waitRead阻塞等待fd是否可读,即是否有新连接进来,最终进入到runtime.poll_runtime_pollWait里 (runtime/netpoll.go),在解释poll_runtime_pollWait代码之前,先来看看最重要的结构:

type pollDesc struct {
    // ...
    rg      uintptr
    wg      uintptr
    // ...
}

const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

pollDesc是与 epoll 交互最重要的结构之一,可以理解为与 epoll 之间的桥梁,其中rgwg为状态信号量,可能的值为pdReadypdWait、等待文件描述符可读或者可写的 goroutine 地址以及nil(0)。

可能出现的情况:

  • 当值为 pdRead 时,代表网络 IO 就绪,处理完之后应该设置为 nil。
  • 当值为 pdWait 时,即等待被挂起(现在并未被挂起)。后面可能出现的情况是:
    • goroutine 被挂起并设置为 goroutine 的地址;
    • 收到了 IO 通知就绪;
    • 超时或者被关闭设置为 nil。

接下来看看poll_runtime_pollWait代码:

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // ...

    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // (1)
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // (2)
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // (3)
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }

    // (4)
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait等待 fd 可读,这里最重要的逻辑在netpollblock函数里完成(根据代码中注释序号依次解释):
(1) 根据mode获取对应的信号量地址 gpp,判断当前是否pdReady
(2) 这段代码的逻辑是当gpp的值如果等于 0 时,将gpp的值更替为pdWait,该操作属于原子操作且内部实现了自旋锁。
(3) 当值为pdWait之后,防止此时可能会有其他的并发操作修改 pd 里的内容,所以需要再次检查错误状态。gopark将当前 goroutine 置于等待状态并等待下一次的调度,但gopark仍有可能因为超时或者关闭会立即返回,由于gopark涉及到 goroutine 调度,在此不做赘述。
(4) 通过原子操作将gpp的值设置为 0,返回修改前的值并判断是否pdReady

至此,FD.Accept结束返回,之后的操作与前面Listen类似,创建 netFD、初始化 netFD、创建 TCPConn 对象。

小结

通过上面流程化的跟踪,发现netFDFDpollDesc在这个过程中占据非常重要的位置,小结内容将会着重分析这几个结构,目的在于解构封装层次。

netFD

netFD包含在conn结构中,而conn又包含在TCPConn结构中,由此可见netFD处于用户接口层下面。

type netFD struct {
    pfd poll.FD

    family      int
    sotype      int
    isConnected bool
    net         string
    laddr       Addr
    raddr       Addr
}

netFD比较简单,只有一些基本的 socket 信息,pfd是其下一层,用户层接口的调用会进入到pfd中。

FD

type FD struct {
    // 读写锁
    fdmu fdMutex
    // 系统文件描述符
    Sysfd int
    // I/O poller.
    pd pollDesc
    // 用于在一次函数调用中读、写多个非连续缓冲区,这里主要是写
    iovecs *[]syscall.Iovec
    // 关闭文件时的信号量
    csema uint32
    // 如果此文件已设置为阻止模式,则为非零值
    isBlocking uint32
    // TCP或UDP
    IsStream bool
    // 读取到0字节时是否为错误,对于基于消息的基础socket而言为false
    ZeroReadIsEOF bool
    // 是否系统中真实文件还是socket连接
    isFile bool
}

FD是 Go 中通用的文件描述符类型,net 包和 os 包用FD来表示网络连接或者文件,FD提供了用户接口层到 runtime 之间逻辑处理。此处的 pollDesc 是poll.pollDesc而非runtime.pollDesc, poll.pollDescinternal/poll/fd_poll_runtime.go中实现了与 runtime 交互的接口。

runtime.pollDesc

type pollDesc struct {
    // 存放pollDesc,全局
    link *pollDesc

    // lock保护pollOpen, pollSetDeadline, pollUnblock and deadlineimpl等并发操作
    // 以上操作包括了seq、rt和wt变量,fd在pollDesc生命周期内恒定,其他变量均以无锁方式
    lock    mutex
    fd      uintptr
    // 关闭标记,一般主动关闭或者超时
    closing bool
    // 是否标记事件扫描错误
    everr   bool    
    user    uint32 
    // fd被重用或者读计时器被重置
    rseq    uintptr 
    // 读信号量,值可能为pdRead、pdWait、goroutine地址或nil(0)
    rg      uintptr 
    // 读的等待过期时间
    rt      timer   
    rd      int64   
    // fd被重用或者写计时器被重置
    wseq    uintptr
    /// 写信号量,值可能为pdRead、pdWait、goroutine地址或nil(0)
    wg      uintptr
    // 写的等待过期时间
    wt      timer  
    wd      int64   
}

pollDesc是抽象实现,它将 epoll、kqueue、iocp 等方式抽象统一,规定了各个平台实现的接口规范,即netpoll

现在我们大致清楚了 epoll 在 Go 中的封装结构:netFD将接口逻辑转发到FDFD提供了用户接口层到 runtime 之间的逻辑处理,且FD是通用抽象逻辑,适用于文件和网络连接;poll.pollDesc抽象了与 runtime 交互的接口和逻辑,而 epoll 的逻辑则被拆分到runtime/netpoll.goruntime/netpoll_epoll.go中,整个结构渐渐式,分工责任明确,大致层次结构如下:

有了上面的层次结构图基础,ReadWrite的流程就比较简单了,Read的调用链:conn.Read-->netFD.Read-->FD.ReadWrite调用链类似,ReadWrite的底层实现在internal/poll/fd_unix.go文件中,有兴趣可以翻阅。

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
astaxie 将本帖设为了精华贴 02月23日 10:48
lwhile GoCN 每日新闻 (2020-02-23) 中提及了此贴 02月23日 12:30

👍,这个源码可以做成一个系列呢,非常赞

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册