Skip to content

连接管理

概述

在高性能网络应用中,连接管理是至关重要的环节。Netpoll 提供了完善的连接生命周期管理机制,包括连接建立、数据读写、超时处理、异常断开等。本章将深入探讨如何使用 Netpoll 进行高效的连接管理。

核心内容

连接生命周期

Netpoll 的连接生命周期包括以下阶段:

  1. OnPrepare:连接准备阶段,初始化连接上下文
  2. OnConnect:连接建立阶段,处理新连接
  3. OnRequest:请求处理阶段,处理业务逻辑
  4. OnDisconnect:连接断开阶段,清理资源
go
package main

import (
    "context"
    "log"
    "sync"
    "time"
    
    "github.com/cloudwego/netpoll"
)

type Session struct {
    ID        string
    Conn      netpoll.Connection
    CreatedAt time.Time
    mu        sync.RWMutex
}

type SessionManager struct {
    sessions sync.Map
}

func NewSessionManager() *SessionManager {
    return &SessionManager{}
}

func (sm *SessionManager) Add(id string, session *Session) {
    sm.sessions.Store(id, session)
}

func (sm *SessionManager) Get(id string) (*Session, bool) {
    value, ok := sm.sessions.Load(id)
    if !ok {
        return nil, false
    }
    return value.(*Session), true
}

func (sm *SessionManager) Remove(id string) {
    sm.sessions.Delete(id)
}

var sessionManager = NewSessionManager()

func main() {
    listener, err := netpoll.CreateListener("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        handler,
        netpoll.WithOnPrepare(onPrepare),
        netpoll.WithOnConnect(onConnect),
        netpoll.WithOnDisconnect(onDisconnect),
        netpoll.WithIdleTimeout(120*time.Second),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("server started on :8080")
    eventLoop.Serve(listener)
}

func onPrepare(conn netpoll.Connection) context.Context {
    sessionID := generateSessionID()
    session := &Session{
        ID:        sessionID,
        Conn:      conn,
        CreatedAt: time.Now(),
    }
    sessionManager.Add(sessionID, session)
    
    ctx := context.WithValue(context.Background(), "session_id", sessionID)
    return ctx
}

func onConnect(conn netpoll.Connection) context.Context {
    log.Printf("connection established: %s", conn.RemoteAddr())
    return context.Background()
}

func onDisconnect(conn netpoll.Connection) context.Context {
    log.Printf("connection closed: %s", conn.RemoteAddr())
    return context.Background()
}

func handler(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    writer := conn.Writer()
    
    data, err := reader.ReadBinary(reader.Len())
    if err != nil {
        return err
    }
    
    _, err = writer.WriteBinary(data)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

func generateSessionID() string {
    return time.Now().Format("20060102150405")
}

连接状态监控

实时监控连接状态对于生产环境至关重要:

go
package main

import (
    "context"
    "expvar"
    "log"
    "net/http"
    _ "net/http/pprof"
    "sync/atomic"
    "time"
    
    "github.com/cloudwego/netpoll"
)

type ConnectionMetrics struct {
    TotalConnections   int64
    ActiveConnections  int64
    ClosedConnections  int64
    TotalBytesRead     int64
    TotalBytesWritten  int64
}

var metrics = &ConnectionMetrics{}

func (m *ConnectionMetrics) IncrTotal() {
    atomic.AddInt64(&m.TotalConnections, 1)
}

func (m *ConnectionMetrics) IncrActive() {
    atomic.AddInt64(&m.ActiveConnections, 1)
}

func (m *ConnectionMetrics) DecrActive() {
    atomic.AddInt64(&m.ActiveConnections, -1)
}

func (m *ConnectionMetrics) IncrClosed() {
    atomic.AddInt64(&m.ClosedConnections, 1)
}

func (m *ConnectionMetrics) AddBytesRead(n int64) {
    atomic.AddInt64(&m.TotalBytesRead, n)
}

func (m *ConnectionMetrics) AddBytesWritten(n int64) {
    atomic.AddInt64(&m.TotalBytesWritten, n)
}

func init() {
    expvar.Publish("total_connections", expvar.Func(func() interface{} {
        return atomic.LoadInt64(&metrics.TotalConnections)
    }))
    expvar.Publish("active_connections", expvar.Func(func() interface{} {
        return atomic.LoadInt64(&metrics.ActiveConnections)
    }))
    expvar.Publish("closed_connections", expvar.Func(func() interface{} {
        return atomic.LoadInt64(&metrics.ClosedConnections)
    }))
}

func main() {
    go func() {
        log.Println(http.ListenAndServe(":6060", nil))
    }()
    
    listener, err := netpoll.CreateListener("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        monitoredHandler,
        netpoll.WithOnPrepare(monitoredPrepare),
        netpoll.WithOnDisconnect(monitoredDisconnect),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop.Serve(listener)
}

func monitoredPrepare(conn netpoll.Connection) context.Context {
    metrics.IncrTotal()
    metrics.IncrActive()
    return context.Background()
}

func monitoredDisconnect(conn netpoll.Connection) context.Context {
    metrics.DecrActive()
    metrics.IncrClosed()
    return context.Background()
}

func monitoredHandler(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    writer := conn.Writer()
    
    data, err := reader.ReadBinary(reader.Len())
    if err != nil {
        return err
    }
    
    metrics.AddBytesRead(int64(len(data)))
    
    _, err = writer.WriteBinary(data)
    if err != nil {
        return err
    }
    
    metrics.AddBytesWritten(int64(len(data)))
    
    return writer.Flush()
}

超时与心跳管理

合理的超时设置可以防止资源泄漏:

go
package main

import (
    "context"
    "encoding/binary"
    "log"
    "time"
    
    "github.com/cloudwego/netpoll"
)

const (
    HeartbeatInterval = 30 * time.Second
    ReadTimeout       = 60 * time.Second
    WriteTimeout      = 60 * time.Second
)

type HeartbeatHandler struct {
    lastHeartbeat time.Time
}

func main() {
    listener, err := netpoll.CreateListener("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        heartbeatHandler,
        netpoll.WithOnPrepare(heartbeatPrepare),
        netpoll.WithReadTimeout(ReadTimeout),
        netpoll.WithWriteTimeout(WriteTimeout),
        netpoll.WithIdleTimeout(120*time.Second),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("server started with heartbeat support")
    eventLoop.Serve(listener)
}

func heartbeatPrepare(conn netpoll.Connection) context.Context {
    ctx := context.WithValue(context.Background(), "last_heartbeat", time.Now())
    
    go func() {
        ticker := time.NewTicker(HeartbeatInterval)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                if !conn.IsActive() {
                    return
                }
                sendHeartbeat(conn)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return ctx
}

func heartbeatHandler(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    
    if reader.Len() < 4 {
        return nil
    }
    
    msgType, err := reader.ReadInt32(binary.BigEndian)
    if err != nil {
        return err
    }
    
    switch msgType {
    case 1:
        return handleHeartbeat(ctx, conn, reader)
    case 2:
        return handleDataMessage(ctx, conn, reader)
    default:
        log.Printf("unknown message type: %d", msgType)
    }
    
    return nil
}

func handleHeartbeat(ctx context.Context, conn netpoll.Connection, reader netpoll.Reader) error {
    writer := conn.Writer()
    
    err := writer.WriteInt32(1, binary.BigEndian)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

func handleDataMessage(ctx context.Context, conn netpoll.Connection, reader netpoll.Reader) error {
    data, err := reader.ReadBinary(reader.Len())
    if err != nil {
        return err
    }
    
    writer := conn.Writer()
    _, err = writer.WriteBinary(data)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

func sendHeartbeat(conn netpoll.Connection) {
    writer := conn.Writer()
    err := writer.WriteInt32(1, binary.BigEndian)
    if err != nil {
        log.Printf("send heartbeat failed: %v", err)
        return
    }
    writer.Flush()
}

优雅关闭

优雅关闭确保正在处理的请求能够完成:

go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/cloudwego/netpoll"
)

type GracefulServer struct {
    eventLoop netpoll.EventLoop
    shutdown  chan struct{}
}

func NewGracefulServer(addr string) (*GracefulServer, error) {
    listener, err := netpoll.CreateListener("tcp", addr)
    if err != nil {
        return nil, err
    }
    
    eventLoop, err := netpoll.NewEventLoop(
        handler,
        netpoll.WithOnPrepare(onPrepare),
        netpoll.WithOnDisconnect(onDisconnect),
    )
    if err != nil {
        return nil, err
    }
    
    return &GracefulServer{
        eventLoop: eventLoop,
        shutdown:  make(chan struct{}),
    }, nil
}

func (s *GracefulServer) Start() error {
    go func() {
        sigCh := make(chan os.Signal, 1)
        signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
        
        <-sigCh
        log.Println("received shutdown signal, gracefully shutting down...")
        
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()
        
        err := s.eventLoop.Shutdown(ctx)
        if err != nil {
            log.Printf("shutdown error: %v", err)
        }
        
        close(s.shutdown)
    }()
    
    return s.eventLoop.Serve(nil)
}

func (s *GracefulServer) Wait() {
    <-s.shutdown
    log.Println("server shutdown complete")
}

func main() {
    server, err := NewGracefulServer(":8080")
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("server started on :8080")
    
    go server.Start()
    server.Wait()
}

func onPrepare(conn netpoll.Connection) context.Context {
    return context.Background()
}

func onDisconnect(conn netpoll.Connection) context.Context {
    return context.Background()
}

func handler(ctx context.Context, conn netpoll.Connection) error {
    reader := conn.Reader()
    writer := conn.Writer()
    
    data, err := reader.ReadBinary(reader.Len())
    if err != nil {
        return err
    }
    
    time.Sleep(100 * time.Millisecond)
    
    _, err = writer.WriteBinary(data)
    if err != nil {
        return err
    }
    
    return writer.Flush()
}

连接池管理

对于客户端连接,使用连接池可以提高性能:

go
package main

import (
    "context"
    "log"
    "sync"
    "time"
    
    "github.com/cloudwego/netpoll"
)

type ConnectionPool struct {
    address    string
    pool       chan netpoll.Connection
    mu         sync.Mutex
    maxConn    int
    activeConn int
}

func NewConnectionPool(address string, maxConn int) *ConnectionPool {
    return &ConnectionPool{
        address: address,
        pool:    make(chan netpoll.Connection, maxConn),
        maxConn: maxConn,
    }
}

func (p *ConnectionPool) Get() (netpoll.Connection, error) {
    select {
    case conn := <-p.pool:
        if conn.IsActive() {
            return conn, nil
        }
        conn.Close()
    default:
    }
    
    p.mu.Lock()
    if p.activeConn < p.maxConn {
        p.activeConn++
        p.mu.Unlock()
        
        conn, err := netpoll.DialConnection("tcp", p.address, time.Second*5)
        if err != nil {
            p.mu.Lock()
            p.activeConn--
            p.mu.Unlock()
            return nil, err
        }
        return conn, nil
    }
    p.mu.Unlock()
    
    select {
    case conn := <-p.pool:
        if conn.IsActive() {
            return conn, nil
        }
        conn.Close()
        return p.Get()
    case <-time.After(time.Second * 5):
        return nil, context.DeadlineExceeded
    }
}

func (p *ConnectionPool) Put(conn netpoll.Connection) {
    if !conn.IsActive() {
        conn.Close()
        p.mu.Lock()
        p.activeConn--
        p.mu.Unlock()
        return
    }
    
    select {
    case p.pool <- conn:
    default:
        conn.Close()
        p.mu.Lock()
        p.activeConn--
        p.mu.Unlock()
    }
}

func (p *ConnectionPool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    close(p.pool)
    for conn := range p.pool {
        conn.Close()
        p.activeConn--
    }
}

func main() {
    pool := NewConnectionPool("localhost:8080", 10)
    defer pool.Close()
    
    for i := 0; i < 5; i++ {
        go func(id int) {
            conn, err := pool.Get()
            if err != nil {
                log.Printf("worker %d: get connection failed: %v", id, err)
                return
            }
            defer pool.Put(conn)
            
            writer := conn.Writer()
            reader := conn.Reader()
            
            _, err = writer.WriteBinary([]byte("hello"))
            if err != nil {
                log.Printf("worker %d: write failed: %v", id, err)
                return
            }
            
            err = writer.Flush()
            if err != nil {
                log.Printf("worker %d: flush failed: %v", id, err)
                return
            }
            
            data, err := reader.ReadBinary(reader.Len())
            if err != nil {
                log.Printf("worker %d: read failed: %v", id, err)
                return
            }
            
            log.Printf("worker %d: received %s", id, string(data))
        }(i)
    }
    
    time.Sleep(time.Second * 2)
}

小结

本章深入探讨了 Netpoll 的连接管理机制:

  1. 连接生命周期:理解 OnPrepare、OnConnect、OnDisconnect 等回调的作用
  2. 状态监控:使用原子操作和 expvar 监控连接状态
  3. 超时与心跳:合理设置超时,实现心跳机制保持连接活跃
  4. 优雅关闭:确保服务关闭时不丢失正在处理的请求
  5. 连接池:客户端使用连接池提高性能

良好的连接管理是构建高可用网络服务的基础。在下一章中,我们将探讨 Netpoll 的高性能实践技巧。