-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use flow-go Components for composing #682
base: main
Are you sure you want to change the base?
Changes from all commits
ade23d5
78ccaf3
86aa27c
f076067
91199a1
a333610
2e04cb5
4984428
e9e537f
3e04bc3
6813473
fc2f653
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -19,6 +19,10 @@ import ( | |||||||||||||||||||
"time" | ||||||||||||||||||||
|
||||||||||||||||||||
"github.com/onflow/go-ethereum/core" | ||||||||||||||||||||
|
||||||||||||||||||||
"github.com/onflow/flow-go/module/component" | ||||||||||||||||||||
"github.com/onflow/flow-go/module/irrecoverable" | ||||||||||||||||||||
|
||||||||||||||||||||
gethVM "github.com/onflow/go-ethereum/core/vm" | ||||||||||||||||||||
gethLog "github.com/onflow/go-ethereum/log" | ||||||||||||||||||||
"github.com/onflow/go-ethereum/rpc" | ||||||||||||||||||||
|
@@ -57,8 +61,12 @@ type Server struct { | |||||||||||||||||||
|
||||||||||||||||||||
config config.Config | ||||||||||||||||||||
collector metrics.Collector | ||||||||||||||||||||
|
||||||||||||||||||||
startupCompleted chan struct{} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
var _ component.Component = (*Server)(nil) | ||||||||||||||||||||
|
||||||||||||||||||||
const ( | ||||||||||||||||||||
shutdownTimeout = 5 * time.Second | ||||||||||||||||||||
batchRequestLimit = 50 | ||||||||||||||||||||
|
@@ -79,10 +87,11 @@ func NewServer( | |||||||||||||||||||
gethLog.SetDefault(gethLog.NewLogger(zeroSlog)) | ||||||||||||||||||||
|
||||||||||||||||||||
return &Server{ | ||||||||||||||||||||
logger: logger, | ||||||||||||||||||||
timeouts: rpc.DefaultHTTPTimeouts, | ||||||||||||||||||||
config: cfg, | ||||||||||||||||||||
collector: collector, | ||||||||||||||||||||
logger: logger, | ||||||||||||||||||||
timeouts: rpc.DefaultHTTPTimeouts, | ||||||||||||||||||||
config: cfg, | ||||||||||||||||||||
collector: collector, | ||||||||||||||||||||
startupCompleted: make(chan struct{}), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
|
@@ -179,9 +188,10 @@ func (h *Server) disableWS() bool { | |||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Start starts the HTTP server if it is enabled and not already running. | ||||||||||||||||||||
func (h *Server) Start() error { | ||||||||||||||||||||
func (h *Server) Start(ctx irrecoverable.SignalerContext) { | ||||||||||||||||||||
defer close(h.startupCompleted) | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be called after the check on line 193? |
||||||||||||||||||||
if h.endpoint == "" || h.listener != nil { | ||||||||||||||||||||
return nil // already running or not configured | ||||||||||||||||||||
return // already running or not configured | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Initialize the server. | ||||||||||||||||||||
|
@@ -192,16 +202,21 @@ func (h *Server) Start() error { | |||||||||||||||||||
h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout | ||||||||||||||||||||
h.server.WriteTimeout = h.timeouts.WriteTimeout | ||||||||||||||||||||
h.server.IdleTimeout = h.timeouts.IdleTimeout | ||||||||||||||||||||
h.server.BaseContext = func(_ net.Listener) context.Context { | ||||||||||||||||||||
return ctx | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
listenConfig := net.ListenConfig{} | ||||||||||||||||||||
// Start the server. | ||||||||||||||||||||
listener, err := net.Listen("tcp", h.endpoint) | ||||||||||||||||||||
listener, err := listenConfig.Listen(ctx, "tcp", h.endpoint) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
// If the server fails to start, we need to clear out the RPC and WS | ||||||||||||||||||||
// configurations so they can be configured another time. | ||||||||||||||||||||
h.disableRPC() | ||||||||||||||||||||
h.disableWS() | ||||||||||||||||||||
return err | ||||||||||||||||||||
ctx.Throw(err) | ||||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
h.listener = listener | ||||||||||||||||||||
|
@@ -213,7 +228,7 @@ func (h *Server) Start() error { | |||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
h.logger.Err(err).Msg("failed to start API server") | ||||||||||||||||||||
panic(err) | ||||||||||||||||||||
ctx.Throw(err) | ||||||||||||||||||||
} | ||||||||||||||||||||
}() | ||||||||||||||||||||
|
||||||||||||||||||||
|
@@ -225,8 +240,17 @@ func (h *Server) Start() error { | |||||||||||||||||||
url := fmt.Sprintf("ws://%v", listener.Addr()) | ||||||||||||||||||||
h.logger.Info().Msgf("JSON-RPC over WebSocket enabled: %s", url) | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
return nil | ||||||||||||||||||||
func (h *Server) Ready() <-chan struct{} { | ||||||||||||||||||||
ready := make(chan struct{}) | ||||||||||||||||||||
|
||||||||||||||||||||
go func() { | ||||||||||||||||||||
<-h.startupCompleted | ||||||||||||||||||||
close(ready) | ||||||||||||||||||||
}() | ||||||||||||||||||||
|
||||||||||||||||||||
return ready | ||||||||||||||||||||
Comment on lines
+246
to
+253
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need the second channel?
Suggested change
|
||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// disableRPC stops the JSON-RPC over HTTP handler. | ||||||||||||||||||||
|
@@ -296,41 +320,50 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||||||||||||||||||
w.WriteHeader(http.StatusNotFound) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Stop shuts down the HTTP server. | ||||||||||||||||||||
func (h *Server) Stop() { | ||||||||||||||||||||
if h.listener == nil { | ||||||||||||||||||||
return // not running | ||||||||||||||||||||
} | ||||||||||||||||||||
// Done shuts down the HTTP server. | ||||||||||||||||||||
func (h *Server) Done() <-chan struct{} { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is non-idempotent and breaks from the component interface because shutdown is triggered by calling I think we should use the component manager, or a similar pattern where |
||||||||||||||||||||
done := make(chan struct{}) | ||||||||||||||||||||
|
||||||||||||||||||||
// Shut down the server. | ||||||||||||||||||||
httpHandler := h.httpHandler | ||||||||||||||||||||
if httpHandler != nil { | ||||||||||||||||||||
httpHandler.server.Stop() | ||||||||||||||||||||
h.httpHandler = nil | ||||||||||||||||||||
} | ||||||||||||||||||||
go func() { | ||||||||||||||||||||
defer close(done) | ||||||||||||||||||||
|
||||||||||||||||||||
wsHandler := h.wsHandler | ||||||||||||||||||||
if wsHandler != nil { | ||||||||||||||||||||
wsHandler.server.Stop() | ||||||||||||||||||||
h.wsHandler = nil | ||||||||||||||||||||
} | ||||||||||||||||||||
if h.listener == nil { | ||||||||||||||||||||
return // not running | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) | ||||||||||||||||||||
defer cancel() | ||||||||||||||||||||
err := h.server.Shutdown(ctx) | ||||||||||||||||||||
if err != nil && err == ctx.Err() { | ||||||||||||||||||||
h.logger.Warn().Msg("HTTP server graceful shutdown timed out") | ||||||||||||||||||||
h.server.Close() | ||||||||||||||||||||
} | ||||||||||||||||||||
// Shut down the server. | ||||||||||||||||||||
httpHandler := h.httpHandler | ||||||||||||||||||||
if httpHandler != nil { | ||||||||||||||||||||
httpHandler.server.Stop() | ||||||||||||||||||||
h.httpHandler = nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
wsHandler := h.wsHandler | ||||||||||||||||||||
if wsHandler != nil { | ||||||||||||||||||||
wsHandler.server.Stop() | ||||||||||||||||||||
h.wsHandler = nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) | ||||||||||||||||||||
defer cancel() | ||||||||||||||||||||
err := h.server.Shutdown(ctx) | ||||||||||||||||||||
if err != nil && err == ctx.Err() { | ||||||||||||||||||||
h.logger.Warn().Msg("HTTP server graceful shutdown timed out") | ||||||||||||||||||||
h.server.Close() | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
h.listener.Close() | ||||||||||||||||||||
h.logger.Info().Msgf( | ||||||||||||||||||||
"HTTP server stopped, endpoint: %s", h.listener.Addr(), | ||||||||||||||||||||
) | ||||||||||||||||||||
h.listener.Close() | ||||||||||||||||||||
h.logger.Info().Msgf( | ||||||||||||||||||||
"HTTP server stopped, endpoint: %s", h.listener.Addr(), | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
// Clear out everything to allow re-configuring it later. | ||||||||||||||||||||
h.host, h.port, h.endpoint = "", 0, "" | ||||||||||||||||||||
h.server, h.listener = nil, nil | ||||||||||||||||||||
|
||||||||||||||||||||
}() | ||||||||||||||||||||
|
||||||||||||||||||||
// Clear out everything to allow re-configuring it later. | ||||||||||||||||||||
h.host, h.port, h.endpoint = "", 0, "" | ||||||||||||||||||||
h.server, h.listener = nil, nil | ||||||||||||||||||||
return done | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// CheckTimeouts ensures that timeout values are meaningful | ||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -178,7 +178,7 @@ func newSubscription[T any]( | |||||||||||||||||||||||
|
||||||||||||||||||||||||
rpcSub := notifier.CreateSubscription() | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
subs := models.NewSubscription(logger, callback(notifier, rpcSub)) | ||||||||||||||||||||||||
subs := models.NewSubscription(callback(notifier, rpcSub)) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
l := logger.With(). | ||||||||||||||||||||||||
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)). | ||||||||||||||||||||||||
|
@@ -190,16 +190,8 @@ func newSubscription[T any]( | |||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||
defer publisher.Unsubscribe(subs) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
for { | ||||||||||||||||||||||||
select { | ||||||||||||||||||||||||
case err := <-subs.Error(): | ||||||||||||||||||||||||
l.Debug().Err(err).Msg("subscription returned error") | ||||||||||||||||||||||||
return | ||||||||||||||||||||||||
case err := <-rpcSub.Err(): | ||||||||||||||||||||||||
l.Debug().Err(err).Msg("client unsubscribed") | ||||||||||||||||||||||||
return | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
err := <-rpcSub.Err() | ||||||||||||||||||||||||
l.Debug().Err(err).Msg("client unsubscribed") | ||||||||||||||||||||||||
Comment on lines
+193
to
+194
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle errors from the subscription to prevent unhandled errors Currently, the code only listens for errors from Apply this diff: go func() {
defer publisher.Unsubscribe(subs)
- err := <-rpcSub.Err()
- l.Debug().Err(err).Msg("client unsubscribed")
+ select {
+ case err := <-rpcSub.Err():
+ l.Debug().Err(err).Msg("client unsubscribed")
+ case err := <-subs.Err():
+ l.Error().Err(err).Msg("subscription error")
+ }
}() 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||
}() | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
l.Info().Msg("new heads subscription created") | ||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to use a component manager here?