- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Go
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Google I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler
- Package:
- sync
- context
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Concurrency vs. Parallelism
- 程式
- Concurrency 指的是程式的架構
- Parallelism 指的是程式運行時的狀態
- 任務
- Concurrency 與核心數無關,意指任務的切換與合作
像是爬蟲,發出 request 1 時,需等待回應
此時收到 request 2 的回應,則切到 request 2 做處理
等 request 1 有回應時,再切回去 - Parallelism 需是多核心,意指任務的同時進行
同時處理 request 1 & request 2 的發送與回應,但互不干擾 - 生活
- Concurrency 一個人同時吃三個饅頭,每次只吃饅頭的一部分
- Parallelism 三個人各別吃自己的饅頭
- 工作
- Concurrency 同一個團隊,大家一起完成工作目標,互相合作
- Parallelism 不同團隊,各自完成工作目標,互不影響
圖解說明
畫圖基本步驟:
- 先畫出 channel,如左方
- 將 channel 的相關 Sender & Receiver 畫出,如中間
通常都是一對的,箭頭表示傳送與接收 - 找出第一筆傳送的 data,一定會有的,也就是第一推動力
- 若有多個,像是 workers 可用右方的 Multi 表示
- Select 則會將 Sender & Receiver 塗上相同的顏色,表示任選一個通行,如右下
Playground
package main func main() { // 建立 channel ch := make(chan int) // 開始 goroutine go func() { // 傳送值給 channel ch <- 123 }() // 從 channel 讀值 <-ch }
Playground
package main import ( "fmt" "time" ) func timer(d time.Duration) <-chan int { c := make(chan int) go func() { time.Sleep(d) c <- 1 }() return c } func main() { for i := 0; i < 24; i++ { c := timer(1 * time.Second) fmt.Println(<-c) } }
Playground
重點在於,goroutine 的 FIFO
重點在於,goroutine 的 FIFO
package main import ( "fmt" "time" ) func main() { var Ball int table := make(chan int) for i := 0; i < 5; i++ { go player(i, table) } table <- Ball time.Sleep(1 * time.Second) <-table } func player(id int, table chan int) { for { ball := <-table fmt.Printf("Player %d Get Ball\n", id) time.Sleep(100 * time.Millisecond) table <- ball } } // Go 運行時, receivers 為 FIFO 隊列,故會看到依序接收的情況 // 像是 0 1 4 3 2 0 1 4 3 2 ... 每 0 1 4 3 2 一個循環
Playground
重點在於,以多合一,再提供給別人
重點在於,以多合一,再提供給別人
package main import ( "fmt" "time" ) func producer(d time.Duration) <-chan int { ch := make(chan int) go func() { var i int for { ch <- i i++ time.Sleep(d) } }() return ch } func fanIn(input1, input2 <-chan int) <-chan int { out := make(chan int) go func() { for { select { case s := <-input1: out <- s case s := <-input2: out <- s } } }() return out } func main() { input1 := producer(100 * time.Millisecond) input2 := producer(250 * time.Millisecond) out := fanIn(input1, input2) for x := range out { fmt.Println(x) } }
Playground
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
package main import ( "fmt" "runtime/debug" "sync" "time" ) const ( WORKERS = 5 SUBWORKERS = 3 TASKS = 5 SUBTASKS = 10 ) func subworker(subtasks chan int, wg *sync.WaitGroup) { defer wg.Done() for { task, ok := <-subtasks if !ok { return } time.Sleep(time.Duration(task) * time.Millisecond) fmt.Println(task) } } func worker(tasks <-chan int, wg *sync.WaitGroup) { defer wg.Done() for { task, ok := <-tasks if !ok { return } var subwg sync.WaitGroup subtasks := make(chan int) for i := 0; i < SUBWORKERS; i++ { subwg.Add(1) go subworker(subtasks, &subwg) } for i := 0; i < SUBTASKS; i++ { subtask := task + 100 subtasks <- subtask } // 只是送出關閉訊號,subworker 不見得馬上關閉 close(subtasks) subwg.Wait() } } func main() { var wg sync.WaitGroup tasks := make(chan int) for i := 0; i < WORKERS; i++ { wg.Add(1) go worker(tasks, &wg) } for i := 0; i < TASKS; i++ { tasks <- i } close(tasks) wg.Wait() // 偵測是否有未關閉的 goroutine debug.SetTraceback("all") panic(1) }
Playground
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
type.go
engine.go
scheduler.go
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
package main import ( "context" "fmt" "runtime/debug" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) e := ConcurrentEngine{ Scheduler: &QueueScheduler{ctx: ctx}, WorkerCount: 10, ctx: ctx, } dataChan := e.Run(0, 100) for data := range dataChan { fmt.Println(data) if data == 10 { break } } cancel() // 偵測是否有未關閉的 goroutine time.Sleep(time.Second) debug.SetTraceback("all") panic(1) }
type.go
package main type Request int type ParseResult struct { Item int Requests []Request } type Scheduler interface { Submit(Request) WorkerReady(chan Request) Run() }
engine.go
package main import "context" type ConcurrentEngine struct { Scheduler Scheduler WorkerCount int ctx context.Context } func (e *ConcurrentEngine) Run(seeds ...Request) chan int { out := make(chan ParseResult) dataChan := make(chan int) e.Scheduler.Run() for i := 0; i < e.WorkerCount; i++ { e.createWorker(out, e.Scheduler) } for _, r := range seeds { e.Scheduler.Submit(r) } go func() { var dataQ []int for { var activeData int var activeDataChan chan<- int if len(dataQ) > 0 { activeData = dataQ[0] activeDataChan = dataChan } select { case activeDataChan <- activeData: dataQ = dataQ[1:] case result := <-out: dataQ = append(dataQ, result.Item) // 處理額外的 requests for _, request := range result.Requests { e.Scheduler.Submit(request) } case <-e.ctx.Done(): return } } }() return dataChan } func (e *ConcurrentEngine) createWorker(out chan<- ParseResult, s Scheduler) { in := make(chan Request) go func() { var parseResultQ []ParseResult s.WorkerReady(in) for { var activeResult ParseResult var activeResultChan chan<- ParseResult if len(parseResultQ) > 0 { activeResult = parseResultQ[0] activeResultChan = out } select { case request := <-in: result := worker(request) parseResultQ = append(parseResultQ, result) s.WorkerReady(in) case activeResultChan <- activeResult: parseResultQ = parseResultQ[1:] case <-e.ctx.Done(): return } } }() } func worker(request Request) ParseResult { return ParseResult{ Item: int(request), Requests: []Request{Request(int(request) + 1)}, } }
scheduler.go
package main import "context" type QueueScheduler struct { workerChan chan chan Request requestChan chan Request ctx context.Context } func (s *QueueScheduler) Submit(r Request) { select { case s.requestChan <- r: case <-s.ctx.Done(): } } func (s *QueueScheduler) WorkerReady(w chan Request) { select { case s.workerChan <- w: case <-s.ctx.Done(): } } func (s *QueueScheduler) Run() { s.requestChan = make(chan Request) s.workerChan = make(chan chan Request) go func() { var requestQ []Request var workerQ []chan Request for { var activeRequest Request var activeWorker chan Request if len(requestQ) > 0 && len(workerQ) > 0 { activeRequest = requestQ[0] activeWorker = workerQ[0] } select { case r := <-s.requestChan: requestQ = append(requestQ, r) case w := <-s.workerChan: workerQ = append(workerQ, w) case activeWorker <- activeRequest: requestQ = requestQ[1:] workerQ = workerQ[1:] case <-s.ctx.Done(): return } } }() }
參考
Visualizing Concurrency in GoGoogle I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler
留言
張貼留言