forked from sivel/flask-lambda
-
Notifications
You must be signed in to change notification settings - Fork 3
/
flask_lambda.py
229 lines (179 loc) · 6.99 KB
/
flask_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# -*- coding: utf-8 -*-
# Copyright 2016 Matt Martz
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import datetime
from decimal import Decimal
try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode
from flask import Flask
try:
from cStringIO import StringIO
except ImportError:
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import six
from werkzeug.wrappers import BaseRequest, BaseResponse
__version__ = '0.0.4'
class Promise(object):
"""
This is just a base class for the proxy class created in
the closure of the lazy function. It can be used to recognize
promises in code.
"""
pass
class _UnicodeDecodeError(UnicodeDecodeError):
def __init__(self, obj, *args):
self.obj = obj
UnicodeDecodeError.__init__(self, *args)
def __str__(self):
original = UnicodeDecodeError.__str__(self)
return '%s. You passed in %r (%s)' % (original, self.obj,
type(self.obj))
_PROTECTED_TYPES = six.integer_types + (type(None), float, Decimal,
datetime.datetime, datetime.date,
datetime.time)
def is_protected_type(obj):
"""Determine if the object instance is of a protected type.
Objects of protected types are preserved as-is when passed to
force_text(strings_only=True).
"""
return isinstance(obj, _PROTECTED_TYPES)
def smart_text(s, encoding='utf-8', strings_only=False, errors='strict'):
"""
Returns a text object representing 's' -- unicode on Python 2 and str on
Python 3. Treats bytestrings using the 'encoding' codec.
If strings_only is True, don't convert (some) non-string-like objects.
"""
if isinstance(s, Promise):
# The input is the result of a gettext_lazy() call.
return s
return force_text(s, encoding, strings_only, errors)
def force_text(s, encoding='utf-8', strings_only=False, errors='strict'):
"""
Similar to smart_text, except that lazy instances are resolved to
strings, rather than kept as lazy objects.
If strings_only is True, don't convert (some) non-string-like objects.
"""
# Handle the common case first for performance reasons.
if issubclass(type(s), six.text_type):
return s
if strings_only and is_protected_type(s):
return s
try:
if not issubclass(type(s), six.string_types):
if six.PY3:
if isinstance(s, bytes):
s = six.text_type(s, encoding, errors)
else:
s = six.text_type(s)
elif hasattr(s, '__unicode__'):
s = six.text_type(s)
else:
s = six.text_type(bytes(s), encoding, errors)
else:
# Note: We use .decode() here, instead of six.text_type(s, encoding,
# errors), so that if s is a SafeBytes, it ends up being a
# SafeText at the end.
s = s.decode(encoding, errors)
except UnicodeDecodeError as e:
if not isinstance(s, Exception):
raise _UnicodeDecodeError(s, *e.args)
else:
# If we get to here, the caller has passed in an Exception
# subclass populated with non-ASCII bytestring data without a
# working unicode method. Try to handle this without raising a
# further exception by individually forcing the exception args
# to unicode.
s = ' '.join(force_text(arg, encoding, strings_only, errors)
for arg in s)
return s
class HealthCheckMiddleware(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
if environ['PATH_INFO'] == '/_health_check':
resp = BaseResponse('ok', status=200)
return resp(environ, start_response)
return self.app(environ, start_response)
def make_environ(event):
environ = {}
for hdr_name, hdr_value in event['headers'].items():
hdr_name = hdr_name.replace('-', '_').upper()
if hdr_name in ['CONTENT_TYPE', 'CONTENT_LENGTH']:
environ[hdr_name] = hdr_value
continue
http_hdr_name = 'HTTP_%s' % hdr_name
environ[http_hdr_name] = hdr_value
qs = event['queryStringParameters']
environ['REQUEST_METHOD'] = event['httpMethod']
environ['PATH_INFO'] = event['path']
environ['QUERY_STRING'] = urlencode(qs) if qs else ''
environ['REMOTE_ADDR'] = event['requestContext']['identity']['sourceIp']
environ['HOST'] = '%(HTTP_HOST)s:%(HTTP_X_FORWARDED_PORT)s' % environ
environ['SCRIPT_NAME'] = ''
environ['SERVER_PORT'] = environ['HTTP_X_FORWARDED_PORT']
environ['SERVER_PROTOCOL'] = 'HTTP/1.1'
body = event.get('body', '')
body = smart_text(body)
environ['CONTENT_LENGTH'] = str(
len(body) if body else ''
)
environ['wsgi.url_scheme'] = environ['HTTP_X_FORWARDED_PROTO']
environ['wsgi.input'] = StringIO(body or '')
environ['wsgi.version'] = (1, 0)
environ['wsgi.errors'] = sys.stderr
environ['wsgi.multithread'] = False
environ['wsgi.run_once'] = True
environ['wsgi.multiprocess'] = False
BaseRequest(environ)
return environ
class LambdaResponse(object):
def __init__(self):
self.status = None
self.response_headers = None
def start_response(self, status, response_headers, exc_info=None):
self.status = int(status[:3])
self.response_headers = dict(response_headers)
def _call(self, event, context):
self.wsgi_app = HealthCheckMiddleware(self.wsgi_app)
if 'httpMethod' not in event:
# In this "context" `event` is `environ` and
# `context` is `start_response`, meaning the request didn't
# occur via API Gateway and Lambda
return self.wsgi_app(event, context)
response = LambdaResponse()
body = next(self.wsgi_app(
make_environ(event),
response.start_response
))
body = smart_text(body)
return {
'statusCode': response.status,
'headers': response.response_headers,
'body': body
}
class FlaskLambda(Flask):
def __call__(self, event, context):
return _call(self, event, context)
class LambdaMiddleware(object):
def __init__(self, app):
self.wsgi_app = app
def __call__(self, event, context):
return _call(self, event, context)