[Go] Concurrency Patterns

程式語言:Go
Package:
sync
context
簡介:各種 concurrency patterns
程式碼參考自 Visualizing Concurrency in Go

基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage

Concurrency vs. Parallelism

  • 程式
    • Concurrency 指的是程式的架構
    • Parallelism 指的是程式運行時的狀態
  • 任務
    • Concurrency 與核心數無關,意指任務的切換與合作
      像是爬蟲,發出 request 1 時,需等待回應
      此時收到 request 2 的回應,則切到 request 2 做處理
      等 request 1 有回應時,再切回去 
    • Parallelism 需是多核心,意指任務的同時進行
      同時處理 request 1 & request 2 的發送與回應,但互不干擾
  • 生活
    • Concurrency 一個人同時吃三個饅頭,每次只吃饅頭的一部分
    • Parallelism 三個人各別吃自己的饅頭
  • 工作
    • Concurrency 同一個團隊,大家一起完成工作目標,互相合作
    • Parallelism 不同團隊,各自完成工作目標,互不影響

圖解說明

畫圖基本步驟:
  1. 先畫出 channel,如左方
  2. 將 channel 的相關 Sender & Receiver 畫出,如中間
    通常都是一對的,箭頭表示傳送與接收
  3. 找出第一筆傳送的 data,一定會有的,也就是第一推動力
  4. 若有多個,像是 workers 可用右方的 Multi 表示
  5. Select 則會將 Sender & Receiver 塗上相同的顏色,表示任選一個通行,如右下


Hello World

Playground
  1. package main
  2.  
  3. func main() {
  4. // 建立 channel
  5. ch := make(chan int)
  6.  
  7. // 開始 goroutine
  8. go func() {
  9. // 傳送值給 channel
  10. ch <- 123
  11. }()
  12. // 從 channel 讀值
  13. <-ch
  14. }

計時器

Playground
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7.  
  8. func timer(d time.Duration) <-chan int {
  9. c := make(chan int)
  10. go func() {
  11. time.Sleep(d)
  12. c <- 1
  13. }()
  14. return c
  15. }
  16.  
  17. func main() {
  18. for i := 0; i < 24; i++ {
  19. c := timer(1 * time.Second)
  20. fmt.Println(<-c)
  21. }
  22. }


Ping-pong

Playground
重點在於,goroutine 的 FIFO
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7.  
  8. func main() {
  9. var Ball int
  10. table := make(chan int)
  11. for i := 0; i < 5; i++ {
  12. go player(i, table)
  13. }
  14.  
  15. table <- Ball
  16. time.Sleep(1 * time.Second)
  17. <-table
  18. }
  19.  
  20. func player(id int, table chan int) {
  21. for {
  22. ball := <-table
  23. fmt.Printf("Player %d Get Ball\n", id)
  24. time.Sleep(100 * time.Millisecond)
  25. table <- ball
  26. }
  27. }
  28.  
  29. // Go 運行時, receivers 為 FIFO 隊列,故會看到依序接收的情況
  30. // 像是 0 1 4 3 2 0 1 4 3 2 ... 每 0 1 4 3 2 一個循環


Fan-In

Playground
重點在於,以多合一,再提供給別人
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7.  
  8. func producer(d time.Duration) <-chan int {
  9. ch := make(chan int)
  10. go func() {
  11. var i int
  12. for {
  13. ch <- i
  14. i++
  15. time.Sleep(d)
  16. }
  17. }()
  18.  
  19. return ch
  20. }
  21.  
  22. func fanIn(input1, input2 <-chan int) <-chan int {
  23. out := make(chan int)
  24. go func() {
  25. for {
  26. select {
  27. case s := <-input1:
  28. out <- s
  29. case s := <-input2:
  30. out <- s
  31. }
  32. }
  33. }()
  34. return out
  35. }
  36.  
  37. func main() {
  38. input1 := producer(100 * time.Millisecond)
  39. input2 := producer(250 * time.Millisecond)
  40. out := fanIn(input1, input2)
  41.  
  42. for x := range out {
  43. fmt.Println(x)
  44. }
  45. }

Fan-Out

Playground
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "runtime/debug"
  6. "sync"
  7. "time"
  8. )
  9.  
  10. const (
  11. WORKERS = 5
  12. SUBWORKERS = 3
  13. TASKS = 5
  14. SUBTASKS = 10
  15. )
  16.  
  17. func subworker(subtasks chan int, wg *sync.WaitGroup) {
  18. defer wg.Done()
  19. for {
  20. task, ok := <-subtasks
  21. if !ok {
  22. return
  23. }
  24. time.Sleep(time.Duration(task) * time.Millisecond)
  25. fmt.Println(task)
  26. }
  27. }
  28.  
  29. func worker(tasks <-chan int, wg *sync.WaitGroup) {
  30. defer wg.Done()
  31. for {
  32. task, ok := <-tasks
  33. if !ok {
  34. return
  35. }
  36.  
  37. var subwg sync.WaitGroup
  38. subtasks := make(chan int)
  39. for i := 0; i < SUBWORKERS; i++ {
  40. subwg.Add(1)
  41. go subworker(subtasks, &subwg)
  42. }
  43. for i := 0; i < SUBTASKS; i++ {
  44. subtask := task + 100
  45. subtasks <- subtask
  46. }
  47. // 只是送出關閉訊號,subworker 不見得馬上關閉
  48. close(subtasks)
  49. subwg.Wait()
  50. }
  51. }
  52.  
  53. func main() {
  54. var wg sync.WaitGroup
  55. tasks := make(chan int)
  56.  
  57. for i := 0; i < WORKERS; i++ {
  58. wg.Add(1)
  59. go worker(tasks, &wg)
  60. }
  61.  
  62. for i := 0; i < TASKS; i++ {
  63. tasks <- i
  64. }
  65.  
  66. close(tasks)
  67. wg.Wait()
  68.  
  69. // 偵測是否有未關閉的 goroutine
  70. debug.SetTraceback("all")
  71. panic(1)
  72. }

Queue Schdule

Playground
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
  1. package main
  2.  
  3. import (
  4. "context"
  5. "fmt"
  6. "runtime/debug"
  7. "time"
  8. )
  9.  
  10. func main() {
  11. ctx, cancel := context.WithCancel(context.Background())
  12. e := ConcurrentEngine{
  13. Scheduler: &QueueScheduler{ctx: ctx},
  14. WorkerCount: 10,
  15. ctx: ctx,
  16. }
  17. dataChan := e.Run(0, 100)
  18. for data := range dataChan {
  19. fmt.Println(data)
  20.  
  21. if data == 10 {
  22. break
  23. }
  24. }
  25. cancel()
  26.  
  27. // 偵測是否有未關閉的 goroutine
  28. time.Sleep(time.Second)
  29. debug.SetTraceback("all")
  30. panic(1)
  31. }

type.go
  1. package main
  2.  
  3. type Request int
  4. type ParseResult struct {
  5. Item int
  6. Requests []Request
  7. }
  8.  
  9. type Scheduler interface {
  10. Submit(Request)
  11. WorkerReady(chan Request)
  12. Run()
  13. }

engine.go
  1. package main
  2.  
  3. import "context"
  4.  
  5. type ConcurrentEngine struct {
  6. Scheduler Scheduler
  7. WorkerCount int
  8. ctx context.Context
  9. }
  10.  
  11. func (e *ConcurrentEngine) Run(seeds ...Request) chan int {
  12. out := make(chan ParseResult)
  13. dataChan := make(chan int)
  14. e.Scheduler.Run()
  15.  
  16. for i := 0; i < e.WorkerCount; i++ {
  17. e.createWorker(out, e.Scheduler)
  18. }
  19.  
  20. for _, r := range seeds {
  21. e.Scheduler.Submit(r)
  22. }
  23.  
  24. go func() {
  25. var dataQ []int
  26. for {
  27. var activeData int
  28. var activeDataChan chan<- int
  29. if len(dataQ) > 0 {
  30. activeData = dataQ[0]
  31. activeDataChan = dataChan
  32. }
  33. select {
  34. case activeDataChan <- activeData:
  35. dataQ = dataQ[1:]
  36. case result := <-out:
  37. dataQ = append(dataQ, result.Item)
  38.  
  39. // 處理額外的 requests
  40. for _, request := range result.Requests {
  41. e.Scheduler.Submit(request)
  42. }
  43. case <-e.ctx.Done():
  44. return
  45. }
  46. }
  47. }()
  48.  
  49. return dataChan
  50. }
  51.  
  52. func (e *ConcurrentEngine) createWorker(out chan<- ParseResult, s Scheduler) {
  53. in := make(chan Request)
  54. go func() {
  55. var parseResultQ []ParseResult
  56. s.WorkerReady(in)
  57. for {
  58. var activeResult ParseResult
  59. var activeResultChan chan<- ParseResult
  60. if len(parseResultQ) > 0 {
  61. activeResult = parseResultQ[0]
  62. activeResultChan = out
  63. }
  64.  
  65. select {
  66. case request := <-in:
  67. result := worker(request)
  68. parseResultQ = append(parseResultQ, result)
  69. s.WorkerReady(in)
  70. case activeResultChan <- activeResult:
  71. parseResultQ = parseResultQ[1:]
  72. case <-e.ctx.Done():
  73. return
  74. }
  75. }
  76. }()
  77. }
  78.  
  79. func worker(request Request) ParseResult {
  80. return ParseResult{
  81. Item: int(request),
  82. Requests: []Request{Request(int(request) + 1)},
  83. }
  84. }

scheduler.go
  1. package main
  2.  
  3. import "context"
  4.  
  5. type QueueScheduler struct {
  6. workerChan chan chan Request
  7. requestChan chan Request
  8. ctx context.Context
  9. }
  10.  
  11. func (s *QueueScheduler) Submit(r Request) {
  12. select {
  13. case s.requestChan <- r:
  14. case <-s.ctx.Done():
  15. }
  16. }
  17.  
  18. func (s *QueueScheduler) WorkerReady(w chan Request) {
  19. select {
  20. case s.workerChan <- w:
  21. case <-s.ctx.Done():
  22. }
  23. }
  24.  
  25. func (s *QueueScheduler) Run() {
  26. s.requestChan = make(chan Request)
  27. s.workerChan = make(chan chan Request)
  28. go func() {
  29. var requestQ []Request
  30. var workerQ []chan Request
  31. for {
  32. var activeRequest Request
  33. var activeWorker chan Request
  34. if len(requestQ) > 0 && len(workerQ) > 0 {
  35. activeRequest = requestQ[0]
  36. activeWorker = workerQ[0]
  37. }
  38.  
  39. select {
  40. case r := <-s.requestChan:
  41. requestQ = append(requestQ, r)
  42. case w := <-s.workerChan:
  43. workerQ = append(workerQ, w)
  44. case activeWorker <- activeRequest:
  45. requestQ = requestQ[1:]
  46. workerQ = workerQ[1:]
  47. case <-s.ctx.Done():
  48. return
  49. }
  50. }
  51. }()
  52. }


參考

Visualizing Concurrency in Go
Google I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler

留言