连接管理
概述
在高性能网络应用中,连接管理是至关重要的环节。Netpoll 提供了完善的连接生命周期管理机制,包括连接建立、数据读写、超时处理、异常断开等。本章将深入探讨如何使用 Netpoll 进行高效的连接管理。
核心内容
连接生命周期
Netpoll 的连接生命周期包括以下阶段:
- OnPrepare:连接准备阶段,初始化连接上下文
- OnConnect:连接建立阶段,处理新连接
- OnRequest:请求处理阶段,处理业务逻辑
- OnDisconnect:连接断开阶段,清理资源
go
package main
import (
"context"
"log"
"sync"
"time"
"github.com/cloudwego/netpoll"
)
type Session struct {
ID string
Conn netpoll.Connection
CreatedAt time.Time
mu sync.RWMutex
}
type SessionManager struct {
sessions sync.Map
}
func NewSessionManager() *SessionManager {
return &SessionManager{}
}
func (sm *SessionManager) Add(id string, session *Session) {
sm.sessions.Store(id, session)
}
func (sm *SessionManager) Get(id string) (*Session, bool) {
value, ok := sm.sessions.Load(id)
if !ok {
return nil, false
}
return value.(*Session), true
}
func (sm *SessionManager) Remove(id string) {
sm.sessions.Delete(id)
}
var sessionManager = NewSessionManager()
func main() {
listener, err := netpoll.CreateListener("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
eventLoop, err := netpoll.NewEventLoop(
handler,
netpoll.WithOnPrepare(onPrepare),
netpoll.WithOnConnect(onConnect),
netpoll.WithOnDisconnect(onDisconnect),
netpoll.WithIdleTimeout(120*time.Second),
)
if err != nil {
log.Fatal(err)
}
log.Println("server started on :8080")
eventLoop.Serve(listener)
}
func onPrepare(conn netpoll.Connection) context.Context {
sessionID := generateSessionID()
session := &Session{
ID: sessionID,
Conn: conn,
CreatedAt: time.Now(),
}
sessionManager.Add(sessionID, session)
ctx := context.WithValue(context.Background(), "session_id", sessionID)
return ctx
}
func onConnect(conn netpoll.Connection) context.Context {
log.Printf("connection established: %s", conn.RemoteAddr())
return context.Background()
}
func onDisconnect(conn netpoll.Connection) context.Context {
log.Printf("connection closed: %s", conn.RemoteAddr())
return context.Background()
}
func handler(ctx context.Context, conn netpoll.Connection) error {
reader := conn.Reader()
writer := conn.Writer()
data, err := reader.ReadBinary(reader.Len())
if err != nil {
return err
}
_, err = writer.WriteBinary(data)
if err != nil {
return err
}
return writer.Flush()
}
func generateSessionID() string {
return time.Now().Format("20060102150405")
}连接状态监控
实时监控连接状态对于生产环境至关重要:
go
package main
import (
"context"
"expvar"
"log"
"net/http"
_ "net/http/pprof"
"sync/atomic"
"time"
"github.com/cloudwego/netpoll"
)
type ConnectionMetrics struct {
TotalConnections int64
ActiveConnections int64
ClosedConnections int64
TotalBytesRead int64
TotalBytesWritten int64
}
var metrics = &ConnectionMetrics{}
func (m *ConnectionMetrics) IncrTotal() {
atomic.AddInt64(&m.TotalConnections, 1)
}
func (m *ConnectionMetrics) IncrActive() {
atomic.AddInt64(&m.ActiveConnections, 1)
}
func (m *ConnectionMetrics) DecrActive() {
atomic.AddInt64(&m.ActiveConnections, -1)
}
func (m *ConnectionMetrics) IncrClosed() {
atomic.AddInt64(&m.ClosedConnections, 1)
}
func (m *ConnectionMetrics) AddBytesRead(n int64) {
atomic.AddInt64(&m.TotalBytesRead, n)
}
func (m *ConnectionMetrics) AddBytesWritten(n int64) {
atomic.AddInt64(&m.TotalBytesWritten, n)
}
func init() {
expvar.Publish("total_connections", expvar.Func(func() interface{} {
return atomic.LoadInt64(&metrics.TotalConnections)
}))
expvar.Publish("active_connections", expvar.Func(func() interface{} {
return atomic.LoadInt64(&metrics.ActiveConnections)
}))
expvar.Publish("closed_connections", expvar.Func(func() interface{} {
return atomic.LoadInt64(&metrics.ClosedConnections)
}))
}
func main() {
go func() {
log.Println(http.ListenAndServe(":6060", nil))
}()
listener, err := netpoll.CreateListener("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
eventLoop, err := netpoll.NewEventLoop(
monitoredHandler,
netpoll.WithOnPrepare(monitoredPrepare),
netpoll.WithOnDisconnect(monitoredDisconnect),
)
if err != nil {
log.Fatal(err)
}
eventLoop.Serve(listener)
}
func monitoredPrepare(conn netpoll.Connection) context.Context {
metrics.IncrTotal()
metrics.IncrActive()
return context.Background()
}
func monitoredDisconnect(conn netpoll.Connection) context.Context {
metrics.DecrActive()
metrics.IncrClosed()
return context.Background()
}
func monitoredHandler(ctx context.Context, conn netpoll.Connection) error {
reader := conn.Reader()
writer := conn.Writer()
data, err := reader.ReadBinary(reader.Len())
if err != nil {
return err
}
metrics.AddBytesRead(int64(len(data)))
_, err = writer.WriteBinary(data)
if err != nil {
return err
}
metrics.AddBytesWritten(int64(len(data)))
return writer.Flush()
}超时与心跳管理
合理的超时设置可以防止资源泄漏:
go
package main
import (
"context"
"encoding/binary"
"log"
"time"
"github.com/cloudwego/netpoll"
)
const (
HeartbeatInterval = 30 * time.Second
ReadTimeout = 60 * time.Second
WriteTimeout = 60 * time.Second
)
type HeartbeatHandler struct {
lastHeartbeat time.Time
}
func main() {
listener, err := netpoll.CreateListener("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
eventLoop, err := netpoll.NewEventLoop(
heartbeatHandler,
netpoll.WithOnPrepare(heartbeatPrepare),
netpoll.WithReadTimeout(ReadTimeout),
netpoll.WithWriteTimeout(WriteTimeout),
netpoll.WithIdleTimeout(120*time.Second),
)
if err != nil {
log.Fatal(err)
}
log.Println("server started with heartbeat support")
eventLoop.Serve(listener)
}
func heartbeatPrepare(conn netpoll.Connection) context.Context {
ctx := context.WithValue(context.Background(), "last_heartbeat", time.Now())
go func() {
ticker := time.NewTicker(HeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !conn.IsActive() {
return
}
sendHeartbeat(conn)
case <-ctx.Done():
return
}
}
}()
return ctx
}
func heartbeatHandler(ctx context.Context, conn netpoll.Connection) error {
reader := conn.Reader()
if reader.Len() < 4 {
return nil
}
msgType, err := reader.ReadInt32(binary.BigEndian)
if err != nil {
return err
}
switch msgType {
case 1:
return handleHeartbeat(ctx, conn, reader)
case 2:
return handleDataMessage(ctx, conn, reader)
default:
log.Printf("unknown message type: %d", msgType)
}
return nil
}
func handleHeartbeat(ctx context.Context, conn netpoll.Connection, reader netpoll.Reader) error {
writer := conn.Writer()
err := writer.WriteInt32(1, binary.BigEndian)
if err != nil {
return err
}
return writer.Flush()
}
func handleDataMessage(ctx context.Context, conn netpoll.Connection, reader netpoll.Reader) error {
data, err := reader.ReadBinary(reader.Len())
if err != nil {
return err
}
writer := conn.Writer()
_, err = writer.WriteBinary(data)
if err != nil {
return err
}
return writer.Flush()
}
func sendHeartbeat(conn netpoll.Connection) {
writer := conn.Writer()
err := writer.WriteInt32(1, binary.BigEndian)
if err != nil {
log.Printf("send heartbeat failed: %v", err)
return
}
writer.Flush()
}优雅关闭
优雅关闭确保正在处理的请求能够完成:
go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/cloudwego/netpoll"
)
type GracefulServer struct {
eventLoop netpoll.EventLoop
shutdown chan struct{}
}
func NewGracefulServer(addr string) (*GracefulServer, error) {
listener, err := netpoll.CreateListener("tcp", addr)
if err != nil {
return nil, err
}
eventLoop, err := netpoll.NewEventLoop(
handler,
netpoll.WithOnPrepare(onPrepare),
netpoll.WithOnDisconnect(onDisconnect),
)
if err != nil {
return nil, err
}
return &GracefulServer{
eventLoop: eventLoop,
shutdown: make(chan struct{}),
}, nil
}
func (s *GracefulServer) Start() error {
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("received shutdown signal, gracefully shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := s.eventLoop.Shutdown(ctx)
if err != nil {
log.Printf("shutdown error: %v", err)
}
close(s.shutdown)
}()
return s.eventLoop.Serve(nil)
}
func (s *GracefulServer) Wait() {
<-s.shutdown
log.Println("server shutdown complete")
}
func main() {
server, err := NewGracefulServer(":8080")
if err != nil {
log.Fatal(err)
}
log.Println("server started on :8080")
go server.Start()
server.Wait()
}
func onPrepare(conn netpoll.Connection) context.Context {
return context.Background()
}
func onDisconnect(conn netpoll.Connection) context.Context {
return context.Background()
}
func handler(ctx context.Context, conn netpoll.Connection) error {
reader := conn.Reader()
writer := conn.Writer()
data, err := reader.ReadBinary(reader.Len())
if err != nil {
return err
}
time.Sleep(100 * time.Millisecond)
_, err = writer.WriteBinary(data)
if err != nil {
return err
}
return writer.Flush()
}连接池管理
对于客户端连接,使用连接池可以提高性能:
go
package main
import (
"context"
"log"
"sync"
"time"
"github.com/cloudwego/netpoll"
)
type ConnectionPool struct {
address string
pool chan netpoll.Connection
mu sync.Mutex
maxConn int
activeConn int
}
func NewConnectionPool(address string, maxConn int) *ConnectionPool {
return &ConnectionPool{
address: address,
pool: make(chan netpoll.Connection, maxConn),
maxConn: maxConn,
}
}
func (p *ConnectionPool) Get() (netpoll.Connection, error) {
select {
case conn := <-p.pool:
if conn.IsActive() {
return conn, nil
}
conn.Close()
default:
}
p.mu.Lock()
if p.activeConn < p.maxConn {
p.activeConn++
p.mu.Unlock()
conn, err := netpoll.DialConnection("tcp", p.address, time.Second*5)
if err != nil {
p.mu.Lock()
p.activeConn--
p.mu.Unlock()
return nil, err
}
return conn, nil
}
p.mu.Unlock()
select {
case conn := <-p.pool:
if conn.IsActive() {
return conn, nil
}
conn.Close()
return p.Get()
case <-time.After(time.Second * 5):
return nil, context.DeadlineExceeded
}
}
func (p *ConnectionPool) Put(conn netpoll.Connection) {
if !conn.IsActive() {
conn.Close()
p.mu.Lock()
p.activeConn--
p.mu.Unlock()
return
}
select {
case p.pool <- conn:
default:
conn.Close()
p.mu.Lock()
p.activeConn--
p.mu.Unlock()
}
}
func (p *ConnectionPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
close(p.pool)
for conn := range p.pool {
conn.Close()
p.activeConn--
}
}
func main() {
pool := NewConnectionPool("localhost:8080", 10)
defer pool.Close()
for i := 0; i < 5; i++ {
go func(id int) {
conn, err := pool.Get()
if err != nil {
log.Printf("worker %d: get connection failed: %v", id, err)
return
}
defer pool.Put(conn)
writer := conn.Writer()
reader := conn.Reader()
_, err = writer.WriteBinary([]byte("hello"))
if err != nil {
log.Printf("worker %d: write failed: %v", id, err)
return
}
err = writer.Flush()
if err != nil {
log.Printf("worker %d: flush failed: %v", id, err)
return
}
data, err := reader.ReadBinary(reader.Len())
if err != nil {
log.Printf("worker %d: read failed: %v", id, err)
return
}
log.Printf("worker %d: received %s", id, string(data))
}(i)
}
time.Sleep(time.Second * 2)
}小结
本章深入探讨了 Netpoll 的连接管理机制:
- 连接生命周期:理解 OnPrepare、OnConnect、OnDisconnect 等回调的作用
- 状态监控:使用原子操作和 expvar 监控连接状态
- 超时与心跳:合理设置超时,实现心跳机制保持连接活跃
- 优雅关闭:确保服务关闭时不丢失正在处理的请求
- 连接池:客户端使用连接池提高性能
良好的连接管理是构建高可用网络服务的基础。在下一章中,我们将探讨 Netpoll 的高性能实践技巧。