Skip to content

Commit

Permalink
rollup-client: Increase call timeouts in CI (ethereum-optimism#12561)
Browse files Browse the repository at this point in the history
Refactors the op-service/client package a fair bit to ensure config options are more consistently applied.
  • Loading branch information
ajsutton authored Oct 22, 2024
1 parent b93daad commit 2d08d19
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 76 deletions.
2 changes: 1 addition & 1 deletion op-chain-ops/cmd/check-canyon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func main() {
// Parse the command-line arguments
flag.Parse()

l2RPC, err := client.NewRPC(context.Background(), logger, rpcURL, client.WithDialBackoff(10))
l2RPC, err := client.NewRPC(context.Background(), logger, rpcURL, client.WithDialAttempts(10))
if err != nil {
log.Crit("Error creating RPC", "err", err)
}
Expand Down
5 changes: 4 additions & 1 deletion op-e2e/system/e2esys/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,10 @@ func (sys *System) RollupClient(name string) *sources.RollupClient {
require.NoError(sys.t, err, "failed to dial rollup instance %s", name)
return cl
})
rollupClient = sources.NewRollupClient(client.NewBaseRPCClient(rpcClient))
rollupClient = sources.NewRollupClient(client.NewBaseRPCClient(rpcClient,
// Increase timeouts because CI servers can be under a lot of load
client.WithCallTimeout(30*time.Second),
client.WithBatchCallTimeout(30*time.Second)))
sys.rollupClients[name] = rollupClient
return rollupClient
}
Expand Down
4 changes: 2 additions & 2 deletions op-node/node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret))
opts := []client.RPCOption{
client.WithGethRPCOptions(auth),
client.WithDialBackoff(10),
client.WithDialAttempts(10),
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, opts...)
if err != nil {
Expand Down Expand Up @@ -140,7 +140,7 @@ func (cfg *L1EndpointConfig) Check() error {
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) {
opts := []client.RPCOption{
client.WithHttpPollInterval(cfg.HttpPollInterval),
client.WithDialBackoff(10),
client.WithDialAttempts(10),
}
if cfg.RateLimit != 0 {
opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize))
Expand Down
8 changes: 4 additions & 4 deletions op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestOutputAtBlock(t *testing.T) {
require.NoError(t, server.Stop(context.Background()))
}()

client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3))
require.NoError(t, err)

var out *eth.OutputResponse
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestVersion(t *testing.T) {
require.NoError(t, server.Stop(context.Background()))
}()

client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3))
assert.NoError(t, err)

var out string
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestSyncStatus(t *testing.T) {
require.NoError(t, server.Stop(context.Background()))
}()

client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3))
assert.NoError(t, err)

var out *eth.SyncStatus
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestSafeHeadAtL1Block(t *testing.T) {
require.NoError(t, server.Stop(context.Background()))
}()

client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3))
client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3))
require.NoError(t, err)

var out *eth.SafeHeadResponse
Expand Down
4 changes: 2 additions & 2 deletions op-program/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV
return nil, nil
}
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL, client.WithDialBackoff(10))
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL, client.WithDialAttempts(10))
if err != nil {
return nil, fmt.Errorf("failed to setup L1 RPC: %w", err)
}

logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL, client.WithDialBackoff(10))
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL, client.WithDialAttempts(10))
if err != nil {
return nil, fmt.Errorf("failed to setup L2 RPC: %w", err)
}
Expand Down
28 changes: 14 additions & 14 deletions op-service/client/lazy_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

// LazyRPC defers connection attempts to the usage of the RPC.
// lazyRPC defers connection attempts to the usage of the RPC.
// This allows a websocket connection to be established lazily.
// The underlying RPC should handle reconnects.
type LazyRPC struct {
type lazyRPC struct {
// mutex to prevent more than one active dial attempt at a time.
mu sync.Mutex
// inner is the actual RPC client.
// It is initialized once. The underlying RPC handles reconnections.
inner RPC
// options to initialize `inner` with.
opts []rpc.ClientOption
cfg rpcConfig
endpoint string
// If we have not initialized `inner` yet,
// do not try to do so after closing the client.
closed bool
}

var _ RPC = (*LazyRPC)(nil)
var _ RPC = (*lazyRPC)(nil)

func NewLazyRPC(endpoint string, opts ...rpc.ClientOption) *LazyRPC {
return &LazyRPC{
opts: opts,
func newLazyRPC(endpoint string, cfg rpcConfig) *lazyRPC {
return &lazyRPC{
cfg: cfg,
endpoint: endpoint,
}
}

func (l *LazyRPC) dial(ctx context.Context) error {
func (l *lazyRPC) dial(ctx context.Context) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.inner != nil {
Expand All @@ -45,15 +45,15 @@ func (l *LazyRPC) dial(ctx context.Context) error {
if l.closed {
return errors.New("cannot dial RPC, client was already closed")
}
underlying, err := rpc.DialOptions(ctx, l.endpoint, l.opts...)
underlying, err := rpc.DialOptions(ctx, l.endpoint, l.cfg.gethRPCOptions...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
l.inner = NewBaseRPCClient(underlying)
l.inner = wrapClient(underlying, l.cfg)
return nil
}

func (l *LazyRPC) Close() {
func (l *lazyRPC) Close() {
l.mu.Lock()
defer l.mu.Unlock()
if l.inner != nil {
Expand All @@ -62,21 +62,21 @@ func (l *LazyRPC) Close() {
l.closed = true
}

func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
func (l *lazyRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
if err := l.dial(ctx); err != nil {
return err
}
return l.inner.CallContext(ctx, result, method, args...)
}

func (l *LazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
func (l *lazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
if err := l.dial(ctx); err != nil {
return err
}
return l.inner.BatchCallContext(ctx, b)
}

func (l *LazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
func (l *lazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
if err := l.dial(ctx); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion op-service/client/lazy_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestLazyRPC(t *testing.T) {

addr := listener.Addr().String()

cl := NewLazyRPC("ws://" + addr)
cl := newLazyRPC("ws://"+addr, applyOptions(nil))
defer cl.Close()

// At this point the connection is online, but the RPC is not.
Expand Down
108 changes: 60 additions & 48 deletions op-service/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"regexp"
"time"

"golang.org/x/time/rate"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -38,77 +37,92 @@ type rpcConfig struct {
lazy bool
callTimeout time.Duration
batchCallTimeout time.Duration
fixedDialBackoff time.Duration
}

type RPCOption func(cfg *rpcConfig) error
type RPCOption func(cfg *rpcConfig)

func WithCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.callTimeout = d
return nil
}
}

func WithBatchCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.batchCallTimeout = d
return nil
}
}

// WithDialBackoff configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy.
func WithDialBackoff(attempts int) RPCOption {
return func(cfg *rpcConfig) error {
// WithDialAttempts configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy by default.
func WithDialAttempts(attempts int) RPCOption {
return func(cfg *rpcConfig) {
cfg.backoffAttempts = attempts
return nil
}
}

// WithFixedDialBackoff makes the RPC client use a fixed delay between dial attempts of 2 seconds instead of exponential
func WithFixedDialBackoff(d time.Duration) RPCOption {
return func(cfg *rpcConfig) {
cfg.fixedDialBackoff = d
}
}

// WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available.
func WithHttpPollInterval(duration time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.httpPollInterval = duration
return nil
}
}

// WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance.
func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.gethRPCOptions = append(cfg.gethRPCOptions, gethRPCOptions...)
return nil
}
}

// WithRateLimit configures the RPC to target the given rate limit (in requests / second).
// See NewRateLimitingClient for more details.
func WithRateLimit(rateLimit float64, burst int) RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.limit = rateLimit
cfg.burst = burst
return nil
}
}

// WithLazyDial makes the RPC client initialization defer the initial connection attempt,
// and defer to later RPC requests upon subsequent dial errors.
// Any dial-backoff option will be ignored if this option is used.
// This is implemented by wrapping the inner RPC client with a LazyRPC.
func WithLazyDial() RPCOption {
return func(cfg *rpcConfig) error {
return func(cfg *rpcConfig) {
cfg.lazy = true
return nil
}
}

// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) {
var cfg rpcConfig
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err)
cfg := applyOptions(opts)

var wrapped RPC
if cfg.lazy {
wrapped = newLazyRPC(addr, cfg)
} else {
underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg)
if err != nil {
return nil, err
}
wrapped = wrapClient(underlying, cfg)
}

return NewRPCWithClient(ctx, lgr, addr, wrapped, cfg.httpPollInterval)
}

func applyOptions(opts []RPCOption) rpcConfig {
var cfg rpcConfig
for _, opt := range opts {
opt(&cfg)
}

if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial.
Expand All @@ -120,23 +134,7 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
if cfg.batchCallTimeout == 0 {
cfg.batchCallTimeout = 20 * time.Second
}

var wrapped RPC
if cfg.lazy {
wrapped = NewLazyRPC(addr, cfg.gethRPCOptions...)
} else {
underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.gethRPCOptions...)
if err != nil {
return nil, err
}
wrapped = &BaseRPCClient{c: underlying, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout}
}

if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
}

return NewRPCWithClient(ctx, lgr, addr, wrapped, cfg.httpPollInterval)
return cfg
}

// NewRPCWithClient builds a new polling client with the given underlying RPC client.
Expand All @@ -148,14 +146,17 @@ func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlyi
}

// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, cfg rpcConfig) (*rpc.Client, error) {
bOff := retry.Exponential()
return retry.Do(ctx, attempts, bOff, func() (*rpc.Client, error) {
if cfg.fixedDialBackoff != 0 {
bOff = retry.Fixed(cfg.fixedDialBackoff)
}
return retry.Do(ctx, cfg.backoffAttempts, bOff, func() (*rpc.Client, error) {
if !IsURLAvailable(ctx, addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr, opts...)
client, err := rpc.DialOptions(ctx, addr, cfg.gethRPCOptions...)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
Expand Down Expand Up @@ -191,15 +192,26 @@ func IsURLAvailable(ctx context.Context, address string) bool {

// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
// with the client.RPC interface.
// It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it.
// It sets a default timeout of 10s on CallContext & 20s on BatchCallContext made through it.
type BaseRPCClient struct {
c *rpc.Client
batchCallTimeout time.Duration
callTimeout time.Duration
}

func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c, callTimeout: 10 * time.Second, batchCallTimeout: 20 * time.Second}
func NewBaseRPCClient(c *rpc.Client, opts ...RPCOption) RPC {
cfg := applyOptions(opts)
return wrapClient(c, cfg)
}

func wrapClient(c *rpc.Client, cfg rpcConfig) RPC {
var wrapped RPC
wrapped = &BaseRPCClient{c: c, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout}

if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
}
return wrapped
}

func (b *BaseRPCClient) Close() {
Expand Down
Loading

0 comments on commit 2d08d19

Please sign in to comment.