Skip to content

Kitex 流式处理

概述

流式处理是 Kitex 的重要特性,特别适用于大模型应用、实时数据传输等场景。本章将深入介绍 Kitex 的流式处理能力,包括 gRPC Streaming 和 TTHeader Streaming。

核心内容

流式处理场景

场景描述
大模型应用一问多答,流式输出
实时数据推送股票行情、日志流
大文件传输分块传输,减少内存占用
双向通信聊天应用、实时协作

流式类型

┌─────────────┐                    ┌─────────────┐
│   Client    │                    │   Server    │
└─────────────┘                    └─────────────┘

1. 服务端流式 (Server Streaming)
   Client ───── Request ─────▶ Server
   Client ◀──── Stream ◀────── Server

2. 客户端流式 (Client Streaming)
   Client ───── Stream ──────▶ Server
   Client ◀──── Response ◀──── Server

3. 双向流式 (Bidirectional Streaming)
   Client ───── Stream ──────▶ Server
   Client ◀──── Stream ◀────── Server

IDL 定义

thrift
// hello.thrift
namespace go hello

struct Request {
    1: string name
}

struct Response {
    1: string message
    2: i64 timestamp
}

service HelloService {
    // 服务端流式
    Response ServerStream(1: Request req) (streaming)
    
    // 客户端流式
    Response ClientStream(1: stream Request req)
    
    // 双向流式
    stream Response BidiStream(1: stream Request req)
}

服务端流式

服务端实现

go
package main

import (
    "context"
    "time"
    
    "example/kitex_gen/hello"
)

type HelloHandler struct{}

// 服务端流式:发送多个响应
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    for i := 0; i < 5; i++ {
        resp := &hello.Response{
            Message:   "Hello stream message " + string(rune(i+1)),
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(resp); err != nil {
            return err
        }
        
        time.Sleep(time.Second)
    }
    return nil
}

客户端调用

go
package main

import (
    "context"
    "log"
    
    "example/kitex_gen/hello/helloservice"
)

func main() {
    cli := helloservice.MustNewClient("hello",
        client.WithHostPorts("127.0.0.1:8888"),
    )
    
    // 创建流式请求
    stream, err := cli.ServerStream(context.Background(), &hello.Request{
        Name: "World",
    })
    if err != nil {
        log.Fatal(err)
    }
    
    // 接收流式响应
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            log.Println("stream finished")
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        
        log.Printf("received: %s", resp.Message)
    }
}

客户端流式

服务端实现

go
// 客户端流式:接收多个请求
func (h *HelloHandler) ClientStream(stream hello.HelloService_ClientStreamServer) error {
    var messages []string
    
    // 接收所有请求
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        messages = append(messages, req.Name)
    }
    
    // 返回汇总响应
    return stream.SendAndClose(&hello.Response{
        Message:   fmt.Sprintf("Received %d messages", len(messages)),
        Timestamp: time.Now().Unix(),
    })
}

客户端调用

go
// 创建流式客户端
stream, err := cli.ClientStream(context.Background())
if err != nil {
    log.Fatal(err)
}

// 发送多个请求
for i := 0; i < 5; i++ {
    if err := stream.Send(&hello.Request{
        Name: fmt.Sprintf("Message %d", i+1),
    }); err != nil {
        log.Fatal(err)
    }
    time.Sleep(time.Second)
}

// 关闭流并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
    log.Fatal(err)
}

log.Printf("response: %s", resp.Message)

双向流式

服务端实现

go
// 双向流式:同时收发
func (h *HelloHandler) BidiStream(stream hello.HelloService_BidiStreamServer) error {
    for {
        // 接收请求
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        
        // 发送响应
        resp := &hello.Response{
            Message:   "Echo: " + req.Name,
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(resp); err != nil {
            return err
        }
    }
}

客户端调用

go
// 创建双向流
stream, err := cli.BidiStream(context.Background())
if err != nil {
    log.Fatal(err)
}

// 启动接收协程
go func() {
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            return
        }
        if err != nil {
            log.Printf("recv error: %v", err)
            return
        }
        log.Printf("received: %s", resp.Message)
    }
}()

// 发送请求
for i := 0; i < 5; i++ {
    if err := stream.Send(&hello.Request{
        Name: fmt.Sprintf("Message %d", i+1),
    }); err != nil {
        log.Fatal(err)
    }
    time.Sleep(time.Second)
}

// 关闭发送端
if err := stream.CloseSend(); err != nil {
    log.Fatal(err)
}

流式生命周期管理

Context 取消

go
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    // 创建流式请求
    stream, err := cli.ServerStream(ctx, &hello.Request{Name: "World"})
    if err != nil {
        log.Fatal(err)
    }
    
    // 启动接收协程
    go func() {
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                log.Println("stream finished")
                return
            }
            if err != nil {
                log.Printf("recv error: %v", err)
                return
            }
            log.Printf("received: %s", resp.Message)
        }
    }()
    
    // 用户取消
    go func() {
        time.Sleep(3 * time.Second)
        cancel() // 取消流式请求
    }()
}

服务端处理取消

go
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    ctx := stream.Context()
    
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            // 客户端取消
            klog.Infof("stream cancelled: %v", ctx.Err())
            return ctx.Err()
        default:
            resp := &hello.Response{
                Message:   fmt.Sprintf("Message %d", i+1),
                Timestamp: time.Now().Unix(),
            }
            
            if err := stream.Send(resp); err != nil {
                return err
            }
            
            time.Sleep(time.Second)
        }
    }
    
    return nil
}

SSE 与 Hertz 集成

go
// Hertz SSE 网关
func main() {
    h := server.Default()
    
    h.GET("/stream/:name", func(ctx context.Context, c *app.RequestContext) {
        name := c.Param("name")
        
        // 设置 SSE 响应头
        c.Response.Header.Set("Content-Type", "text/event-stream")
        c.Response.Header.Set("Cache-Control", "no-cache")
        c.Response.Header.Set("Connection", "keep-alive")
        
        // 调用 Kitex 流式服务
        stream, err := kitexClient.ServerStream(ctx, &hello.Request{Name: name})
        if err != nil {
            c.String(500, err.Error())
            return
        }
        
        // 转发流式响应
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                break
            }
            
            // 发送 SSE 事件
            c.Response.BodyWriter().Write([]byte(
                fmt.Sprintf("data: %s\n\n", resp.Message),
            ))
            c.Response.BodyWriter().Flush()
        }
    })
    
    h.Spin()
}

流式错误处理

go
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    ctx := stream.Context()
    
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            // 区分错误类型
            if errors.Is(ctx.Err(), context.Canceled) {
                // 用户主动取消
                klog.Infof("client cancelled")
            } else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
                // 超时
                klog.Infof("deadline exceeded")
            }
            return ctx.Err()
        default:
            // 业务处理
            if err := processItem(ctx, i, stream); err != nil {
                // 流级别错误
                if isRecoverableError(err) {
                    klog.Warnf("recoverable error: %v", err)
                    continue
                }
                // 连接级别错误
                return err
            }
        }
    }
}

流式处理最佳实践

1. 内存管理

go
// 预分配缓冲区
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    // 预分配响应对象,避免每次创建新对象
    resp := &hello.Response{}
    
    for i := 0; i < 100; i++ {
        // 复用响应对象
        resp.Message = fmt.Sprintf("Message %d", i+1)
        resp.Timestamp = time.Now().Unix()
        
        if err := stream.Send(resp); err != nil {
            return err
        }
    }
    return nil
}

2. 流量控制

go
// 实现简单的流量控制
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    ctx := stream.Context()
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            resp := &hello.Response{
                Message:   fmt.Sprintf("Message %d", i+1),
                Timestamp: time.Now().Unix(),
            }
            
            if err := stream.Send(resp); err != nil {
                return err
            }
        }
    }
}

3. 批量处理

go
// 客户端批量发送
func batchSend(stream hello.HelloService_ClientStreamServer, messages []string) error {
    for _, msg := range messages {
        if err := stream.Send(&hello.Request{Name: msg}); err != nil {
            return err
        }
    }
    return nil
}

大模型应用案例

1. 大模型推理服务

IDL 定义

thrift
namespace go llm

struct CompletionRequest {
    1: string prompt
    2: i32 max_tokens
    3: float temperature
}

struct CompletionResponse {
    1: string text
    2: bool finish_reason
}

service LLMService {
    // 流式生成
    stream CompletionResponse StreamCompletion(1: CompletionRequest req) (streaming)
}

服务端实现

go
func (h *LLMHandler) StreamCompletion(req *llm.CompletionRequest, stream llm.LLMService_StreamCompletionServer) error {
    ctx := stream.Context()
    
    // 调用大模型
    model := initModel()
    
    // 流式生成
    for chunk := range model.StreamGenerate(ctx, req.Prompt, req.MaxTokens, req.Temperature) {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            resp := &llm.CompletionResponse{
                Text:          chunk,
                FinishReason:  false,
            }
            
            if err := stream.Send(resp); err != nil {
                return err
            }
        }
    }
    
    // 发送结束标记
    finalResp := &llm.CompletionResponse{
        Text:          "",
        FinishReason:  true,
    }
    return stream.Send(finalResp)
}

客户端调用

go
func callLLM() {
    cli := llmservice.MustNewClient("llm",
        client.WithHostPorts("127.0.0.1:8888"),
    )
    
    req := &llm.CompletionRequest{
        Prompt:      "Write a poem about cloud computing",
        MaxTokens:   100,
        Temperature: 0.7,
    }
    
    stream, err := cli.StreamCompletion(context.Background(), req)
    if err != nil {
        log.Fatal(err)
    }
    
    var fullResponse string
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        
        fullResponse += resp.Text
        fmt.Print(resp.Text) // 实时输出
        
        if resp.FinishReason {
            break
        }
    }
    
    fmt.Println("\nComplete response:", fullResponse)
}

2. SSE 集成大模型

Hertz 网关

go
func main() {
    h := server.Default()
    
    h.POST("/api/chat", func(ctx context.Context, c *app.RequestContext) {
        var req llm.CompletionRequest
        if err := c.BindAndValidate(&req); err != nil {
            c.String(400, err.Error())
            return
        }
        
        // 设置 SSE 响应头
        c.Response.Header.Set("Content-Type", "text/event-stream")
        c.Response.Header.Set("Cache-Control", "no-cache")
        c.Response.Header.Set("Connection", "keep-alive")
        
        // 调用 Kitex 流式服务
        stream, err := llmClient.StreamCompletion(ctx, &req)
        if err != nil {
            c.String(500, err.Error())
            return
        }
        
        // 转发流式响应
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                break
            }
            
            // 发送 SSE 事件
            event := fmt.Sprintf("data: %s\n\n", resp.Text)
            if _, err := c.Response.BodyWriter().Write([]byte(event)); err != nil {
                break
            }
            c.Response.BodyWriter().Flush()
            
            if resp.FinishReason {
                break
            }
        }
    })
    
    h.Spin()
}

性能优化

1. 网络优化

go
// 优化网络配置
cli := helloservice.MustNewClient("hello",
    client.WithHostPorts("127.0.0.1:8888"),
    client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
        MaxIdlePerAddress: 100,
        MaxIdleGlobal:     1000,
        KeepAlive:         true,
    }),
    client.WithReadWriteTimeout(30*time.Second),
)

2. 并发优化

go
// 并发处理多个流式请求
func processMultipleStreams() {
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(idx int) {
            defer wg.Done()
            stream, err := cli.ServerStream(context.Background(), &hello.Request{
                Name: fmt.Sprintf("Stream %d", idx),
            })
            if err != nil {
                log.Printf("stream %d error: %v", idx, err)
                return
            }
            
            // 处理流式响应
            for {
                resp, err := stream.Recv()
                if err == io.EOF {
                    break
                }
                if err != nil {
                    break
                }
                log.Printf("stream %d: %s", idx, resp.Message)
            }
        }(i)
    }
    
    wg.Wait()
}

3. 内存优化

go
// 内存池优化
var respPool = sync.Pool{
    New: func() interface{} {
        return &hello.Response{}
    },
}

func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    for i := 0; i < 100; i++ {
        resp := respPool.Get().(*hello.Response)
        resp.Message = fmt.Sprintf("Message %d", i+1)
        resp.Timestamp = time.Now().Unix()
        
        if err := stream.Send(resp); err != nil {
            respPool.Put(resp)
            return err
        }
        
        respPool.Put(resp)
    }
    return nil
}

常见问题解决

1. 流超时问题

问题:流式处理超时 解决方法

  • 增加超时时间
  • 定期发送心跳
  • 实现断点续传
go
// 增加超时时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

// 定期发送心跳
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    ctx := stream.Context()
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            // 发送心跳
            if err := stream.Send(&hello.Response{
                Message:   "heartbeat",
                Timestamp: time.Now().Unix(),
            }); err != nil {
                return err
            }
        default:
            // 正常业务处理
        }
    }
}

2. 内存占用过高

问题:流式处理导致内存占用过高 解决方法

  • 使用对象池
  • 限制并发流数量
  • 批量处理
go
// 限制并发流数量
var semaphore = make(chan struct{}, 100) // 最多100个并发流

func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
    // 获取信号量
    semaphore <- struct{}{}
    defer func() { <-semaphore }()
    
    // 处理流
    // ...
}

3. 网络断开重连

问题:网络断开后流中断 解决方法

  • 实现重连机制
  • 断点续传
  • 错误重试
go
// 实现重连
func retryStream(ctx context.Context, cli helloservice.Client, req *hello.Request) error {
    maxRetries := 3
    
    for i := 0; i < maxRetries; i++ {
        stream, err := cli.ServerStream(ctx, req)
        if err != nil {
            log.Printf("retry %d: %v", i+1, err)
            time.Sleep(time.Duration(i*100) * time.Millisecond)
            continue
        }
        
        // 处理流
        if err := processStream(stream); err == nil {
            return nil
        }
    }
    
    return fmt.Errorf("failed after %d retries", maxRetries)
}

小结

本章介绍了 Kitex 流式处理的完整内容:

  1. 流式类型

    • 服务端流式:客户端发送一次请求,服务端返回多个响应
    • 客户端流式:客户端发送多个请求,服务端返回一次响应
    • 双向流式:客户端和服务端同时收发数据
  2. IDL 定义

    • 使用 streaming 关键字定义流式接口
    • 支持不同类型的流式模式
  3. 实现方式

    • 服务端:实现流处理接口,使用 SendRecv 方法
    • 客户端:创建流,收发数据,关闭流
    • Context 管理:处理取消和超时
  4. 最佳实践

    • 内存管理:预分配缓冲区,复用对象
    • 流量控制:使用定时器控制发送速率
    • 批量处理:批量发送和接收数据
  5. 大模型应用案例

    • 大模型推理服务:流式生成文本
    • SSE 集成:通过 Hertz 提供 SSE 接口
    • 实时交互:类似 ChatGPT 的对话体验
  6. 性能优化

    • 网络优化:长连接池、适当的超时设置
    • 并发优化:并发处理多个流
    • 内存优化:使用对象池减少内存分配
  7. 常见问题解决

    • 流超时:增加超时时间、发送心跳
    • 内存占用:使用对象池、限制并发
    • 网络断开:实现重连机制、断点续传
  8. SSE 集成

    • 与 Hertz 框架配合实现 SSE 响应
    • 支持浏览器端的流式接收

通过本章的学习,你应该掌握了 Kitex 流式处理的核心概念和实践技巧,特别是在大模型应用、实时数据传输等场景中的应用。流式处理是 CloudWeGo 生态中的重要特性,为构建高性能、实时的微服务提供了有力支持。

在下一章中,我们将学习 Hertz HTTP 框架,了解如何构建高性能的 HTTP 服务。