ブログ

割とコンピュータよりの情報をお届けします。

2019/3/2

Pythonのmultiprocessingの例2

Pythonではmultithreading よりもmultiprocessingの方がCPUのコアを有効に使用できるが,前の例だとCtrl+Cで停止したときにプロセスが残る.
WinPython 3.7では少し改善しているので対策コードが追加できる.

メインのプロセスから,terminate()を実行するととりあえず子プロセスは消える.
実際には残るらしい.
その場合にはProcessのコンストラクタにdaemon=Falseを指定する
最初に考えたコードは次のようなもの... しかし,実際にはうまくいっていない感がある

import numpy as np
from multiprocessing import Process
import multiprocessing as mp

def calc(queue, i_range, arg1, arg2, arg3):
    print('Start: {}\r\n'.format(mp.current_process().name))
    a = arg1[i_range] + arg2[i_range];
    b = arg2[i_range] + arg3[i_range];
    c = arg3[i_range] + arg1[i_range];
    queue.put([a, b, c])

if __name__ == '__main__':
    

    x = np.linspace(0, 10000, 10000);
    y = np.linspace(10000, 0, 10000);
    z = np.linspace(0, 10000, 10000) - 5000;

    mp_size = 4
    ps = [];
    queue = mp.Queue()
    results = dict()
    for i in range(mp_size):
        ps.append(mp.Process(target=calc, args=(queue, range(x.shape[0]*i//mp_size, x.shape[0]*(i+1)//mp_size), x, y, z), name="{}".format(i+1), daemon=False))
        
    for p in ps:
        p.start()

    try:
        for i in range(mp_size):
            results[i] = queue.get();
        
    except KeyboardInterrupt:
        for p in ps:
            p.terminate()

    a = 0;
    b = 0;
    c = 0;
    for i in range(mp_size):
        a += np.asscalar(np.sum(results[i][0]));
        b += np.asscalar(np.sum(results[i][1]));
        c += np.asscalar(np.sum(results[i][2]));
    
    print('{},{},{}'.format(a, b, c))

無理やり止めるコードを追加しなおしたのが以下のコードである.
無理やり止めているのでidleでデバッグしていると例外などが発生したって確認できなくなる.ただしエンドユーザには関係なくなる.

import numpy as np
from multiprocessing import Process
import multiprocessing as mp
import signal
import sys

ps = [];

def calc(queue, i_range, arg1, arg2, arg3):
    print('Start: {}\r\n'.format(mp.current_process().name))
    a = arg1[i_range] + arg2[i_range];
    b = arg2[i_range] + arg3[i_range];
    c = arg3[i_range] + arg1[i_range];
    queue.put([a, b, c])

def handler(signum, frame):
    global ps;
    for p in ps:
        if p.is_alive():
            try:
                p.terminate()
            except AssertionError:
                pass
            print('terminate {}'.format(p.pid));
    sys.exit(0);

if __name__ == '__main__':
    
    signal.signal(signal.SIGINT, handler);
    
    x = np.linspace(0, 10000, 10000);
    y = np.linspace(10000, 0, 10000);
    z = np.linspace(0, 10000, 10000) - 5000;

    mp_size = 4
    
    queue = mp.Queue()
    results = dict()
    for i in range(mp_size):
        ps.append(mp.Process(target=calc, args=(queue, range(x.shape[0]*i//mp_size, x.shape[0]*(i+1)//mp_size), x, y, z), name="{}".format(i+1), daemon=False))
       
    for p in ps:
        p.start()
        
    for i in range(len(ps)):
        results[i] = queue.get();
        
    a = 0;
    b = 0;
    c = 0;
    for i in range(mp_size):
        a += np.asscalar(np.sum(results[i][0]));
        b += np.asscalar(np.sum(results[i][1]));
        c += np.asscalar(np.sum(results[i][2]));
    
    print('{},{},{}'.format(a, b, c))
    

実はこれでも対策は不十分でどうにもならないことがある.子プロセスの準備中などに止められると異常終了して意図通りに止まらないことがあるのだ.(is_alive()のせいかも)

≫ Read More

2019/03/02 コンピュータ   TakeMe
Tag:Python