Pythonのmultiprocessingの例2
Pythonではmultithreading よりもmultiprocessingの方がCPUのコアを有効に使用できるが,前の例だとCtrl+Cで停止したときにプロセスが残る.
WinPython 3.7では少し改善しているので対策コードが追加できる.
メインのプロセスから,terminate()を実行するととりあえず子プロセスは消える.
実際には残るらしい.
その場合にはProcessのコンストラクタにdaemon=Falseを指定する
最初に考えたコードは次のようなもの... しかし,実際にはうまくいっていない感がある
import numpy as np from multiprocessing import Process import multiprocessing as mp def calc(queue, i_range, arg1, arg2, arg3): print('Start: {}\r\n'.format(mp.current_process().name)) a = arg1[i_range] + arg2[i_range]; b = arg2[i_range] + arg3[i_range]; c = arg3[i_range] + arg1[i_range]; queue.put([a, b, c]) if __name__ == '__main__': x = np.linspace(0, 10000, 10000); y = np.linspace(10000, 0, 10000); z = np.linspace(0, 10000, 10000) - 5000; mp_size = 4 ps = []; queue = mp.Queue() results = dict() for i in range(mp_size): ps.append(mp.Process(target=calc, args=(queue, range(x.shape[0]*i//mp_size, x.shape[0]*(i+1)//mp_size), x, y, z), name="{}".format(i+1), daemon=False)) for p in ps: p.start() try: for i in range(mp_size): results[i] = queue.get(); except KeyboardInterrupt: for p in ps: p.terminate() a = 0; b = 0; c = 0; for i in range(mp_size): a += np.asscalar(np.sum(results[i][0])); b += np.asscalar(np.sum(results[i][1])); c += np.asscalar(np.sum(results[i][2])); print('{},{},{}'.format(a, b, c))
無理やり止めるコードを追加しなおしたのが以下のコードである.
無理やり止めているのでidleでデバッグしていると例外などが発生したって確認できなくなる.ただしエンドユーザには関係なくなる.
import numpy as np from multiprocessing import Process import multiprocessing as mp import signal import sys ps = []; def calc(queue, i_range, arg1, arg2, arg3): print('Start: {}\r\n'.format(mp.current_process().name)) a = arg1[i_range] + arg2[i_range]; b = arg2[i_range] + arg3[i_range]; c = arg3[i_range] + arg1[i_range]; queue.put([a, b, c]) def handler(signum, frame): global ps; for p in ps: if p.is_alive(): try: p.terminate() except AssertionError: pass print('terminate {}'.format(p.pid)); sys.exit(0); if __name__ == '__main__': signal.signal(signal.SIGINT, handler); x = np.linspace(0, 10000, 10000); y = np.linspace(10000, 0, 10000); z = np.linspace(0, 10000, 10000) - 5000; mp_size = 4 queue = mp.Queue() results = dict() for i in range(mp_size): ps.append(mp.Process(target=calc, args=(queue, range(x.shape[0]*i//mp_size, x.shape[0]*(i+1)//mp_size), x, y, z), name="{}".format(i+1), daemon=False)) for p in ps: p.start() for i in range(len(ps)): results[i] = queue.get(); a = 0; b = 0; c = 0; for i in range(mp_size): a += np.asscalar(np.sum(results[i][0])); b += np.asscalar(np.sum(results[i][1])); c += np.asscalar(np.sum(results[i][2])); print('{},{},{}'.format(a, b, c))
実はこれでも対策は不十分でどうにもならないことがある.子プロセスの準備中などに止められると異常終了して意図通りに止まらないことがあるのだ.(is_alive()のせいかも)
タグ:Python