Skip to content

Latest commit

 

History

History
256 lines (180 loc) · 8.22 KB

27b92a32ea3922079b13f63faf5e46d3.md

File metadata and controls

256 lines (180 loc) · 8.22 KB

sync包的实际应用

这里我们介绍sync包中的同步原语在应用层的使用方法。

sync.Mutex

sync.Mutex可能是sync包中使用最广泛的原语,它允许在共享资源上互斥访问(不能同时访问)

mutex := &sync.Mutex{}

mutex.Lock()
//Update共享变量(比如切片,结构体指针等)
mutex.Unlock()

必须指出的是,在第一次被使用后,不能再对sync.Mutex进行复制。(sync包的所有原语都一样)。如果结构体具有同步原语字段,则必须通过指针传递它。

sync.RWMutex

sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.Mutex的Lock和Unlock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:

mutex := &sync.RWMutex{}

mutex.Lock()
//Update 共享变量
mutex.Unlock()

mutex.RLock()
//Read 共享变量
mutex.RUnlock()

sync.RWMutex允许至少一个读锁或一个写锁存在,而sync.Mutex允许一个读锁或一个写锁存在。

通过基准测试来比较这几个方法的性能:

BenchmarkMutexLock-4       83497579         17.7 ns/op
BenchmarkRWMutexLock-4     35286374         44.3 ns/op
BenchmarkRWMutexRLock-4    89403342         15.3 ns/op

可以看到锁定/解锁sync.RWMutex读锁的速度比锁定/解锁sync.Mutex更快,另一方面,在sync.RWMutex上调用Lock()/ Unlock()是最慢的操作。

因此,只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex。

sync.WaitGroup

sync.WaitGroup也是一个经常会用到的同步原语,它的使用场景是在一个goroutine中等待一组goroutine执行完成。

sync.WaitGroup拥有一个内部计数器。当计数器等于0时,则Wait()方法回立即返回。否则他将阻塞执行Wait()方法的goroutine直到计数器等于0为止。

要增加计数器,我们必须使用Add(int)方法。要减少它,我们可以使用Done()(将计数器减1),也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的。

在以下的示例中,启动了八个goroutine,并等待他们完成

wg := &sync.WaitGroup{}

for i := 0; i < 8; i++ {
  wg.Add(1)
  go func() {
    // Do something
    wg.Done()
  }()
}

wg.Wait()
// 继续往下执行...

每次创建goroutine时,我们都会使用wg.Add(1)来增加wg的内部计数器。我们也可以在for循环之前调用wg.Add(8)。

与此同时,每个goroutine完成时,都会使用wg.Done()减少wg的内部计数器。

main goroutine会在八个goroutine都执行wg.Done()将计数器变为0后才能继续往下执行。

sync.Once

sync.Once是一个简单而强大的原语,可以确保一个函数仅执行一次。在下面的示例中,只有一个goroutine会显示输出消息:

once := &sync.Once{}
for i := 0; i < 4; i++ {
    i := i
    go func() {
        once.Do(func() {
            fmt.Printf("first %d\n", i)
        })
    }()
}

我们使用了Do(func ())方法来指定只能被调用一次的部分。

sync.Cond

sync.Cond可能时sync包提供的同步原语中最不常用的一个,它用于发出信号(一对一)或广播信号(一对多)到goroutine。让我们考虑一个场景,我们必须向一个goroutine指示共享切片的第一个元素已更新。创建sync.Cond需要sync.Locker对象(sync.Mutex或sync.RWMutex):

    cond := sync.NewCond(&sync.Mutex{})

然后,我们编写负责显示切片的第一个元素的函数:

func printFirstElement(s []int, cond *sync.Cond) {
    cond.L.Lock()
    cond.Wait()
    fmt.Printf("%d\n", s[0])
    cond.L.Unlock()
}

我们可以使用cond.L访问内部的互斥锁。一旦获得了锁,我们将调用cond.Wait(),这会让当前goroutine在收到信号前一直处于阻塞状态。

让我们回到main goroutine。我们通过传递共享切片和先前创建的sync.Cond来创建printFirstElement池。然后我们调用get()函数,将结果存储在s[0]中并发出信号:

s := make([]int, 1)
for i := 0; i < runtime.NumCPU(); i++ {
    go printFirstElement(s, cond)
}   

i := get()
cond.L.Lock()
s[0] = i
cond.Signal()
cond.L.Unlock()

这个信号会解除一个goroutine的阻塞状态,解除阻塞的goroutine将会显示s[0]中存储的值。

但是,有的人可能会争辩说我们的代码破坏了Go的最基本原则之一:

不要通过共享内存进行通信;而是通过通信共享内存。

确实,在这个示例中,最好使用channel来传递get()返回的值。但是我们也提到了sync.Cond也可以用于广播信号。我们修改一下上面的示例,把Signal()调用改为调用Broadcast()。

i := get()
cond.L.Lock()
s[0] = i
cond.Broadcast()
cond.L.Unlock()

在这种情况下,所有goroutine都将被触发。 众所周知,channel里的元素只会由一个goroutine接收到。通过channel模拟广播的唯一方法是关闭channel。

当一个channel被关闭后,channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。

但是这种方式只能广播一次。因此,尽管存在很大争议,但这无疑是sync.Cond的一个有趣的功能。

Sync.Map

sync.Map是一个并发版本的Go语言的map,我们可以:

  • 使用Store(interface{}, interface{}) 添加元素

  • 使用Load(interface{}) interface{} 检索元素

  • 使用Delete(interface{}) 删除元素

  • 使用LoadOrStore(interface{}, interface{}) (interface{}, bool) 检索或添加之前不存在的元素。如果键在map中存在,则返回的bool值为true

  • 使用Range遍历元素

m := &sync.Map{}

// 添加元素
m.Store(1, "one")
m.Store(2, "two")

// 获取元素1
value, contains := m.Load(1)
if contains {
  fmt.Printf("%s\n", value.(string))
}

// 返回已存value,否则把指定的键值存储到map中
value, loaded := m.LoadOrStore(3, "three")
if !loaded {
  fmt.Printf("%s\n", value.(string))
}

m.Delete(3)

// 迭代所有元素
m.Range(func(key, value interface{}) bool {
  fmt.Printf("%d: %s\n", key.(int), value.(string))
  return true
})

//输出
one
three
1: one
2: two

如你所见,Range方法接收一个类型为func(key,value interface {})bool的函数参数。如果函数返回了false,则停止迭代。有趣的事实是,即使我们在恒定时间后返回false,最坏情况下的时间复杂度仍为O(n)。

我们应该在什么时候使用sync.Map而不是在普通的map上使用sync.Mutex?

  • 当我们对map有频繁的读取和不频繁的写入时。

  • 当多个goroutine读取,写入和覆盖不相交的键时。具体是什么意思呢?例如,如果我们有一个分片实现,其中包含一组4个goroutine,每个goroutine负责25%的键(每个负责的键不冲突)。在这种情况下,sync.Map是首选。

sync.Pool

sync.Pool是一个并发池,负责安全的保存一组对象,它有两个导出方法:

  • Get() interface{} 用来从并发池中取出元素

  • Put(interface{}) 将一个对象加入并发池

pool := &sync.Pool{}

pool.Put(NewConnection(1))
pool.Put(NewConnection(2))
pool.Put(NewConnection(3))

connection := pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)

//输出
1
3
2

需要注意的是Get()方法会从并发池中随机取出对象,无法保证以固定的顺序获取并发池中存储的对象。

还可以为sync.Pool指定一个创建者方法:

pool := &sync.Pool{
  New: func() interface{} {
    return NewConnection()
  },
}

connection := pool.Get().(*Connection)

这样每次调用Get()时,当池中不存在对象时,将使用pool.New中指定的函数创建一个对象(在本例中为指针)。

那么什么时候使用sync.Pool?有两个用例:

由于临时对象池会定期被GC回收,所以使用上不建议用于数据库连接池的实现,因为GC触发后达不到连接复用节约开销的效果。

  • 第一个是当我们必须重用共享的和长期存在的对象时,比如gin中的context对象池。

  • 第二个是用于优化内存分配,比如go内置的fmt包。