程式語言:Python
Package:multiprocessing
官方文件
功能:並行處理
因 GIL (CPython) 緣故,multithread 需用 multiprocess 取代,可參考以下文章
Python 最難的問題
Python 的 GIL 是什么鬼,多线程性能究竟如何
注意事項
-
window 程式需在 if __name__ == '__main__': 之內運行
不然會有 RuntimeError,甚至莫名的錯誤
因 window 沒有 fork,所以執行時是採用 spawn,使用 runpy 實現
所以每個 child process 會在一開始 import 原先的 module 再執行對應的 function
參考 multiprocessing - Windows
指定產生 process 的方式- Contexts and start methods
- from multiprocessing import Pool
- import sys
- import os
-
-
- def f(x):
- return x
-
-
- if __name__ == '__main__':
- # 依 CPU 數量建立 child process
- pool = Pool()
- pool.map(f, range(10))
-
- # 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
- try:
- print('pid:{:4d} stdout:{:8d}, __name__:{:10s}, importedBy:{}'.format(os.getpid(), id(sys.stdout), __name__, sys._getframe(1).f_globals.get('__name__')))
- except ValueError:
- print('pid:{:4d} stdout:{:8d}, __name__:{:10s}'.format(os.getpid(), id(sys.stdout), __name__))
-
-
- # 輸出結果
- # pid:5908 stdout: 4703408, __name__:__mp_main__, importedBy:runpy
- # pid:2296 stdout: 6866096, __name__:__mp_main__, importedBy:runpy
- # pid:2152 stdout:21546160, __name__:__main__
-
print 視需求加上 flush=True 強制刷新
不然最後程式結束強制 flush 只是殘存在任一 sys.stdout 的東西
- from multiprocessing import Pool
- import time
- import sys
-
-
- def f(x):
- time.sleep(0.1)
- # 無換行,所以會先暫存在 sys.stdout,等待 flush
- print(x, end=" ")
- return x
-
-
- def g(x):
- if x >=0 :
- print(":g", id(sys.stdout), x)
- return x
-
-
- if __name__ == '__main__':
- # 可加入 maxtasksperchild 限定,因 process 結束時也會 flush sys.stdout
- pool = Pool()
- print(pool.map(f, range(10)))
-
- # 加上 close() & join() 可清空所有 sys.stdout
- # pool.close()
- # pool.join()
-
- # 此行可強制輸出 sys.stdout 內的東西
- # 因換行代表強制 flush
- # print(pool.map(g, range(6)))
-
- # 每個 process 可能有不同的 sys.stdout
- # 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
- print("stdout:", id(sys.stdout))
-
- # 輸出結果
- # stdout: 7324848
- # stdout: 4768944
- # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- # stdout: 4900016
- # 0 1 2 3 6 7
建立 Process
Process
- import time
- from multiprocessing import Process, Lock, freeze_support, set_start_method
-
- def f(lock, i):
- print('{:2d} Stop'.format(i))
- # 暫停 1s,這邊是為了看出其他 child process Stop 的效果,才暫停的
- time.sleep(1)
- # block 直到上一個 child process 解鎖
- lock.acquire()
- try:
- print('{:2d} Start'.format(i))
- finally:
- # 必做解鎖,不然會影響到下一個 child process
- lock.release()
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # 在 window 中,像 PyInstaller 為了產生執行檔
- # multiprocessing 會被禁止,導致無法執行,出現 RuntimeError
- # 需在 start 前加入此 function,只在 window 有效,其餘無作用
- # 且在 __name__ == '__main__' 之後
- freeze_support()
- # 指定產生 process 的方式,讓不同平台的表現一致
- set_start_method('spawn')
- lock = Lock()
-
- for num in range(10):
- # 建立 child process
- p = Process(target=f, args=(lock, num))
- # 開始執行 child process
- p.start()
- # blocking parent process 直到 child process 返回
- # 等待 0.1s,可為無參數,則表示永久等待
- p.join(timeout=0.1)
-
- # 輸出結果,每次結果都不一樣
- # 可以看到並不是連續的,這是因為 child process 建立有快有慢
- # 且搶到 key 的,也不一定是按 stop 的順序,需注意
- # 0 Stop
- # 1 Stop
- # 3 Stop
- # 2 Stop
- # 5 Stop
- # 4 Stop
- # 0 Start
- # 1 Start
- # 7 Stop
- # 2 Start
- # 3 Start
- # 9 Stop
- # 8 Stop
- # 6 Stop
- # 5 Start
- # 4 Start
- # 7 Start
- # 9 Start
- # 8 Start
- # 6 Start
Pool
多個 child process 建議用法
- import os
- import time
- import traceback
- from multiprocessing import Pool
-
-
- def handle_error(e):
- '''處理 child process 的錯誤,不然 code 寫錯時,不會回報任何錯誤'''
- traceback.print_exception(type(e), e, e.__traceback__)
-
-
- def long_time_task(name):
- print('任務 {} ({}) 開始'.format(name, os.getpid()))
- start = time.time()
- time.sleep(3)
- end = time.time()
- print('任務 {} 執行 {:0.2f} seconds.'.format(name, (end - start)))
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__=='__main__':
- print('Parent process {}.'.format(os.getpid()))
- # 指定建立 child process 的數量,若無指定,預設為 cpu 數量
- with Pool(processes=3) as p:
- for i in range(5):
- # 建立 child process 並執行,error_callback 必須指定,不然很難 debug
- p.apply_async(long_time_task, args=(i,), error_callback=handle_error)
- print('等待所有 child processes 完成')
- # 關掉 pool 不再加入任何 child process
- p.close()
- # 調用 join() 之前必須先調用close()
- p.join()
- print('所有 child processes 完成')
-
- # 執行結果
- # Parent process 6072.
- # 等待所有 child processes 完成
- # 任務 0 (5192) 開始
- # 任務 1 (6040) 開始
- # 任務 2 (4576) 開始
- # 任務 0 執行 3.01 seconds.
- # 任務 3 (5192) 開始
- # 任務 1 執行 3.01 seconds.
- # 任務 4 (6040) 開始
- # 任務 2 執行 3.01 seconds.
- # 任務 3 執行 3.00 seconds.
- # 任務 4 執行 3.00 seconds.
- # 所有 child processes 完成
回傳資料
Pool
- from multiprocessing import Pool, TimeoutError
- import time
-
-
- def f(x):
- return x*x
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # start 4 worker processes
- with Pool(processes=4) as pool:
-
- # 建立 child process 並執行
- resA = pool.apply_async(f, (20,))
-
- # get timeout
- resB = pool.apply_async(time.sleep, (10,))
- try:
- print(resB.get(timeout=1))
- except TimeoutError:
- print("得 resB 值超時")
-
- print("pool 此時仍可使用")
-
- # 已跳出 with,故 pool 已被 close
- print("pool 已不可使用")
- # 得到回傳值
- print('resA', resA.get(timeout=1)) # resA 400
資料交換
資料不會被更改,只是單純的再複製一份
Queues
- from multiprocessing import Process, Queue
-
-
- def f(q):
- q.put([42, None, 'hello'])
- q.put(2)
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # FIFO
- q = Queue()
- p = Process(target=f, args=(q,))
- p.start()
- print(q.get()) # [42, None, 'hello']
- print(q.get()) # 2
- p.join()
Pipes
- from multiprocessing import Process, Pipe
-
-
- def f(conn):
- print('In f')
- # 從 a 接收資料
- print(conn.recv()) # 2
- # 送出資料給 a
- conn.send([42, None, 'hello'])
- # 關閉 child process 的 b 端口,但不影響 parenet process 的 b 端口
- conn.close()
- print('b_conn close', conn.closed)
- print('Out f')
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # 會回傳兩個端點,如同水管的兩端,可互相溝通
- a_conn, b_conn = Pipe()
- # 送出資料給 b
- a_conn.send(2)
- # 建立 child process
- p = Process(target=f, args=(b_conn,))
- # 執行 child process
- p.start()
- # 從 b 接收資料
- print(a_conn.recv()) # [42, None, 'hello']
- # 等待直到 child process 完成
- p.join()
- # 在 child process 關閉,並不影響 parent process 的端口
- print('b_conn close', b_conn.closed)
-
-
- # 輸出結果
- # In f
- # 2
- # b_conn close True
- # Out f
- # [42, None, 'hello']
- # b_conn close False
資料分享
資料會被更改
Shared memory
- from multiprocessing import Process, Value, Array
-
-
- def f(n, a):
- n.value = 3.1415927
- for i, x in enumerate(a):
- a[i] = -x
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- num = Value('d', 0.0) # 0.0
- arr = Array('i', range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- print(num.value)
- print(arr[:])
-
- # 建立 child process
- p = Process(target=f, args=(num, arr))
- # 執行 child process
- p.start()
- # 等待直到 child process 完成
- p.join()
-
- print(num.value) # 3.1415927
- print(arr[:]) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Server process
速度較慢,但可通過網路分享
- from multiprocessing import Process, Manager
-
-
- def f(d, l):
- d[1] = '1'
- d['2'] = 2
- d[0.25] = None
- l.reverse()
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- with Manager() as manager:
- d = manager.dict()
- l = manager.list(range(10))
- print(d) # {}
- print(l) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
-
- # 建立 child process
- p = Process(target=f, args=(d, l))
- # 執行 child process
- p.start()
- # 等待直到 child process 完成
- p.join()
-
- print(d) # {0.25: None, 1: '1', '2': 2}
- print(l) # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
class multiprocessing.Process
lass multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- 參數
- group
- target
- name
- args
- kwargs
- daemon
- True
- parent process 結束時,同時終止所有 child process
- 屬性
- name
- child process 名字,只單純用來給人看
- 預設為 ‘Process-N1:N2:...:Nk‘ ,k 代表第幾個 child process
- daemon
- 當結束時,是否隨 parent process 結束
- 需在 start() 前設定
- pid
- exitcode
- 終止的 code
- child 未結束時仍是 None
- authkey
- parent process 的 身份驗證 key
- sentinel
- 方法
- run()
- start()
- join([timeout])
- 持續等待直到 child process 完成
- process 不該呼叫自已的 join,會產生 deadlock
- timeout (秒)
- is_alive()
- terminate()
- import multiprocessing, time, signal
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # 建立
- p = multiprocessing.Process(target=time.sleep, args=(1000,))
- print(p, p.is_alive()) # Process(Process-1, initial)> False
-
- # 啟動
- p.start()
- print(p, p.is_alive()) # Process(Process-1, started)> True
-
- # 終止
- p.terminate()
- # 等一下,以確定終止完成
- time.sleep(0.1)
- print(p, p.is_alive()) # Process(Process-1, stopped[SIGTERM])> False
- print(p.exitcode == -signal.SIGTERM) # True
-
- # 輸出結果
- # <Process(Process-1, initial)> False
- # <Process(Process-1, started)> True
- # <Process(Process-1, stopped[SIGTERM])> False
- # True
class multiprocessing.Queue([maxsize])
- 多 process 溝通的建議首選
- 參數
- maxsize
- queue 的大小
- 設為 5 表示可放入 5 個 objects
- 方法
- 因為 multiprocessing 的關係,跟數量有關係的都只是預估值,並不可靠
- qsize()
- empty()
- full()
- put(obj[, block=True[, timeout=None]])
- put_nowait(obj)
- get([block=True[, timeout=None]])
- get_nowait()
- close()
- join_thread()
- 確認所有的 data 都已被清除,再退出 process
- 必須使用在 close() 後面
- cancel_join_thread()
- 不清除 data 直接退出 process
- 必須使用在 close() 後面
- from multiprocessing import Queue
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- # 大小設定為 5 個 objects
- q = Queue(5)
- print(q.empty()) # True
- print(q.qsize()) # 0
-
- q.put([1,2,3], False)
- q.put(1, False)
- q.put('123', False)
- q.put({1,2,3}, False)
- q.put({'list':[1,2,3], 'dict':{'1':1, '2':2}}, False)
-
- print(q.qsize()) # 5
- print(q.full()) # True
-
- # 關閉,並等到清除資料後才退出 process
- q.close()
- q.join_thread()
multiprocessing.Pipe([duplex=True])
- 回傳 Connection objects (conn1, conn2)
- 參數
class multiprocessing.Connection
- 方法
- fileno()
- 回傳 file descriptor or handle used by the connection.
- close()
- poll([timeout])
- send_bytes(buffer[, offset[, size]])
- 傳送 bytes
- buffer
- offset
- size
- recv_bytes([maxlength])
- 持續等待直到接收到 bytes
- maxlength
- recv_bytes_into(buffer[, offset])
- 持續等待直到接收到 bytes 並放入 buffer
- from multiprocessing import Pipe
-
-
- a, b = Pipe()
- a.send([1, 'hello', None])
- print(b.poll()) # True
- print(b.recv()) #[1, 'hello', None]
- b.send_bytes(b'thank you')
- print(a.recv_bytes()) # b'thank you'
-
- import array
- # 建立 array,格式為 byte
- arr1 = array.array('i', range(5))
- arr2 = array.array('i', [0] * 10)
- a.send_bytes(arr1)
- # 從第三個開始填入
- count = b.recv_bytes_into(arr2, 3*arr2.itemsize)
- # 確認收到的 bytes 一致
- assert count == len(arr1) * arr1.itemsize
- print(arr2) # array('b', [0, 0, 0, 0, 1, 2, 3, 4, 0, 0])
class multiprocessing.pool.Pool
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- 參數
- porcesses
- 執行 process 的上限
- 預設為 cpu 個數
- initializer
- 初始函數
- 每個 process 啟動前呼叫 initializer(*initargs) 初始化
- initargs
- maxtasksperchild
- 完成多少數量的任務後需重啟 process
- 避免運行時間很長的工作進程消耗太多的系統資源
- from multiprocessing import Pool
- import os
-
-
- def f():
- print("PID: %d" % os.getpid())
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- pool = Pool(1, maxtasksperchild=2)
-
- for _ in range(5):
- pool.apply_async(f)
-
- # 關閉 pool
- pool.close()
- # 需關閉後才能 join()
- # 不加此行,會看不到結果,因為會隨 parent process 關閉
- pool.join()
-
- # 輸出結果
- # PID: 5028
- # PID: 5028
- # PID: 4284
- # PID: 4284
- # PID: 5712
context
方法
- 回傳的物件
- blocking
- 即是目標函數所回傳的物件
內部程式碼為 return multiprocessing.pool.AsyncResult.get()
因為 get() 故為 blocking
- nonblocking
- 即是 multiprocessing.pool.AsyncResult
內部程式碼為 return multiprocessing.pool.AsyncResult
- apply(func[, args[, kwds]])
- 建立 child process 並執行
- blocking,需等到完成才會進行下一步
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 建立 child process 並執行
- nonblocking,直接進行下一步
- callback
- error_callback
- from multiprocessing import Pool
- import traceback
-
-
- def f(x):
- return x*x
-
-
- def show(x):
- print(x+1, 'In show')
-
-
- def handle_error(e):
- traceback.print_exception(type(e), e, e.__traceback__)
-
-
- # 因 window spawn 的緣故
- # 必須在 __name__ == '__main__' 之內執行
- if __name__ == '__main__':
- with Pool() as pool:
- result = pool.apply_async(f, (10,), callback=show, error_callback=handle_error)
- print(result.get(timeout=1))
-
-
- # 輸出結果
- # 101 In show
- # 100
map(func, iterable[, chunksize])
- 同內建的 map,但只支援單一參數的 function
- blocking,直到完成才會進行下一步
- chunksize
- from multiprocessing import Pool
- import time
-
-
- def f(x):
- # 稍微暫停才看得出差異
- time.sleep(0.1)
- print(x)
-
-
- if __name__ == '__main__':
- pool = Pool(2)
- # chunksize 若改為 20,則會按順序輸出,因同時處理 20筆
- pool.map(f, range(20), chunksize=10)
-
-
- # 輸出結果,因個別交錯輸出
- # 0
- # 10
- # 1
- # 11
- # 2
- # 12
- # 3
- # 13
- # 4
- # 14
- # 5
- # 15
- # 6
- # 16
- # 7
- # 17
- # 8
- # 18
- # 9
- # 19
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 參數同 apply_async & map
- nonblocking
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])
- 類似 map,但傳入的參數視為 *x
- blocking,直到完成才會進行下一步
- from multiprocessing import Pool
- import time
-
-
- def f(x, y):
- print(x, y)
-
-
- if __name__ == '__main__':
- pool = Pool()
- pool.starmap(f, zip(range(20), range(20,40)), chunksize=1)
-
-
- # 輸出結果
- # 0 20
- # 1 21
- # 2 22
- # 3 23
- # 4 24
- # 5 25
- # 6 26
- # 7 27
- # 8 28
- # 10 30
- # 11 31
- # 9 29
- # 12 32
- # 13 33
- # 15 35
- # 14 34
- # 16 36
- # 17 37
- # 18 38
- # 19 39
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
- 參數同 apply_async & map
- nonblocking,直接執行下一步
close()
- 等到所有任務完成,process 將退出,並關閉 Pool
terminate()
join()
- 等待 child process 結束
- 需先呼叫 close() 或 terminate()
class multiprocessing.pool.AsyncResult
- 得 nonblocking 的回傳結果,例:Pool.apply_async() 和 Pool.map_async()
- 方法
- get([timeout])
- wait([timeout])
- ready()
- successful()
- 同 ready(),但未完成會 raise AssertionError
Logging
- debug 使用
- multiprocessing.get_logger()
- multiprocessing.log_to_stderr()
- 呼叫 get_logger() 並回傳 logger
- 同時加入處理程序,將訊息發送到 sys.stderr
格式 '[%(levelname)s /%(processName)s] %(message)s'
- import multiprocessing, logging
-
-
- logger = multiprocessing.log_to_stderr()
- logger.setLevel(logging.INFO)
-
- # 因 window spawn 的緣故
- if __name__ == '__main__':
- m = multiprocessing.Manager()
- del m
-
- # 輸出結果
- # [INFO/SyncManager-1] child process calling self.run()
- # [INFO/SyncManager-1] child process calling self.run()
- # [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
- # [INFO/MainProcess] sending shutdown message to manager
- # [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
- # [INFO/SyncManager-1] process shutting down
- # [INFO/SyncManager-1] process shutting down
- # [INFO/SyncManager-1] process exiting with exitcode 0
- # [INFO/SyncManager-1] process exiting with exitcode 0
- # [INFO/MainProcess] process shutting down
參考
Python 標準庫10 多進程初步 (multiprocessing包)
python 中的 Queue 與多進程(multiprocessing)
正确使用 Multiprocessing 的姿势
Python 多进程编程
multiprocessing--多線程
留言
張貼留言