-
Notifications
You must be signed in to change notification settings - Fork 0
/
memory_usage.py
58 lines (52 loc) · 1.53 KB
/
memory_usage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import subprocess
import psutil
import sys
import time
import threading
import os
def full_memory(p):
t = p.memory_full_info().uss
for c in p.children():
t += full_memory(c)
return t
class MyClass(threading.Thread):
def __init__(self, pid, max_allowed):
self.pid = pid
self.max_uss = 0
self.max_allowed = max_allowed
threading.Thread.__init__(self)
def run(self):
while True:
try:
p = psutil.Process(self.pid)
self.max_uss = max(self.max_uss, full_memory(p))
if self.max_uss > max_allowed and max_allowed > 0:
p.kill(9)
break
except:
break
print(sys.argv)
cmd = sys.argv[2:]
max_allowed = int(sys.argv[1])
print(cmd)
print(os.environ)
p = psutil.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ)
# peak_uss = 0
myclass = MyClass(p.pid, max_allowed)
myclass.start()
myclass.join()
if myclass.max_uss > myclass.max_allowed and myclass.max_allowed > 0:
sys.stderr.write(
"process has used {} > {} allowed\n".format(
myclass.max_uss, myclass.max_allowed))
exit(1)
print(myclass.max_uss)
# while p.is_running():
# try:
# peak_uss = max(peak_uss, full_memory(p))
# except:
# print(sys.exc_info()[0])
# break
# time.sleep(0.00001)
# print("completed {} {}".format(p.returncode, peak_uss))
# p = psutil.Popen(["/usr/bin/python", "-c", "print('hello')"], stdout=PIPE)