Skip to content

Commit

Permalink
feat: graceful bridge shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Artsiom Koltun <[email protected]>
  • Loading branch information
artek-koltun committed Oct 17, 2023
1 parent ef4f782 commit 1841eca
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 25 deletions.
52 changes: 27 additions & 25 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"net"
"net/http"
"os"
"strings"
"time"

Expand Down Expand Up @@ -71,29 +72,33 @@ func main() {

flag.Parse()

sh := utils.NewShutdownHandler(2 * time.Second)

// Create KV store for persistence
options := gomap.DefaultOptions
options.Codec = utils.ProtoCodec{}
// TODO: we can change to redis or badger at any given time
store := gomap.NewStore(options)
defer func(store gokv.Store) {
err := store.Close()
if err != nil {
log.Panic(err)
}
}(store)
sh.AddGokvStore(store)

go runGatewayServer(grpcPort, httpPort)
runGrpcServer(grpcPort, useKvm, store, spdkAddress, qmpAddress, ctrlrDir, busesStr, tlsFiles)
runGrpcServer(grpcPort, useKvm, store, spdkAddress, qmpAddress, ctrlrDir, busesStr, tlsFiles, sh)
runGatewayServer(grpcPort, httpPort, sh)

if err := sh.RunAndWait(); err != nil {
log.Printf("Bridge error: %v", err)
os.Exit(-1)
}
log.Print("Bridge successfully stopped")
}

func runGrpcServer(grpcPort int, useKvm bool, store gokv.Store, spdkAddress, qmpAddress, ctrlrDir, busesStr, tlsFiles string) {
func runGrpcServer(
grpcPort int,
useKvm bool,
store gokv.Store,
spdkAddress, qmpAddress, ctrlrDir, busesStr, tlsFiles string,
sh *utils.ShutdownHandler) {
tp := utils.InitTracerProvider("opi-spdk-bridge")
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Panicf("Tracer Provider Shutdown: %v", err)
}
}()
sh.AddTraceProvider(tp)

buses := splitBusesBySeparator(busesStr)

Expand Down Expand Up @@ -171,16 +176,17 @@ func runGrpcServer(grpcPort int, useKvm bool, store gokv.Store, spdkAddress, qmp

reflection.Register(s)

log.Printf("gRPC server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Panicf("failed to serve: %v", err)
}
sh.AddGrpcServer(s, lis)
}

func runGatewayServer(grpcPort int, httpPort int) {
func runGatewayServer(grpcPort int, httpPort int, sh *utils.ShutdownHandler) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sh.AddShutdown(func(_ context.Context) error {
log.Println("Canceling context to close HTTP gateway endpoint to gRPC server")
cancel()
return nil
})

// Register gRPC server endpoint
// Note: Make sure the gRPC server is running properly and accessible
Expand All @@ -192,15 +198,11 @@ func runGatewayServer(grpcPort int, httpPort int) {
}

// Start HTTP server (and proxy calls to gRPC server endpoint)
log.Printf("HTTP Server listening at %v", httpPort)
server := &http.Server{
Addr: fmt.Sprintf(":%d", httpPort),
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
err = server.ListenAndServe()
if err != nil {
log.Panic("cannot start HTTP gateway server")
}
sh.AddHTTPServer(server)
}
191 changes: 191 additions & 0 deletions pkg/utils/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (C) 2023 Intel Corporation

// Package utils contains utility functions
package utils

import (
"context"
"errors"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/philippgille/gokv"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

// ServeFunc function to run service job
type ServeFunc func() error

// ShutdownFunc function to perform shutdown of a service
type ShutdownFunc func(ctx context.Context) error

// ShutdownHandler is responsible for running services and perform their shutdowns
// on service error or signals
type ShutdownHandler struct {
waitSignal chan os.Signal
timeoutPerShutdown time.Duration

mu sync.Mutex
serves []ServeFunc
shutdowns []ShutdownFunc

eg *errgroup.Group
egCtx context.Context
}

// NewShutdownHandler creates an instance of ShutdownHandler
func NewShutdownHandler(
timeoutPerShutdown time.Duration,
) *ShutdownHandler {
eg, egCtx := errgroup.WithContext(context.Background())

return &ShutdownHandler{
waitSignal: make(chan os.Signal, 1),
timeoutPerShutdown: timeoutPerShutdown,

mu: sync.Mutex{},
serves: []ServeFunc{},
shutdowns: []ShutdownFunc{},

eg: eg,
egCtx: egCtx,
}
}

// AddServe adds a service to run ant corresponding shutdown
func (s *ShutdownHandler) AddServe(serve ServeFunc, shutdown ShutdownFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.serves = append(s.serves, serve)
s.shutdowns = append(s.shutdowns, shutdown)
}

// AddShutdown add a shutdown procedure to execute.
// Shutdowns are executed in backward order
func (s *ShutdownHandler) AddShutdown(shutdown ShutdownFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.shutdowns = append(s.shutdowns, shutdown)

Check warning on line 77 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}

// AddGrpcServer adds serve and shutdown procedures for provided gRPC server
func (s *ShutdownHandler) AddGrpcServer(server *grpc.Server, lis net.Listener) {
s.AddServe(
func() error {
log.Printf("gRPC Server listening at %v", lis.Addr())
return server.Serve(lis)
},
func(ctx context.Context) error {
log.Println("Stopping gRPC Server")
return s.runWithCtx(ctx, func() error {
server.GracefulStop()
return nil
})

Check warning on line 92 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L81-L92

Added lines #L81 - L92 were not covered by tests
},
)
}

// AddHTTPServer adds serve and shutdown procedures for provided HTTP server
func (s *ShutdownHandler) AddHTTPServer(server *http.Server) {
s.AddServe(
func() error {
log.Printf("HTTP Server listening at %v", server.Addr)
err := server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}

Check warning on line 105 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L98-L105

Added lines #L98 - L105 were not covered by tests

return err

Check warning on line 107 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L107

Added line #L107 was not covered by tests
},
func(ctx context.Context) error {
log.Println("Stopping HTTP Server")
err := server.Shutdown(ctx)
if err != nil {
cerr := server.Close()
log.Println("HTTP server close error:", cerr)
}
return err

Check warning on line 116 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L109-L116

Added lines #L109 - L116 were not covered by tests
},
)
}

// AddGokvStore adds gokv shutdown procedure
func (s *ShutdownHandler) AddGokvStore(store gokv.Store) {
s.AddShutdown(func(ctx context.Context) error {
log.Println("Stopping gokv storage")
return s.runWithCtx(ctx, func() error {
return store.Close()
})

Check warning on line 127 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L122-L127

Added lines #L122 - L127 were not covered by tests
})
}

// AddTraceProvider adds trace provider shutdown procedure
func (s *ShutdownHandler) AddTraceProvider(tp *sdktrace.TracerProvider) {
s.AddShutdown(func(ctx context.Context) error {
log.Println("Stopping tracer")
return tp.Shutdown(ctx)
})

Check warning on line 136 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L132-L136

Added lines #L132 - L136 were not covered by tests
}

// RunAndWait runs all services and execute shutdowns on a signal received
func (s *ShutdownHandler) RunAndWait() error {
for i := range s.serves {
fn := s.serves[i]
s.eg.Go(func() error {
return fn()
})
}

s.eg.Go(func() error {
signal.Notify(s.waitSignal, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-s.waitSignal:
log.Printf("Got signal: %v", sig)
case <-s.egCtx.Done():
// can be reached if any Serve returned an error. Thus, initiating shutdown
log.Println("A process from errgroup exited with error:", s.egCtx.Err())
}
log.Printf("Start graceful shutdown with timeout per shutdown call: %v", s.timeoutPerShutdown)

s.mu.Lock()
defer s.mu.Unlock()

var err error
for i := len(s.shutdowns) - 1; i >= 0; i-- {
timeoutCtx, cancel := context.WithTimeout(context.Background(), s.timeoutPerShutdown)
defer cancel()
err = errors.Join(err, s.shutdowns[i](timeoutCtx))
}

return err
})

return s.eg.Wait()
}

func (s *ShutdownHandler) runWithCtx(ctx context.Context, fn func() error) error {
var err error

stopped := make(chan struct{}, 1)
func() {
err = fn()
stopped <- struct{}{}
}()

Check warning on line 182 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L175-L182

Added lines #L175 - L182 were not covered by tests

select {
case <-ctx.Done():
err = ctx.Err()
case <-stopped:

Check warning on line 187 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L184-L187

Added lines #L184 - L187 were not covered by tests
}

return err

Check warning on line 190 in pkg/utils/shutdown.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/shutdown.go#L190

Added line #L190 was not covered by tests
}
Loading

0 comments on commit 1841eca

Please sign in to comment.