From 7f1ba0971690e638ab7f960a794e752492f4ec98 Mon Sep 17 00:00:00 2001 From: Luke Lombardi Date: Sun, 14 Jan 2024 11:04:19 -0400 Subject: [PATCH] wip --- internal/abstractions/function/function.go | 7 ++++++- internal/abstractions/function/function.proto | 2 +- internal/repository/base.go | 1 + internal/repository/container_redis.go | 10 ++++++++++ internal/worker/image.go | 6 ++---- proto/function.pb.go | 6 +++--- sdk/src/beam/clients/function.py | 2 +- 7 files changed, 24 insertions(+), 10 deletions(-) diff --git a/internal/abstractions/function/function.go b/internal/abstractions/function/function.go index 45962dcca..f7e628b05 100644 --- a/internal/abstractions/function/function.go +++ b/internal/abstractions/function/function.go @@ -207,8 +207,13 @@ _stream: break _stream } case <-keyEventChan: + exitCode, err := fs.containerRepo.GetContainerExitCode(containerId) + if err != nil { + return err + } + result, _ := fs.rdb.Get(stream.Context(), Keys.FunctionResult(workspaceName, taskId)).Bytes() - if err := stream.Send(&pb.FunctionInvokeResponse{TaskId: taskId, Done: true, Result: result, ExitCode: 0}); err != nil { + if err := stream.Send(&pb.FunctionInvokeResponse{TaskId: taskId, Done: true, Result: result, ExitCode: int32(exitCode)}); err != nil { break } case <-ctx.Done(): diff --git a/internal/abstractions/function/function.proto b/internal/abstractions/function/function.proto index 36e4534cd..723fbaca2 100644 --- a/internal/abstractions/function/function.proto +++ b/internal/abstractions/function/function.proto @@ -22,7 +22,7 @@ message FunctionInvokeResponse { string task_id = 1; string output = 2; bool done = 3; - uint32 exit_code = 4; + int32 exit_code = 4; bytes result = 5; } diff --git a/internal/repository/base.go b/internal/repository/base.go index 9a709fe66..389bc2f6f 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -29,6 +29,7 @@ type ContainerRepository interface { GetContainerState(string) (*types.ContainerState, error) SetContainerState(string, *types.ContainerState) error SetContainerExitCode(string, int) error + GetContainerExitCode(string) (int, error) SetContainerAddress(containerId string, addr string) error UpdateContainerStatus(string, types.ContainerStatus, time.Duration) error DeleteContainerState(*types.ContainerRequest) error diff --git a/internal/repository/container_redis.go b/internal/repository/container_redis.go index 5ce02a86d..93cf852eb 100644 --- a/internal/repository/container_redis.go +++ b/internal/repository/container_redis.go @@ -85,6 +85,16 @@ func (cr *ContainerRedisRepository) SetContainerExitCode(containerId string, exi return nil } +func (cr *ContainerRedisRepository) GetContainerExitCode(containerId string) (int, error) { + exitCodeKey := common.RedisKeys.SchedulerContainerExitCode(containerId) + exitCode, err := cr.rdb.Get(context.TODO(), exitCodeKey).Int() + if err != nil { + return -1, err + } + + return exitCode, nil +} + func (cr *ContainerRedisRepository) UpdateContainerStatus(containerId string, status types.ContainerStatus, expiry time.Duration) error { switch status { case types.ContainerStatusPending, types.ContainerStatusRunning, types.ContainerStatusStopping: diff --git a/internal/worker/image.go b/internal/worker/image.go index 59e7493b5..a74bca347 100644 --- a/internal/worker/image.go +++ b/internal/worker/image.go @@ -308,7 +308,6 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st if err != nil { log.Printf("Unable to create archive: %v\n", err) - // outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to archive image."} return err } log.Printf("Container <%v> archive took %v\n", imageId, time.Since(startTime)) @@ -317,11 +316,10 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st startTime = time.Now() err = i.registry.Push(ctx, archivePath, imageId) if err != nil { - log.Printf("Failed to push image for image <%v>: %v\n", imageId, err) - // outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to push image."} + log.Printf("Failed to push image <%v>: %v\n", imageId, err) return err } - log.Printf("Container <%v> push took %v\n", imageId, time.Since(startTime)) + log.Printf("Image <%v> push took %v\n", imageId, time.Since(startTime)) return nil } diff --git a/proto/function.pb.go b/proto/function.pb.go index 4e9444844..cb3a5addd 100644 --- a/proto/function.pb.go +++ b/proto/function.pb.go @@ -83,7 +83,7 @@ type FunctionInvokeResponse struct { TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` Output string `protobuf:"bytes,2,opt,name=output,proto3" json:"output,omitempty"` Done bool `protobuf:"varint,3,opt,name=done,proto3" json:"done,omitempty"` - ExitCode uint32 `protobuf:"varint,4,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` + ExitCode int32 `protobuf:"varint,4,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` Result []byte `protobuf:"bytes,5,opt,name=result,proto3" json:"result,omitempty"` } @@ -140,7 +140,7 @@ func (x *FunctionInvokeResponse) GetDone() bool { return false } -func (x *FunctionInvokeResponse) GetExitCode() uint32 { +func (x *FunctionInvokeResponse) GetExitCode() int32 { if x != nil { return x.ExitCode } @@ -374,7 +374,7 @@ var file_function_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x6f, 0x6e, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x04, 0x64, 0x6f, 0x6e, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0d, 0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, + 0x01, 0x28, 0x05, 0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x31, 0x0a, 0x16, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x47, 0x65, 0x74, 0x41, 0x72, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, diff --git a/sdk/src/beam/clients/function.py b/sdk/src/beam/clients/function.py index f5187b43c..2e988dd64 100644 --- a/sdk/src/beam/clients/function.py +++ b/sdk/src/beam/clients/function.py @@ -19,7 +19,7 @@ class FunctionInvokeResponse(betterproto.Message): task_id: str = betterproto.string_field(1) output: str = betterproto.string_field(2) done: bool = betterproto.bool_field(3) - exit_code: int = betterproto.uint32_field(4) + exit_code: int = betterproto.int32_field(4) result: bytes = betterproto.bytes_field(5)