-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprotocol_formatter.py
118 lines (86 loc) · 3.54 KB
/
protocol_formatter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import json
import datetime
import re
from urllib.parse import urlencode
# TODO: If we are using temp creds, need to include X-Amz-Security-Token header
# Otherwise, don't include
ALL_DEFAULT_HEADERS = {
'Host': '',
'X-Amz-Date': '',
}
QUERY_DEFAULT_HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
}
JSON_DEFAULT_HEADERS = {
'Content-Type': 'application/x-amz-json-'
}
# json version fills in the rest of the content type
# Intentionally left blank
REST_JSON_DEFAULT_HEADERS = {}
def query_protocol_formatter(host, token, name, version, kwargs, input_format):
headers = {}
gathered_headers = _resolve_headers(headers, QUERY_DEFAULT_HEADERS)
complete_headers = _complete_headers(gathered_headers, host, token)
to_return = { 'headers' : complete_headers }
if "content_type" in kwargs.keys():
to_return['headers']['Content-Type'] = kwargs['content_type']
body_map = { 'Action': name }
body_map['Version'] = version
body_map.update(input_format)
to_return['body'] = urlencode(body_map)
return to_return
def json_protocol_formatter(host, token, json_version, amz_target, kwargs, input_format):
# Need to fill in the Content-Type first
headers = {}
to_return = {}
headers['Content-Type'] = JSON_DEFAULT_HEADERS['Content-Type'] + json_version
# TODO: Make the kwargs header modifications better
if "content_type" in kwargs.keys():
headers['Content-Type'] = kwargs['content_type']
headers['X-Amz-Target'] = amz_target
gathered_headers = _resolve_headers(headers, JSON_DEFAULT_HEADERS)
complete_headers = _complete_headers(gathered_headers, host, token)
to_return['headers'] = complete_headers
to_return['body'] = json.dumps(input_format)
return to_return
def rest_json_protocol_formatter(host, token, json_version, request_uri, kwargs, input_format):
# Need to fill in the Content-Type first
headers = {}
to_return = {}
if "content_type" in kwargs.keys():
headers['Content-Type'] = kwargs['content_type']
gathered_headers = _resolve_headers(headers, REST_JSON_DEFAULT_HEADERS)
complete_headers = _complete_headers(gathered_headers, host, token)
to_return['headers'] = complete_headers
# Need to act on the parameters in the URI
# TODO replace params with type appropriate ones
to_return['request_uri'] = re.sub("\{(.*?)\}", "", request_uri)
to_return['body'] = json.dumps(input_format)
return to_return
def _resolve_headers(custom_headers, default_headers):
if custom_headers == '':
default_headers.update(ALL_DEFAULT_HEADERS)
return default_headers
else:
return _apply_custom_headers(custom_headers, default_headers)
def _apply_custom_headers(custom_headers, default_headers):
""" A user may apply a custom header for their request. These will be
appended to the default headers. """
to_return = default_headers.copy()
to_return.update(custom_headers)
to_return.update(ALL_DEFAULT_HEADERS)
return to_return
def _complete_headers(headers, host, token):
""" We need to fill in those headers with the info we have """
for header in headers.keys():
if header == 'X-Amz-Date':
headers[header] = _get_date_string()
if header == 'Host':
headers[header] = host.split("/")[0]
if header == 'X-Amz-Security-Token':
headers[header] = token
return headers
def _get_date_string():
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
return amz_date