Skip to content

并发编程

Go语言的并发模型基于CSP(通信顺序进程),使用goroutine和channel实现。

Goroutine

基本使用

Goroutine是轻量级线程,由Go运行时管理:

go
package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 3; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s, ":", i)
    }
}

func main() {
    // 普通函数调用(同步)
    // say("同步")
    
    // 启动goroutine(异步)
    go say("异步1")
    go say("异步2")
    
    // 等待一段时间让goroutine执行完
    time.Sleep(time.Second)
    fmt.Println("主程序结束")
}

匿名函数goroutine

go
func main() {
    // 启动匿名函数goroutine
    go func() {
        fmt.Println("匿名goroutine")
    }()
    
    // 带参数的匿名goroutine
    go func(msg string) {
        fmt.Println(msg)
    }("带参数")
    
    time.Sleep(time.Second)
}

Goroutine特性

  • 初始栈大小很小(2KB),可动态增长
  • 由Go运行时调度,非操作系统线程
  • 创建成本极低,可以创建成千上万个

Channel

Channel是goroutine之间的通信管道。

创建Channel

go
// 无缓冲channel
ch := make(chan int)

// 有缓冲channel
ch := make(chan int, 10)

基本操作

go
func main() {
    ch := make(chan string)
    
    // 发送数据
    go func() {
        ch <- "hello"  // 发送
    }()
    
    // 接收数据
    msg := <-ch  // 接收
    fmt.Println(msg)
}

Channel示例

go
package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum  // 发送sum到channel c
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}
    
    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    
    x := <-c  // 从channel c接收
    y := <-c  // 从channel c接收
    fmt.Println(x, y, x+y)
}

缓冲Channel

go
func main() {
    // 创建缓冲大小为2的channel
    ch := make(chan int, 2)
    
    // 发送数据(不会阻塞,直到缓冲满)
    ch <- 1
    ch <- 2
    // ch <- 3  // 这会阻塞,因为缓冲已满
    
    // 接收数据
    fmt.Println(<-ch)  // 1
    fmt.Println(<-ch)  // 2
}

关闭Channel

go
func main() {
    ch := make(chan int, 5)
    
    // 发送数据
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch)  // 关闭channel
    }()
    
    // 接收数据直到channel关闭
    for num := range ch {
        fmt.Println(num)
    }
    
    // 检查channel是否关闭
    num, ok := <-ch
    fmt.Println(num, ok)  // 0 false
}

Select

Select用于处理多个channel操作:

go
package main

import (
    "fmt"
    "time"
)

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("退出")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    
    fibonacci(c, quit)
}

超时处理

go
func main() {
    ch := make(chan int, 1)
    
    select {
    case res := <-ch:
        fmt.Println("收到结果:", res)
    case <-time.After(time.Second):
        fmt.Println("超时")
    }
}

非阻塞操作

go
func main() {
    ch := make(chan int, 1)
    ch <- 1
    
    select {
    case msg := <-ch:
        fmt.Println("收到:", msg)
    default:
        fmt.Println("没有数据")
    }
}

sync包

WaitGroup

等待一组goroutine完成:

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // 完成时通知
	
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)  // 增加计数
        go worker(i, &wg)
    }
    
    wg.Wait()  // 等待所有goroutine完成
    fmt.Println("所有工作完成")
}

Mutex

互斥锁,保护共享资源:

go
package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("计数器:", counter.Value())  // 1000
}

RWMutex

读写锁,允许多读单写:

go
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]int
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    val, ok := sm.m[key]
    return val, ok
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.m[key] = value
}

Once

确保操作只执行一次:

go
var once sync.Once

func initialize() {
    fmt.Println("初始化")
}

func main() {
    for i := 0; i < 5; i++ {
        once.Do(initialize)  // 只会执行一次
    }
}

并发模式

Worker Pool

go
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d 处理任务 %d\n", id, j)
        results <- j * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // 启动workers
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for r := 1; r <= numJobs; r++ {
        fmt.Println("结果:", <-results)
    }
}

Pipeline

go
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 构建pipeline
    c := generate(1, 2, 3, 4, 5)
    out := square(c)
    
    // 消费结果
    for result := range out {
        fmt.Println(result)
    }
}

Context

用于控制goroutine的生命周期:

go
func main() {
    // 创建带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    go func() {
        time.Sleep(3 * time.Second)
        fmt.Println("工作完成")
    }()
    
    select {
    case <-ctx.Done():
        fmt.Println("超时:", ctx.Err())
    }
}