[Go] Concurrency Patterns

程式語言:Go
Package:
sync
context
簡介:各種 concurrency patterns
程式碼參考自 Visualizing Concurrency in Go

基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage

Concurrency vs. Parallelism

  • 程式
    • Concurrency 指的是程式的架構
    • Parallelism 指的是程式運行時的狀態
  • 任務
    • Concurrency 與核心數無關,意指任務的切換與合作
      像是爬蟲,發出 request 1 時,需等待回應
      此時收到 request 2 的回應,則切到 request 2 做處理
      等 request 1 有回應時,再切回去 
    • Parallelism 需是多核心,意指任務的同時進行
      同時處理 request 1 & request 2 的發送與回應,但互不干擾
  • 生活
    • Concurrency 一個人同時吃三個饅頭,每次只吃饅頭的一部分
    • Parallelism 三個人各別吃自己的饅頭
  • 工作
    • Concurrency 同一個團隊,大家一起完成工作目標,互相合作
    • Parallelism 不同團隊,各自完成工作目標,互不影響

圖解說明

畫圖基本步驟:
  1. 先畫出 channel,如左方
  2. 將 channel 的相關 Sender & Receiver 畫出,如中間
    通常都是一對的,箭頭表示傳送與接收
  3. 找出第一筆傳送的 data,一定會有的,也就是第一推動力
  4. 若有多個,像是 workers 可用右方的 Multi 表示
  5. Select 則會將 Sender & Receiver 塗上相同的顏色,表示任選一個通行,如右下


Hello World

Playground
package main

func main() {
    // 建立 channel
    ch := make(chan int)

    // 開始 goroutine
    go func() {
        // 傳送值給 channel
        ch <- 123
    }()
    // 從 channel 讀值
    <-ch
}

計時器

Playground
package main

import (
    "fmt"
    "time"
)

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1
    }()
    return c
}

func main() {
    for i := 0; i < 24; i++ {
        c := timer(1 * time.Second)
        fmt.Println(<-c)
    }
}


Ping-pong

Playground
重點在於,goroutine 的 FIFO
package main

import (
    "fmt"
    "time"
)

func main() {
    var Ball int
    table := make(chan int)
    for i := 0; i < 5; i++ {
        go player(i, table)
    }

    table <- Ball
    time.Sleep(1 * time.Second)
    <-table
}

func player(id int, table chan int) {
    for {
        ball := <-table
        fmt.Printf("Player %d Get Ball\n", id)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

// Go 運行時, receivers 為 FIFO 隊列,故會看到依序接收的情況
// 像是 0 1 4 3 2 0 1 4 3 2 ... 每 0 1 4 3 2 一個循環


Fan-In

Playground
重點在於,以多合一,再提供給別人
package main

import (
    "fmt"
    "time"
)

func producer(d time.Duration) <-chan int {
    ch := make(chan int)
    go func() {
        var i int
        for {
            ch <- i
            i++
            time.Sleep(d)
        }
    }()

    return ch
}

func fanIn(input1, input2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for {
            select {
            case s := <-input1:
                out <- s
            case s := <-input2:
                out <- s
            }
        }
    }()
    return out
}

func main() {
    input1 := producer(100 * time.Millisecond)
    input2 := producer(250 * time.Millisecond)
    out := fanIn(input1, input2)

    for x := range out {
        fmt.Println(x)
    }
}

Fan-Out

Playground
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
package main

import (
    "fmt"
    "runtime/debug"
    "sync"
    "time"
)

const (
    WORKERS    = 5
    SUBWORKERS = 3
    TASKS      = 5
    SUBTASKS   = 10
)

func subworker(subtasks chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-subtasks
        if !ok {
            return
        }
        time.Sleep(time.Duration(task) * time.Millisecond)
        fmt.Println(task)
    }
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasks
        if !ok {
            return
        }

        var subwg sync.WaitGroup
        subtasks := make(chan int)
        for i := 0; i < SUBWORKERS; i++ {
            subwg.Add(1)
            go subworker(subtasks, &subwg)
        }
        for i := 0; i < SUBTASKS; i++ {
            subtask := task + 100
            subtasks <- subtask
        }
        // 只是送出關閉訊號,subworker 不見得馬上關閉
        close(subtasks)
        subwg.Wait()
    }
}

func main() {
    var wg sync.WaitGroup
    tasks := make(chan int)

    for i := 0; i < WORKERS; i++ {
        wg.Add(1)
        go worker(tasks, &wg)
    }

    for i := 0; i < TASKS; i++ {
        tasks <- i
    }

    close(tasks)
    wg.Wait()

    // 偵測是否有未關閉的 goroutine
    debug.SetTraceback("all")
    panic(1)
}

Queue Schdule

Playground
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
package main

import (
    "context"
    "fmt"
    "runtime/debug"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    e := ConcurrentEngine{
        Scheduler:   &QueueScheduler{ctx: ctx},
        WorkerCount: 10,
        ctx:         ctx,
    }
    dataChan := e.Run(0, 100)
    for data := range dataChan {
        fmt.Println(data)

        if data == 10 {
            break
        }
    }
    cancel()

    // 偵測是否有未關閉的 goroutine
    time.Sleep(time.Second)
    debug.SetTraceback("all")
    panic(1)
}

type.go
package main

type Request int
type ParseResult struct {
    Item     int
    Requests []Request
}

type Scheduler interface {
    Submit(Request)
    WorkerReady(chan Request)
    Run()
}

engine.go
package main

import "context"

type ConcurrentEngine struct {
    Scheduler   Scheduler
    WorkerCount int
    ctx         context.Context
}

func (e *ConcurrentEngine) Run(seeds ...Request) chan int {
    out := make(chan ParseResult)
    dataChan := make(chan int)
    e.Scheduler.Run()

    for i := 0; i < e.WorkerCount; i++ {
        e.createWorker(out, e.Scheduler)
    }

    for _, r := range seeds {
        e.Scheduler.Submit(r)
    }

    go func() {
        var dataQ []int
        for {
            var activeData int
            var activeDataChan chan<- int
            if len(dataQ) > 0 {
                activeData = dataQ[0]
                activeDataChan = dataChan
            }
            select {
            case activeDataChan <- activeData:
                dataQ = dataQ[1:]
            case result := <-out:
                dataQ = append(dataQ, result.Item)

                // 處理額外的 requests
                for _, request := range result.Requests {
                    e.Scheduler.Submit(request)
                }
            case <-e.ctx.Done():
                return
            }
        }
    }()

    return dataChan
}

func (e *ConcurrentEngine) createWorker(out chan<- ParseResult, s Scheduler) {
    in := make(chan Request)
    go func() {
        var parseResultQ []ParseResult
        s.WorkerReady(in)
        for {
            var activeResult ParseResult
            var activeResultChan chan<- ParseResult
            if len(parseResultQ) > 0 {
                activeResult = parseResultQ[0]
                activeResultChan = out
            }

            select {
            case request := <-in:
                result := worker(request)
                parseResultQ = append(parseResultQ, result)
                s.WorkerReady(in)
            case activeResultChan <- activeResult:
                parseResultQ = parseResultQ[1:]
            case <-e.ctx.Done():
                return
            }
        }
    }()
}

func worker(request Request) ParseResult {
    return ParseResult{
        Item:     int(request),
        Requests: []Request{Request(int(request) + 1)},
    }
}

scheduler.go
package main

import "context"

type QueueScheduler struct {
    workerChan  chan chan Request
    requestChan chan Request
    ctx         context.Context
}

func (s *QueueScheduler) Submit(r Request) {
    select {
    case s.requestChan <- r:
    case <-s.ctx.Done():
    }
}

func (s *QueueScheduler) WorkerReady(w chan Request) {
    select {
    case s.workerChan <- w:
    case <-s.ctx.Done():
    }
}

func (s *QueueScheduler) Run() {
    s.requestChan = make(chan Request)
    s.workerChan = make(chan chan Request)
    go func() {
        var requestQ []Request
        var workerQ []chan Request
        for {
            var activeRequest Request
            var activeWorker chan Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeRequest = requestQ[0]
                activeWorker = workerQ[0]
            }

            select {
            case r := <-s.requestChan:
                requestQ = append(requestQ, r)
            case w := <-s.workerChan:
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest:
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            case <-s.ctx.Done():
                return
            }
        }
    }()
}


參考

Visualizing Concurrency in Go
Google I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler

留言