Skip to content

Commit 20ed82d

Browse files
committed
Lambda search endpoints using a warp adapter
1 parent d82b649 commit 20ed82d

File tree

17 files changed

+571
-284
lines changed

17 files changed

+571
-284
lines changed

distribution/lambda/Makefile

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,13 @@ bench-search-histogram:
133133
bench-search:
134134
for run in {1..30}
135135
do
136-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=true
137-
$(MAKE) bench-search-term
138-
$(MAKE) bench-search-histogram
139-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
140136
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0
141137
$(MAKE) bench-search-term
142138
$(MAKE) bench-search-histogram
143-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
144139
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB
145140
$(MAKE) bench-search-term
146141
$(MAKE) bench-search-histogram
147142
done
143+
144+
test-mock-data-endpoints:
145+
python -c 'from cdk import cli; cli.test_mock_data_endpoints()'

distribution/lambda/cdk/cli.py

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from dataclasses import dataclass
1515
from functools import cache
1616
from io import BytesIO
17+
from urllib.parse import urlparse
1718

1819
import boto3
1920
import botocore.config
@@ -29,6 +30,8 @@
2930
retries={"max_attempts": 0}, read_timeout=60 * 15
3031
)
3132
session = boto3.Session(region_name=region)
33+
mock_sales_index_id = "mock-sales"
34+
hdfs_logs_index_id = "hdfs-logs"
3235

3336

3437
@cache
@@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
3942
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
4043
outputs = stacks[0]["Outputs"]
4144
for output in outputs:
42-
if output["ExportName"] == export_name:
45+
if "ExportName" in output and output["ExportName"] == export_name:
4346
return output["OutputValue"]
4447
else:
4548
print(f"Export name {export_name} not found in stack {stack_name}")
4649
exit(1)
4750

4851

52+
def _decompress_if_gzip(payload: bytes, headers: dict) -> str:
53+
if headers.get("content-encoding", "") == "gzip":
54+
return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode()
55+
else:
56+
return payload.decode()
57+
58+
4959
@dataclass
5060
class LambdaResult:
5161
function_error: str
5262
log_tail: str
5363
payload: str
5464
raw_size_bytes: int
65+
status_code: int
5566

5667
@staticmethod
5768
def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
@@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
6172
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
6273
payload=payload,
6374
raw_size_bytes=len(payload),
75+
status_code=200,
6476
)
6577

6678
@staticmethod
6779
def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult":
6880
gw_str = lambda_resp["Payload"].read().decode()
6981
gw_obj = json.loads(gw_str)
70-
payload = gw_obj["body"]
71-
if gw_obj["isBase64Encoded"]:
82+
if "body" in gw_obj:
83+
payload = gw_obj["body"]
84+
status_code = gw_obj["statusCode"]
85+
else:
86+
payload = gw_str
87+
status_code = -1
88+
if gw_obj.get("isBase64Encoded", False):
7289
dec_payload = base64.b64decode(payload)
73-
if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip":
74-
payload = (
75-
gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload))
76-
.read()
77-
.decode()
78-
)
79-
else:
80-
payload = dec_payload.decode()
90+
payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {}))
8191
return LambdaResult(
8292
function_error=lambda_resp.get("FunctionError", ""),
8393
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
8494
payload=payload,
8595
raw_size_bytes=len(gw_str),
96+
status_code=status_code,
8697
)
8798

8899
def extract_report(self) -> str:
@@ -111,9 +122,12 @@ def _format_lambda_output(
111122
print("\n## LOG TAIL:")
112123
print(lambda_result.log_tail)
113124
print("\n## RAW RESPONSE SIZE (BYTES):")
114-
ratio = lambda_result.raw_size_bytes / len(lambda_result.payload)
115-
print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)")
116-
print("\n## RESPONSE:")
125+
if len(lambda_result.payload) == 0:
126+
ratio = "empty payload"
127+
else:
128+
ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload"
129+
print(f"{lambda_result.raw_size_bytes} ({ratio})")
130+
print(f"\n## RESPONSE [{lambda_result.status_code}]:")
117131
payload_size = len(lambda_result.payload)
118132
print(lambda_result.payload[:max_resp_size])
119133
if payload_size > max_resp_size:
@@ -184,6 +198,7 @@ def invoke_hdfs_indexer() -> LambdaResult:
184198

185199
def _invoke_searcher(
186200
stack_name: str,
201+
index_id: str,
187202
function_export_name: str,
188203
payload: str,
189204
download_logs: bool,
@@ -198,9 +213,12 @@ def _invoke_searcher(
198213
LogType="Tail",
199214
Payload=json.dumps(
200215
{
201-
"headers": {"Content-Type": "application/json"},
202-
"requestContext": {
203-
"http": {"method": "POST"},
216+
"path": f"/api/v1/{index_id}/search",
217+
"resource": f"/api/v1/{index_id}/search",
218+
"httpMethod": "POST",
219+
"headers": {
220+
"Content-Type": "application/json",
221+
"Content-Length": f"{len(payload)}",
204222
},
205223
"body": payload,
206224
"isBase64Encoded": False,
@@ -218,6 +236,7 @@ def _invoke_searcher(
218236
def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult:
219237
return _invoke_searcher(
220238
app.HDFS_STACK_NAME,
239+
hdfs_logs_index_id,
221240
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
222241
payload,
223242
download_logs,
@@ -284,6 +303,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
284303
def invoke_mock_data_searcher():
285304
_invoke_searcher(
286305
app.MOCK_DATA_STACK_NAME,
306+
mock_sales_index_id,
287307
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
288308
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
289309
True,
@@ -321,7 +341,9 @@ def print_mock_data_metastore():
321341
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
322342
)
323343
s3 = session.client("s3")
324-
response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
344+
response = s3.get_object(
345+
Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json"
346+
)
325347
print(response["Body"].read().decode())
326348

327349

@@ -387,3 +409,41 @@ def benchmark_hdfs_search(payload: str):
387409
with open(f"lambda-bench.log", "a+") as f:
388410
f.write(json.dumps(bench_result))
389411
f.write("\n")
412+
413+
414+
def test_mock_data_endpoints():
415+
apigw_url = _get_cloudformation_output_value(
416+
app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME
417+
)
418+
419+
def req(method, path, body=None):
420+
conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc)
421+
conn.request(
422+
method,
423+
path,
424+
body,
425+
headers={"x-api-key": os.getenv("SEARCHER_API_KEY")},
426+
)
427+
response = conn.getresponse()
428+
print(f"{method} {path}")
429+
if response.status != 200:
430+
print(f"[{response.status}] => {response.read().decode()}")
431+
exit(1)
432+
else:
433+
headers = {k: v for (k, v) in response.getheaders()}
434+
body = _decompress_if_gzip(response.read(), headers)
435+
print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}")
436+
437+
req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal")
438+
req(
439+
"POST",
440+
f"/api/v1/{mock_sales_index_id}/search",
441+
'{"query":"quantity:>5", "max_hits": 10}',
442+
)
443+
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal")
444+
req(
445+
"POST",
446+
f"/api/v1/_elastic/{mock_sales_index_id}/_search",
447+
'{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}',
448+
)
449+
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity")

distribution/lambda/cdk/stacks/examples/mock_data_stack.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
1818
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
1919
SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name"
20+
API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url"
2021

2122

2223
class Source(Construct):
@@ -98,11 +99,13 @@ def __init__(
9899
searcher_integration = aws_apigateway.LambdaIntegration(
99100
qw_svc.searcher.lambda_function
100101
)
101-
search_resource = (
102-
api.root.add_resource("v1").add_resource(index_id).add_resource("search")
103-
)
102+
search_resource = api.root.add_resource("v1").add_resource("{proxy+}")
104103
search_resource.add_method("POST", searcher_integration, api_key_required=True)
105-
api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api)
104+
search_resource.add_method("GET", searcher_integration, api_key_required=True)
105+
search_resource.add_method("PUT", searcher_integration, api_key_required=True)
106+
# Change the deployment id (api-deployment-x) each time the API changes,
107+
# otherwise changes are not deployed.
108+
api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api)
106109
api_stage = aws_apigateway.Stage(
107110
self, "api", deployment=api_deployment, stage_name="api"
108111
)
@@ -122,7 +125,10 @@ def __init__(
122125
api.deployment_stage = api_stage
123126

124127
aws_cdk.CfnOutput(
125-
self, "search-api-url", value=api.url.rstrip("/") + search_resource.path
128+
self,
129+
"search-api-url",
130+
value=api.url.rstrip("/") + search_resource.path,
131+
export_name=API_GATEWAY_EXPORT_NAME,
126132
)
127133

128134

distribution/lambda/resources/hdfs-logs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Index config file for hdfs-logs dataset.
33
#
44

5-
version: 0.6
5+
version: 0.7
66

77
index_id: hdfs-logs
88

distribution/lambda/resources/mock-sales.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Index config file for mock-sales data generator.
33
#
44

5-
version: 0.6
5+
version: 0.7
66

77
index_id: mock-sales
88

0 commit comments

Comments
 (0)