[Python] multiprocessing Pool() 用法

由於有想把sqlite資料庫切割開來,同時存取多個資料庫的想法,
所以就開始來研究multiprocessing的用法

我主要參考的是這篇,裡面提到的非常多,搜尋multiprocessing就好,
即便是multiprocessing也有很多用法,我有試過Pool()跟Process(),

但由於我需要抓回傳值,我也沒有需要晚點執行process的需求,
所以Pool()的方法比較適合我,所以下面都只談Pool(),

首先是Pool().map()的用法,我這邊只是單純轉一手,
用法就是把參數變成一個list,就可以同時執行同個function多個參數,
function我都擺在taiwan_xbrl_common.py裡面,

function的內容:

def pool_map(function_name, arguments_list, processes = multiprocessing.cpu_count()):
    #
    # Description:
    #   透過 multiprocessing.Pool().map 去同時跑多個 process
    #   Pool(processes = cpu_count())
    #   沒有給初始值預設是給 cpu 數量的執行緒個數
    #
    #   平常使用上要用 [] 框起來 return_values 才能抓到所有的值
    #   不然會只有存到最後一個值 但函數的回傳值本來就是 list 形式
    #   return_values = [Pool().map(function_name, arguments_list)]
    #
    # Arguments:
    #   function_name: 要同時執行多個執行緒的 function 名稱
    #                  只接收只有一個參數的 function
    #   arguments_list: 要丟進 function 裡面的唯一參數組合而成的 list
    #
    # Returns:
    #   回傳每個執行緒的 return value list 集合
    #
    return multiprocessing.Pool(processes).map(function_name, arguments_list)
使用function的方式:

import taiwan_xbrl_common as common

arguments_list1 = ['2009Q1', '2009Q2', '2009Q3', '2009Q4', \
                  '2010Q1', '2010Q2', '2010Q3', '2010Q4', \
                  '2011Q1', '2011Q2', '2011Q3', '2011Q4', \
                  '2012Q1', '2012Q2', '2012Q3', '2012Q4', \
                  '2013Q1', '2013Q2', '2013Q3', '2013Q4', \
                  '2014Q1', '2014Q2', '2014Q3', '2014Q4', \
                  '2015Q1', '2015Q2', '2015Q3', '2015Q4', \
                  '2016Q1', '2016Q2', '2016Q3', '2016Q4',]

def test1(arg1):
    print(current_process().name)
    return arg1 + "test"

result1 = common.pool_map(test1, arguments_list1)
print(result1)
執行結果:

SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-2
SpawnPoolWorker-3
SpawnPoolWorker-2
SpawnPoolWorker-3
['2009Q1test', '2009Q2test', '2009Q3test', '2009Q4test', '2010Q1test', '2010Q2te
st', '2010Q3test', '2010Q4test', '2011Q1test', '2011Q2test', '2011Q3test', '2011
Q4test', '2012Q1test', '2012Q2test', '2012Q3test', '2012Q4test', '2013Q1test', '
2013Q2test', '2013Q3test', '2013Q4test', '2014Q1test', '2014Q2test', '2014Q3test
', '2014Q4test', '2015Q1test', '2015Q2test', '2015Q3test', '2015Q4test', '2016Q1
test', '2016Q2test', '2016Q3test', '2016Q4test']

由於程式碼很簡單,所以也不太會用到不同的process去執行,
接下來就看看Pool().apply_async()怎麼實現不定數量參數去執行多process,
而且還要把所有resault全接回來,Pool(),map()只能傳一個參數,
Process()又還要透過Pipe()傳接參數,實在不是很單純,
我也不想改要跑的function還要在裡面傳接,
所以似乎就只有Pool().apply_async()比較實用了,

下面就直接先來看看function內容:

def pool_apply_async(function_name, *arguments_lists, processes = multiprocessing.cpu_count()):
    #
    # Description:
    #   透過 multiprocessing.Pool().apply_async 去同時跑多個 process
    #   Pool() 沒有給初始值預設是給 cpu 數量的 process 個數
    #   參數透過迴圈加進 Tuple 傳給 args
    #   process 回傳的 Queue list 個別提取出來再轉成 list 回傳
    #
    # Arguments:
    #   function_name: 要同時執行多個執行緒的 function 名稱
    #   *arguments_lists: 不定個數的參數 可同時丟入多個參數 list
    #                     每個參數 list 的內容數量要相同
    #
    # Returns:
    #   回傳每個執行緒的 return value list 集合
    #   假如有產生錯誤就回傳 None
    #
    argument_count = len(arguments_lists) # 參數個數
    each_argument_count = len(arguments_lists[0]) # 每個參數 list 的數量
    queue_list = [] # 建立一個空白參數 list
    for argument_number in range(argument_count):
        if each_argument_count != len(arguments_lists[argument_number]):# 每個參數的數量應該都要相同
            print("Every argument list length not sycn!")
            return None
        queue_list += 'None' # 先丟個隨意的字串進 list 佔位置 因為 Queue元素不能直接串接產生
        queue_list[argument_number] = multiprocessing.Queue() # 把 Queue 元件丟進 list
        for argument in arguments_lists[argument_number]: # 把所有參數丟進 Queue 裡面
            queue_list[argument_number].put(argument)
    # 產生一個 Pool() 物件 並指定產生的 process 個數 預設為 CPU 個數
    pool = multiprocessing.Pool(processes)
    # 同一個 Pool() 物件在一個時間同時處理設定個數的 process 其餘在 Queue 中等待
    # 假如宣告多個 Pool() 物件會造成同一時間執行過多 process 反而造成效率低落
    # 參數是透過迴圈來把多個參數 Queue 個別 get 出來變成 Tuple 傳進 args
    # 回傳值則是透過 [] 把所有回傳的 Queue list 起來
    values = [pool.apply_async(function_name, args = ([queue_list[argument_number].get() for argument_number in range(argument_count)])) for index in range(each_argument_count)]
    # 回傳的是 Queue list 需要個別 get 出再用 [] list 起來
    return [value.get() for value in values]
使用function的方式:

import taiwan_xbrl_common as common

arguments_list1 = ['2009Q1', '2009Q2', '2009Q3', '2009Q4', \
                  '2010Q1', '2010Q2', '2010Q3', '2010Q4', \
                  '2011Q1', '2011Q2', '2011Q3', '2011Q4', \
                  '2012Q1', '2012Q2', '2012Q3', '2012Q4', \
                  '2013Q1', '2013Q2', '2013Q3', '2013Q4', \
                  '2014Q1', '2014Q2', '2014Q3', '2014Q4', \
                  '2015Q1', '2015Q2', '2015Q3', '2015Q4', \
                  '2016Q1', '2016Q2', '2016Q3', '2016Q4',]

arguments_list2 = ['2009Q1', '2009Q2', '2009Q3', '2009Q4', \
                  '2010Q1', '2010Q2', '2010Q3', '2010Q4', \
                  '2011Q1', '2011Q2', '2011Q3', '2011Q4', \
                  '2012Q1', '2012Q2', '2012Q3', '2012Q4', \
                  '2013Q1', '2013Q2', '2013Q3', '2013Q4', \
                  '2014Q1', '2014Q2', '2014Q3', '2014Q4', \
                  '2015Q1', '2015Q2', '2015Q3', '2015Q4', \
                  '2016Q1', '2016Q2', '2016Q3', '2016Q4',]

arguments_list3 = ['2009Q1', '2009Q2', '2009Q3', '2009Q4', \
                  '2010Q1', '2010Q2', '2010Q3', '2010Q4', \
                  '2011Q1', '2011Q2', '2011Q3', '2011Q4', \
                  '2012Q1', '2012Q2', '2012Q3', '2012Q4', \
                  '2013Q1', '2013Q2', '2013Q3', '2013Q4', \
                  '2014Q1', '2014Q2', '2014Q3', '2014Q4', \
                  '2015Q1', '2015Q2', '2015Q3', '2015Q4', \
                  '2016Q1', '2016Q2', '2016Q3', '2016Q4',]

arguments_list4 = ['2009Q1', '2009Q2', '2009Q3', '2009Q4', \
                  '2010Q1', '2010Q2', '2010Q3', '2010Q4', \
                  '2011Q1', '2011Q2', '2011Q3', '2011Q4', \
                  '2012Q1', '2012Q2', '2012Q3', '2012Q4', \
                  '2013Q1', '2013Q2', '2013Q3', '2013Q4', \
                  '2014Q1', '2014Q2', '2014Q3', '2014Q4', \
                  '2015Q1', '2015Q2', '2015Q3', '2015Q4', \
                  '2016Q1', '2016Q2', '2016Q3', '2016Q4',]

def test2(arg1, arg2):
    print(current_process().name)
    return arg1 + arg2

def test3(arg1, arg2, arg3):
    print(current_process().name)
    return arg1 + arg2 + arg3

def test4(arg1, arg2, arg3, arg4):
    print(current_process().name)
    return arg1 + arg2 + arg3 + arg4

result2 = common.pool_apply_async(test2, arguments_list1, arguments_list2)
print(result2)
result3 = common.pool_apply_async(test3, arguments_list1, arguments_list2, arguments_list3)
print(result3)
result4 = common.pool_apply_async(test4, arguments_list1, arguments_list2, arguments_list3, arguments_list4)
print(result4)
執行結果:

SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
SpawnPoolWorker-1
['2009Q12009Q1', '2009Q22009Q2', '2009Q32009Q3', '2009Q42009Q4', '2010Q12010Q1'
 '2010Q22010Q2', '2010Q32010Q3', '2010Q42010Q4', '2011Q12011Q1', '2011Q22011Q2'
 '2011Q32011Q3', '2011Q42011Q4', '2012Q12012Q1', '2012Q22012Q2', '2012Q32012Q3'
 '2012Q42012Q4', '2013Q12013Q1', '2013Q22013Q2', '2013Q32013Q3', '2013Q42013Q4'
 '2014Q12014Q1', '2014Q22014Q2', '2014Q32014Q3', '2014Q42014Q4', '2015Q12015Q1'
 '2015Q22015Q2', '2015Q32015Q3', '2015Q42015Q4', '2016Q12016Q1', '2016Q22016Q2'
 '2016Q32016Q3', '2016Q42016Q4']
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
SpawnPoolWorker-5
['2009Q12009Q12009Q1', '2009Q22009Q22009Q2', '2009Q32009Q32009Q3', '2009Q42009Q
2009Q4', '2010Q12010Q12010Q1', '2010Q22010Q22010Q2', '2010Q32010Q32010Q3', '201
Q42010Q42010Q4', '2011Q12011Q12011Q1', '2011Q22011Q22011Q2', '2011Q32011Q32011Q
', '2011Q42011Q42011Q4', '2012Q12012Q12012Q1', '2012Q22012Q22012Q2', '2012Q3201
Q32012Q3', '2012Q42012Q42012Q4', '2013Q12013Q12013Q1', '2013Q22013Q22013Q2', '2
13Q32013Q32013Q3', '2013Q42013Q42013Q4', '2014Q12014Q12014Q1', '2014Q22014Q2201
Q2', '2014Q32014Q32014Q3', '2014Q42014Q42014Q4', '2015Q12015Q12015Q1', '2015Q22
15Q22015Q2', '2015Q32015Q32015Q3', '2015Q42015Q42015Q4', '2016Q12016Q12016Q1',
2016Q22016Q22016Q2', '2016Q32016Q32016Q3', '2016Q42016Q42016Q4']
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
SpawnPoolWorker-12
['2009Q12009Q12009Q12009Q1', '2009Q22009Q22009Q22009Q2', '2009Q32009Q32009Q3200
Q3', '2009Q42009Q42009Q42009Q4', '2010Q12010Q12010Q12010Q1', '2010Q22010Q22010Q
2010Q2', '2010Q32010Q32010Q32010Q3', '2010Q42010Q42010Q42010Q4', '2011Q12011Q12
11Q12011Q1', '2011Q22011Q22011Q22011Q2', '2011Q32011Q32011Q32011Q3', '2011Q4201
Q42011Q42011Q4', '2012Q12012Q12012Q12012Q1', '2012Q22012Q22012Q22012Q2', '2012Q
2012Q32012Q32012Q3', '2012Q42012Q42012Q42012Q4', '2013Q12013Q12013Q12013Q1', '2
13Q22013Q22013Q22013Q2', '2013Q32013Q32013Q32013Q3', '2013Q42013Q42013Q42013Q4'
 '2014Q12014Q12014Q12014Q1', '2014Q22014Q22014Q22014Q2', '2014Q32014Q32014Q3201
Q3', '2014Q42014Q42014Q42014Q4', '2015Q12015Q12015Q12015Q1', '2015Q22015Q22015Q
2015Q2', '2015Q32015Q32015Q32015Q3', '2015Q42015Q42015Q42015Q4', '2016Q12016Q12
16Q12016Q1', '2016Q22016Q22016Q22016Q2', '2016Q32016Q32016Q32016Q3', '2016Q4201
Q42016Q42016Q4']

看起來似乎好像沒有同時跑到太多的process,但其實是因為這執行太快,
根本不需要用到多個process處理,假如是同時處理多個資料庫,
因為時間比較久,可以看得出來同一個時間是處理cpu數量的process,
數量是可以指定的,只是太多process真的反而比較慢,

implement過程中遇到幾個問題,第一個是不固定數量參數,可參考這篇
再來就是每個參數都是list,偏偏apply_async需要透過Queue處理參數,
本來想用Queue存Queue,但是Queue沒辦法[index]處理,
所以才想說用List來存,但Queue又沒辦法直接串接,
所以只好先隨便存個str進去,然後再用Queue的位置丟進list,
就有了Queue list的功能,

再來就是apply_async的return值沒辦法全抓到,就可參考這篇
但其實裡面範例跟我要的功能還有差距,所以只好再研究,
重點就是用[]在裡面跑迴圈去apply_async,最後就可以把所有return值list起來
而最終的return list是Queue,還要透過get()去把所有值再list起來

而Pool()的宣告要在[]迴圈之外,
因為在迴圈之內就會產生跟迴圈數量一樣多的process,
同時執行太多process,光是彼此sync就受不了了,
最終會產生跟list內容數量一樣多的process,
會慢很多,假如需要同時有非常多process,
應該只需要在Pool()中設定數量就好,也不需要那麼多個物件,

最終就可以實現同時執行多個多參數function的功能了