小小西 發表於 2025-12-23 11:13:46

Golang中NetPoll机制的实现

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">Linux 网络IO</a></li><li><a href="#_label1">Linux 非阻塞IO Select、Poll、Epoll</a></li><li><a href="#_label2">Golang 中Epoll 应用</a></li><ul class="second_class_ul"><li><a href="#_lab2_2_0">一个简单的网络IO</a></li><li><a href="#_lab2_2_1">NetPoll的调度</a></li></ul></ul></div><p class="maodian"><a name="_label0"></a></p><h2>Linux 网络IO</h2>
<p>Linux 的阻塞网络 I/O (输入/输出) 是指在进行网络操作(如 read() 或 write())时,如果操作无法立即完成,调用线程将被操作系统&ldquo;阻塞&rdquo;,直到操作成功或失败才返回。它属于同步 I/O 模型的一种,与之相对的是非阻塞 I/O。</p>
<div class="jb51code"><pre class="brush:go;">int listenfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定到所有可用接口
server_addr.sin_port = htons(8080); // 绑定到 8080 端口
bind(listenfd, (struct sockaddr *)&amp;server_addr, sizeof(server_addr));

listen(listenfd, 5); // 允许 5 个待处理的连接

// 阻塞等待
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connectfd = accept(listenfd, (struct sockaddr *)&amp;client_addr, &amp;client_addr_len);
</pre></div>
<p class="maodian"><a name="_label1"></a></p><h2>Linux 非阻塞IO Select、Poll、Epoll</h2>
<p><strong>核心机制</strong>:这些系统调用让用户进程在调用前,能<strong>预先指定一系列要监控的文件描述符</strong>。当一个或多个文件描述符准备好进行读写操作时,系统会通知用户进程。</p>
<p><strong>轮询与事件驱动</strong>:<br />select 和 poll: 它们采用轮询方式,每次都检查所有传入的文件描述符集合来判断哪些是活跃的。</p>
<p><strong>(1)select==&gt;时间复杂度O(n)</strong></p>
<p>它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。</p>
<p><strong>(2)poll==&gt;时间复杂度O(n)</strong></p>
<p>poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的.</p>
<p><strong>(3)epoll==&gt;时间复杂度O(1)</strong></p>
<p>epoll: 它采用事件驱动模型,通过内核的回调机制,只在需要时通知用户进程,效率更高,尤其是在处理大量并发连接时。<strong>epoll 机制通过在内核中维护一个就绪列表,当数据到达时,将对应的节点加入就绪列表,然后唤醒等待的用户进程,从而避免了不必要的轮询</strong>。</p>
<p class="maodian"><a name="_label2"></a></p><h2>Golang 中Epoll 应用</h2>
<p class="maodian"><a name="_lab2_2_0"></a></p><h3>一个简单的网络IO</h3>
<div class="jb51code"><pre class="brush:go;">// 启动 tcp server 代码示例
func main() {
        /*
                - 创建 tcp 端口监听器
                  - 创建 socket fd,bind、accept
                        - 创建 epoll 事件表(epoll_create)
                        - socket fd 注册到 epoll 事件表(epoll_ctl:add)

        */
        l, _ := net.Listen("tcp", ":8080")
        // eventloop reactor 模型
        for {
                /*
                        - 等待 tcp 连接到达
                          - loop + 非阻塞模式调用 accept
                                - 若未就绪,则通过 gopark 进行阻塞
                                - 等待 netpoller 轮询唤醒
                                     - 检查是否有 io 事件就绪(epoll_wait——nonblock)
                                       - 若发现事件就绪 通过 goready 唤醒 g
                                - accept 获取 conn fd 后注册到 epoll 事件表(epoll_ctl:add)
                                - 返回 conn
                */
                conn, _ := l.Accept()
                // goroutine per conn
                go serve(conn)
        }
}

// 处理一笔到来的 tcp 连接
func serve(conn net.Conn) {
        /*
                - 关闭 conn
                   - 从 epoll 事件表中移除该 fd(epoll_ctl:remove)
                   - 销毁该 fd
        */
        defer conn.Close()
        var buf []byte
        /*
                - 读取连接中的数据
                   - loop + 非阻塞模式调用 recv (read)
                   - 若未就绪,则通过 gopark 进行阻塞
                   - 等待 netpoller 轮询唤醒
                          - 检查是否有 io 事件就绪(epoll_wait——nonblock)
                                - 若发现事件就绪 通过 goready 唤醒 g
        */
        _, _ = conn.Read(buf)
        /*
                - 向连接中写入数据
                   - loop + 非阻塞模式调用 writev (write)
                   - 若未就绪,则通过 gopark 进行阻塞
                   - 等待 netpoller 轮询唤醒
                          - 检查是否有 io 事件就绪(epoll_wait:nonblock)
                                - 若发现事件就绪 通过 goready 唤醒 g
        */
        _, _ = conn.Write(buf)
}
</pre></div>
<p>1、调用Listen方法,通过epoll_create 初始化事件表, 创建 socket fd, 通过epoll_ctl 将socket fd注册到Epoll事件表, 监听就绪事件,等待远端连接。如果有远端连接,则会在内核空间执行回调函数,将socket fd放入就绪列表中</p>
<div class="jb51code"><pre class="brush:go;"> // 在 epoll 事件表中注册监听 r 的读就绪事件
        ev := epollevent{
                events: _EPOLLIN,
        }
</pre></div>
<p>2、调用Accept方法,非阻塞调用Epoll Wait,获取当前监听的fd是否有就绪的socket fd<br />2.1、如果没有则将当前的goroutine阻塞,并挂载在socket fd 对应的polldesc的rg等待链表中,等待环境<br />2.2、如果有,则获取conn fd返回给用户程序</p>
<div class="jb51code"><pre class="brush:go;">// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
      // ...
      for {
                // 以nonblock 模式发起一次 syscall accept 尝试接收到来的 conn
                s, rsa, errcall, err := accept(fd.Sysfd)
                // 接收conn成功,直接返回结果
                if err == nil {
                        return s, rsa, "", err
                }
                switch err {
                // 中断类错误直接忽略
                case syscall.EINTR:
                        continue
                // 当前未有到达的conn
                case syscall.EAGAIN:
       // 走入 poll_wait 流程,并标识关心的是 socket fd 的读就绪事件
       // (当conn 到达时,表现为 socket fd 可读)
                        if fd.pd.pollable() {
         // 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }
                        }
                        // ...
                }
                // ...
        }
}

// 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
        return pd.wait('r', isFile)
}
</pre></div>
<p>3、调用Read、Write方法。非阻塞调用Epoll Wait,关心conn fd上是否有读就绪事件或者写就绪事件<br />3.1、如果 conn fd 下读操作尚未就绪(尚无数据到达),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的 rg 中<br />3.2、如果 conn fd 下写操作尚未就绪(缓冲区空间不足),则会执行 poll wait 将当前 g 阻塞并挂载到 conn fd 对应 pollDesc 的wg中</p>
<div class="jb51code"><pre class="brush:go;">// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
      // ...
        for {
                // 以非阻塞模式执行一次syscall read 操作
                n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
                if err != nil {
                        n = 0
                        // 走入 poll_wait 流程,并标识关心的是该 fd 的读就绪事件
                        if err == syscall.EAGAIN &amp;&amp; fd.pd.pollable() {
                              // 倘若读操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }
                        }
                }
                err = fd.eofError(n, err)
                return n, err
        }
}
</pre></div>
<p>4、小结<br />可以看到,一个连接的数据读写,如果条件不满足,goroutine都会挂起并挂载到pollDesc的rg、wg中,但此时用户程序主线程不会阻塞,而是等待底层数据到达后,再进行处理。</p>
<p class="maodian"><a name="_lab2_2_1"></a></p><h3>NetPoll的调度</h3>
<p>由上可知,当 g 发现关心的 io 事件未就绪时,会通过 gopark 操作将自身陷入阻塞,并且将 g 挂载在 pollDesc 的 rg/wg 中, 而本小节介绍的 net_poll 流程就负责<strong>轮询</strong>获取已就绪 pollDesc 对应的 g,将其返回给上游的 gmp 调度系统,对其进行唤醒和调度.</p>
<p><strong>Golang GMP模型触发netpoll时机</strong><br />1、M在寻找可用的Goroutine时,在本地和全局队列上没有找到,会调用netpoll处理网络IO</p>
<div class="jb51code"><pre class="brush:go;">// gmp 核心调度流程:g0 为当前 p 找到下一个调度的g
    /*
      pick g 的核心逻辑:
             1)每调度 61 次,需要专门尝试处理一次全局队列(防止饥饿)
             2)尝试从本地队列中获取 g
             3)尝试从全局队列中获取 g
             4)以【非阻塞模式】调度 netpoll 流程,获取所有需要唤醒的 g 进行唤醒,并获取其中的首个g
             5)从其他 p 中窃取一半的 g 填充到本地队列
             6)仍找不到合适的 g,则协助 gc
             7)以【阻塞或者超时】模式,调度netpoll 流程(全局仅有一个 p 能走入此分支)
             8)当前m 添加到全局队列的空闲队列中,停止当前 m
    */

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    // ..
    /*
      同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
      - epoll事件表初始化过
      - 有 g 在等待io 就绪事件
      - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
    */
    if netpollinited() &amp;&amp; atomic.Load(&amp;netpollWaiters) &gt; 0 &amp;&amp; atomic.Load64(&amp;sched.lastpoll) != 0 {
                // 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度
                if list := netpoll(0); !list.empty() { // non-blocking
                // 获取就绪 g 队列中的首个 g
                        gp := list.pop()
                        // 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列
                        injectglist(&amp;list)
                        // 把首个g 也置为就绪态
                        casgstatus(gp, _Gwaiting, _Grunnable)
                         // ...
                         //返回 g 给当前 p进行调度
                        return gp, false, false
                }
        }

    // ...
    /*
      同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程:
            - epoll事件表初始化过
            - 有 g 在等待io 就绪事件
            - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
    */       
        if netpollinited() &amp;&amp; (atomic.Load(&amp;netpollWaiters) &gt; 0 || pollUntil != 0) &amp;&amp; atomic.Xchg64(&amp;sched.lastpoll, 0) != 0 {
                // 默认为阻塞模式
                delay := int64(-1)
                // 存在定时时间,则设为超时模式
                if pollUntil != 0 {
                        delay = pollUntil - now
                        // ...
                }
                // 以【阻塞或超时模式】发起一轮 netpoll
                list := netpoll(delay) // block until new work is available
        }
      // ...
}
</pre></div>
<p>2、Sysmon 定时会调用netpoll处理网络IO</p>
<div class="jb51code"><pre class="brush:go;">// The main goroutine.
func main() {
      // ...
      // 新建一个 m,直接运行 sysmon 函数
        systemstack(func() {
                newm(sysmon, nil, -1)
        })

         // ...
}

// 全局唯一监控线程的执行函数
func sysmon() {
      // ...
        for {
            // ...
      /*
      同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化过
            - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
            - 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms
      */
                lastpoll := int64(atomic.Load64(&amp;sched.lastpoll))
                if netpollinited() &amp;&amp; lastpoll != 0 &amp;&amp; lastpoll+10*1000*1000 &lt; now {
                        // 以非阻塞模式发起 netpoll
                        list := netpoll(0) // non-blocking - returns list of goroutines
                        // 获取到的g 置为就绪态并添加到全局队列中
                        if !list.empty() {
                              // ...
                                injectglist(&amp;list)
                              // ...
                        }
                }
                // ...
        }
}
</pre></div>
<p>3、gc并发标记的流程</p>
<div class="jb51code"><pre class="brush:go;">func startTheWorldWithSema(emitTraceEvent bool) int64 {
      // 断言世界已停止
        assertWorldStopped()
      // ...
      // 如果 epoll 事件表初始化过,则以非阻塞模式执行一次 netpoll
        if netpollinited() {
                // 所有取得的 g 置为就绪态并添加到全局队列
                list := netpoll(0) // non-blocking
                injectglist(&amp;list)
        }
      // ...
}
</pre></div>
<p>当上述条件成立时,Netpoll执行 epoll_wait 操作,获取就绪的 io 事件 list. 一轮最多获取 128 个,根据就绪事件类型,将 mode 分为 w(写就绪事件)和 r(读就绪事件)。获取 pollDesc 实例中 rg或者wg中的 g 实例,一并返回GMP进行调度</p>
<p style="text-align:center"><img alt="" src="https://img.jbzj.com/file_images/article/202512/2025122311141815.png" /></p>
<div class="jb51code"><pre class="brush:go;">// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
/*
    - netpoll 流程用于轮询检查是否有就绪的 io 事件
    - 如果有就绪 io 事件,还需要检查是否有 pollDesc 中的 g 关心该事件
    - 找到所有关心该就绪 io 事件的 g,添加到 list 中返回给上游进行 goready 唤醒
*/
func netpoll(delay int64) gList {
   /*
      根据传入的 delay 参数,决定调用 epoll_wait 的模式
            - delay &lt; 0:设为 -1 阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例)
            - delay = 0:设为 0 非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程)
            - delay &gt; 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作)
    */
   var waitms int32
   if delay &lt; 0 {
                waitms = -1
   } else if delay == 0 {
                waitms = 0
   // 针对 delay 时长取整
   } else if delay &lt; 1e6 {
                waitms = 1
   } else if delay &lt; 1e15 {
                waitms = int32(delay / 1e6)
   } else {
   // 1e9 ms == ~11.5 days.
                waitms = 1e9
   }
   // 一次最多接收 128 个 io 就绪事件       
   var events epollevent
retry:
   // 以指定模式,调用 epoll_wait 指令
        n := epollwait(epfd, &amp;events, int32(len(events)), waitms)
   // ...

   // 遍历就绪的每个 io 事件       
   var toRun gList
   for i := int32(0); i &lt; n; i++ {
           ev := &amp;events
         if ev.events == 0 {
                continue
         }

         // pipe 接收端的信号量处理
         if *(**uintptr)(unsafe.Pointer(&amp;ev.data)) == &amp;netpollBreakRd {
         // ...
            }

            /*
             根据 io 事件类型,标识出 mode:
               - EPOLL_IN -&gt; r;
               - EPOLL_OUT -&gt; w;
               - 错误或者中断事件 -&gt; r &amp; w;
             */
            var mode int32
            if ev.events&amp;(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'r'
            }
            if ev.events&amp;(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'w'
            }
            // 根据 epollevent.data 获取到监听了该事件的 pollDesc 实例
            if mode != 0 {
                        pd := *(**pollDesc)(unsafe.Pointer(&amp;ev.data))
                        // ...                       
                        // 尝试针对对应 pollDesc 进行唤醒操作
                        netpollready(&amp;toRun, pd, mode)
            }
      }
      return toRun
}




/*
    epollwait 操作:
      - epfd:epoll 事件表 fd 句柄
      - ev:用于承载就绪 epoll event 的容器
      - nev:ev 的容量
      - timeout:
            - -1:阻塞模式
            - 0:非阻塞模式:
            - &gt;0:超时模式. 单位 ms
      - 返回值 int32:就绪的 event 数量
*/
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32





// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
/*
根据 pd 以及 mode 标识的 io 就绪事件,获取需要进行 ready 唤醒的 g list
对应 g 会存储到 toRun 这个 list 容器当中
*/
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
        var rg, wg *g
        if mode == 'r' || mode == 'r'+'w' {
      // 倘若到达事件包含读就绪,尝试获取需要 ready 唤醒的 g
                rg = netpollunblock(pd, 'r', true)
        }
        if mode == 'w' || mode == 'r'+'w' {
// 倘若到达事件包含写就绪,尝试获取需要 ready 唤醒的 g
                wg = netpollunblock(pd, 'w', true)
        }
// 找到需要唤醒的 g,添加到 glist 中返回给上层
        if rg != nil {
                toRun.push(rg)
        }
        if wg != nil {
                toRun.push(wg)
        }
}




/*
根据指定的就绪io 事件类型以及 pollDesc,判断是否有 g 需要被唤醒. 若返回结果非空,则为需要唤醒的 g
*/
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
      // 根据 io 事件类型,获取 pollDesc 中对应的状态标识器
      gpp := &amp;pd.rg
        if mode == 'w' {
                gpp = &amp;pd.wg
        }

        for {
                // 从 gpp 中取出值,此时该值应该为调用过 park 操作的 g
                old := gpp.Load()
               // ...
                if ioready {
                        new = pdReady
                }
                // 通过 cas 操作,将 gpp 值由 g 置换成 pdReady
                if gpp.CompareAndSwap(old, new) {
                        // 返回需要唤醒的 g
                        return (*g)(unsafe.Pointer(old))
                }
        }
}
</pre></div>
頁: [1]
查看完整版本: Golang中NetPoll机制的实现