Go语言同步原语Mutex、WaitGroup、Once深度解析

张开发
2026/5/1 6:20:45 15 分钟阅读

分享文章

Go语言同步原语Mutex、WaitGroup、Once深度解析
前言Go语言的sync包提供了丰富的同步原语用于Goroutine之间的协调和数据保护。本文详细介绍Mutex、RWMutex、WaitGroup、Once、Cond、Pool等同步原语的原理和使用场景帮助读者掌握Go并发编程的核心工具。一、Mutex互斥锁1.1 Mutex的基本使用import sync ​ type Counter struct { mu sync.Mutex count int } ​ func (c *Counter) Inc() { c.mu.Lock() defer c.mu.Unlock() c.count } ​ func (c *Counter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.count } ​ func main() { var c Counter var wg sync.WaitGroup for i : 0; i 1000; i { wg.Add(1) go func() { defer wg.Done() c.Inc() }() } wg.Wait() fmt.Printf(计数结果: %d\n, c.Get()) // 应该是1000 }1.2 Mutex的两种模式Go的Mutex有两种模式正常模式非公平锁等待者按FIFO顺序但刚解锁的Goroutine可能有优势饥饿模式为保证公平等待超过1ms的Goroutine优先获取锁type Mutex struct { state int32 // 锁状态 sema uint32 // 信号量 } ​ const ( mutexLocked 1 iota // 1: 锁已被持有 mutexWoken // 2: 有等待者被唤醒 mutexStarving // 4: 处于饥饿模式 mutexWaiterShift iota // 等待者数量偏移 )1.3 错误示例与修正// 错误锁粒度太大 type BadCounter struct { mu sync.Mutex count int } ​ func (c *BadCounter) Inc() { c.mu.Lock() // 模拟一些耗时操作 time.Sleep(1 * time.Millisecond) c.count c.mu.Unlock() } ​ // 修正减小锁粒度 type GoodCounter struct { mu sync.Mutex count int } ​ func (c *GoodCounter) Inc() { c.mu.Lock() c.count // 只在必要时持有锁 c.mu.Unlock() // 耗时操作放在锁外 time.Sleep(1 * time.Millisecond) }二、RWMutex读写锁2.1 RWMutex vs Muteximport sync ​ type Cache struct { mu sync.RWMutex data map[string]string } ​ func (c *Cache) Get(key string) string { // 读操作使用RLock c.mu.RLock() defer c.mu.RUnlock() return c.data[key] } ​ func (c *Cache) Set(key, value string) { // 写操作使用Lock c.mu.Lock() defer c.mu.Unlock() c.data[key] value } ​ func (c *Cache) GetOrSet(key, defaultValue string) string { // 先尝试读锁 c.mu.RLock() if v, ok : c.data[key]; ok { c.mu.RUnlock() return v } c.mu.RUnlock() // 读不到升级为写锁 c.mu.Lock() defer c.mu.Unlock() // 再次检查可能有其他Goroutine已经写入 if v, ok : c.data[key]; ok { return v } c.data[key] defaultValue return defaultValue }2.2 读写锁的性能优势import ( sync testing ) ​ type Data struct { mu sync.Mutex m map[string]int } ​ func (d *Data) MutexGet(key string) int { d.mu.Lock() defer d.mu.Unlock() return d.m[key] } ​ func (d *Data) MutexSet(key string, val int) { d.mu.Lock() defer d.mu.Unlock() d.m[key] val } ​ type RWData struct { mu sync.RWMutex m map[string]int } ​ func (d *RWData) RWGet(key string) int { d.mu.RLock() defer d.mu.RUnlock() return d.m[key] } ​ func (d *RWData) RWSet(key string, val int) { d.mu.Lock() defer d.mu.Unlock() d.m[key] val } ​ // 读多写少场景RWMutex性能更好 // 写多读少场景普通Mutex更简单三、WaitGroup3.1 WaitGroup的基本使用import sync ​ func main() { var wg sync.WaitGroup // 添加待等待的Goroutine数量 wg.Add(3) go func() { defer wg.Done() // 完成后通知 fmt.Println(任务1完成) }() go func() { defer wg.Done() fmt.Println(任务2完成) }() go func() { defer wg.Done() fmt.Println(任务3完成) }() // 等待所有Goroutine完成 wg.Wait() fmt.Println(所有任务完成) }3.2 WaitGroup的错误用法// 错误1在Goroutine内部调用Add func wrong1() { var wg sync.WaitGroup for i : 0; i 3; i { go func() { wg.Add(1) // 错误Add应该在启动Goroutine之前 defer wg.Done() // 工作... }() } wg.Wait() } ​ // 错误2Add和Wait的时序问题 func wrong2() { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() // 工作... }() wg.Wait() // 如果Goroutine还没启动就Wait会永久阻塞 } ​ // 正确先Add再启动Goroutine func correct() { var wg sync.WaitGroup for i : 0; i 3; i { wg.Add(1) // 先Add go func() { defer wg.Done() // 工作... }() } wg.Wait() }3.3 WaitGroup实践func downloadFiles(urls []string) error { var wg sync.WaitGroup var errMu sync.Mutex var errs []error for _, url : range urls { wg.Add(1) go func(u string) { defer wg.Done() if err : download(u); err ! nil { errMu.Lock() errs append(errs, err) errMu.Unlock() } }(url) } wg.Wait() if len(errs) 0 { return fmt.Errorf(下载失败: %v, errs) } return nil } ​ func download(url string) error { // 模拟下载 return nil }四、Once4.1 Once保证只执行一次import sync ​ func main() { var once sync.Once for i : 0; i 5; i { go func() { once.Do(func() { fmt.Println(这段代码只执行一次) }) }() } time.Sleep(time.Second) }输出这段代码只执行一次4.2 单例模式实现import sync ​ type Singleton struct { data string } ​ var ( instance *Singleton once sync.Once ) ​ func GetInstance() *Singleton { once.Do(func() { instance Singleton{data: 单例数据} fmt.Println(单例初始化) }) return instance } ​ func main() { // 多次调用只会初始化一次 for i : 0; i 3; i { go func() { s : GetInstance() fmt.Printf(实例地址: %p\n, s) }() } time.Sleep(time.Second) }4.3 Once的变体初始化检查type Manager struct { mu sync.Mutex once sync.Once inited bool } ​ func (m *Manager) Init() { m.mu.Lock() defer m.mu.Unlock() if !m.inited { m.once.Do(func() { fmt.Println(执行初始化) m.inited true }) } }五、Cond条件变量5.1 Cond的使用场景import sync ​ func main() { var mu sync.Mutex cond : sync.NewCond(mu) queue : make([]int, 0) // 消费者等待数据 consumer : func() { mu.Lock() for len(queue) 0 { cond.Wait() // 等待时会释放锁 } fmt.Printf(消费: %d\n, queue[0]) queue queue[1:] mu.Unlock() } // 生产者添加数据 producer : func(v int) { mu.Lock() queue append(queue, v) cond.Signal() // 通知一个等待者 mu.Unlock() } // 启动消费者 go consumer() // 生产数据 time.Sleep(100 * time.Millisecond) producer(42) time.Sleep(time.Second) }5.2 Broadcast广播func main() { var mu sync.Mutex cond : sync.NewCond(mu) var ready bool // 等待所有Goroutine就绪 for i : 0; i 3; i { go func(id int) { mu.Lock() for !ready { cond.Wait() } mu.Unlock() fmt.Printf(Goroutine %d 开始执行\n, id) }(i) } // 广播通知所有等待者 mu.Lock() ready true cond.Broadcast() // 通知所有等待者 mu.Unlock() time.Sleep(time.Second) }六、Pool对象池6.1 sync.Pool用途sync.Pool用于缓存临时对象减少GC压力import sync ​ var pool sync.Pool{ New: func() interface{} { fmt.Println(创建新对象) return bytes.Buffer{} }, } ​ func main() { // 获取对象 buf : pool.Get().(*bytes.Buffer) buf.Reset() // 重置后使用 buf.WriteString(hello) fmt.Println(buf.String()) // 归还对象 pool.Put(buf) // 再次获取可能复用之前的 buf2 : pool.Get().(*bytes.Buffer) fmt.Printf(获取到: %p\n, buf2) }6.2 Pool的注意事项// 1. Get返回的对象不能假设任何初始状态 buf : pool.Get().(*bytes.Buffer) buf.Reset() // 必须重置 ​ // 2. Pool中的对象可能被GC回收 // 3. 适合存储无状态的临时对象 // 4. 不适合用于连接池用sql.DB或redis.Pool6.3 典型应用fmt.Printf的缓冲区池// fmt包内部使用sync.Pool来复用临时缓冲区 // 这是sync.Pool的典型应用场景 ​ // 自定义使用Pool type BufferPool struct { pool sync.Pool } ​ func NewBufferPool() *BufferPool { return BufferPool{ pool: sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, }, } } ​ func (p *BufferPool) Get() *bytes.Buffer { return p.pool.Get().(*bytes.Buffer) } ​ func (p *BufferBox) Put(buf *bytes.Buffer) { buf.Reset() p.pool.Put(buf) }七、综合实践7.1 线程安全的配置管理器import sync ​ type Config struct { mu sync.RWMutex data map[string]string } ​ func NewConfig() *Config { return Config{ data: make(map[string]string), } } ​ func (c *Config) Set(key, value string) { c.mu.Lock() defer c.mu.Unlock() c.data[key] value } ​ func (c *Config) Get(key string) string { c.mu.RLock() defer c.mu.RUnlock() return c.data[key] } ​ func (c *Config) Update(fn func(map[string]string)) { c.mu.Lock() defer c.mu.Unlock() fn(c.data) }7.2 并发安全的计数器import sync ​ type ConcurrentCounter struct { mu sync.Mutex count int } ​ // 方式1普通Mutex func (c *ConcurrentCounter) Inc() { c.mu.Lock() c.count c.mu.Unlock() } ​ // 方式2使用atomic更高效 import sync/atomic ​ type AtomicCounter struct { count int64 } ​ func (c *AtomicCounter) Inc() { atomic.AddInt64(c.count, 1) } ​ func (c *AtomicCounter) Get() int64 { return atomic.LoadInt64(c.count) } ​ // 方式3分段锁高并发场景 type ShardedCounter struct { shards []int64 numShards int mu []sync.Mutex } ​ func NewShardedCounter(numShards int) *ShardedCounter { c : ShardedCounter{ shards: make([]int64, numShards), numShards: numShards, mu: make([]sync.Mutex, numShards), } return c } ​ func (c *ShardedCounter) Inc() { idx : rand.Intn(c.numShards) c.mu[idx].Lock() c.shards[idx] c.mu[idx].Unlock() } ​ func (c *ShardedCounter) Get() int64 { var total int64 for i : 0; i c.numShards; i { atomic.AddInt64(c.shards[i], 0) // 读取 } return total }总结Mutex互斥锁保护临界区RWMutex读写锁读多写少场景WaitGroup等待一组Goroutine完成Once保证只执行一次Cond条件变量线程间等待/通知Pool对象池复用临时对象最佳实践锁的粒度要适当既不能太大也不能太小读多写少用RWMutexWaitGroup的Add在启动前Done在Goroutine内Once用于单例初始化sync.Pool用于无状态的临时对象 后续会继续更新更多Go语言知识点的系列文章

更多文章