[Python] multiprocessing 基本教學

程式語言:Python
Package:multiprocessing
官方文件

功能:並行處理
因 GIL (CPython) 緣故,multithread 需用 multiprocess 取代,可參考以下文章
Python 最難的問題
Python 的 GIL 是什么鬼,多线程性能究竟如何

注意事項
  • window 程式需在 if __name__ == '__main__': 之內運行
    不然會有 RuntimeError,甚至莫名的錯誤
    因 window 沒有 fork,所以執行時是採用 spawn,使用 runpy 實現
    所以每個 child process 會在一開始 import 原先的 module 再執行對應的 function
    參考 multiprocessing - Windows
    指定產生 process 的方式- Contexts and start methods
    from multiprocessing import Pool
    import sys
    import os
    
    
    def f(x):
        return x
    
    
    if __name__ == '__main__':
        # 依 CPU 數量建立 child process
        pool = Pool()
        pool.map(f, range(10))
    
    # 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
    try:
        print('pid:{:4d} stdout:{:8d}, __name__:{:10s}, importedBy:{}'.format(os.getpid(), id(sys.stdout), __name__, sys._getframe(1).f_globals.get('__name__')))
    except ValueError:
        print('pid:{:4d} stdout:{:8d}, __name__:{:10s}'.format(os.getpid(), id(sys.stdout), __name__))
        
        
    # 輸出結果
    # pid:5908 stdout: 4703408, __name__:__mp_main__, importedBy:runpy
    # pid:2296 stdout: 6866096, __name__:__mp_main__, importedBy:runpy
    # pid:2152 stdout:21546160, __name__:__main__
    
  • print 視需求加上 flush=True 強制刷新
    不然最後程式結束強制 flush 只是殘存在任一 sys.stdout 的東西
  • from multiprocessing import Pool
    import time
    import sys
    
    
    def f(x):
        time.sleep(0.1)
        # 無換行,所以會先暫存在 sys.stdout,等待 flush
        print(x, end=" ")
        return x
    
    
    def g(x):
        if x >=0 :
            print(":g", id(sys.stdout), x)
        return x
    
        
    if __name__ == '__main__':
        # 可加入 maxtasksperchild 限定,因 process 結束時也會 flush sys.stdout
        pool = Pool()
        print(pool.map(f, range(10)))
        
        # 加上 close() & join() 可清空所有 sys.stdout
        # pool.close()
        # pool.join()    
        
        # 此行可強制輸出 sys.stdout 內的東西
        # 因換行代表強制 flush
        # print(pool.map(g, range(6)))
        
    # 每個 process 可能有不同的 sys.stdout
    # 因不在 '__main__' 中,所以 import 時每個 child process 都會執行
    print("stdout:", id(sys.stdout))
    
    # 輸出結果
    # stdout: 7324848
    # stdout: 4768944
    # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    # stdout: 4900016
    # 0 1 2 3 6 7
    

建立 Process

Process
import time
from multiprocessing import Process, Lock, freeze_support, set_start_method

def f(lock, i):
    print('{:2d} Stop'.format(i))
    # 暫停 1s,這邊是為了看出其他 child process Stop 的效果,才暫停的
    time.sleep(1)    
    # block 直到上一個 child process 解鎖
    lock.acquire()
    try:
        print('{:2d} Start'.format(i))
    finally:
        # 必做解鎖,不然會影響到下一個 child process
        lock.release()

        
# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
    # 在 window 中,像 PyInstaller 為了產生執行檔
    # multiprocessing 會被禁止,導致無法執行,出現 RuntimeError
    # 需在 start 前加入此 function,只在 window 有效,其餘無作用
    # 且在 __name__ == '__main__' 之後
    freeze_support()
    # 指定產生 process 的方式,讓不同平台的表現一致
    set_start_method('spawn')
    lock = Lock()

    for num in range(10):
        # 建立 child process
        p = Process(target=f, args=(lock, num))        
        # 開始執行 child process
        p.start()
        # blocking parent process 直到 child process 返回
        # 等待 0.1s,可為無參數,則表示永久等待
        p.join(timeout=0.1)
        
# 輸出結果,每次結果都不一樣
# 可以看到並不是連續的,這是因為 child process 建立有快有慢
# 且搶到 key 的,也不一定是按 stop 的順序,需注意
# 0 Stop  
# 1 Stop  
# 3 Stop  
# 2 Stop  
# 5 Stop  
# 4 Stop  
# 0 Start 
# 1 Start 
# 7 Stop  
# 2 Start 
# 3 Start 
# 9 Stop  
# 8 Stop  
# 6 Stop  
# 5 Start 
# 4 Start 
# 7 Start 
# 9 Start 
# 8 Start 
# 6 Start 

Pool
多個 child process 建議用法
import os
import time
import traceback
from multiprocessing import Pool


def handle_error(e):
    '''處理 child process 的錯誤,不然 code 寫錯時,不會回報任何錯誤'''
    traceback.print_exception(type(e), e, e.__traceback__)
    
    
def long_time_task(name):
    print('任務 {} ({}) 開始'.format(name, os.getpid()))
    start = time.time()
    time.sleep(3)
    end = time.time()
    print('任務 {} 執行 {:0.2f} seconds.'.format(name, (end - start)))


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__=='__main__':
    print('Parent process {}.'.format(os.getpid()))
    # 指定建立 child process 的數量,若無指定,預設為 cpu 數量
    with Pool(processes=3) as p:
        for i in range(5):
            # 建立 child process 並執行,error_callback 必須指定,不然很難 debug
            p.apply_async(long_time_task, args=(i,), error_callback=handle_error)
        print('等待所有 child processes 完成')
        # 關掉 pool 不再加入任何 child process
        p.close()
        # 調用 join() 之前必須先調用close()
        p.join()
    print('所有 child processes 完成')

# 執行結果    
# Parent process 6072.      
# 等待所有 child processes 完成   
# 任務 0 (5192) 開始            
# 任務 1 (6040) 開始            
# 任務 2 (4576) 開始            
# 任務 0 執行 3.01 seconds.     
# 任務 3 (5192) 開始            
# 任務 1 執行 3.01 seconds.     
# 任務 4 (6040) 開始            
# 任務 2 執行 3.01 seconds.     
# 任務 3 執行 3.00 seconds.     
# 任務 4 執行 3.00 seconds.     
# 所有 child processes 完成   

回傳資料

Pool
from multiprocessing import Pool, TimeoutError
import time


def f(x):
    return x*x


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行    
if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # 建立 child process 並執行
        resA = pool.apply_async(f, (20,))

        # get timeout
        resB = pool.apply_async(time.sleep, (10,))
        try:
            print(resB.get(timeout=1))
        except TimeoutError:
            print("得 resB 值超時")
            
        print("pool 此時仍可使用")
        
    # 已跳出 with,故 pool 已被 close
    print("pool 已不可使用")
    # 得到回傳值
    print('resA', resA.get(timeout=1)) # resA 400

資料交換

資料不會被更改,只是單純的再複製一份
Queues
from multiprocessing import Process, Queue


def f(q):
    q.put([42, None, 'hello'])
    q.put(2)


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
    # FIFO
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # [42, None, 'hello']
    print(q.get())    # 2
    p.join()

Pipes
from multiprocessing import Process, Pipe


def f(conn):
    print('In f')
    # 從 a 接收資料
    print(conn.recv())   # 2
    # 送出資料給 a
    conn.send([42, None, 'hello'])
    # 關閉 child process 的 b 端口,但不影響 parenet process 的 b 端口
    conn.close()
    print('b_conn close', conn.closed)
    print('Out f')


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行    
if __name__ == '__main__':
    # 會回傳兩個端點,如同水管的兩端,可互相溝通
    a_conn, b_conn = Pipe()
    # 送出資料給 b
    a_conn.send(2)
    # 建立 child process
    p = Process(target=f, args=(b_conn,))
    # 執行 child process
    p.start()
    # 從 b 接收資料
    print(a_conn.recv())   # [42, None, 'hello']
    # 等待直到 child process 完成
    p.join()
    # 在 child process 關閉,並不影響 parent process 的端口
    print('b_conn close', b_conn.closed)
    

# 輸出結果
# In f
# 2
# b_conn close True
# Out f
# [42, None, 'hello']
# b_conn close False

資料分享

資料會被更改
Shared memory
from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i, x in enumerate(a):
        a[i] = -x


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行        
if __name__ == '__main__':
    num = Value('d', 0.0) # 0.0
    arr = Array('i', range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    print(num.value)
    print(arr[:])
    
    # 建立 child process
    p = Process(target=f, args=(num, arr))
    # 執行 child process
    p.start()
    # 等待直到 child process 完成
    p.join()

    print(num.value) # 3.1415927
    print(arr[:]) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Server process
速度較慢,但可通過網路分享
from multiprocessing import Process, Manager


def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        print(d) # {}
        print(l) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

        # 建立 child process
        p = Process(target=f, args=(d, l))
        # 執行 child process
        p.start()
        # 等待直到 child process 完成
        p.join()

        print(d) # {0.25: None, 1: '1', '2': 2}
        print(l) # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

class multiprocessing.Process

lass multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 參數
    • group
      • 必須為 None
    • target
      • 目標函數
    • name
      • process 的名字
    • args
      • 傳入的參數
    • kwargs
      • 傳入的 dict 參數
    • daemon
      • True
        • parent process 結束時,同時終止所有 child process
  • 屬性
    • name
      • child process 名字,只單純用來給人看
      • 預設為 ‘Process-N1:N2:...:Nk‘ ,k 代表第幾個 child process
    • daemon
      • 當結束時,是否隨 parent process 結束
      • 需在 start() 前設定
    • pid
      • process ID
    • exitcode
      • 終止的 code
      • child 未結束時仍是 None
    • authkey
      • parent process 的 身份驗證 key
    • sentinel
      • 數字表示的句柄。可以使用在系統等待函數裡
  • 方法
    • run()
      • 運行的主程式
    • start()
      • 啟動,並呼叫 run()
    • join([timeout])
      • 持續等待直到 child process 完成
      • process 不該呼叫自已的 join,會產生 deadlock
      • timeout (秒)
    • is_alive()
      • process 是否存活
    • terminate()
      • 終止 process
      • 不準在以下情況終止
        • 正在操作資料
        • 未解鎖就終止
import multiprocessing, time, signal


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
    # 建立
    p = multiprocessing.Process(target=time.sleep, args=(1000,))
    print(p, p.is_alive()) # Process(Process-1, initial)> False

    # 啟動
    p.start()
    print(p, p.is_alive()) # Process(Process-1, started)> True

    # 終止
    p.terminate()
    # 等一下,以確定終止完成
    time.sleep(0.1)
    print(p, p.is_alive()) # Process(Process-1, stopped[SIGTERM])> False
    print(p.exitcode == -signal.SIGTERM) # True

# 輸出結果
# <Process(Process-1, initial)> False
# <Process(Process-1, started)> True
# <Process(Process-1, stopped[SIGTERM])> False
# True

class multiprocessing.Queue([maxsize])

  • 多 process 溝通的建議首選
  • 參數
    • maxsize
      • queue 的大小
      • 設為 5 表示可放入 5 個 objects
  • 方法
    • 因為 multiprocessing 的關係,跟數量有關係的都只是預估值,並不可靠
    • qsize()
      • 目前的 objects 數量
    • empty()
      • 是否為空
    • full()
      • 是否已滿
    • put(obj[, block=True[, timeout=None]])
      • obj
        • 放入的物件
      • block
        • 持續等待直到物件被取走
      • Timeout
        • 等待時間(秒)
    • put_nowait(obj)
      • 等同 put(obj, False)
    • get([block=True[, timeout=None]])
      • block
        • 持續等待直到物件可領取
      • Timeout
        • 等待時間(秒)
    • get_nowait()
      • 等同 get(False)
    • close()
      • 關閉 queue
    • join_thread()
      • 確認所有的 data 都已被清除,再退出 process
      • 必須使用在 close() 後面
    • cancel_join_thread()
      • 不清除 data 直接退出 process
      • 必須使用在 close() 後面
from multiprocessing import Queue


# 因 window spawn 的緣故
# 必須在 __name__ == '__main__' 之內執行
if __name__ == '__main__':
    # 大小設定為 5 個 objects
    q = Queue(5)
    print(q.empty()) # True
    print(q.qsize()) # 0
    
    q.put([1,2,3], False)
    q.put(1, False)
    q.put('123', False)
    q.put({1,2,3}, False)
    q.put({'list':[1,2,3], 'dict':{'1':1, '2':2}}, False)
    
    print(q.qsize()) # 5
    print(q.full()) # True
    
    # 關閉,並等到清除資料後才退出 process
    q.close()
    q.join_thread()

multiprocessing.Pipe([duplex=True])

  • 回傳 Connection objects (conn1, conn2)
  • 參數
    • duplex
      • 是否為雙向
      • False
        • conn1 只能接收
        • conn2 只能傳送

class multiprocessing.Connection

  • 方法
    • recv()
      • 持續等待直到接收到 object
    • fileno()
      • 回傳 file descriptor or handle used by the connection.
    • close()
      • 關閉連接
    • poll([timeout])
      • 是否有任何資料可讀取
      • timeout (秒)
    • send_bytes(buffer[, offset[, size]])
      • 傳送 bytes
      • buffer
        • bytes-like object
      • offset
        • 從第幾個 byte 開始傳送
      • size
        • 限制傳送的 size
    • recv_bytes([maxlength])
      • 持續等待直到接收到 bytes
      • maxlength
        • 最大長度,需大於接受的長度
    • recv_bytes_into(buffer[, offset])
      • 持續等待直到接收到 bytes 並放入 buffer
      • buffer
        • 放入的 buffer
      • offset
        • 從第幾個 byte 開始填入
from multiprocessing import Pipe


a, b = Pipe()
a.send([1, 'hello', None])
print(b.poll()) # True
print(b.recv()) #[1, 'hello', None]
b.send_bytes(b'thank you')
print(a.recv_bytes()) # b'thank you'

import array
# 建立 array,格式為 byte
arr1 = array.array('i', range(5))
arr2 = array.array('i', [0] * 10)
a.send_bytes(arr1)
# 從第三個開始填入
count = b.recv_bytes_into(arr2, 3*arr2.itemsize)
# 確認收到的 bytes 一致
assert count == len(arr1) * arr1.itemsize
print(arr2) # array('b', [0, 0, 0, 0, 1, 2, 3, 4, 0, 0])

class multiprocessing.pool.Pool

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • 參數
    • porcesses
      • 執行 process 的上限
      • 預設為 cpu 個數
    • initializer
      • 初始函數
      • 每個 process 啟動前呼叫 initializer(*initargs) 初始化
    • initargs
      • 初始函數的參數
    • maxtasksperchild
      • 完成多少數量的任務後需重啟 process
      • 避免運行時間很長的工作進程消耗太多的系統資源
      from multiprocessing import Pool
      import os
      
      
      def f():
          print("PID: %d" % os.getpid())
          
      
      # 因 window spawn 的緣故
      # 必須在 __name__ == '__main__' 之內執行
      if __name__ == '__main__':
          pool = Pool(1, maxtasksperchild=2)
          
          for _ in range(5):
              pool.apply_async(f)
          
          # 關閉 pool
          pool.close()
          # 需關閉後才能 join()
          # 不加此行,會看不到結果,因為會隨 parent process 關閉
          pool.join()
      
      # 輸出結果
      # PID: 5028
      # PID: 5028
      # PID: 4284
      # PID: 4284
      # PID: 5712
      
    • context
      • 可用來定義 context
  • 方法
    • 回傳的物件
      • blocking
        • 即是目標函數所回傳的物件
          內部程式碼為 return multiprocessing.pool.AsyncResult.get()
          因為 get() 故為 blocking
      • nonblocking
        • 即是 multiprocessing.pool.AsyncResult
          內部程式碼為 return multiprocessing.pool.AsyncResult
    • apply(func[, args[, kwds]])
      • 建立 child process 並執行
      • blocking,需等到完成才會進行下一步
    • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
      • 建立 child process 並執行
      • nonblocking,直接進行下一步
      • callback
        • 處理回傳資料的函數
        • 必須夠快,不然會影響運作
      • error_callback
        • 處理錯誤的函數
      from multiprocessing import Pool
      import traceback
      
      
      def f(x):
          return x*x
      
          
      def show(x):
          print(x+1, 'In show')
      
          
      def handle_error(e):
          traceback.print_exception(type(e), e, e.__traceback__)
          
      
      # 因 window spawn 的緣故
      # 必須在 __name__ == '__main__' 之內執行    
      if __name__ == '__main__':
          with Pool() as pool:
              result = pool.apply_async(f, (10,), callback=show, error_callback=handle_error)
              print(result.get(timeout=1))
      
              
      # 輸出結果
      # 101 In show
      # 100
      
    • map(func, iterable[, chunksize])
      • 同內建的 map,但只支援單一參數的 function
      • blocking,直到完成才會進行下一步
      • chunksize
        • 一次處理的數目
      from multiprocessing import Pool
      import time
      
      
      def f(x):
          # 稍微暫停才看得出差異
          time.sleep(0.1)
          print(x)
      
          
      if __name__ == '__main__':
          pool = Pool(2)
          # chunksize 若改為 20,則會按順序輸出,因同時處理 20筆
          pool.map(f, range(20), chunksize=10)
          
      
      # 輸出結果,因個別交錯輸出
      # 0
      # 10
      # 1
      # 11
      # 2
      # 12
      # 3
      # 13
      # 4
      # 14
      # 5
      # 15
      # 6
      # 16
      # 7
      # 17
      # 8
      # 18
      # 9
      # 19
      
    • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
      • 參數同 apply_async & map
      • nonblocking
    • imap(func, iterable[, chunksize])
      • iterator 版的 map
      • blocking
    • imap_unordered(func, iterable[, chunksize])
      • 無序的 imap
      • blocking
    • starmap(func, iterable[, chunksize])
      • 類似 map,但傳入的參數視為 *x
      • blocking,直到完成才會進行下一步
      from multiprocessing import Pool
      import time
      
      
      def f(x, y):
          print(x, y)
      
          
      if __name__ == '__main__':
          pool = Pool()
          pool.starmap(f, zip(range(20), range(20,40)), chunksize=1)
      
          
      # 輸出結果
      # 0 20 
      # 1 21 
      # 2 22 
      # 3 23 
      # 4 24 
      # 5 25 
      # 6 26 
      # 7 27 
      # 8 28 
      # 10 30
      # 11 31
      # 9 29 
      # 12 32
      # 13 33
      # 15 35
      # 14 34
      # 16 36
      # 17 37
      # 18 38
      # 19 39
      
    • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
      • 參數同 apply_async & map
      • nonblocking,直接執行下一步
    • close()
      • 等到所有任務完成,process 將退出,並關閉 Pool
    • terminate()
      • 立刻終止,無需等待任務完成
    • join()
      • 等待 child process 結束
      • 需先呼叫 close() 或 terminate()

class multiprocessing.pool.AsyncResult

  • 得 nonblocking 的回傳結果,例:Pool.apply_async() 和 Pool.map_async()
  • 方法
    • get([timeout])
      • 持續等待直到得到回傳的值
      • timeout (秒)
    • wait([timeout])
      • 持續等待直到回傳的值可用
    • ready()
      • child process 是否已完成
    • successful()
      • 同 ready(),但未完成會 raise AssertionError

Logging

  • debug 使用
  • multiprocessing.get_logger()
    • 回傳 logger
  • multiprocessing.log_to_stderr()
    • 呼叫 get_logger() 並回傳 logger
    • 同時加入處理程序,將訊息發送到 sys.stderr
      格式 '[%(levelname)s /%(processName)s] %(message)s'
import multiprocessing, logging


logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)

# 因 window spawn 的緣故
if __name__ == '__main__':
    m = multiprocessing.Manager()
    del m
    
# 輸出結果
# [INFO/SyncManager-1] child process calling self.run()
# [INFO/SyncManager-1] child process calling self.run()
# [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
# [INFO/MainProcess] sending shutdown message to manager
# [INFO/SyncManager-1] manager serving at '\\\\.\\pipe\\pyc-5984-0-ekn9ajo6'
# [INFO/SyncManager-1] process shutting down
# [INFO/SyncManager-1] process shutting down
# [INFO/SyncManager-1] process exiting with exitcode 0
# [INFO/SyncManager-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down

參考

Python 標準庫10 多進程初步 (multiprocessing包)
python 中的 Queue 與多進程(multiprocessing)
正确使用 Multiprocessing 的姿势
Python 多进程编程
multiprocessing--多線程

留言