-
Notifications
You must be signed in to change notification settings - Fork 11
/
notifier.py
128 lines (96 loc) · 3.78 KB
/
notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import datetime
import json
import mimetypes
import os
from typing import List
from google.cloud import storage
from google.oauth2 import service_account
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from blocks import return_slack_blocks
def send_slack_message(report_file: str):
print("Trying to send Slack Message...")
token = os.getenv("SLACK_KEY")
# Initialize a Web API client
client = WebClient(token=token)
try:
client.chat_postMessage(
channel="C07AHPJ525V",
text="Important update from ACTIONS...", # This is required but can be anything if blocks are used
blocks=return_slack_blocks(report_file),
)
except SlackApiError as e:
print(f"Error sending message: {e.response['error']}")
def get_content_type(filename):
"""
Return the MIME type based on the filename extension.
"""
content_type, _ = mimetypes.guess_type(filename)
return content_type or "text/plain"
def get_current_timestamp():
"""
Returns the current date and time formatted as 'YYYY-mm-dd_HHmm_SS'.
:return: str, formatted timestamp
"""
print("Getting the current date...")
now = datetime.datetime.now()
formatted_timestamp = now.strftime("%Y-%m-%d-%H%M_%S")
return formatted_timestamp
def list_and_write(source_directory: str, links: List[str]):
print(f"Trying to traverse {source_directory} and write files...")
for root, dirs, files in os.walk(source_directory):
for file in files:
fullpath = os.path.join(root, file)
target_path = os.path.join(f"{time_now}", (fullpath))
content_type = get_content_type(file)
blob = bucket.blob(target_path)
blob.content_type = content_type
read_mode = "rb" if content_type == "image/png" else "r"
write_mode = "wb" if content_type == "image/png" else "w"
links.append(f"{root_url}{target_path}")
with (
open(fullpath, read_mode) as infile,
blob.open(write_mode, content_type=content_type) as f,
):
contents = infile.read()
f.write(contents)
def compile_link_file(links_win: List[str], links_mac: List[str]):
print("Compiling the current artifact link file...")
links_mac.sort()
links_win.sort()
output_file_path = os.path.join(time_now, "full_report.txt")
content_type = get_content_type(output_file_path)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(output_file_path)
with blob.open("w", content_type=content_type) as f:
f.write("Windows Artifacts:\n\n")
for url in links_win:
f.write(url + "\n")
f.write("\nMac Artifacts:\n\n")
for url in links_mac:
f.write(url + "\n")
return f"{root_url}{time_now}/full_report.txt"
time_now = get_current_timestamp()
links_win = []
links_mac = []
root_url = "https://storage.googleapis.com/notifier-artifact-bucket/"
report_file = ""
try:
credential_string = os.getenv("GCP_CREDENTIAL")
credentials_dict = json.loads(credential_string)
credentials = service_account.Credentials.from_service_account_info(
credentials_dict
)
storage_client = storage.Client(credentials=credentials)
bucket_name = "notifier-artifact-bucket"
bucket = storage_client.bucket(bucket_name)
list_and_write("artifacts-mac", links_mac)
list_and_write("artifacts-win", links_win)
except Exception as e:
print("The artifact upload process ran into some issues: ", e)
try:
report_file = compile_link_file(links_win, links_mac)
except Exception as e:
print(f"The link compilation had some issues: {e}")
send_slack_message(report_file)
print("Message sent! Script finished successfully.")