-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathapp.py
71 lines (55 loc) · 1.81 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import boto3
from chalice import Chalice, BadRequestError
from chalicelib.storage import Config, S3Storage, MaxSizeError, CachingStorage
from chalicelib.storage import SemiDBMCache
from chalicelib.schema import SavedQuery
CACHE_DIR = '/tmp/appcache'
app = Chalice(app_name='jmespath-playground')
app.debug = True
app.context = {}
def before_request(app):
if 'storage' in app.context:
return
s3 = boto3.client('s3')
config = Config(
bucket=os.environ['APP_S3_BUCKET'],
prefix=os.environ.get('APP_S3_PREFIX', ''),
)
if not os.path.isdir(CACHE_DIR):
os.makedirs(CACHE_DIR)
cache = SemiDBMCache(CACHE_DIR)
storage = S3Storage(client=s3,
config=config)
app.context['storage'] = CachingStorage(storage, cache)
@app.route('/anon', methods=['POST'], cors=True)
def new_anonymous_query():
before_request(app)
try:
body = app.current_request.json_body
except ValueError as e:
raise BadRequestError("Invalid JSON: %s" % e)
_validate_body(body)
storage = app.context['storage']
try:
uuid = storage.put(body)
except MaxSizeError as e:
raise BadRequestError(str(e))
return {'uuid': uuid}
def _validate_body(body):
if body is None:
raise BadRequestError("Request body cannot be empty.")
data = SavedQuery().load(body)
if data.errors:
raise BadRequestError(data.errors)
@app.route('/anon/{uuid}', methods=['GET'], cors=True)
def get_anonymous_query(uuid):
before_request(app)
storage = app.context['storage']
result = storage.get(uuid)
return result
# This is just used as a sanity check to make sure
# we can hit our API. Could also be used for monitoring.
@app.route('/ping', methods=['GET'], cors=True)
def ping():
return {'ping': 11}