服务器之家

服务器之家 > 正文

Go:有了 Sync 为什么还有 Atomic?

时间:2021-12-06 23:43     来源/作者:幽鬼

Go:有了 Sync 为什么还有 Atomic?

Go 是一种擅长并发的语言,启动新的 goroutine 就像输入 “go” 一样简单。随着你发现自己构建的系统越来越复杂,正确保护对共享资源的访问以防止竞争条件变得极其重要。此类资源可能包括可即时更新的配置(例如功能标志)、内部状态(例如断路器状态)等。

01 什么是竞态条件?

对于大多数读者来说,这可能是基础知识,但由于本文的其余部分取决于对竞态条件的理解,因此有必要进行简短的复习。竞态条件是一种情况,在这种情况下,程序的行为取决于其他不可控事件的顺序或时间。在大多数情况下,这种情况是一个错误,因为可能会发生不希望的结果。

举个具体的例子或许更容易理解:

  1. // race_condition_test.go
  2. package main
  3. import (
  4. "fmt"
  5. "sort"
  6. "sync"
  7. "testing"
  8. )
  9. func Test_RaceCondition(t *testing.T) {
  10. var s = make([]int, 0)
  11. wg := sync.WaitGroup{}
  12. // spawn 10 goroutines to modify the slice in parallel
  13. for i := 0; i < 10; i++ {
  14. wg.Add(1)
  15. go func(i int) {
  16. defer wg.Done()
  17. s = append(s, i) //add a new item to the slice
  18. }(i)
  19. }
  20. wg.Wait()
  21. sort.Ints(s) //sort the response to have comparable results
  22. fmt.Println(s)
  23. }

执行一:

  1. $ go test -v race_condition_test.go
  2. === RUN Test_RaceCondition
  3. [0 1 2 3 4 5 6 7 8 9]
  4. --- PASS: Test_RaceCondition (0.00s)

这里看起来一切都很好。这是我们预期的输出。该程序迭代了 10 次,并在每次迭代时将索引添加到切片中。

执行二:

  1. === RUN Test_RaceCondition
  2. [0 3]
  3. --- PASS: Test_RaceCondition (0.00s)

等等,这里发生了什么?这次我们的响应切片中只有两个元素。这是因为切片的内容 s 在加载和修改之间发生了变化,导致程序覆盖了一些结果。这种特殊的竞态条件是由数据竞争引起的,在这种情况下,多个 goroutine 尝试同时访问特定的共享变量,并且这些 goroutine 中的至少一个尝试修改它。(注意,以上结果并非一定如此,每次运行结果可能都不相同)

如果你使用 -race 标志执行测试,go 甚至会告诉你存在数据竞争并帮助你准确定位:

  1. $ go test race_condition_test.go -race
  2. ==================
  3. WARNING: DATA RACE
  4. Read at 0x00c000132048 by goroutine 9:
  5. command-line-arguments.Test_RaceCondition.func1()
  6. /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0xb4
  7. command-line-arguments.Test_RaceCondition·dwrap·1()
  8. /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
  9. Previous write at 0x00c000132048 by goroutine 8:
  10. command-line-arguments.Test_RaceCondition.func1()
  11. /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0x136
  12. command-line-arguments.Test_RaceCondition·dwrap·1()
  13. /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
  14. Goroutine 9 (running) created at:
  15. command-line-arguments.Test_RaceCondition()
  16. /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
  17. testing.tRunner()
  18. /usr/local/go/src/testing/testing.go:1259 +0x22f
  19. testing.(*T).Run·dwrap·21()
  20. /usr/local/go/src/testing/testing.go:1306 +0x47
  21. Goroutine 8 (finished) created at:
  22. command-line-arguments.Test_RaceCondition()
  23. /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
  24. testing.tRunner()
  25. /usr/local/go/src/testing/testing.go:1259 +0x22f
  26. testing.(*T).Run·dwrap·21()
  27. /usr/local/go/src/testing/testing.go:1306 +0x47
  28. ==================

02 并发控制

保护对这些共享资源的访问通常涉及常见的内存同步机制,例如通道或互斥锁。

这是将竞态条件调整为使用互斥锁的相同测试用例:

  1. func Test_NoRaceCondition(t *testing.T) {
  2. var s = make([]int, 0)
  3. m := sync.Mutex{}
  4. wg := sync.WaitGroup{}
  5. // spawn 10 goroutines to modify the slice in parallel
  6. for i := 0; i < 10; i++ {
  7. wg.Add(1)
  8. go func(i int) {
  9. m.Lock()
  10. defer wg.Done()
  11. defer m.Unlock()
  12. s = append(s, i)
  13. }(i)
  14. }
  15. wg.Wait()
  16. sort.Ints(s) //sort the response to have comparable results
  17. fmt.Println(s)
  18. }

这次它始终返回所有 10 个整数,因为它确保每个 goroutine 仅在没有其他人执行时才读写切片。如果第二个 goroutine 同时尝试获取锁,它必须等到前一个 goroutine 完成(即直到它解锁)。

然而,对于高吞吐量系统,性能变得非常重要,因此减少锁争用(即一个进程或线程试图获取另一个进程或线程持有的锁的情况)变得更加重要。执行此操作的最基本方法之一是使用读写锁 ( sync.RWMutex) 而不是标准 sync.Mutex,但是 Go 还提供了一些原子内存原语即 atomic 包。

03 原子

Go 的 atomic 包提供了用于实现同步算法的低级原子内存原语。这听起来像是我们需要的东西,所以让我们尝试用 atomic 重写该测试:

  1. import "sync/atomic"
  2. func Test_RaceCondition_Atomic(t *testing.T) {
  3. var s = atomic.Value{}
  4. s.Store([]int{}) // store empty slice as the base
  5. wg := sync.WaitGroup{}
  6. // spawn 10 goroutines to modify the slice in parallel
  7. for i := 0; i < 10; i++ {
  8. wg.Add(1)
  9. go func(i int) {
  10. defer wg.Done()
  11. s1 := s.Load().([]int)
  12. s.Store(append(s1, i)) //replace the slice with a new one containing the new item
  13. }(i)
  14. }
  15. wg.Wait()
  16. s1 := s.Load().([]int)
  17. sort.Ints(s1) //sort the response to have comparable results
  18. fmt.Println(s1)
  19. }

执行结果:

  1. === RUN Test_RaceCondition_Atomic
  2. [1 3]
  3. --- PASS: Test_RaceCondition_Atomic (0.00s)

什么?这和我们之前遇到的问题完全一样,那么这个包有什么好处呢?

04 读取-复制-更新

atomic 不是灵丹妙药,它显然不能替代互斥锁,但是当涉及到可以使用读取-复制-更新[1]模式管理的共享资源时,它非常出色。在这种技术中,我们通过引用获取当前值,当我们想要更新它时,我们不修改原始值,而是替换指针(因此没有人访问另一个线程可能访问的相同资源)。前面的示例无法使用此模式实现,因为它应该随着时间的推移扩展现有资源而不是完全替换其内容,但在许多情况下,读取-复制-更新是完美的。

这是一个基本示例,我们可以在其中获取和存储布尔值(例如,对于功能标志很有用)。在这个例子中,我们正在执行一个并行基准测试,比较原子和读写互斥:

  1. package main
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. )
  7. type AtomicValue struct{
  8. value atomic.Value
  9. }
  10. func (b *AtomicValue) Get() bool {
  11. return b.value.Load().(bool)
  12. }
  13. func (b *AtomicValue) Set(value bool) {
  14. b.value.Store(value)
  15. }
  16. func BenchmarkAtomicValue_Get(b *testing.B) {
  17. atomB := AtomicValue{}
  18. atomB.value.Store(false)
  19. b.RunParallel(func(pb *testing.PB) {
  20. for pb.Next() {
  21. atomB.Get()
  22. }
  23. })
  24. }
  25. /************/
  26. type MutexBool struct {
  27. mutex sync.RWMutex
  28. flag bool
  29. }
  30. func (mb *MutexBool) Get() bool {
  31. mb.mutex.RLock()
  32. defer mb.mutex.RUnlock()
  33. return mb.flag
  34. }
  35. func BenchmarkMutexBool_Get(b *testing.B) {
  36. mb := MutexBool{flag: true}
  37. b.RunParallel(func(pb *testing.PB) {
  38. for pb.Next() {
  39. mb.Get()
  40. }
  41. })
  42. }

结果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
  2. BenchmarkAtomicValue_Get
  3. BenchmarkAtomicValue_Get-8 1000000000 0.5472 ns/op
  4. BenchmarkMutexBool_Get
  5. BenchmarkMutexBool_Get-8 24966127 48.80 ns/op

结果很清楚。atomic 的速度提高了 89 倍以上。并且可以通过使用更原始的类型来进一步改进:

  1. type AtomicBool struct{ flag int32 }
  2. func (b *AtomicBool) Get() bool {
  3. return atomic.LoadInt32(&(b.flag)) != 0
  4. }
  5. func (b *AtomicBool) Set(value bool) {
  6. var i int32 = 0
  7. if value {
  8. i = 1
  9. }
  10. atomic.StoreInt32(&(b.flag), int32(i))
  11. }
  12. func BenchmarkAtomicBool_Get(b *testing.B) {
  13. atomB := AtomicBool{flag: 1}
  14. b.RunParallel(func(pb *testing.PB) {
  15. for pb.Next() {
  16. atomB.Get()
  17. }
  18. })
  19. }
  20. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
  21. BenchmarkAtomicBool_Get
  22. BenchmarkAtomicBool_Get-8 1000000000 0.3161 ns/op

此版本比互斥锁版本快 154 倍以上。

写操作也显示出明显的差异(尽管规模并不那么令人印象深刻):

  1. func BenchmarkAtomicBool_Set(b *testing.B) {
  2. atomB := AtomicBool{flag: 1}
  3. b.RunParallel(func(pb *testing.PB) {
  4. for pb.Next() {
  5. atomB.Set(true)
  6. }
  7. })
  8. }
  9. /************/
  10. func BenchmarkAtomicValue_Set(b *testing.B) {
  11. atomB := AtomicValue{}
  12. atomB.value.Store(false)
  13. b.RunParallel(func(pb *testing.PB) {
  14. for pb.Next() {
  15. atomB.Set(true)
  16. }
  17. })
  18. }
  19. /************/
  20. func BenchmarkMutexBool_Set(b *testing.B) {
  21. mb := MutexBool{flag: true}
  22. b.RunParallel(func(pb *testing.PB) {
  23. for pb.Next() {
  24. mb.Set(true)
  25. }
  26. })
  27. }

结果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
  2. BenchmarkAtomicBool_Set
  3. BenchmarkAtomicBool_Set-8 64624705 16.79 ns/op
  4. BenchmarkAtomicValue_Set
  5. BenchmarkAtomicValue_Set-8 47654121 26.43 ns/op
  6. BenchmarkMutexBool_Set
  7. BenchmarkMutexBool_Set-8 20124637 66.50 ns/op

在这里我们可以看到 atomic 在写入时比在读取时慢得多,但仍然比互斥锁快得多。有趣的是,我们可以看到互斥锁读取和写入之间的差异不是很明显(慢 30%)。尽管如此, atomic 仍然表现得更好(比互斥锁快 2-4 倍)。

05 为什么 atomic 这么快?

简而言之,原子操作很快,因为它们依赖于原子 CPU 指令而不是依赖外部锁。使用互斥锁时,每次获得锁时,goroutine 都会短暂暂停或中断,这种阻塞占使用互斥锁所花费时间的很大一部分。原子操作可以在没有任何中断的情况下执行。

06 atomic 总是答案吗?

正如我们在一个早期示例中已经证明的那样,atomic 无法解决所有问题,某些操作只能使用互斥锁来解决。

考虑以下示例,该示例演示了我们使用 map 作为内存缓存的常见模式:

  1. package main
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. )
  7. //Don't use this implementation!
  8. type AtomicCacheMap struct {
  9. value atomic.Value //map[int]int
  10. }
  11. func (b *AtomicCacheMap) Get(key int) int {
  12. return b.value.Load().(map[int]int)[key]
  13. }
  14. func (b *AtomicCacheMap) Set(key, value int) {
  15. oldMap := b.value.Load().(map[int]int)
  16. newMap := make(map[int]int, len(oldMap)+1)
  17. for k, v := range oldMap {
  18. newMap[k] = v
  19. }
  20. newMap[key] = value
  21. b.value.Store(newMap)
  22. }
  23. func BenchmarkAtomicCacheMap_Get(b *testing.B) {
  24. atomM := AtomicCacheMap{}
  25. atomM.value.Store(testMap)
  26. b.RunParallel(func(pb *testing.PB) {
  27. for pb.Next() {
  28. atomM.Get(0)
  29. }
  30. })
  31. }
  32. func BenchmarkAtomicCacheMap_Set(b *testing.B) {
  33. atomM := AtomicCacheMap{}
  34. atomM.value.Store(testMap)
  35. var i = 0
  36. b.RunParallel(func(pb *testing.PB) {
  37. for pb.Next() {
  38. atomM.Set(i, i)
  39. i++
  40. }
  41. })
  42. }
  43. /************/
  44. type MutexCacheMap struct {
  45. mutex sync.RWMutex
  46. value map[int]int
  47. }
  48. func (mm *MutexCacheMap) Get(key int) int {
  49. mm.mutex.RLock()
  50. defer mm.mutex.RUnlock()
  51. return mm.value[key]
  52. }
  53. func (mm *MutexCacheMap) Set(key, value int) {
  54. mm.mutex.Lock()
  55. defer mm.mutex.Unlock()
  56. mm.value[key] = value
  57. }
  58. func BenchmarkMutexCacheMap_Get(b *testing.B) {
  59. mb := MutexCacheMap{value: testMap}
  60. b.RunParallel(func(pb *testing.PB) {
  61. for pb.Next() {
  62. mb.Get(0)
  63. }
  64. })
  65. }
  66. func BenchmarkMutexCacheMap_Set(b *testing.B) {
  67. mb := MutexCacheMap{value: testMap}
  68. var i = 0
  69. b.RunParallel(func(pb *testing.PB) {
  70. for pb.Next() {
  71. mb.Set(i, i)
  72. i++
  73. }
  74. })
  75. }

结果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
  2. BenchmarkAtomicCacheMap_Get
  3. BenchmarkAtomicCacheMap_Get-8 301664540 4.194 ns/op
  4. BenchmarkAtomicCacheMap_Set
  5. BenchmarkAtomicCacheMap_Set-8 87637 95889 ns/op
  6. BenchmarkMutexCacheMap_Get
  7. BenchmarkMutexCacheMap_Get-8 20000959 54.63 ns/op
  8. BenchmarkMutexCacheMap_Set
  9. BenchmarkMutexCacheMap_Set-8 5012434 267.2 ns/op

哎呀,这种表现是痛苦的。这意味着,当必须复制大型结构时,atomic 的性能非常差。不仅如此,此代码还包含竞态条件。就像本文开头的切片案例一样,原子缓存示例具有竞态条件,其中可能会在复制 map 和存储 map 的时间之间添加新的缓存条目,在这种情况下,新条目将丢失。在这种情况下,该 -race 标志不会检测到任何数据竞争,因为没有对同一 map 的并发访问。

07 注意事项

Go 的文档[2]警告了 atomic 包的潜在误用:

这些函数需要非常小心才能正确使用。除了特殊的低级应用程序,同步最好使用通道或 sync 包的工具来完成。通过通信共享内存;不要通过共享内存进行通信。

开始使用 atomic 包时,你可能会遇到的第一个问题是:

  1. panic: sync/atomic: store of inconsistently typed value into Value

使用 atomic.Store,确保每次调用方法时都存储完全相同的类型很重要。这听起来很容易,但通常并不像听起来那么简单:

  1. package main
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. )
  6. //Our own custom error type which implements the error interface
  7. type CustomError struct {
  8. Code int
  9. Message string
  10. }
  11. func (e CustomError) Error() string {
  12. return fmt.Sprintf("%d: %s", e.Code, e.Message)
  13. }
  14. func InternalServerError(msg string) error {
  15. return CustomError{Code: 500, Message: msg}
  16. }
  17. func main() {
  18. var (
  19. err1 error = fmt.Errorf("error happened")
  20. err2 error = InternalServerError("another error happened")
  21. )
  22. errVal := atomic.Value{}
  23. errVal.Store(err1)
  24. errVal.Store(err2) //panics here
  25. }

两个值都是 error 类型是不够的,因为它们只是实现了错误接口。它们的具体类型仍然不同,因此 atomic 不喜欢它。

08 总结

竞态条件很糟糕,应该保护对共享资源的访问。互斥体很酷,但由于锁争用而趋于缓慢。对于某些读取-复制-更新模式有意义的情况(这往往是动态配置之类的东西,例如特性标志、日志级别或 map 或结构体,一次填充例如通过 JSON 解析等),尤其是当读取次数比写入次数多时。atomic 通常不应用于其他用例(例如,随时间增长的变量,如缓存),并且该特性的使用需要非常小心。

可能最重要的方法是将锁保持在最低限度,如果你在在考虑原子等替代方案,请务必在投入生产之前对其进行广泛的测试和试验。

原文链接:https://www.sixt.tech/golangs-atomic

参考资料

[1]读取-复制-更新: https://en.wikipedia.org/wiki/Read-copy-update

[2]文档: https://pkg.go.dev/sync/atomic

原文链接:https://mp.weixin.qq.com/s/n95eMdSW_Xrs9isnNbRnGA

标签:

相关文章

热门资讯

yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
2021年耽改剧名单 2021要播出的59部耽改剧列表
2021年耽改剧名单 2021要播出的59部耽改剧列表 2021-03-05
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
返回顶部

828
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40