-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsynthetics_functions.py
114 lines (88 loc) · 3.31 KB
/
synthetics_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""_summary_
Returns:
_type_: _description_
"""
import requests
import base64
import json
def get_urls(url_file):
with open(url_file, 'r') as f:
urls = f.readlines()
return urls
def create_domain_check_payload(js_file, url):
with open(js_file, 'r') as f:
script = f.readlines()
script[0] = "var request = require('request');\n"
script[2] = f"var domain = '{url}';\n"
joined_script = ''.join(script)
return base64.b64encode(joined_script.encode('utf-8')).decode('utf-8')
def get_entities(url, query, headers):
entities = []
next_cursor = 'null'
while True:
mutation = {
f"""query: {{
actor {{
entitySearch( query: \"{query}\") {{
results (cursor: {next_cursor or ''}) {{
nextCursor entities {{
... on SyntheticMonitorEntityOutline {{
name guid monitorId }}}}}}}}}}}}"""
}
response = requests.request("POST", url, headers=headers, json=mutation)
if response.status_code != 200:
raise Exception(f"Query failed: {response.text}")
data = response.json()
entities += data['data']['actor']['entitySearch']['results']['entities']
next_cursor = f'"{data['data']['actor']['entitySearch']['results']['nextCursor']}"'
if next_cursor == '"None"':
break
return entities
def get_create_script_monitor_mutation(url, account_id):
return "{\"query\":\"mutation SyntheticsCreateScriptApiMonitor {\
syntheticsCreateScriptApiMonitor(\
accountId: " + account_id + "\
monitor: {\
name: \\\"Domain Expiry - " + url + "\\\"\
period: EVERY_DAY\
runtime: {\
runtimeType: \\\"NODE_API\\\"\
runtimeTypeVersion: \\\"16.10\\\"\
scriptLanguage: \\\"JAVASCRIPT\\\"\
}\
script: \\\"\\\"\
status: ENABLED\
locations: { public: \\\"AWS_US_EAST_1\\\" }\
}\
) {\
errors {\
description\
type\
}\
}\
}\
\",\"variables\":{}}"
def get_delete_monitor_mutation(guid):
return "{\"query\":\"mutation SyntheticsDeleteMonitor {\
syntheticsDeleteMonitor(guid: \\\"" + guid + "\\\") {\
deletedGuid\
}\
}\
\",\"variables\":{}}"
def get_ids(i, url, monitors):
try:
if url in monitors[i]["name"]:
monitors.append(monitors.pop(monitors.index(monitors[i])))
return monitors[-1]["monitorId"], monitors[-1]["guid"]
except IndexError:
raise IndexError
except:
return False
def make_graph_post_request(url, headers, mutation):
response = requests.request("POST", url, headers=headers, data=mutation)
return response
def make_rest_put_request(endpoint, headers, data):
response = requests.request("PUT", endpoint, headers=headers, data=json.dumps(data))
return response