Kitex 流式处理
概述
流式处理是 Kitex 的重要特性,特别适用于大模型应用、实时数据传输等场景。本章将深入介绍 Kitex 的流式处理能力,包括 gRPC Streaming 和 TTHeader Streaming。
核心内容
流式处理场景
| 场景 | 描述 |
|---|---|
| 大模型应用 | 一问多答,流式输出 |
| 实时数据推送 | 股票行情、日志流 |
| 大文件传输 | 分块传输,减少内存占用 |
| 双向通信 | 聊天应用、实时协作 |
流式类型
┌─────────────┐ ┌─────────────┐
│ Client │ │ Server │
└─────────────┘ └─────────────┘
1. 服务端流式 (Server Streaming)
Client ───── Request ─────▶ Server
Client ◀──── Stream ◀────── Server
2. 客户端流式 (Client Streaming)
Client ───── Stream ──────▶ Server
Client ◀──── Response ◀──── Server
3. 双向流式 (Bidirectional Streaming)
Client ───── Stream ──────▶ Server
Client ◀──── Stream ◀────── ServerIDL 定义
thrift
// hello.thrift
namespace go hello
struct Request {
1: string name
}
struct Response {
1: string message
2: i64 timestamp
}
service HelloService {
// 服务端流式
Response ServerStream(1: Request req) (streaming)
// 客户端流式
Response ClientStream(1: stream Request req)
// 双向流式
stream Response BidiStream(1: stream Request req)
}服务端流式
服务端实现
go
package main
import (
"context"
"time"
"example/kitex_gen/hello"
)
type HelloHandler struct{}
// 服务端流式:发送多个响应
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
for i := 0; i < 5; i++ {
resp := &hello.Response{
Message: "Hello stream message " + string(rune(i+1)),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(resp); err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}客户端调用
go
package main
import (
"context"
"log"
"example/kitex_gen/hello/helloservice"
)
func main() {
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
)
// 创建流式请求
stream, err := cli.ServerStream(context.Background(), &hello.Request{
Name: "World",
})
if err != nil {
log.Fatal(err)
}
// 接收流式响应
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("stream finished")
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("received: %s", resp.Message)
}
}客户端流式
服务端实现
go
// 客户端流式:接收多个请求
func (h *HelloHandler) ClientStream(stream hello.HelloService_ClientStreamServer) error {
var messages []string
// 接收所有请求
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
messages = append(messages, req.Name)
}
// 返回汇总响应
return stream.SendAndClose(&hello.Response{
Message: fmt.Sprintf("Received %d messages", len(messages)),
Timestamp: time.Now().Unix(),
})
}客户端调用
go
// 创建流式客户端
stream, err := cli.ClientStream(context.Background())
if err != nil {
log.Fatal(err)
}
// 发送多个请求
for i := 0; i < 5; i++ {
if err := stream.Send(&hello.Request{
Name: fmt.Sprintf("Message %d", i+1),
}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
// 关闭流并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
log.Printf("response: %s", resp.Message)双向流式
服务端实现
go
// 双向流式:同时收发
func (h *HelloHandler) BidiStream(stream hello.HelloService_BidiStreamServer) error {
for {
// 接收请求
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 发送响应
resp := &hello.Response{
Message: "Echo: " + req.Name,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(resp); err != nil {
return err
}
}
}客户端调用
go
// 创建双向流
stream, err := cli.BidiStream(context.Background())
if err != nil {
log.Fatal(err)
}
// 启动接收协程
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("recv error: %v", err)
return
}
log.Printf("received: %s", resp.Message)
}
}()
// 发送请求
for i := 0; i < 5; i++ {
if err := stream.Send(&hello.Request{
Name: fmt.Sprintf("Message %d", i+1),
}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
// 关闭发送端
if err := stream.CloseSend(); err != nil {
log.Fatal(err)
}流式生命周期管理
Context 取消
go
func main() {
ctx, cancel := context.WithCancel(context.Background())
// 创建流式请求
stream, err := cli.ServerStream(ctx, &hello.Request{Name: "World"})
if err != nil {
log.Fatal(err)
}
// 启动接收协程
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("stream finished")
return
}
if err != nil {
log.Printf("recv error: %v", err)
return
}
log.Printf("received: %s", resp.Message)
}
}()
// 用户取消
go func() {
time.Sleep(3 * time.Second)
cancel() // 取消流式请求
}()
}服务端处理取消
go
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
ctx := stream.Context()
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
// 客户端取消
klog.Infof("stream cancelled: %v", ctx.Err())
return ctx.Err()
default:
resp := &hello.Response{
Message: fmt.Sprintf("Message %d", i+1),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(resp); err != nil {
return err
}
time.Sleep(time.Second)
}
}
return nil
}SSE 与 Hertz 集成
go
// Hertz SSE 网关
func main() {
h := server.Default()
h.GET("/stream/:name", func(ctx context.Context, c *app.RequestContext) {
name := c.Param("name")
// 设置 SSE 响应头
c.Response.Header.Set("Content-Type", "text/event-stream")
c.Response.Header.Set("Cache-Control", "no-cache")
c.Response.Header.Set("Connection", "keep-alive")
// 调用 Kitex 流式服务
stream, err := kitexClient.ServerStream(ctx, &hello.Request{Name: name})
if err != nil {
c.String(500, err.Error())
return
}
// 转发流式响应
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
break
}
// 发送 SSE 事件
c.Response.BodyWriter().Write([]byte(
fmt.Sprintf("data: %s\n\n", resp.Message),
))
c.Response.BodyWriter().Flush()
}
})
h.Spin()
}流式错误处理
go
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
ctx := stream.Context()
for i := 0; ; i++ {
select {
case <-ctx.Done():
// 区分错误类型
if errors.Is(ctx.Err(), context.Canceled) {
// 用户主动取消
klog.Infof("client cancelled")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
// 超时
klog.Infof("deadline exceeded")
}
return ctx.Err()
default:
// 业务处理
if err := processItem(ctx, i, stream); err != nil {
// 流级别错误
if isRecoverableError(err) {
klog.Warnf("recoverable error: %v", err)
continue
}
// 连接级别错误
return err
}
}
}
}流式处理最佳实践
1. 内存管理
go
// 预分配缓冲区
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
// 预分配响应对象,避免每次创建新对象
resp := &hello.Response{}
for i := 0; i < 100; i++ {
// 复用响应对象
resp.Message = fmt.Sprintf("Message %d", i+1)
resp.Timestamp = time.Now().Unix()
if err := stream.Send(resp); err != nil {
return err
}
}
return nil
}2. 流量控制
go
// 实现简单的流量控制
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
ctx := stream.Context()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
resp := &hello.Response{
Message: fmt.Sprintf("Message %d", i+1),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(resp); err != nil {
return err
}
}
}
}3. 批量处理
go
// 客户端批量发送
func batchSend(stream hello.HelloService_ClientStreamServer, messages []string) error {
for _, msg := range messages {
if err := stream.Send(&hello.Request{Name: msg}); err != nil {
return err
}
}
return nil
}大模型应用案例
1. 大模型推理服务
IDL 定义:
thrift
namespace go llm
struct CompletionRequest {
1: string prompt
2: i32 max_tokens
3: float temperature
}
struct CompletionResponse {
1: string text
2: bool finish_reason
}
service LLMService {
// 流式生成
stream CompletionResponse StreamCompletion(1: CompletionRequest req) (streaming)
}服务端实现:
go
func (h *LLMHandler) StreamCompletion(req *llm.CompletionRequest, stream llm.LLMService_StreamCompletionServer) error {
ctx := stream.Context()
// 调用大模型
model := initModel()
// 流式生成
for chunk := range model.StreamGenerate(ctx, req.Prompt, req.MaxTokens, req.Temperature) {
select {
case <-ctx.Done():
return ctx.Err()
default:
resp := &llm.CompletionResponse{
Text: chunk,
FinishReason: false,
}
if err := stream.Send(resp); err != nil {
return err
}
}
}
// 发送结束标记
finalResp := &llm.CompletionResponse{
Text: "",
FinishReason: true,
}
return stream.Send(finalResp)
}客户端调用:
go
func callLLM() {
cli := llmservice.MustNewClient("llm",
client.WithHostPorts("127.0.0.1:8888"),
)
req := &llm.CompletionRequest{
Prompt: "Write a poem about cloud computing",
MaxTokens: 100,
Temperature: 0.7,
}
stream, err := cli.StreamCompletion(context.Background(), req)
if err != nil {
log.Fatal(err)
}
var fullResponse string
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fullResponse += resp.Text
fmt.Print(resp.Text) // 实时输出
if resp.FinishReason {
break
}
}
fmt.Println("\nComplete response:", fullResponse)
}2. SSE 集成大模型
Hertz 网关:
go
func main() {
h := server.Default()
h.POST("/api/chat", func(ctx context.Context, c *app.RequestContext) {
var req llm.CompletionRequest
if err := c.BindAndValidate(&req); err != nil {
c.String(400, err.Error())
return
}
// 设置 SSE 响应头
c.Response.Header.Set("Content-Type", "text/event-stream")
c.Response.Header.Set("Cache-Control", "no-cache")
c.Response.Header.Set("Connection", "keep-alive")
// 调用 Kitex 流式服务
stream, err := llmClient.StreamCompletion(ctx, &req)
if err != nil {
c.String(500, err.Error())
return
}
// 转发流式响应
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
break
}
// 发送 SSE 事件
event := fmt.Sprintf("data: %s\n\n", resp.Text)
if _, err := c.Response.BodyWriter().Write([]byte(event)); err != nil {
break
}
c.Response.BodyWriter().Flush()
if resp.FinishReason {
break
}
}
})
h.Spin()
}性能优化
1. 网络优化
go
// 优化网络配置
cli := helloservice.MustNewClient("hello",
client.WithHostPorts("127.0.0.1:8888"),
client.WithLongConnectionPoolConfig(&pool.LongPoolConfig{
MaxIdlePerAddress: 100,
MaxIdleGlobal: 1000,
KeepAlive: true,
}),
client.WithReadWriteTimeout(30*time.Second),
)2. 并发优化
go
// 并发处理多个流式请求
func processMultipleStreams() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
stream, err := cli.ServerStream(context.Background(), &hello.Request{
Name: fmt.Sprintf("Stream %d", idx),
})
if err != nil {
log.Printf("stream %d error: %v", idx, err)
return
}
// 处理流式响应
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
break
}
log.Printf("stream %d: %s", idx, resp.Message)
}
}(i)
}
wg.Wait()
}3. 内存优化
go
// 内存池优化
var respPool = sync.Pool{
New: func() interface{} {
return &hello.Response{}
},
}
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
for i := 0; i < 100; i++ {
resp := respPool.Get().(*hello.Response)
resp.Message = fmt.Sprintf("Message %d", i+1)
resp.Timestamp = time.Now().Unix()
if err := stream.Send(resp); err != nil {
respPool.Put(resp)
return err
}
respPool.Put(resp)
}
return nil
}常见问题解决
1. 流超时问题
问题:流式处理超时 解决方法:
- 增加超时时间
- 定期发送心跳
- 实现断点续传
go
// 增加超时时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 定期发送心跳
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
ctx := stream.Context()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// 发送心跳
if err := stream.Send(&hello.Response{
Message: "heartbeat",
Timestamp: time.Now().Unix(),
}); err != nil {
return err
}
default:
// 正常业务处理
}
}
}2. 内存占用过高
问题:流式处理导致内存占用过高 解决方法:
- 使用对象池
- 限制并发流数量
- 批量处理
go
// 限制并发流数量
var semaphore = make(chan struct{}, 100) // 最多100个并发流
func (h *HelloHandler) ServerStream(req *hello.Request, stream hello.HelloService_ServerStreamServer) error {
// 获取信号量
semaphore <- struct{}{}
defer func() { <-semaphore }()
// 处理流
// ...
}3. 网络断开重连
问题:网络断开后流中断 解决方法:
- 实现重连机制
- 断点续传
- 错误重试
go
// 实现重连
func retryStream(ctx context.Context, cli helloservice.Client, req *hello.Request) error {
maxRetries := 3
for i := 0; i < maxRetries; i++ {
stream, err := cli.ServerStream(ctx, req)
if err != nil {
log.Printf("retry %d: %v", i+1, err)
time.Sleep(time.Duration(i*100) * time.Millisecond)
continue
}
// 处理流
if err := processStream(stream); err == nil {
return nil
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}小结
本章介绍了 Kitex 流式处理的完整内容:
流式类型:
- 服务端流式:客户端发送一次请求,服务端返回多个响应
- 客户端流式:客户端发送多个请求,服务端返回一次响应
- 双向流式:客户端和服务端同时收发数据
IDL 定义:
- 使用
streaming关键字定义流式接口 - 支持不同类型的流式模式
- 使用
实现方式:
- 服务端:实现流处理接口,使用
Send、Recv方法 - 客户端:创建流,收发数据,关闭流
- Context 管理:处理取消和超时
- 服务端:实现流处理接口,使用
最佳实践:
- 内存管理:预分配缓冲区,复用对象
- 流量控制:使用定时器控制发送速率
- 批量处理:批量发送和接收数据
大模型应用案例:
- 大模型推理服务:流式生成文本
- SSE 集成:通过 Hertz 提供 SSE 接口
- 实时交互:类似 ChatGPT 的对话体验
性能优化:
- 网络优化:长连接池、适当的超时设置
- 并发优化:并发处理多个流
- 内存优化:使用对象池减少内存分配
常见问题解决:
- 流超时:增加超时时间、发送心跳
- 内存占用:使用对象池、限制并发
- 网络断开:实现重连机制、断点续传
SSE 集成:
- 与 Hertz 框架配合实现 SSE 响应
- 支持浏览器端的流式接收
通过本章的学习,你应该掌握了 Kitex 流式处理的核心概念和实践技巧,特别是在大模型应用、实时数据传输等场景中的应用。流式处理是 CloudWeGo 生态中的重要特性,为构建高性能、实时的微服务提供了有力支持。
在下一章中,我们将学习 Hertz HTTP 框架,了解如何构建高性能的 HTTP 服务。