-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path09_pipeline.py
90 lines (80 loc) · 2.73 KB
/
09_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# -----------------------------------------------
# 9. Data pipeline:
#
# Write a program that automates the
# sequential execution of previously created
# script files, ensuring that each script
# runs to completion before the next begins.
# This program aims to streamline the
# generation of outputs from all your
# previous files, consolidating the
# results into one sequence.
# -----------------------------------------------
import subprocess
import sys
import os
import logging
from datetime import datetime
# Set up logging
logging.basicConfig(filename='pipeline.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def run_script(script_name):
"""
Run a Python script and handle any errors.
"""
try:
logging.info(f"Starting execution of {script_name}")
result = subprocess.run([sys.executable, script_name],
check=True,
capture_output=True,
text=True)
logging.info(f"Successfully completed {script_name}")
print(f"Output from {script_name}:")
print(result.stdout)
except subprocess.CalledProcessError as e:
logging.error(f"Error in {script_name}: {e}")
logging.error(f"Error output: {e.output}")
raise
except Exception as e:
logging.error(f"Unexpected error in {script_name}: {e}")
raise
def check_file_exists(filename):
"""
Check if a file exists and log the result.
"""
if os.path.exists(filename):
logging.info(f"File {filename} exists.")
return True
else:
logging.warning(f"File {filename} does not exist.")
return False
def main():
start_time = datetime.now()
logging.info(f"Starting pipeline execution at {start_time}")
scripts = [
"01_pull.py",
"02_combine.py",
"03_parse.py",
"04_clean.py",
"05_extract.py",
"06_frequency.py",
"07_visualization.py",
"08_export.py"
]
try:
for script in scripts:
if check_file_exists(script):
run_script(script)
else:
logging.error(f"Script {script} not found. Stopping pipeline.")
break
end_time = datetime.now()
duration = end_time - start_time
logging.info(f"Pipeline execution completed at {end_time}")
logging.info(f"Total execution time: {duration}")
print(f"Pipeline execution completed. Total time: {duration}")
except Exception as e:
logging.error(f"Pipeline execution failed: {e}")
print(f"Pipeline execution failed. Check pipeline.log for details.")
if __name__ == "__main__":
main()