Skip to content

Netpoll 基础

概述

Netpoll 是 CloudWeGo 团队开发的高性能网络库,专为 Go 语言设计。它基于 epoll/kqueue 实现,提供了事件驱动的网络 I/O 模型,能够显著提升网络应用的性能和吞吐量。Netpoll 在字节跳动内部大规模使用,支撑了海量高并发场景。

核心内容

什么是 Netpoll

Netpoll 是一个基于事件驱动的高性能网络库,主要特点包括:

  • 零拷贝设计:通过引用计数和 buffer 复用,减少内存分配和拷贝开销
  • 事件驱动模型:基于 epoll/kqueue 实现高效的事件循环
  • 连接复用:支持连接池和长连接管理
  • 无锁设计:采用 CAS 和原子操作,减少锁竞争

核心概念

EventLoop

EventLoop 是 Netpoll 的核心组件,负责管理事件循环和分发 I/O 事件:

go
package main

import (
    "context"
    "log"
    
    "github.com/cloudwego/netpoll"
)

func main() {
    network, address := "tcp", ":8080"
    
    listener, err := netpoll.CreateListener(network, address)
    if err != nil {
        log.Fatalf("create listener failed: %v", err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        handler,
        netpoll.WithOnPrepare(prepare),
        netpoll.WithOnConnect(connect),
    )
    if err != nil {
        log.Fatalf("create event loop failed: %v", err)
    }
    
    err = eventLoop.Serve(listener)
    if err != nil {
        log.Fatalf("serve failed: %v", err)
    }
}

func prepare(connection netpoll.Connection) context.Context {
    log.Printf("prepare connection: %s", connection.RemoteAddr())
    return context.Background()
}

func connect(connection netpoll.Connection) context.Context {
    log.Printf("new connection: %s", connection.RemoteAddr())
    return context.Background()
}

func handler(ctx context.Context, connection netpoll.Connection) error {
    reader := connection.Reader()
    writer := connection.Writer()
    
    buf, err := reader.ReadBinary(reader.Len())
    if err != nil {
        return err
    }
    
    _, err = writer.WriteBinary(buf)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

Connection

Connection 表示一个网络连接,提供了读写接口:

go
type Connection interface {
    Reader() Reader
    Writer() Writer
    RemoteAddr() net.Addr
    LocalAddr() net.Addr
    Close() error
    IsActive() bool
}

Reader 和 Writer

Netpoll 提供了高性能的 Reader 和 Writer 接口:

go
package main

import (
    "context"
    "encoding/binary"
    "log"
    
    "github.com/cloudwego/netpoll"
)

func handleMessage(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    writer := conn.Writer()
    
    length, err := reader.ReadInt32(binary.BigEndian)
    if err != nil {
        return err
    }
    
    data, err := reader.ReadBinary(int(length))
    if err != nil {
        return err
    }
    
    processedData := processBusinessLogic(data)
    
    err = writer.WriteInt32(int32(len(processedData)), binary.BigEndian)
    if err != nil {
        return err
    }
    
    _, err = writer.WriteBinary(processedData)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

func processBusinessLogic(data []byte) []byte {
    return data
}

零拷贝机制

Netpoll 的零拷贝设计是其高性能的关键:

go
package main

import (
    "context"
    "log"
    
    "github.com/cloudwego/netpoll"
)

func zeroCopyExample(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    writer := conn.Writer()
    
    buf := reader.Slice(reader.Len())
    defer buf.Release()
    
    _, err := writer.WriteBinary(buf.Bytes())
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

func main() {
    listener, err := netpoll.CreateListener("tcp", ":9090")
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        func(ctx context.Context, conn netpoll.Connection) error {
            return zeroCopyExample(ctx, conn)
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop.Serve(listener)
}

配置选项

Netpoll 提供了丰富的配置选项:

go
package main

import (
    "log"
    "time"
    
    "github.com/cloudwego/netpoll"
)

func main() {
    listener, err := netpoll.CreateListener("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        handler,
        netpoll.WithOnPrepare(onPrepare),
        netpoll.WithOnConnect(onConnect),
        netpoll.WithOnDisconnect(onDisconnect),
        netpoll.WithReadTimeout(30*time.Second),
        netpoll.WithWriteTimeout(30*time.Second),
        netpoll.WithIdleTimeout(60*time.Second),
        netpoll.WithOnRequest(onRequest),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop.Serve(listener)
}

func onPrepare(conn netpoll.Connection) context.Context {
    return context.Background()
}

func onConnect(conn netpoll.Connection) context.Context {
    return context.Background()
}

func onDisconnect(conn netpoll.Connection) context.Context {
    return context.Background()
}

func onRequest(ctx context.Context, conn netpoll.Connection) error {
    return nil
}

func handler(ctx context.Context, conn netpoll.Connection) error {
    return nil
}

小结

本章介绍了 Netpoll 的基础概念和核心组件:

  1. EventLoop:事件循环的核心,负责事件分发和连接管理
  2. Connection:网络连接抽象,提供读写接口
  3. 零拷贝机制:通过 buffer 复用减少内存开销
  4. 配置选项:灵活的超时和回调配置

Netpoll 通过事件驱动和零拷贝设计,能够显著提升网络应用的性能。在下一章中,我们将深入探讨连接管理的最佳实践。