- 取得連結
- X
- 以電子郵件傳送
- 其他應用程式
程式語言:Go
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Google I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler
- Package:
- sync
- context
程式碼參考自 Visualizing Concurrency in Go
基本上只要 goroutine 內有用到 channel 就必須用 select 防止卡住
並且要留下一個關閉的機制,才不會造成 goroutine leakage
Concurrency vs. Parallelism
- 程式
- Concurrency 指的是程式的架構
- Parallelism 指的是程式運行時的狀態
- 任務
- Concurrency 與核心數無關,意指任務的切換與合作
像是爬蟲,發出 request 1 時,需等待回應
此時收到 request 2 的回應,則切到 request 2 做處理
等 request 1 有回應時,再切回去 - Parallelism 需是多核心,意指任務的同時進行
同時處理 request 1 & request 2 的發送與回應,但互不干擾 - 生活
- Concurrency 一個人同時吃三個饅頭,每次只吃饅頭的一部分
- Parallelism 三個人各別吃自己的饅頭
- 工作
- Concurrency 同一個團隊,大家一起完成工作目標,互相合作
- Parallelism 不同團隊,各自完成工作目標,互不影響
圖解說明
畫圖基本步驟:
- 先畫出 channel,如左方
- 將 channel 的相關 Sender & Receiver 畫出,如中間
通常都是一對的,箭頭表示傳送與接收 - 找出第一筆傳送的 data,一定會有的,也就是第一推動力
- 若有多個,像是 workers 可用右方的 Multi 表示
- Select 則會將 Sender & Receiver 塗上相同的顏色,表示任選一個通行,如右下
Playground
package main
func main() {
// 建立 channel
ch := make(chan int)
// 開始 goroutine
go func() {
// 傳送值給 channel
ch <- 123
}()
// 從 channel 讀值
<-ch
}
Playground
package main
import (
"fmt"
"time"
)
func timer(d time.Duration) <-chan int {
c := make(chan int)
go func() {
time.Sleep(d)
c <- 1
}()
return c
}
func main() {
for i := 0; i < 24; i++ {
c := timer(1 * time.Second)
fmt.Println(<-c)
}
}
Playground
重點在於,goroutine 的 FIFO
重點在於,goroutine 的 FIFO
package main
import (
"fmt"
"time"
)
func main() {
var Ball int
table := make(chan int)
for i := 0; i < 5; i++ {
go player(i, table)
}
table <- Ball
time.Sleep(1 * time.Second)
<-table
}
func player(id int, table chan int) {
for {
ball := <-table
fmt.Printf("Player %d Get Ball\n", id)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
// Go 運行時, receivers 為 FIFO 隊列,故會看到依序接收的情況
// 像是 0 1 4 3 2 0 1 4 3 2 ... 每 0 1 4 3 2 一個循環
Playground
重點在於,以多合一,再提供給別人
重點在於,以多合一,再提供給別人
package main
import (
"fmt"
"time"
)
func producer(d time.Duration) <-chan int {
ch := make(chan int)
go func() {
var i int
for {
ch <- i
i++
time.Sleep(d)
}
}()
return ch
}
func fanIn(input1, input2 <-chan int) <-chan int {
out := make(chan int)
go func() {
for {
select {
case s := <-input1:
out <- s
case s := <-input2:
out <- s
}
}
}()
return out
}
func main() {
input1 := producer(100 * time.Millisecond)
input2 := producer(250 * time.Millisecond)
out := fanIn(input1, input2)
for x := range out {
fmt.Println(x)
}
}
Playground
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
重點在於,以一傳多,並且 close 的處理,利用 sync 避免 goroutine leak
package main
import (
"fmt"
"runtime/debug"
"sync"
"time"
)
const (
WORKERS = 5
SUBWORKERS = 3
TASKS = 5
SUBTASKS = 10
)
func subworker(subtasks chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-subtasks
if !ok {
return
}
time.Sleep(time.Duration(task) * time.Millisecond)
fmt.Println(task)
}
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasks
if !ok {
return
}
var subwg sync.WaitGroup
subtasks := make(chan int)
for i := 0; i < SUBWORKERS; i++ {
subwg.Add(1)
go subworker(subtasks, &subwg)
}
for i := 0; i < SUBTASKS; i++ {
subtask := task + 100
subtasks <- subtask
}
// 只是送出關閉訊號,subworker 不見得馬上關閉
close(subtasks)
subwg.Wait()
}
}
func main() {
var wg sync.WaitGroup
tasks := make(chan int)
for i := 0; i < WORKERS; i++ {
wg.Add(1)
go worker(tasks, &wg)
}
for i := 0; i < TASKS; i++ {
tasks <- i
}
close(tasks)
wg.Wait()
// 偵測是否有未關閉的 goroutine
debug.SetTraceback("all")
panic(1)
}
Playground
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
type.go
engine.go
scheduler.go
重點在於,select 如何處理 queue 的部分,並且可延伸至 context 的處理,才不會造成 goroutine leak
main.go
package main
import (
"context"
"fmt"
"runtime/debug"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
e := ConcurrentEngine{
Scheduler: &QueueScheduler{ctx: ctx},
WorkerCount: 10,
ctx: ctx,
}
dataChan := e.Run(0, 100)
for data := range dataChan {
fmt.Println(data)
if data == 10 {
break
}
}
cancel()
// 偵測是否有未關閉的 goroutine
time.Sleep(time.Second)
debug.SetTraceback("all")
panic(1)
}
type.go
package main
type Request int
type ParseResult struct {
Item int
Requests []Request
}
type Scheduler interface {
Submit(Request)
WorkerReady(chan Request)
Run()
}
engine.go
package main
import "context"
type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
ctx context.Context
}
func (e *ConcurrentEngine) Run(seeds ...Request) chan int {
out := make(chan ParseResult)
dataChan := make(chan int)
e.Scheduler.Run()
for i := 0; i < e.WorkerCount; i++ {
e.createWorker(out, e.Scheduler)
}
for _, r := range seeds {
e.Scheduler.Submit(r)
}
go func() {
var dataQ []int
for {
var activeData int
var activeDataChan chan<- int
if len(dataQ) > 0 {
activeData = dataQ[0]
activeDataChan = dataChan
}
select {
case activeDataChan <- activeData:
dataQ = dataQ[1:]
case result := <-out:
dataQ = append(dataQ, result.Item)
// 處理額外的 requests
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
case <-e.ctx.Done():
return
}
}
}()
return dataChan
}
func (e *ConcurrentEngine) createWorker(out chan<- ParseResult, s Scheduler) {
in := make(chan Request)
go func() {
var parseResultQ []ParseResult
s.WorkerReady(in)
for {
var activeResult ParseResult
var activeResultChan chan<- ParseResult
if len(parseResultQ) > 0 {
activeResult = parseResultQ[0]
activeResultChan = out
}
select {
case request := <-in:
result := worker(request)
parseResultQ = append(parseResultQ, result)
s.WorkerReady(in)
case activeResultChan <- activeResult:
parseResultQ = parseResultQ[1:]
case <-e.ctx.Done():
return
}
}
}()
}
func worker(request Request) ParseResult {
return ParseResult{
Item: int(request),
Requests: []Request{Request(int(request) + 1)},
}
}
scheduler.go
package main
import "context"
type QueueScheduler struct {
workerChan chan chan Request
requestChan chan Request
ctx context.Context
}
func (s *QueueScheduler) Submit(r Request) {
select {
case s.requestChan <- r:
case <-s.ctx.Done():
}
}
func (s *QueueScheduler) WorkerReady(w chan Request) {
select {
case s.workerChan <- w:
case <-s.ctx.Done():
}
}
func (s *QueueScheduler) Run() {
s.requestChan = make(chan Request)
s.workerChan = make(chan chan Request)
go func() {
var requestQ []Request
var workerQ []chan Request
for {
var activeRequest Request
var activeWorker chan Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeRequest = requestQ[0]
activeWorker = workerQ[0]
}
select {
case r := <-s.requestChan:
requestQ = append(requestQ, r)
case w := <-s.workerChan:
workerQ = append(workerQ, w)
case activeWorker <- activeRequest:
requestQ = requestQ[1:]
workerQ = workerQ[1:]
case <-s.ctx.Done():
return
}
}
}()
}
參考
Visualizing Concurrency in GoGoogle I/O 2012 - Go Concurrency Patterns 投影片
Google I/O 2012 - Go Concurrency Patterns 影片
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
Concurrency 與 Parallelism 的不同之處
還在疑惑並發和並行?
Concurrency is not parallelism
golang 学习
joizhang/learn-golang
Scheduling In Go : Part I - OS Scheduler







留言
張貼留言