并发编程
Go语言的并发模型基于CSP(通信顺序进程),使用goroutine和channel实现。
Goroutine
基本使用
Goroutine是轻量级线程,由Go运行时管理:
go
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s, ":", i)
}
}
func main() {
// 普通函数调用(同步)
// say("同步")
// 启动goroutine(异步)
go say("异步1")
go say("异步2")
// 等待一段时间让goroutine执行完
time.Sleep(time.Second)
fmt.Println("主程序结束")
}匿名函数goroutine
go
func main() {
// 启动匿名函数goroutine
go func() {
fmt.Println("匿名goroutine")
}()
// 带参数的匿名goroutine
go func(msg string) {
fmt.Println(msg)
}("带参数")
time.Sleep(time.Second)
}Goroutine特性
- 初始栈大小很小(2KB),可动态增长
- 由Go运行时调度,非操作系统线程
- 创建成本极低,可以创建成千上万个
Channel
Channel是goroutine之间的通信管道。
创建Channel
go
// 无缓冲channel
ch := make(chan int)
// 有缓冲channel
ch := make(chan int, 10)基本操作
go
func main() {
ch := make(chan string)
// 发送数据
go func() {
ch <- "hello" // 发送
}()
// 接收数据
msg := <-ch // 接收
fmt.Println(msg)
}Channel示例
go
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 发送sum到channel c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x := <-c // 从channel c接收
y := <-c // 从channel c接收
fmt.Println(x, y, x+y)
}缓冲Channel
go
func main() {
// 创建缓冲大小为2的channel
ch := make(chan int, 2)
// 发送数据(不会阻塞,直到缓冲满)
ch <- 1
ch <- 2
// ch <- 3 // 这会阻塞,因为缓冲已满
// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}关闭Channel
go
func main() {
ch := make(chan int, 5)
// 发送数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭channel
}()
// 接收数据直到channel关闭
for num := range ch {
fmt.Println(num)
}
// 检查channel是否关闭
num, ok := <-ch
fmt.Println(num, ok) // 0 false
}Select
Select用于处理多个channel操作:
go
package main
import (
"fmt"
"time"
)
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("退出")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}超时处理
go
func main() {
ch := make(chan int, 1)
select {
case res := <-ch:
fmt.Println("收到结果:", res)
case <-time.After(time.Second):
fmt.Println("超时")
}
}非阻塞操作
go
func main() {
ch := make(chan int, 1)
ch <- 1
select {
case msg := <-ch:
fmt.Println("收到:", msg)
default:
fmt.Println("没有数据")
}
}sync包
WaitGroup
等待一组goroutine完成:
go
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时通知
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数
go worker(i, &wg)
}
wg.Wait() // 等待所有goroutine完成
fmt.Println("所有工作完成")
}Mutex
互斥锁,保护共享资源:
go
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("计数器:", counter.Value()) // 1000
}RWMutex
读写锁,允许多读单写:
go
type SafeMap struct {
mu sync.RWMutex
m map[string]int
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
val, ok := sm.m[key]
return val, ok
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.m[key] = value
}Once
确保操作只执行一次:
go
var once sync.Once
func initialize() {
fmt.Println("初始化")
}
func main() {
for i := 0; i < 5; i++ {
once.Do(initialize) // 只会执行一次
}
}并发模式
Worker Pool
go
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, j)
results <- j * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动workers
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for r := 1; r <= numJobs; r++ {
fmt.Println("结果:", <-results)
}
}Pipeline
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 构建pipeline
c := generate(1, 2, 3, 4, 5)
out := square(c)
// 消费结果
for result := range out {
fmt.Println(result)
}
}Context
用于控制goroutine的生命周期:
go
func main() {
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
time.Sleep(3 * time.Second)
fmt.Println("工作完成")
}()
select {
case <-ctx.Done():
fmt.Println("超时:", ctx.Err())
}
}