Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blobfixture: tolerate non-found errors #141839

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ func (s *azureStorage) Writer(ctx context.Context, basename string) (io.WriteClo
}), nil
}

// isNotFoundErr checks if the error indicates a blob not found condition.
func isNotFoundErr(err error) bool {
if err == nil {
return false
}
var azerr *azcore.ResponseError
return errors.As(err, &azerr) && azerr.ErrorCode == "BlobNotFound"
}

func (s *azureStorage) ReadFile(
ctx context.Context, basename string, opts cloud.ReadOptions,
) (_ ioctx.ReadCloserCtx, fileSize int64, _ error) {
Expand All @@ -326,15 +335,8 @@ func (s *azureStorage) ReadFile(
resp, err := s.getBlob(basename).DownloadStream(ctx, &azblob.DownloadStreamOptions{Range: azblob.
HTTPRange{Offset: opts.Offset}})
if err != nil {
if azerr := (*azcore.ResponseError)(nil); errors.As(err, &azerr) {
if azerr.ErrorCode == "BlobNotFound" {
// nolint:errwrap
return nil, 0, errors.Wrapf(
errors.Wrap(cloud.ErrFileDoesNotExist, "azure blob does not exist"),
"%v",
err.Error(),
)
}
if isNotFoundErr(err) {
return nil, 0, cloud.WrapErrFileDoesNotExist(err, "azure blob does not exist")
}
return nil, 0, errors.Wrapf(err, "failed to create azure reader")
}
Expand Down Expand Up @@ -385,6 +387,9 @@ func (s *azureStorage) Delete(ctx context.Context, basename string) error {
err := timeutil.RunWithTimeout(ctx, "delete azure file", cloud.Timeout.Get(&s.settings.SV),
func(ctx context.Context) error {
_, err := s.getBlob(basename).Delete(ctx, nil)
if isNotFoundErr(err) {
return nil
}
return err
})
return errors.Wrap(err, "delete file")
Expand Down
7 changes: 7 additions & 0 deletions pkg/cloud/cloudtestutils/cloud_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ func checkExportStore(t *testing.T, info StoreInfo, skipSingleFile bool) {
require.True(t, errors.Is(err, cloud.ErrFileDoesNotExist), "Expected a file does not exist error but returned %s")

require.NoError(t, s.Delete(ctx, testingFilename))
// Deleting a file that does not exist is okay. This behavior is somewhat
// forced by the behavior of S3. In non-versioned S3 buckets, S3 does not
// return an error or metadata indicating the object did not exist before
// the delete. So if we want consistent behavior across all cloud.Storage
// interfaces, and we don't want to read before we delete an S3 object, we
// need to treat this as a non-error.
require.NoError(t, s.Delete(ctx, testingFilename))
})
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type ExternalStorage interface {
// passed to the callback is undefined.
List(ctx context.Context, prefix, delimiter string, fn ListingFn) error

// Delete removes the named file from the store.
// Delete removes the named file from the store. If the file does not exist,
// Delete returns nil.
Delete(ctx context.Context, basename string) error

// Size returns the length of the named file in bytes.
Expand Down Expand Up @@ -128,7 +129,13 @@ type SQLConnI interface {
// ErrFileDoesNotExist is a sentinel error for indicating that a specified
// bucket/object/key/file (depending on storage terminology) does not exist.
// This error is raised by the ReadFile method.
var ErrFileDoesNotExist = errors.New("external_storage: file doesn't exist")
var ErrFileDoesNotExist = errors.New("external_storage: file does not exist")

// WrapErrFileDoesNotExist wraps an error with ErrFileDoesNotExist.
func WrapErrFileDoesNotExist(err error, msg string) error {
//nolint:errwrap
return errors.Wrapf(ErrFileDoesNotExist, "%s: %s", msg, err.Error())
}

// ErrListingUnsupported is a marker for indicating listing is unsupported.
var ErrListingUnsupported = errors.New("listing is not supported")
Expand Down
25 changes: 12 additions & 13 deletions pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func (g *gcsStorage) Writer(ctx context.Context, basename string) (io.WriteClose
return w, nil
}

// isNotExistErr checks if the error indicates a file does not exist
func isNotExistErr(err error) bool {
return errors.Is(err, gcs.ErrObjectNotExist)
}

func (g *gcsStorage) ReadFile(
ctx context.Context, basename string, opts cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
Expand Down Expand Up @@ -317,17 +322,8 @@ func (g *gcsStorage) ReadFile(
)

if err := r.Open(ctx); err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// Callers of this method sometimes look at the returned error to determine
// if file does not exist. Regardless why we couldn't open the stream
// (whether its invalid bucket or file doesn't exist),
// return our internal ErrFileDoesNotExist.
// nolint:errwrap
err = errors.Wrapf(
errors.Wrapf(cloud.ErrFileDoesNotExist, "gcs object %q does not exist", object),
"%v",
err.Error(),
)
if isNotExistErr(err) {
return nil, 0, cloud.WrapErrFileDoesNotExist(err, "gcs object does not exist")
}
return nil, 0, err
}
Expand Down Expand Up @@ -360,11 +356,14 @@ func (g *gcsStorage) List(ctx context.Context, prefix, delim string, fn cloud.Li
}

func (g *gcsStorage) Delete(ctx context.Context, basename string) error {
return timeutil.RunWithTimeout(ctx, "delete gcs file",
cloud.Timeout.Get(&g.settings.SV),
err := timeutil.RunWithTimeout(ctx, "delete gcs file", cloud.Timeout.Get(&g.settings.SV),
func(ctx context.Context) error {
return g.bucket.Object(path.Join(g.prefix, basename)).Delete(ctx)
})
if isNotExistErr(err) {
return nil
}
return err
}

func (g *gcsStorage) Size(ctx context.Context, basename string) (int64, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/httpsink/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/retry",
"@com_github_cockroachdb_errors//oserror",
"@com_github_stretchr_testify//require",
],
)
20 changes: 11 additions & 9 deletions pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (h *httpStorage) Delete(ctx context.Context, basename string) error {
return timeutil.RunWithTimeout(ctx, redact.Sprintf("DELETE %s", basename),
cloud.Timeout.Get(&h.settings.SV), func(ctx context.Context) error {
_, err := h.reqNoBody(ctx, "DELETE", basename, nil)
if errors.Is(err, cloud.ErrFileDoesNotExist) {
return nil
}
return err
})
}
Expand Down Expand Up @@ -213,6 +216,10 @@ func (h *httpStorage) reqNoBody(
return resp, err
}

func isNotFoundErr(resp *http.Response) bool {
return resp != nil && resp.StatusCode == http.StatusNotFound
}

func (h *httpStorage) req(
ctx context.Context, method, file string, body io.Reader, headers map[string]string,
) (*http.Response, error) {
Expand Down Expand Up @@ -251,22 +258,17 @@ func (h *httpStorage) req(

switch resp.StatusCode {
case 200, 201, 204, 206:
// Pass.
// Pass.
return resp, nil
default:
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
err := errors.Errorf("error response from server: %s %q", resp.Status, body)
if err != nil && resp.StatusCode == 404 {
// nolint:errwrap
err = errors.Wrapf(
errors.Wrap(cloud.ErrFileDoesNotExist, "http storage file does not exist"),
"%v",
err.Error(),
)
if isNotFoundErr(resp) {
return nil, cloud.WrapErrFileDoesNotExist(err, "http storage file does not exist")
}
return nil, err
}
return resp, nil
}

func init() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/cloud/httpsink/http_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors/oserror"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -75,7 +76,12 @@ func TestPutHttp(t *testing.T) {
}
http.ServeFile(w, r, localfile)
case "DELETE":
if err := os.Remove(localfile); err != nil {
err := os.Remove(localfile)
if oserror.IsNotExist(err) {
http.Error(w, err.Error(), 404)
return
}
if err != nil {
http.Error(w, err.Error(), 500)
return
}
Expand Down
27 changes: 14 additions & 13 deletions pkg/cloud/nodelocal/nodelocal_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func joinRelativePath(filePath string, file string) string {
return path.Join(".", filePath, file)
}

// isNotFoundErr checks if the error indicates a file not found condition,
// handling both local and remote nodelocal store cases.
func isNotFoundErr(err error) bool {
return oserror.IsNotExist(err) || status.Code(err) == codes.NotFound
}

func (l *localFileStorage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
return l.blobClient.Writer(ctx, joinRelativePath(l.base, basename))
}
Expand All @@ -154,19 +160,10 @@ func (l *localFileStorage) ReadFile(
ctx context.Context, basename string, opts cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
reader, size, err := l.blobClient.ReadFile(ctx, joinRelativePath(l.base, basename), opts.Offset)
if err != nil && isNotFoundErr(err) {
return nil, 0, cloud.WrapErrFileDoesNotExist(err, "nodelocal storage file does not exist")
}
if err != nil {
// The format of the error returned by the above ReadFile call differs based
// on whether we are reading from a local or remote nodelocal store.
// The local store returns a golang native ErrNotFound, whereas the remote
// store returns a gRPC native NotFound error.
if oserror.IsNotExist(err) || status.Code(err) == codes.NotFound {
// nolint:errwrap
return nil, 0, errors.WithMessagef(
errors.Wrap(cloud.ErrFileDoesNotExist, "nodelocal storage file does not exist"),
"%s",
err.Error(),
)
}
return nil, 0, err
}
return reader, size, nil
Expand Down Expand Up @@ -204,7 +201,11 @@ func (l *localFileStorage) List(
}

func (l *localFileStorage) Delete(ctx context.Context, basename string) error {
return l.blobClient.Delete(ctx, joinRelativePath(l.base, basename))
err := l.blobClient.Delete(ctx, joinRelativePath(l.base, basename))
if isNotFoundErr(err) {
return nil
}
return err
}

func (l *localFileStorage) Size(ctx context.Context, basename string) (int64, error) {
Expand Down
17 changes: 12 additions & 5 deletions pkg/cloud/userfile/file_table_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ func checkBaseAndJoinFilePath(prefix, basename string) (string, error) {
return path.Join(prefix, basename), nil
}

// isNotExistErr checks if the error indicates a file does not exist
func isNotExistErr(err error) bool {
return oserror.IsNotExist(err)
}

// ReadFile implements the ExternalStorage interface and returns the contents of
// the file stored in the user scoped FileToTableSystem.
func (f *fileTableStorage) ReadFile(
Expand All @@ -218,11 +223,9 @@ func (f *fileTableStorage) ReadFile(
return nil, 0, err
}
reader, size, err := f.fs.ReadFile(ctx, filepath, opts.Offset)
if oserror.IsNotExist(err) {
return nil, 0, errors.Wrapf(cloud.ErrFileDoesNotExist,
"file %s does not exist in the UserFileTableSystem", filepath)
if err != nil && isNotExistErr(err) {
return nil, 0, cloud.WrapErrFileDoesNotExist(err, "file does not exist in the UserFileTableSystem")
}

return reader, size, err
}

Expand Down Expand Up @@ -276,7 +279,11 @@ func (f *fileTableStorage) Delete(ctx context.Context, basename string) error {
if err != nil {
return err
}
return f.fs.DeleteFile(ctx, filepath)
err = f.fs.DeleteFile(ctx, filepath)
if isNotExistErr(err) {
return nil
}
return err
}

// Size implements the ExternalStorage interface and returns the size of the
Expand Down
42 changes: 32 additions & 10 deletions pkg/roachprod/blobfixture/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,45 @@ func (r *Registry) URI(path string) url.URL {
return copy
}

// maybeReadFile attempts to read a file and returns its contents. Returns nil bytes
// if the file does not exist. This is useful for handling cases where files may have
// been concurrently deleted by GC.
func (r *Registry) maybeReadFile(ctx context.Context, filename string) ([]byte, error) {
bytes, err := func() ([]byte, error) {
file, _, err := r.storage.ReadFile(ctx, filename, cloud.ReadOptions{})
if err != nil {
return nil, err
}
defer func() { _ = file.Close(ctx) }()

bytes, err := ioctx.ReadAll(ctx, file)
if err != nil {
return nil, err
}
return bytes, nil
}()
if errors.Is(err, cloud.ErrFileDoesNotExist) {
return nil, nil
}
return bytes, err
}

// listFixtures lists all fixtures of the given kind.
func (r *Registry) listFixtures(
ctx context.Context, kindPrefix string, l *logger.Logger,
) ([]FixtureMetadata, error) {
if l != nil {
l.Printf("listing fixtures: %s", kindPrefix)
}
result := []FixtureMetadata{}
var result []FixtureMetadata

err := r.storage.List(ctx, kindPrefix /*delimiter*/, "", func(found string) error {
file, _, err := r.storage.ReadFile(ctx, path.Join(kindPrefix, found), cloud.ReadOptions{})
json, err := r.maybeReadFile(ctx, path.Join(kindPrefix, found))
if err != nil {
return err
}
defer func() { _ = file.Close(ctx) }()

json, err := ioctx.ReadAll(ctx, file)
if err != nil {
return err
if json == nil {
return nil // Skip files that don't exist (may have been GC'd)
}

metadata := FixtureMetadata{}
Expand All @@ -204,7 +226,6 @@ func (r *Registry) listFixtures(
}

result = append(result, metadata)

return nil
})
if err != nil {
Expand Down Expand Up @@ -234,13 +255,14 @@ func (r *Registry) upsertMetadata(metadata FixtureMetadata) error {
}

func (r *Registry) deleteMetadata(metadata FixtureMetadata) error {
return errors.Wrap(r.storage.Delete(context.Background(), metadata.MetadataPath), "unable to delete fixture metadata")
return errors.Wrap(r.storage.Delete(context.Background(), metadata.MetadataPath), "failed to delete metadata")
}

func (r *Registry) deleteBlobsMatchingPrefix(prefix string) error {
return r.storage.List(context.Background(), prefix, "", func(path string) error {
err := r.storage.List(context.Background(), prefix, "", func(path string) error {
return r.storage.Delete(context.Background(), prefix+path)
})
return errors.Wrapf(err, "failed to delete blobs matching prefix %q", prefix)
}

// ScratchHandle is returned by Registry.Create and is used to mark a fixture
Expand Down
Loading
Loading