程式語言:Python
Package:multiprocessing
官方文件
功能:並行處理
因 GIL (CPython) 緣故,multithread 需用 multiprocess 取代,可參考以下文章
Python 最難的問題
Python 的 GIL 是什么鬼,多线程性能究竟如何
注意事項
-
window 程式需在 if __name__ == '__main__': 之內運行
不然會有 RuntimeError,甚至莫名的錯誤
因 window 沒有 fork,所以執行時是採用 spawn,使用 runpy 實現
所以每個 child process 會在一開始 import 原先的 module 再執行對應的 function
參考 multiprocessing - Windows
指定產生 process 的方式- Contexts and start methods
from multiprocessing import Pool
import sys
import os
def f(x):
return x
if __name__ == '__main__':
# 依 CPU 數量建立 child process
pool = Pool()
pool.map(f, range(10))
# 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
try:
print('pid:{:4d} stdout:{:8d}, __name__:{:10s}, importedBy:{}'.format(os.getpid(), id(sys.stdout), __name__, sys._getframe(1).f_globals.get('__name__')))
except ValueError:
print('pid:{:4d} stdout:{:8d}, __name__:{:10s}'.format(os.getpid(), id(sys.stdout), __name__))
# 輸出結果
# pid:5908 stdout: 4703408, __name__:__mp_main__, importedBy:runpy
# pid:2296 stdout: 6866096, __name__:__mp_main__, importedBy:runpy
# pid:2152 stdout:21546160, __name__:__main__
-
print 視需求加上 flush=True 強制刷新
不然最後程式結束強制 flush 只是殘存在任一 sys.stdout 的東西
from multiprocessing import Pool
import time
import sys
def f(x):
time.sleep(0.1)
# 無換行,所以會先暫存在 sys.stdout,等待 flush
print(x, end=" ")
return x
def g(x):
if x >=0 :
print(":g", id(sys.stdout), x)
return x
if __name__ == '__main__':
# 可加入 maxtasksperchild 限定,因 process 結束時也會 flush sys.stdout
pool = Pool()
print(pool.map(f, range(10)))
# 加上 close() & join() 可清空所有 sys.stdout
# pool.close()
# pool.join()
# 此行可強制輸出 sys.stdout 內的東西
# 因換行代表強制 flush
# print(pool.map(g, range(6)))
# 每個 process 可能有不同的 sys.stdout
# 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
print("stdout:", id(sys.stdout))
# 輸出結果
# stdout: 7324848
# stdout: 4768944
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# stdout: 4900016
# 0 1 2 3 6 7
建立 Process
Process
import time
from multiprocessing import Process, Lock, freeze_support, set_start_method
def f(lock, i):
print('{:2d} Stop'.format(i))
# 暫停 1s,這邊是為了看出其他 child process Stop 的效果,才暫停的
time.sleep(1)
# block 直到上一個 child process 解鎖
lock.acquire()
try:
print('{:2d} Start'.format(i))
finally:
# 必做解鎖,不然會影響到下一個 child process
lock.release()
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# 在 window 中,像 PyInstaller 為了產生執行檔
# multiprocessing 會被禁止,導致無法執行,出現 RuntimeError
# 需在 start 前加入此 function,只在 window 有效,其餘無作用
# 且在 __name__ == '__main__' 之後
freeze_support()
# 指定產生 process 的方式,讓不同平台的表現一致
set_start_method('spawn')
lock = Lock()
for num in range(10):
# 建立 child process
p = Process(target=f, args=(lock, num))
# 開始執行 child process
p.start()
# blocking parent process 直到 child process 返回
# 等待 0.1s,可為無參數,則表示永久等待
p.join(timeout=0.1)
# 輸出結果,每次結果都不一樣
# 可以看到並不是連續的,這是因為 child process 建立有快有慢
# 且搶到 key 的,也不一定是按 stop 的順序,需注意
# 0 Stop
# 1 Stop
# 3 Stop
# 2 Stop
# 5 Stop
# 4 Stop
# 0 Start
# 1 Start
# 7 Stop
# 2 Start
# 3 Start
# 9 Stop
# 8 Stop
# 6 Stop
# 5 Start
# 4 Start
# 7 Start
# 9 Start
# 8 Start
# 6 Start
Pool
多個 child process 建議用法
import os
import time
import traceback
from multiprocessing import Pool
def handle_error(e):
'''處理 child process 的錯誤,不然 code 寫錯時,不會回報任何錯誤'''
traceback.print_exception(type(e), e, e.__traceback__)
def long_time_task(name):
print('任務 {} ({}) 開始'.format(name, os.getpid()))
start = time.time()
time.sleep(3)
end = time.time()
print('任務 {} 執行 {:0.2f} seconds.'.format(name, (end - start)))
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__=='__main__':
print('Parent process {}.'.format(os.getpid()))
# 指定建立 child process 的數量,若無指定,預設為 cpu 數量
with Pool(processes=3) as p:
for i in range(5):
# 建立 child process 並執行,error_callback 必須指定,不然很難 debug
p.apply_async(long_time_task, args=(i,), error_callback=handle_error)
print('等待所有 child processes 完成')
# 關掉 pool 不再加入任何 child process
p.close()
# 調用 join() 之前必須先調用close()
p.join()
print('所有 child processes 完成')
# 執行結果
# Parent process 6072.
# 等待所有 child processes 完成
# 任務 0 (5192) 開始
# 任務 1 (6040) 開始
# 任務 2 (4576) 開始
# 任務 0 執行 3.01 seconds.
# 任務 3 (5192) 開始
# 任務 1 執行 3.01 seconds.
# 任務 4 (6040) 開始
# 任務 2 執行 3.01 seconds.
# 任務 3 執行 3.00 seconds.
# 任務 4 執行 3.00 seconds.
# 所有 child processes 完成
回傳資料
Pool
from multiprocessing import Pool, TimeoutError
import time
def f(x):
return x*x
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# 建立 child process 並執行
resA = pool.apply_async(f, (20,))
# get timeout
resB = pool.apply_async(time.sleep, (10,))
try:
print(resB.get(timeout=1))
except TimeoutError:
print("得 resB 值超時")
print("pool 此時仍可使用")
# 已跳出 with,故 pool 已被 close
print("pool 已不可使用")
# 得到回傳值
print('resA', resA.get(timeout=1)) # resA 400
資料交換
資料不會被更改,只是單純的再複製一份
Queues
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
q.put(2)
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# FIFO
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # [42, None, 'hello']
print(q.get()) # 2
p.join()
Pipes
from multiprocessing import Process, Pipe
def f(conn):
print('In f')
# 從 a 接收資料
print(conn.recv()) # 2
# 送出資料給 a
conn.send([42, None, 'hello'])
# 關閉 child process 的 b 端口,但不影響 parenet process 的 b 端口
conn.close()
print('b_conn close', conn.closed)
print('Out f')
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# 會回傳兩個端點,如同水管的兩端,可互相溝通
a_conn, b_conn = Pipe()
# 送出資料給 b
a_conn.send(2)
# 建立 child process
p = Process(target=f, args=(b_conn,))
# 執行 child process
p.start()
# 從 b 接收資料
print(a_conn.recv()) # [42, None, 'hello']
# 等待直到 child process 完成
p.join()
# 在 child process 關閉,並不影響 parent process 的端口
print('b_conn close', b_conn.closed)
# 輸出結果
# In f
# 2
# b_conn close True
# Out f
# [42, None, 'hello']
# b_conn close False
資料分享
資料會被更改
Shared memory
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i, x in enumerate(a):
a[i] = -x
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
num = Value('d', 0.0) # 0.0
arr = Array('i', range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
print(num.value)
print(arr[:])
# 建立 child process
p = Process(target=f, args=(num, arr))
# 執行 child process
p.start()
# 等待直到 child process 完成
p.join()
print(num.value) # 3.1415927
print(arr[:]) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Server process
速度較慢,但可通過網路分享
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
print(d) # {}
print(l) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 建立 child process
p = Process(target=f, args=(d, l))
# 執行 child process
p.start()
# 等待直到 child process 完成
p.join()
print(d) # {0.25: None, 1: '1', '2': 2}
print(l) # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
class multiprocessing.Process
lass multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- 參數
- group
- target
- name
- args
- kwargs
- daemon
- True
- parent process 結束時,同時終止所有 child process
- 屬性
- name
- child process 名字,只單純用來給人看
- 預設為 ‘Process-N1:N2:...:Nk‘ ,k 代表第幾個 child process
- daemon
- 當結束時,是否隨 parent process 結束
- 需在 start() 前設定
- pid
- exitcode
- 終止的 code
- child 未結束時仍是 None
- authkey
- parent process 的 身份驗證 key
- sentinel
- 方法
- run()
- start()
- join([timeout])
- 持續等待直到 child process 完成
- process 不該呼叫自已的 join,會產生 deadlock
- timeout (秒)
- is_alive()
- terminate()
import multiprocessing, time, signal
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# 建立
p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # Process(Process-1, initial)> False
# 啟動
p.start()
print(p, p.is_alive()) # Process(Process-1, started)> True
# 終止
p.terminate()
# 等一下,以確定終止完成
time.sleep(0.1)
print(p, p.is_alive()) # Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # True
# 輸出結果
# <Process(Process-1, initial)> False
# <Process(Process-1, started)> True
# <Process(Process-1, stopped[SIGTERM])> False
# True
class multiprocessing.Queue([maxsize])
- 多 process 溝通的建議首選
- 參數
- maxsize
- queue 的大小
- 設為 5 表示可放入 5 個 objects
- 方法
- 因為 multiprocessing 的關係,跟數量有關係的都只是預估值,並不可靠
- qsize()
- empty()
- full()
- put(obj[, block=True[, timeout=None]])
- put_nowait(obj)
- get([block=True[, timeout=None]])
- get_nowait()
- close()
- join_thread()
- 確認所有的 data 都已被清除,再退出 process
- 必須使用在 close() 後面
- cancel_join_thread()
- 不清除 data 直接退出 process
- 必須使用在 close() 後面
from multiprocessing import Queue
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
# 大小設定為 5 個 objects
q = Queue(5)
print(q.empty()) # True
print(q.qsize()) # 0
q.put([1,2,3], False)
q.put(1, False)
q.put('123', False)
q.put({1,2,3}, False)
q.put({'list':[1,2,3], 'dict':{'1':1, '2':2}}, False)
print(q.qsize()) # 5
print(q.full()) # True
# 關閉,並等到清除資料後才退出 process
q.close()
q.join_thread()
multiprocessing.Pipe([duplex=True])
- 回傳 Connection objects (conn1, conn2)
- 參數
class multiprocessing.Connection
- 方法
- fileno()
- 回傳 file descriptor or handle used by the connection.
- close()
- poll([timeout])
- send_bytes(buffer[, offset[, size]])
- 傳送 bytes
- buffer
- offset
- size
- recv_bytes([maxlength])
- 持續等待直到接收到 bytes
- maxlength
- recv_bytes_into(buffer[, offset])
- 持續等待直到接收到 bytes 並放入 buffer
from multiprocessing import Pipe
a, b = Pipe()
a.send([1, 'hello', None])
print(b.poll()) # True
print(b.recv()) #[1, 'hello', None]
b.send_bytes(b'thank you')
print(a.recv_bytes()) # b'thank you'
import array
# 建立 array,格式為 byte
arr1 = array.array('i', range(5))
arr2 = array.array('i', [0] * 10)
a.send_bytes(arr1)
# 從第三個開始填入
count = b.recv_bytes_into(arr2, 3*arr2.itemsize)
# 確認收到的 bytes 一致
assert count == len(arr1) * arr1.itemsize
print(arr2) # array('b', [0, 0, 0, 0, 1, 2, 3, 4, 0, 0])
class multiprocessing.pool.Pool
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- 參數
- porcesses
- 執行 process 的上限
- 預設為 cpu 個數
- initializer
- 初始函數
- 每個 process 啟動前呼叫 initializer(*initargs) 初始化
- initargs
- maxtasksperchild
- 完成多少數量的任務後需重啟 process
- 避免運行時間很長的工作進程消耗太多的系統資源
from multiprocessing import Pool
import os
def f():
print("PID: %d" % os.getpid())
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
pool = Pool(1, maxtasksperchild=2)
for _ in range(5):
pool.apply_async(f)
# 關閉 pool
pool.close()
# 需關閉後才能 join()
# 不加此行,會看不到結果,因為會隨 parent process 關閉
pool.join()
# 輸出結果
# PID: 5028
# PID: 5028
# PID: 4284
# PID: 4284
# PID: 5712
context
方法
- 回傳的物件
- blocking
- 即是目標函數所回傳的物件
內部程式碼為 return multiprocessing.pool.AsyncResult.get()
因為 get() 故為 blocking
- nonblocking
- 即是 multiprocessing.pool.AsyncResult
內部程式碼為 return multiprocessing.pool.AsyncResult
- apply(func[, args[, kwds]])
- 建立 child process 並執行
- blocking,需等到完成才會進行下一步
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 建立 child process 並執行
- nonblocking,直接進行下一步
- callback
- error_callback
from multiprocessing import Pool
import traceback
def f(x):
return x*x
def show(x):
print(x+1, 'In show')
def handle_error(e):
traceback.print_exception(type(e), e, e.__traceback__)
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
with Pool() as pool:
result = pool.apply_async(f, (10,), callback=show, error_callback=handle_error)
print(result.get(timeout=1))
# 輸出結果
# 101 In show
# 100
map(func, iterable[, chunksize])
- 同內建的 map,但只支援單一參數的 function
- blocking,直到完成才會進行下一步
- chunksize
from multiprocessing import Pool
import time
def f(x):
# 稍微暫停才看得出差異
time.sleep(0.1)
print(x)
if __name__ == '__main__':
pool = Pool(2)
# chunksize 若改為 20,則會按順序輸出,因同時處理 20筆
pool.map(f, range(20), chunksize=10)
# 輸出結果,因個別交錯輸出
# 0
# 10
# 1
# 11
# 2
# 12
# 3
# 13
# 4
# 14
# 5
# 15
# 6
# 16
# 7
# 17
# 8
# 18
# 9
# 19
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 參數同 apply_async & map
- nonblocking
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])
- 類似 map,但傳入的參數視為 *x
- blocking,直到完成才會進行下一步
from multiprocessing import Pool
import time
def f(x, y):
print(x, y)
if __name__ == '__main__':
pool = Pool()
pool.starmap(f, zip(range(20), range(20,40)), chunksize=1)
# 輸出結果
# 0 20
# 1 21
# 2 22
# 3 23
# 4 24
# 5 25
# 6 26
# 7 27
# 8 28
# 10 30
# 11 31
# 9 29
# 12 32
# 13 33
# 15 35
# 14 34
# 16 36
# 17 37
# 18 38
# 19 39
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
- 參數同 apply_async & map
- nonblocking,直接執行下一步
close()
- 等到所有任務完成,process 將退出,並關閉 Pool
terminate()
join()
- 等待 child process 結束
- 需先呼叫 close() 或 terminate()
class multiprocessing.pool.AsyncResult
- 得 nonblocking 的回傳結果,例:Pool.apply_async() 和 Pool.map_async()
- 方法
- get([timeout])
- wait([timeout])
- ready()
- successful()
- 同 ready(),但未完成會 raise AssertionError
Logging
- debug 使用
- multiprocessing.get_logger()
- multiprocessing.log_to_stderr()
- 呼叫 get_logger() 並回傳 logger
- 同時加入處理程序,將訊息發送到 sys.stderr
格式 '[%(levelname)s /%(processName)s] %(message)s'
import multiprocessing, logging
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# 因 window spawn 的緣故
if __name__ == '__main__':
m = multiprocessing.Manager()
del m
# 輸出結果
# [INFO/SyncManager-1] child process calling self.run()
# [INFO/SyncManager-1] child process calling self.run()
# [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
# [INFO/MainProcess] sending shutdown message to manager
# [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
# [INFO/SyncManager-1] process shutting down
# [INFO/SyncManager-1] process shutting down
# [INFO/SyncManager-1] process exiting with exitcode 0
# [INFO/SyncManager-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
參考
Python 標準庫10 多進程初步 (multiprocessing包)
python 中的 Queue 與多進程(multiprocessing)
正确使用 Multiprocessing 的姿势
Python 多进程编程
multiprocessing--多線程
留言
張貼留言