Skip to content

Commit

Permalink
fix: shutdown workers on SIGTERM or SIGINT (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyecusch authored Oct 14, 2024
1 parent 73bcbce commit 9290bb2
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 211 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: 1.21
go-version: 1.23
- name: Check License Headers
run: make license-header-check
- name: Run Tests
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,4 @@ require (
go-simpler.org/musttag v0.12.2 // indirect
go-simpler.org/sloglint v0.7.2 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/goleak v1.3.0 // indirect
)
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/firefart/nonamedreturns v1.0.5 h1:tM+Me2ZaXs8tfdDw3X6DOX++wMCOqzYUho6tUTYIdRA=
github.com/firefart/nonamedreturns v1.0.5/go.mod h1:gHJjDqhGM4WyPt639SOZs+G89Ko7QKH5R5BhnO6xJhw=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
Expand Down Expand Up @@ -424,8 +424,6 @@ github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhK
github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs=
github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk=
github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c=
github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba h1:ZIPl9waqhbqw3xB2+zpUI2T1kEHyMkOnZZMt6tdrEUM=
github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba/go.mod h1:4LQH9hea9rX+0A+8G47NRk5nZuXCDqiwci1aZsHAkU8=
github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b h1:ImQFk66gRM3v9A6qmPImOiV3HJMDAX93X5rplMKn6ok=
github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b/go.mod h1:9bQnYPqLzq8CcPk5MHT3phg19CWJhDlFOfdIv27lwwM=
github.com/nitrictech/protoutils v0.0.0-20220321044654-02667a814cdf h1:8MB8W8ylM8sCM2COGfiO39/tB6BTdiawLszaUGCNL5w=
Expand Down Expand Up @@ -481,8 +479,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
Expand Down Expand Up @@ -643,8 +640,7 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
Expand Down
1 change: 1 addition & 0 deletions licenses.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
_ "github.com/nitrictech/go-sdk/nitric"
_ "github.com/nitrictech/go-sdk/nitric/apis"
_ "github.com/nitrictech/go-sdk/nitric/batch"
_ "github.com/nitrictech/go-sdk/nitric/errors"
_ "github.com/nitrictech/go-sdk/nitric/keyvalue"
_ "github.com/nitrictech/go-sdk/nitric/queues"
Expand Down
59 changes: 26 additions & 33 deletions nitric/apis/workers.go → nitric/apis/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package apis
import (
"context"
errorsstd "errors"
"io"

grpcx "github.com/nitrictech/go-sdk/internal/grpc"
"github.com/nitrictech/go-sdk/internal/handlers"
Expand All @@ -40,54 +39,48 @@ type apiWorkerOpts struct {

var _ workers.StreamWorker = (*apiWorker)(nil)

// Start implements Worker.
// Start runs the API worker, creating a stream to the Nitric server
func (a *apiWorker) Start(ctx context.Context) error {
initReq := &v1.ClientMessage{
Content: &v1.ClientMessage_RegistrationRequest{
RegistrationRequest: a.registrationRequest,
},
}

stream, err := a.client.Serve(ctx)
if err != nil {
return err
}

err = stream.Send(initReq)
if err != nil {
return err
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
return a.client.Serve(ctx)
}

for {
var ctx *Ctx

resp, err := stream.Recv()

if errorsstd.Is(err, io.EOF) {
err = stream.CloseSend()
if err != nil {
return err
}
handlerSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
if msg.GetRegistrationResponse() != nil {
// No need to respond to the registration response
return nil, nil
}

return nil
} else if err == nil && resp.GetRegistrationResponse() != nil {
// There is no need to respond to the registration response
} else if err == nil && resp.GetHttpRequest() != nil {
ctx = NewCtx(resp)
if msg.GetHttpRequest() != nil {
handlerCtx := NewCtx(msg)

err = a.Handler(ctx)
err := a.Handler(handlerCtx)
if err != nil {
ctx.WithError(err)
handlerCtx.WithError(err)
}

err = stream.Send(ctx.ToClientMessage())
if err != nil {
return err
}
} else {
return err
return handlerCtx.ToClientMessage(), nil
}

return nil, errors.NewWithCause(
codes.Internal,
"ApiWorker: Unhandled server message",
errorsstd.New("unhandled server message"),
)
}

return workers.HandleStream(
ctx,
createStream,
initReq,
handlerSrvMsg,
)
}

func newApiWorker(opts *apiWorkerOpts) *apiWorker {
Expand Down
51 changes: 18 additions & 33 deletions nitric/batch/batch_workers.go → nitric/batch/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package batch

import (
"context"
"io"

"google.golang.org/grpc"

Expand All @@ -25,6 +24,7 @@ import (
"github.com/nitrictech/go-sdk/constants"
"github.com/nitrictech/go-sdk/nitric/errors"
"github.com/nitrictech/go-sdk/nitric/errors/codes"
"github.com/nitrictech/go-sdk/nitric/workers"
v1 "github.com/nitrictech/nitric/core/pkg/proto/batch/v1"
)

Expand All @@ -38,53 +38,38 @@ type jobWorkerOpts struct {
Handler Handler
}

// Start implements Worker.
// Start runs the Job worker, creating a stream to the Nitric server
func (s *jobWorker) Start(ctx context.Context) error {
initReq := &v1.ClientMessage{
Content: &v1.ClientMessage_RegistrationRequest{
RegistrationRequest: s.registrationRequest,
},
}

// Create the request stream and send the initial request
stream, err := s.client.HandleJob(ctx)
if err != nil {
return err
}

err = stream.Send(initReq)
if err != nil {
return err
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
return s.client.HandleJob(ctx)
}
for {
var ctx *Ctx

resp, err := stream.Recv()
handleSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
if msg.GetJobRequest() != nil {
handlerCtx := NewCtx(msg)

if errorsstd.Is(err, io.EOF) {
err = stream.CloseSend()
err := s.handler(handlerCtx)
if err != nil {
return err
handlerCtx.WithError(err)
}

return nil
} else if err == nil && resp.GetRegistrationResponse() != nil {
// Do nothing
} else if err == nil && resp.GetJobRequest() != nil {
ctx = NewCtx(resp)
err = s.handler(ctx)
if err != nil {
ctx.WithError(err)
}

err = stream.Send(ctx.ToClientMessage())
if err != nil {
return err
}
} else {
return err
return handlerCtx.ToClientMessage(), nil
}

return nil, errors.NewWithCause(
codes.Internal,
"JobWorker: Unhandled server message",
errorsstd.New("unhandled server message"),
)
}

return workers.HandleStream(ctx, createStream, initReq, handleSrvMsg)
}

func newJobWorker(opts *jobWorkerOpts) *jobWorker {
Expand Down
20 changes: 19 additions & 1 deletion nitric/nitric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
package nitric

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/nitrictech/go-sdk/nitric/apis"
"github.com/nitrictech/go-sdk/nitric/batch"
"github.com/nitrictech/go-sdk/nitric/keyvalue"
Expand Down Expand Up @@ -42,7 +48,19 @@ var (
)

func Run() {
err := workers.GetDefaultManager().Run()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-sigChan
fmt.Printf("Received signal, shutting down...\n")
cancel()
}()

err := workers.GetDefaultManager().Run(ctx)
if err != nil {
panic(err)
}
Expand Down
55 changes: 22 additions & 33 deletions nitric/schedules/schedule_workers.go → nitric/schedules/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package schedules
import (
"context"
errorsstd "errors"
"io"

grpcx "github.com/nitrictech/go-sdk/internal/grpc"
"github.com/nitrictech/go-sdk/internal/handlers"
"github.com/nitrictech/go-sdk/nitric/errors"
"github.com/nitrictech/go-sdk/nitric/errors/codes"
"github.com/nitrictech/go-sdk/nitric/workers"
v1 "github.com/nitrictech/nitric/core/pkg/proto/schedules/v1"
)

Expand All @@ -36,53 +36,42 @@ type scheduleWorkerOpts struct {
Handler handlers.Handler[Ctx]
}

// Start implements Worker.
// Start runs the Schedule worker, creating a stream to the Nitric server
func (i *scheduleWorker) Start(ctx context.Context) error {
initReq := &v1.ClientMessage{
Content: &v1.ClientMessage_RegistrationRequest{
RegistrationRequest: i.registrationRequest,
},
}

// Create the request stream and send the initial request
stream, err := i.client.Schedule(ctx)
if err != nil {
return err
createStream := func(ctx context.Context) (workers.Stream[v1.ClientMessage, v1.RegistrationResponse, *v1.ServerMessage], error) {
return i.client.Schedule(ctx)
}

err = stream.Send(initReq)
if err != nil {
return err
}
for {
var ctx *Ctx

resp, err := stream.Recv()

if errorsstd.Is(err, io.EOF) {
err = stream.CloseSend()
if err != nil {
return err
}
handlerSrvMsg := func(msg *v1.ServerMessage) (*v1.ClientMessage, error) {
if msg.GetIntervalRequest() != nil {
handlerCtx := NewCtx(msg)

return nil
} else if err == nil && resp.GetRegistrationResponse() != nil {
// There is no need to respond to the registration response
} else if err == nil && resp.GetIntervalRequest() != nil {
ctx = NewCtx(resp)
err = i.handler(ctx)
err := i.handler(handlerCtx)
if err != nil {
ctx.WithError(err)
handlerCtx.WithError(err)
}

err = stream.Send(ctx.ToClientMessage())
if err != nil {
return err
}
} else {
return err
return handlerCtx.ToClientMessage(), nil
}
return nil, errors.NewWithCause(
codes.Internal,
"ScheduleWorker: Unhandled server message",
errorsstd.New("unhandled server message"),
)
}

return workers.HandleStream(
ctx,
createStream,
initReq,
handlerSrvMsg,
)
}

func newScheduleWorker(opts *scheduleWorkerOpts) *scheduleWorker {
Expand Down
Loading

0 comments on commit 9290bb2

Please sign in to comment.