Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry HTTP2 INTERNAL_ERRORs #278

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gofrs/flock v0.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.20.0
golang.org/x/sync v0.6.0
)

Expand All @@ -16,6 +17,7 @@ require (
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
26 changes: 16 additions & 10 deletions pkg/geoipupdate/database/local_file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewLocalFileWriter(
}

// Write writes the result struct returned by a Reader to a database file.
func (w *LocalFileWriter) Write(result *ReadResult) error {
func (w *LocalFileWriter) Write(result *ReadResult) (err error) {
// exit early if we've got the latest database version.
if strings.EqualFold(result.OldHash, result.NewHash) {
if w.verbose {
Expand All @@ -56,8 +56,11 @@ func (w *LocalFileWriter) Write(result *ReadResult) error {
}

defer func() {
if err := result.reader.Close(); err != nil {
log.Printf("closing reader for %s: %+v", result.EditionID, err)
if closeErr := result.reader.Close(); closeErr != nil {
err = errors.Join(
err,
fmt.Errorf("closing reader for %s: %w", result.EditionID, closeErr),
)
}
}()

Expand All @@ -69,34 +72,37 @@ func (w *LocalFileWriter) Write(result *ReadResult) error {
return fmt.Errorf("setting up database writer for %s: %w", result.EditionID, err)
}
defer func() {
if err := fw.close(); err != nil {
log.Printf("closing file writer: %+v", err)
if closeErr := fw.close(); closeErr != nil {
err = errors.Join(
err,
fmt.Errorf("closing file writer: %w", closeErr),
)
}
}()

if err := fw.write(result.reader); err != nil {
if err = fw.write(result.reader); err != nil {
return fmt.Errorf("writing to the temp file for %s: %w", result.EditionID, err)
}

// make sure the hash of the temp file matches the expected hash.
if err := fw.validateHash(result.NewHash); err != nil {
if err = fw.validateHash(result.NewHash); err != nil {
return fmt.Errorf("validating hash for %s: %w", result.EditionID, err)
}

// move the temoporary database file into it's final location and
// sync the directory.
if err := fw.syncAndRename(databaseFilePath); err != nil {
if err = fw.syncAndRename(databaseFilePath); err != nil {
return fmt.Errorf("renaming temp file: %w", err)
}

// sync database directory.
if err := syncDir(filepath.Dir(databaseFilePath)); err != nil {
if err = syncDir(filepath.Dir(databaseFilePath)); err != nil {
return fmt.Errorf("syncing database directory: %w", err)
}

// check if we need to set the file's modified at time
if w.preserveFileTime {
if err := setModifiedAtTime(databaseFilePath, result.ModifiedAt); err != nil {
if err = setModifiedAtTime(databaseFilePath, result.ModifiedAt); err != nil {
return err
}
}
Expand Down
70 changes: 60 additions & 10 deletions pkg/geoipupdate/geoip_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ package geoipupdate
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"golang.org/x/net/http2"

"github.com/maxmind/geoipupdate/v6/pkg/geoipupdate/database"
"github.com/maxmind/geoipupdate/v6/pkg/geoipupdate/internal"
)
Expand Down Expand Up @@ -85,20 +89,11 @@ func (c *Client) Run(ctx context.Context) error {
for _, editionID := range c.config.EditionIDs {
editionID := editionID
processFunc := func(ctx context.Context) error {
editionHash, err := writer.GetHash(editionID)
edition, err := c.downloadEdition(ctx, editionID, reader, writer)
if err != nil {
return err
}

edition, err := reader.Read(ctx, editionID, editionHash)
if err != nil {
return err
}

if err := writer.Write(edition); err != nil {
return err
}

edition.CheckedAt = time.Now().In(time.UTC)

mu.Lock()
Expand Down Expand Up @@ -126,3 +121,58 @@ func (c *Client) Run(ctx context.Context) error {

return nil
}

// downloadEdition downloads the file with retries on HTTP2 INTERNAL_ERRORs.
func (c *Client) downloadEdition(
ctx context.Context,
editionID string,
r database.Reader,
w database.Writer,
) (*database.ReadResult, error) {
editionHash, err := w.GetHash(editionID)
if err != nil {
return nil, err
}

// RetryFor value of 0 means that no retries should be performed.
// Max zero retries has to be set to achieve that
// because the backoff never stops if MaxElapsedTime is zero.
exp := backoff.NewExponentialBackOff()
exp.MaxElapsedTime = c.config.RetryFor
b := backoff.BackOff(exp)
if exp.MaxElapsedTime == 0 {
b = backoff.WithMaxRetries(exp, 0)
}

var edition *database.ReadResult
err = backoff.RetryNotify(
func() error {
edition, err = r.Read(ctx, editionID, editionHash)
marselester marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return backoff.Permanent(err)
}

if err = w.Write(edition); err != nil {
marselester marked this conversation as resolved.
Show resolved Hide resolved
streamErr := http2.StreamError{}
if errors.As(err, &streamErr) && streamErr.Code == http2.ErrCodeInternal {
return err
}

return backoff.Permanent(err)
}

return nil
},
b,
func(err error, d time.Duration) {
if c.config.Verbose {
log.Printf("Couldn't download %s, retrying in %v: %v", editionID, d, err)
}
},
)
if err != nil {
return nil, err
}

return edition, nil
}
33 changes: 29 additions & 4 deletions pkg/geoipupdate/geoip_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"testing"
"time"

"github.com/maxmind/geoipupdate/v6/pkg/geoipupdate/database"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"

"github.com/maxmind/geoipupdate/v6/pkg/geoipupdate/database"
)

// TestClientOutput makes sure that the client outputs the result of it's
Expand Down Expand Up @@ -77,6 +79,21 @@ func TestClientOutput(t *testing.T) {
t.Errorf("database %s was not updated", outputDatabases[i].EditionID)
}
}

streamErr := http2.StreamError{
Code: http2.ErrCodeInternal,
}
c.getWriter = func() (database.Writer, error) {
w := mockWriter{
WriteFunc: func(_ *database.ReadResult) error {
return streamErr
},
}

return &w, nil
}
err = c.Run(context.Background())
require.ErrorIs(t, err, streamErr)
}

type mockReader struct {
Expand All @@ -93,10 +110,18 @@ func (mr *mockReader) Read(_ context.Context, _, _ string) (*database.ReadResult
return &res, nil
}

type mockWriter struct{}
type mockWriter struct {
WriteFunc func(*database.ReadResult) error
}

func (w *mockWriter) Write(r *database.ReadResult) error {
if w.WriteFunc != nil {
return w.WriteFunc(r)
}

func (w *mockWriter) Write(_ *database.ReadResult) error { return nil }
func (w mockWriter) GetHash(_ string) (string, error) { return "", nil }
return nil
}
func (w mockWriter) GetHash(_ string) (string, error) { return "", nil }

func afterOrEqual(t1, t2 time.Time) bool {
return t1.After(t2) || t1.Equal(t2)
Expand Down
Loading