-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
executable file
·70 lines (56 loc) · 2.25 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys
import glob
import re
import json
from collections import defaultdict
from slugify import slugify
from analysis.plot import plot_all_stages
from log_parser.parser import LogParser
def parse_application_log(file_path, all_apps, save=True, analyse=False):
log_parser = LogParser(file_path)
try:
log_parser.process()
except (KeyError, json.decoder.JSONDecodeError) as e:
name = None
try:
name = log_parser.process_name_only()
finally:
print(f"error on parse {f'({name}) ' if name else ''}{file_path}, {e}")
return
name = log_parser.get_app_name()
id = log_parser.get_app_id()
safe_name = slugify(name+"_"+id)
if len(safe_name) == 0:
safe_name = file_path.split("/")[-1]
if save:
os.makedirs("parser_output", exist_ok=True)
report = log_parser.generate_report()
with open(os.path.expanduser(f"parser_output/{safe_name}_report"), "w") as report_file:
report_file.write(report)
log_parser.save_plot_of_stages_dag(f"parser_output/{safe_name}_stages_dag")
log_parser.save_plot_of_rdds_dag(f"parser_output/{safe_name}_RDDs_dag")
print(f"Log processing of application '{safe_name}' completed.")
if analyse:
regex = r"query([0-9]*)_cluster_([0-9]*)G"
matches = re.finditer(regex, name)
for match in matches:
groups = list(match.groups())
query_number, data_scale = int(groups[0]), int(groups[1])
all_apps[query_number][data_scale] = log_parser
break
def parse_application_log_from_directory(directory):
files_and_dirs = glob.glob(directory + "/**", recursive=True)
files = [f for f in files_and_dirs if os.path.isfile(f)]
apps = defaultdict(dict)
for file in files:
parse_application_log(file, apps, save=True, analyse=False)
return apps
if __name__ == "__main__":
if len(sys.argv) < 1:
print("Usage: python3 <main.py> <log_dir>")
else:
parse_application_log_from_directory(os.path.join(sys.argv[1]))
# apps = parse_application_log_from_directory(os.path.join(sys.argv[1]))
# plot_all_stages(apps[26], "query_26")
# plot_all_stages(apps[52], "query_52")