-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiprocess.py
77 lines (52 loc) · 1.62 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#########################################
##### MULTIPROCESS.PY
import multiprocessing
import traceback
import os
import re
import sys
import utils
###########################################################
# Multiprocessing
###########################################################
def worker(inq, work_func, args:list):
process = multiprocessing.current_process()
worker_id = process._identity
print("Starting worker %s [%s]" % (process.name, worker_id))
while True:
try:
# retrieve work item
req = inq.get(block=True, timeout=5)
idx, work_item = req
# end of queue
if not work_item:
return
# perform actual work
ret = work_func(work_item, args, worker_id)
except Exception as e:
raise Exception("Worker %s [%s] failed to do work: %s" % \
(process.name, process._identity, str(e)))
def start_parallel_workers(work_item_list:list, work_func, args:list=None):
try:
# register signal handler
signal = utils.Signal()
signal.install([utils.Signal.SIGINT, utils.Signal.SIGTERM])
# multiprocessing
manager = multiprocessing.Manager()
inq = manager.Queue()
work_args = (inq, work_func, args)
cpu_count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cpu_count, worker, work_args)
# work with each item
for idx, work_item in enumerate(work_item_list):
# check for interruption
if signal.caught():
break
inq.put((idx, work_item))
for i in range(cpu_count):
inq.put((None, None))
# wait for them to exit
pool.close()
pool.join()
except Exception as e:
raise Exception("Failed to start workers: %s" % (str(e)))