- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Go
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Google I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler
- Package:
- sync
- context
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Concurrency vs. Parallelism
- 程式
- Concurrency 指的是程式的架構
- Parallelism 指的是程式運行時的狀態
- 任務
- Concurrency 與核心數無關,意指任務的切換與合作
像是爬蟲,發出 request 1 時,需等待回應
此時收到 request 2 的回應,則切到 request 2 做處理
等 request 1 有回應時,再切回去 - Parallelism 需是多核心,意指任務的同時進行
同時處理 request 1 & request 2 的發送與回應,但互不干擾 - 生活
- Concurrency 一個人同時吃三個饅頭,每次只吃饅頭的一部分
- Parallelism 三個人各別吃自己的饅頭
- 工作
- Concurrency 同一個團隊,大家一起完成工作目標,互相合作
- Parallelism 不同團隊,各自完成工作目標,互不影響
圖解說明
畫圖基本步驟:
- 先畫出 channel,如左方
- 將 channel 的相關 Sender & Receiver 畫出,如中間
通常都是一對的,箭頭表示傳送與接收 - 找出第一筆傳送的 data,一定會有的,也就是第一推動力
- 若有多個,像是 workers 可用右方的 Multi 表示
- Select 則會將 Sender & Receiver 塗上相同的顏色,表示任選一個通行,如右下
Playground
- package main
- func main() {
- // 建立 channel
- ch := make(chan int)
- // 開始 goroutine
- go func() {
- // 傳送值給 channel
- ch <- 123
- }()
- // 從 channel 讀值
- <-ch
- }
Playground
- package main
- import (
- "fmt"
- "time"
- )
- func timer(d time.Duration) <-chan int {
- c := make(chan int)
- go func() {
- time.Sleep(d)
- c <- 1
- }()
- return c
- }
- func main() {
- for i := 0; i < 24; i++ {
- c := timer(1 * time.Second)
- fmt.Println(<-c)
- }
- }
Playground
重點在於,goroutine 的 FIFO
重點在於,goroutine 的 FIFO
- package main
- import (
- "fmt"
- "time"
- )
- func main() {
- var Ball int
- table := make(chan int)
- for i := 0; i < 5; i++ {
- go player(i, table)
- }
- table <- Ball
- time.Sleep(1 * time.Second)
- <-table
- }
- func player(id int, table chan int) {
- for {
- ball := <-table
- fmt.Printf("Player %d Get Ball\n", id)
- time.Sleep(100 * time.Millisecond)
- table <- ball
- }
- }
- // Go 運行時, receivers 為 FIFO 隊列,故會看到依序接收的情況
- // 像是 0 1 4 3 2 0 1 4 3 2 ... 每 0 1 4 3 2 一個循環
Playground
重點在於,以多合一,再提供給別人
重點在於,以多合一,再提供給別人
- package main
- import (
- "fmt"
- "time"
- )
- func producer(d time.Duration) <-chan int {
- ch := make(chan int)
- go func() {
- var i int
- for {
- ch <- i
- i++
- time.Sleep(d)
- }
- }()
- return ch
- }
- func fanIn(input1, input2 <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- for {
- select {
- case s := <-input1:
- out <- s
- case s := <-input2:
- out <- s
- }
- }
- }()
- return out
- }
- func main() {
- input1 := producer(100 * time.Millisecond)
- input2 := producer(250 * time.Millisecond)
- out := fanIn(input1, input2)
- for x := range out {
- fmt.Println(x)
- }
- }
Playground
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
- package main
- import (
- "fmt"
- "runtime/debug"
- "sync"
- "time"
- )
- const (
- WORKERS = 5
- SUBWORKERS = 3
- TASKS = 5
- SUBTASKS = 10
- )
- func subworker(subtasks chan int, wg *sync.WaitGroup) {
- defer wg.Done()
- for {
- task, ok := <-subtasks
- if !ok {
- return
- }
- time.Sleep(time.Duration(task) * time.Millisecond)
- fmt.Println(task)
- }
- }
- func worker(tasks <-chan int, wg *sync.WaitGroup) {
- defer wg.Done()
- for {
- task, ok := <-tasks
- if !ok {
- return
- }
- var subwg sync.WaitGroup
- subtasks := make(chan int)
- for i := 0; i < SUBWORKERS; i++ {
- subwg.Add(1)
- go subworker(subtasks, &subwg)
- }
- for i := 0; i < SUBTASKS; i++ {
- subtask := task + 100
- subtasks <- subtask
- }
- // 只是送出關閉訊號,subworker 不見得馬上關閉
- close(subtasks)
- subwg.Wait()
- }
- }
- func main() {
- var wg sync.WaitGroup
- tasks := make(chan int)
- for i := 0; i < WORKERS; i++ {
- wg.Add(1)
- go worker(tasks, &wg)
- }
- for i := 0; i < TASKS; i++ {
- tasks <- i
- }
- close(tasks)
- wg.Wait()
- // 偵測是否有未關閉的 goroutine
- debug.SetTraceback("all")
- panic(1)
- }
Playground
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
type.go
engine.go
scheduler.go
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
- package main
- import (
- "context"
- "fmt"
- "runtime/debug"
- "time"
- )
- func main() {
- ctx, cancel := context.WithCancel(context.Background())
- e := ConcurrentEngine{
- Scheduler: &QueueScheduler{ctx: ctx},
- WorkerCount: 10,
- ctx: ctx,
- }
- dataChan := e.Run(0, 100)
- for data := range dataChan {
- fmt.Println(data)
- if data == 10 {
- break
- }
- }
- cancel()
- // 偵測是否有未關閉的 goroutine
- time.Sleep(time.Second)
- debug.SetTraceback("all")
- panic(1)
- }
type.go
- package main
- type Request int
- type ParseResult struct {
- Item int
- Requests []Request
- }
- type Scheduler interface {
- Submit(Request)
- WorkerReady(chan Request)
- Run()
- }
engine.go
- package main
- import "context"
- type ConcurrentEngine struct {
- Scheduler Scheduler
- WorkerCount int
- ctx context.Context
- }
- func (e *ConcurrentEngine) Run(seeds ...Request) chan int {
- out := make(chan ParseResult)
- dataChan := make(chan int)
- e.Scheduler.Run()
- for i := 0; i < e.WorkerCount; i++ {
- e.createWorker(out, e.Scheduler)
- }
- for _, r := range seeds {
- e.Scheduler.Submit(r)
- }
- go func() {
- var dataQ []int
- for {
- var activeData int
- var activeDataChan chan<- int
- if len(dataQ) > 0 {
- activeData = dataQ[0]
- activeDataChan = dataChan
- }
- select {
- case activeDataChan <- activeData:
- dataQ = dataQ[1:]
- case result := <-out:
- dataQ = append(dataQ, result.Item)
- // 處理額外的 requests
- for _, request := range result.Requests {
- e.Scheduler.Submit(request)
- }
- case <-e.ctx.Done():
- return
- }
- }
- }()
- return dataChan
- }
- func (e *ConcurrentEngine) createWorker(out chan<- ParseResult, s Scheduler) {
- in := make(chan Request)
- go func() {
- var parseResultQ []ParseResult
- s.WorkerReady(in)
- for {
- var activeResult ParseResult
- var activeResultChan chan<- ParseResult
- if len(parseResultQ) > 0 {
- activeResult = parseResultQ[0]
- activeResultChan = out
- }
- select {
- case request := <-in:
- result := worker(request)
- parseResultQ = append(parseResultQ, result)
- s.WorkerReady(in)
- case activeResultChan <- activeResult:
- parseResultQ = parseResultQ[1:]
- case <-e.ctx.Done():
- return
- }
- }
- }()
- }
- func worker(request Request) ParseResult {
- return ParseResult{
- Item: int(request),
- Requests: []Request{Request(int(request) + 1)},
- }
- }
scheduler.go
- package main
- import "context"
- type QueueScheduler struct {
- workerChan chan chan Request
- requestChan chan Request
- ctx context.Context
- }
- func (s *QueueScheduler) Submit(r Request) {
- select {
- case s.requestChan <- r:
- case <-s.ctx.Done():
- }
- }
- func (s *QueueScheduler) WorkerReady(w chan Request) {
- select {
- case s.workerChan <- w:
- case <-s.ctx.Done():
- }
- }
- func (s *QueueScheduler) Run() {
- s.requestChan = make(chan Request)
- s.workerChan = make(chan chan Request)
- go func() {
- var requestQ []Request
- var workerQ []chan Request
- for {
- var activeRequest Request
- var activeWorker chan Request
- if len(requestQ) > 0 && len(workerQ) > 0 {
- activeRequest = requestQ[0]
- activeWorker = workerQ[0]
- }
- select {
- case r := <-s.requestChan:
- requestQ = append(requestQ, r)
- case w := <-s.workerChan:
- workerQ = append(workerQ, w)
- case activeWorker <- activeRequest:
- requestQ = requestQ[1:]
- workerQ = workerQ[1:]
- case <-s.ctx.Done():
- return
- }
- }
- }()
- }
參考
Visualizing Concurrency in GoGoogle I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler
留言
張貼留言