Skip to content

Commit

Permalink
Fix: fix memory leak in endpoint buffer (#808)
Browse files Browse the repository at this point in the history
NOTE: Still need to reproduce this locally before merging
  • Loading branch information
luke-lombardi authored Dec 21, 2024
1 parent 8aba971 commit b36b742
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 12 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@ build-test:
poetry config virtualenvs.in-project true
poetry install -C sdk
poetry shell -C sdk
cd build_tests && python app.py $(MODE)
cd e2e/build_tests && python app.py $(MODE)

load-test:
cd e2e/load_tests && k6 run --env URL=$(URL) --env TOKEN=$(TOKEN) throughput.js
File renamed without changes.
File renamed without changes.
30 changes: 30 additions & 0 deletions e2e/load_tests/throughput.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import http from "k6/http";
import { check, sleep } from "k6";

const url = __ENV.URL;
const headers = {
Connection: "keep-alive",
"Content-Type": "application/json",
Authorization: `Bearer ${__ENV.TOKEN || "default_token"}`,
};

const payload = JSON.stringify({
data: "x".repeat(1024 * 1024 * 2), // 2MB payload
});

export let options = {
stages: [
{ duration: "30s", target: 100 }, // Ramp-up to 100 VUs in 30 seconds
{ duration: "1m", target: 100 }, // Stay at 100 VUs for 1 minute
{ duration: "30s", target: 0 }, // Ramp-down to 0 VUs in 30 seconds
],
};

export default function () {
const res = http.post(url, payload, { headers });
check(res, {
"status is 200": (r) => r.status === 200,
});

sleep(0.01); // Adjust this to control request rate
}
20 changes: 12 additions & 8 deletions pkg/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,13 @@ func (rb *RequestBuffer) handleHeartbeatEvents() {
}
}

func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error {
func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error {
done := make(chan bool)
req := &request{
ctx: ctx,
done: done,
payload: &types.TaskPayload{
Args: task.msg.Args,
Kwargs: task.msg.Kwargs,
},
task: task,
ctx: ctx,
done: done,
payload: payload,
task: task,
}
rb.buffer.Push(req, false)

Expand Down Expand Up @@ -176,6 +173,12 @@ func (rb *RequestBuffer) processRequests() {
continue
}

if req.ctx.Request().Context().Err() != nil {
rb.cancelInFlightTask(req.task)
req.payload = nil
continue
}

go rb.handleRequest(req)
}
}
Expand Down Expand Up @@ -534,6 +537,7 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) {
func (rb *RequestBuffer) afterRequest(req *request, containerId string) {
defer func() {
req.done <- true
req.payload = nil
}()

defer rb.releaseRequestToken(containerId, req.task.msg.TaskId)
Expand Down
4 changes: 2 additions & 2 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (es *HttpEndpointService) forwardRequest(
ttl = DefaultEndpointRequestTTL
}

task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, payload, types.TaskPolicy{
task, err := es.taskDispatcher.Send(ctx.Request().Context(), string(types.ExecutorEndpoint), authInfo, stubId, &types.TaskPayload{}, types.TaskPolicy{
MaxRetries: 0,
Timeout: instance.StubConfig.TaskPolicy.Timeout,
Expires: time.Now().Add(time.Duration(ttl) * time.Second),
Expand All @@ -209,7 +209,7 @@ func (es *HttpEndpointService) forwardRequest(
return err
}

return task.Execute(ctx.Request().Context(), ctx)
return task.Execute(ctx.Request().Context(), ctx, payload)
}

func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/abstractions/endpoint/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type EndpointTask struct {
func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error {
var err error = nil
echoCtx := options[0].(echo.Context)
payload := options[1].(*types.TaskPayload)

instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId)
if err != nil {
return err
Expand All @@ -30,7 +32,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro
return err
}

return instance.buffer.ForwardRequest(echoCtx, t)
return instance.buffer.ForwardRequest(echoCtx, t, payload)
}

func (t *EndpointTask) Retry(ctx context.Context) error {
Expand Down

0 comments on commit b36b742

Please sign in to comment.