forked from awslabs/open-data-registry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathext.py
74 lines (56 loc) · 2.34 KB
/
ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import yaml
import requests
from urllib3.exceptions import InsecureRequestWarning
# Suppress the warning on Verify=False requests
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
tags = yaml.safe_load(open('tags.yaml'))
tags.append('aws-pds')
resources = yaml.safe_load(open('resources.yaml'))
services = yaml.safe_load(open('services.yaml'))
# Check if provided tags are in tags.yaml
def ext_tags(value, rule_obj, path):
if value not in tags:
print('Invalid tag!', value)
return False
# If we're here, all tags were ok
return True
# Check if provided resources are in resources.yaml
def ext_resources(value, rule_obj, path):
if value not in resources:
print('Invalid resource!', value)
return False
# If we're here, all resources were ok
return True
# Check if provided services are in services.yaml
def ext_services(value, rule_obj, path):
if value not in services:
print('Invalid service!', value)
return False
# If we're here, all services were ok
return True
def ext_valid_bucket_regions(value, rule_obj, path):
# Make sure this is a dict, and a bucket, then validate the region
if isinstance(value, dict) and 'Type' in value and value['Type'] == 'S3 Bucket':
bucket = value['ARN']
parts = bucket.split(':::')
if not parts[0] == 'arn:aws:s3':
# This is probably not on public aws so we can't check
return True
bucket = parts[1]
parts = bucket.split('/')
bucket = parts[0]
url = "https://{}.s3.amazonaws.com".format(bucket)
# Get the headers for this bucket.
# Verify=False because the wildcard matching doesn't work for buckets with '.'
r = requests.head(url, verify=False)
if r.status_code == requests.codes.not_found:
print("Bucket {} doesn't exist or there was a momentary glitch".format(bucket))
return False
if not 'x-amz-bucket-region' in r.headers:
print("Bucket region missing from request header?")
return False
region = r.headers['x-amz-bucket-region']
if not value['Region'].lower() == region.lower():
print('The region for bucket {} is listed as {} but is actually {}'.format(bucket, value['Region'], region))
return False
return True