Skip to content

Commit 24dc8d5

Browse files
authored
Lambda search endpoints using a warp adapter (#4805)
* Lambda search endpoints using a warp adapter * Fix request id span * Make logging 30% more compact * Minor improvements * Propagete poll_ready to warp svc * Upgrade lambda_runtime
1 parent d097326 commit 24dc8d5

File tree

19 files changed

+613
-322
lines changed

19 files changed

+613
-322
lines changed

distribution/lambda/Makefile

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ bench-index:
111111
done
112112

113113
bench-search-term:
114-
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
114+
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
115115
mem_sizes=( 1024 2048 4096 8192 )
116116
for mem_size in "$${mem_sizes[@]}"
117117
do
@@ -121,7 +121,7 @@ bench-search-term:
121121
done
122122

123123
bench-search-histogram:
124-
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
124+
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
125125
mem_sizes=( 1024 2048 4096 8192 )
126126
for mem_size in "$${mem_sizes[@]}"
127127
do
@@ -133,15 +133,13 @@ bench-search-histogram:
133133
bench-search:
134134
for run in {1..30}
135135
do
136-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=true
137-
$(MAKE) bench-search-term
138-
$(MAKE) bench-search-histogram
139-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
140136
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0
141137
$(MAKE) bench-search-term
142138
$(MAKE) bench-search-histogram
143-
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
144139
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB
145140
$(MAKE) bench-search-term
146141
$(MAKE) bench-search-histogram
147142
done
143+
144+
test-mock-data-endpoints:
145+
python -c 'from cdk import cli; cli.test_mock_data_endpoints()'

distribution/lambda/cdk/cli.py

Lines changed: 87 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from dataclasses import dataclass
1515
from functools import cache
1616
from io import BytesIO
17+
from urllib.parse import urlparse
1718

1819
import boto3
1920
import botocore.config
@@ -29,6 +30,8 @@
2930
retries={"max_attempts": 0}, read_timeout=60 * 15
3031
)
3132
session = boto3.Session(region_name=region)
33+
mock_sales_index_id = "mock-sales"
34+
hdfs_logs_index_id = "hdfs-logs"
3235

3336

3437
@cache
@@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
3942
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
4043
outputs = stacks[0]["Outputs"]
4144
for output in outputs:
42-
if output["ExportName"] == export_name:
45+
if "ExportName" in output and output["ExportName"] == export_name:
4346
return output["OutputValue"]
4447
else:
4548
print(f"Export name {export_name} not found in stack {stack_name}")
4649
exit(1)
4750

4851

52+
def _decompress_if_gzip(payload: bytes, headers: dict) -> str:
53+
if headers.get("content-encoding", "") == "gzip":
54+
return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode()
55+
else:
56+
return payload.decode()
57+
58+
4959
@dataclass
5060
class LambdaResult:
5161
function_error: str
5262
log_tail: str
5363
payload: str
5464
raw_size_bytes: int
65+
status_code: int
5566

5667
@staticmethod
5768
def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
@@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
6172
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
6273
payload=payload,
6374
raw_size_bytes=len(payload),
75+
status_code=0,
6476
)
6577

6678
@staticmethod
6779
def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult":
6880
gw_str = lambda_resp["Payload"].read().decode()
6981
gw_obj = json.loads(gw_str)
70-
payload = gw_obj["body"]
71-
if gw_obj["isBase64Encoded"]:
82+
if "body" in gw_obj:
83+
payload = gw_obj["body"]
84+
status_code = gw_obj["statusCode"]
85+
else:
86+
payload = gw_str
87+
status_code = -1
88+
if gw_obj.get("isBase64Encoded", False):
7289
dec_payload = base64.b64decode(payload)
73-
if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip":
74-
payload = (
75-
gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload))
76-
.read()
77-
.decode()
78-
)
79-
else:
80-
payload = dec_payload.decode()
90+
payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {}))
8191
return LambdaResult(
8292
function_error=lambda_resp.get("FunctionError", ""),
8393
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
8494
payload=payload,
8595
raw_size_bytes=len(gw_str),
96+
status_code=status_code,
8697
)
8798

8899
def extract_report(self) -> str:
@@ -108,12 +119,13 @@ def _format_lambda_output(
108119
if lambda_result.function_error != "":
109120
print("\n## FUNCTION ERROR:")
110121
print(lambda_result.function_error)
111-
print("\n## LOG TAIL:")
112-
print(lambda_result.log_tail)
113122
print("\n## RAW RESPONSE SIZE (BYTES):")
114-
ratio = lambda_result.raw_size_bytes / len(lambda_result.payload)
115-
print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)")
116-
print("\n## RESPONSE:")
123+
if len(lambda_result.payload) == 0:
124+
ratio = "empty payload"
125+
else:
126+
ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload"
127+
print(f"{lambda_result.raw_size_bytes} ({ratio})")
128+
print(f"\n## RESPONSE [{lambda_result.status_code}]:")
117129
payload_size = len(lambda_result.payload)
118130
print(lambda_result.payload[:max_resp_size])
119131
if payload_size > max_resp_size:
@@ -184,6 +196,7 @@ def invoke_hdfs_indexer() -> LambdaResult:
184196

185197
def _invoke_searcher(
186198
stack_name: str,
199+
index_id: str,
187200
function_export_name: str,
188201
payload: str,
189202
download_logs: bool,
@@ -198,9 +211,14 @@ def _invoke_searcher(
198211
LogType="Tail",
199212
Payload=json.dumps(
200213
{
201-
"headers": {"Content-Type": "application/json"},
214+
"resource": f"/api/v1/{index_id}/search",
215+
"path": f"/api/v1/{index_id}/search",
216+
"httpMethod": "POST",
217+
"headers": {
218+
"Content-Type": "application/json",
219+
},
202220
"requestContext": {
203-
"http": {"method": "POST"},
221+
"httpMethod": "POST",
204222
},
205223
"body": payload,
206224
"isBase64Encoded": False,
@@ -218,6 +236,7 @@ def _invoke_searcher(
218236
def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult:
219237
return _invoke_searcher(
220238
app.HDFS_STACK_NAME,
239+
hdfs_logs_index_id,
221240
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
222241
payload,
223242
download_logs,
@@ -249,7 +268,6 @@ def get_logs(
249268
last_event_id = event["eventId"]
250269
yield event["message"]
251270
if event["message"].startswith("REPORT"):
252-
print(event["message"])
253271
lower_time_bound = int(event["timestamp"])
254272
last_event_id = "REPORT"
255273
break
@@ -277,13 +295,15 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
277295
int(invoke_start * 1000),
278296
):
279297
f.write(log)
298+
print(f"Logs written to lambda.{request_id}.log")
280299
except Exception as e:
281300
print(f"Failed to download logs: {e}")
282301

283302

284303
def invoke_mock_data_searcher():
285304
_invoke_searcher(
286305
app.MOCK_DATA_STACK_NAME,
306+
mock_sales_index_id,
287307
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
288308
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
289309
True,
@@ -321,7 +341,9 @@ def print_mock_data_metastore():
321341
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
322342
)
323343
s3 = session.client("s3")
324-
response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
344+
response = s3.get_object(
345+
Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json"
346+
)
325347
print(response["Body"].read().decode())
326348

327349

@@ -387,3 +409,48 @@ def benchmark_hdfs_search(payload: str):
387409
with open(f"lambda-bench.log", "a+") as f:
388410
f.write(json.dumps(bench_result))
389411
f.write("\n")
412+
413+
414+
def test_mock_data_endpoints():
415+
apigw_url = _get_cloudformation_output_value(
416+
app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME
417+
)
418+
419+
def req(method, path, body=None, expected_status=200):
420+
conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc)
421+
conn.request(
422+
method,
423+
path,
424+
body,
425+
headers={"x-api-key": os.getenv("SEARCHER_API_KEY")},
426+
)
427+
response = conn.getresponse()
428+
print(f"{method} {path}")
429+
headers = {k: v for (k, v) in response.getheaders()}
430+
body = _decompress_if_gzip(response.read(), headers)
431+
if response.status != expected_status:
432+
print(f"[{response.status}] => {body}")
433+
exit(1)
434+
else:
435+
print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}")
436+
437+
req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal")
438+
req(
439+
"POST",
440+
f"/api/v1/{mock_sales_index_id}/search",
441+
'{"query":"quantity:>5", "max_hits": 10}',
442+
)
443+
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal")
444+
req(
445+
"POST",
446+
f"/api/v1/_elastic/{mock_sales_index_id}/_search",
447+
'{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}',
448+
)
449+
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity")
450+
# expected errors
451+
req(
452+
"GET",
453+
f"/api/v1/_elastic/{mock_sales_index_id}/_search?query=animal",
454+
expected_status=400,
455+
)
456+
req("GET", f"/api/v1/_elastic/_search?q=animal", expected_status=501)

distribution/lambda/cdk/stacks/examples/mock_data_stack.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
1818
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
1919
SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name"
20+
API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url"
2021

2122

2223
class Source(Construct):
@@ -98,11 +99,12 @@ def __init__(
9899
searcher_integration = aws_apigateway.LambdaIntegration(
99100
qw_svc.searcher.lambda_function
100101
)
101-
search_resource = (
102-
api.root.add_resource("v1").add_resource(index_id).add_resource("search")
103-
)
102+
search_resource = api.root.add_resource("v1").add_resource("{proxy+}")
104103
search_resource.add_method("POST", searcher_integration, api_key_required=True)
105-
api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api)
104+
search_resource.add_method("GET", searcher_integration, api_key_required=True)
105+
# Change the deployment id (api-deployment-x) each time the API changes,
106+
# otherwise changes are not deployed.
107+
api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api)
106108
api_stage = aws_apigateway.Stage(
107109
self, "api", deployment=api_deployment, stage_name="api"
108110
)
@@ -122,7 +124,10 @@ def __init__(
122124
api.deployment_stage = api_stage
123125

124126
aws_cdk.CfnOutput(
125-
self, "search-api-url", value=api.url.rstrip("/") + search_resource.path
127+
self,
128+
"search-api-url",
129+
value=api.url.rstrip("/") + search_resource.path,
130+
export_name=API_GATEWAY_EXPORT_NAME,
126131
)
127132

128133

distribution/lambda/resources/hdfs-logs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Index config file for hdfs-logs dataset.
33
#
44

5-
version: 0.6
5+
version: 0.7
66

77
index_id: hdfs-logs
88

distribution/lambda/resources/mock-sales.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Index config file for mock-sales data generator.
33
#
44

5-
version: 0.6
5+
version: 0.7
66

77
index_id: mock-sales
88

0 commit comments

Comments
 (0)