From 44b2ed58b465492871d2fd5674ca3e8b822eeed3 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 10:52:07 -0700 Subject: [PATCH 01/33] avoiding signed urls requires auth header support --- broker/client/reader.go | 55 ++++++++++++++------- broker/fragment/store_gcs.go | 27 ++++++---- broker/fragment/stores.go | 4 +- mainboilerplate/runconsumer/run_consumer.go | 10 ++-- 4 files changed, 65 insertions(+), 31 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index fc1e5583..a5958f1d 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -8,9 +8,11 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "github.com/prometheus/client_golang/prometheus" "go.gazette.dev/core/broker/codecs" + "go.gazette.dev/core/broker/fragment" pb "go.gazette.dev/core/broker/protocol" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,9 +26,9 @@ import ( // seek to the requested offset, and read its content. // // Reader returns EOF if: -// * The broker closes the RPC, eg because its assignment has change or it's shutting down. -// * The requested EndOffset has been read through. -// * A Fragment being read by the Reader reaches EOF. +// - The broker closes the RPC, eg because its assignment has change or it's shutting down. +// - The requested EndOffset has been read through. +// - A Fragment being read by the Reader reaches EOF. // // If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable // is returned upon reaching the journal write head. @@ -58,6 +60,10 @@ func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRe return r } +// Arize logic to avoid use of signed URLs on GCS +var TransformSignedURLs = false +var gcs = fragment.GcsBackend{} + // Read from the journal. If this is the first Read of the Reader, a Read RPC is started. func (r *Reader) Read(p []byte) (n int, err error) { // If we have an open direct reader of a persisted fragment, delegate to it. @@ -164,9 +170,24 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { - if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment, - r.Request.Offset, r.Response.FragmentUrl); err == nil { - n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. + if TransformSignedURLs { + var url *url.URL + if url, err = url.Parse(r.Response.FragmentUrl); err != nil { + return 0, err + + } + if url.Scheme != "gs" { + return 0, fmt.Error("TransformSignedURLs is only supported for GCS") + } + if r.direct, err = gcs.Open(r.ctx, r.Response.FragmentUrl, + *r.Response.Fragment, r.Request.Offset); err == nil { + n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. + } + } else { + if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment, + r.Request.Offset, r.Response.FragmentUrl); err == nil { + n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. + } } return } @@ -197,9 +218,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 { } // Seek provides a limited form of seeking support. Specifically, if: -// * A Fragment URL is being directly read, and -// * The Seek offset is ahead of the current Reader offset, and -// * The Fragment also covers the desired Seek offset +// - A Fragment URL is being directly read, and +// - The Seek offset is ahead of the current Reader offset, and +// - The Fragment also covers the desired Seek offset +// // Then a seek is performed by reading and discarding to the seeked offset. // Seek will otherwise return ErrSeekRequiresNewReader. func (r *Reader) Seek(offset int64, whence int) (int64, error) { @@ -349,15 +371,14 @@ func (fr *FragmentReader) Close() error { // fragments are persisted, and to which this client also has access. The // returned cleanup function removes the handler and restores the prior http.Client. // -// const root = "/mnt/shared-nas-array/path/to/fragment-root" -// defer client.InstallFileTransport(root)() -// -// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{ -// Journal: "a/journal/with/nas/fragment/store", -// DoNotProxy: true, -// }) -// // rr.Read will read Fragments directly from NAS. +// const root = "/mnt/shared-nas-array/path/to/fragment-root" +// defer client.InstallFileTransport(root)() // +// var rr = NewRetryReader(ctx, client, protocol.ReadRequest{ +// Journal: "a/journal/with/nas/fragment/store", +// DoNotProxy: true, +// }) +// // rr.Read will read Fragments directly from NAS. func InstallFileTransport(root string) (remove func()) { var transport = http.DefaultTransport.(*http.Transport).Clone() transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root))) diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 159c5b8b..8e8156ee 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -27,17 +27,17 @@ type GSStoreConfig struct { RewriterConfig } -type gcsBackend struct { +type GcsBackend struct { client *storage.Client signedURLOptions storage.SignedURLOptions clientMu sync.Mutex } -func (s *gcsBackend) Provider() string { +func (s *GcsBackend) Provider() string { return "gcs" } -func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) { +func (s *GcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) { cfg, client, opts, err := s.gcsClient(ep) if err != nil { return "", err @@ -59,7 +59,7 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) } } -func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) { +func (s *GcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) { cfg, client, _, err := s.gcsClient(ep) if err != nil { return false, err @@ -73,7 +73,7 @@ func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragme return exists, err } -func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) { +func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) { cfg, client, _, err := s.gcsClient(ep) if err != nil { return nil, err @@ -81,7 +81,16 @@ func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } -func (s *gcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error { +// Arize Open routine with offset for use with consumers and signed URLs. +func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { + cfg, client, _, err := s.gcsClient(ep) + if err != nil { + return nil, err + } + return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1) +} + +func (s *GcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error { cfg, client, _, err := s.gcsClient(ep) if err != nil { return err @@ -105,7 +114,7 @@ func (s *gcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) erro return err } -func (s *gcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error { +func (s *GcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error { var cfg, client, _, err = s.gcsClient(ep) if err != nil { return err @@ -136,7 +145,7 @@ func (s *gcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.U return err } -func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { +func (s *GcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { cfg, client, _, err := s.gcsClient(fragment.BackingStore.URL()) if err != nil { return err @@ -144,7 +153,7 @@ func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx) } -func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) { +func (s *GcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) { var conf *jwt.Config if err = parseStoreArgs(ep, &cfg); err != nil { diff --git a/broker/fragment/stores.go b/broker/fragment/stores.go index c857d73b..4fe9eedb 100644 --- a/broker/fragment/stores.go +++ b/broker/fragment/stores.go @@ -36,12 +36,12 @@ type backend interface { var sharedStores = struct { s3 *s3Backend - gcs *gcsBackend + gcs *GcsBackend azure *azureBackend fs *fsBackend }{ s3: newS3Backend(), - gcs: &gcsBackend{}, + gcs: &GcsBackend{}, azure: &azureBackend{ pipelines: make(map[string]pipeline.Pipeline), clients: make(map[string]*service.Client), diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 66b8c102..34b61f06 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -76,9 +76,10 @@ type Config interface { type BaseConfig struct { Consumer struct { mbp.ServiceConfig - Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` - MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` - WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` + Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` + MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` + WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` + TransformSignedURLs bool `long:"transform-signed-urls" env:"TRANSFORM_SIGNED_URLS" description:"When a signed URL is received, transform it into an unsigned URL. This is useful when clients do not require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { @@ -124,6 +125,9 @@ func (sc Cmd) Execute(args []string) error { var srv, err = server.New("", bc.Consumer.Port) mbp.Must(err, "building Server instance") + // Arize avoidance of using signed URLs. + client.TransformSignedURLs = bc.Consumer.TransformSignedURLs + if bc.Broker.Cache.Size <= 0 { log.Warn("--broker.cache.size is disabled; consider setting > 0") } From b73ffdcb309f1f2195617e93e346ff8e77224e1c Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 11:12:26 -0700 Subject: [PATCH 02/33] avoid signature overloading --- broker/client/reader.go | 2 +- broker/fragment/store_gcs.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index a5958f1d..5c197195 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -179,7 +179,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { if url.Scheme != "gs" { return 0, fmt.Error("TransformSignedURLs is only supported for GCS") } - if r.direct, err = gcs.Open(r.ctx, r.Response.FragmentUrl, + if r.direct, err = gcs.OpenWithOffset(r.ctx, r.Response.FragmentUrl, *r.Response.Fragment, r.Request.Offset); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. } diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 8e8156ee..52bdda44 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -82,7 +82,7 @@ func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment } // Arize Open routine with offset for use with consumers and signed URLs. -func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { +func (s *GcsBackend) OpenWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { cfg, client, _, err := s.gcsClient(ep) if err != nil { return nil, err From f6ff84181e9858a0f28a03d9b75faf8b6258d939 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 11:24:43 -0700 Subject: [PATCH 03/33] name cleanup --- broker/client/reader.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 5c197195..7e9231e5 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -171,15 +171,15 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { if TransformSignedURLs { - var url *url.URL - if url, err = url.Parse(r.Response.FragmentUrl); err != nil { + var fragURL *url.URL + if fragURL, err = url.Parse(r.Response.FragmentUrl); err != nil { return 0, err } - if url.Scheme != "gs" { + if fragURL.Scheme != "gs" { return 0, fmt.Error("TransformSignedURLs is only supported for GCS") } - if r.direct, err = gcs.OpenWithOffset(r.ctx, r.Response.FragmentUrl, + if r.direct, err = gcs.OpenWithOffset(r.ctx, fragURL, *r.Response.Fragment, r.Request.Offset); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. } From d5c771813257eedba396850188147531fa489ac7 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 11:35:49 -0700 Subject: [PATCH 04/33] fix typo --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 7e9231e5..73b8bac9 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -177,7 +177,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { } if fragURL.Scheme != "gs" { - return 0, fmt.Error("TransformSignedURLs is only supported for GCS") + return 0, fmt.Errorf("TransformSignedURLs is only supported for GCS") } if r.direct, err = gcs.OpenWithOffset(r.ctx, fragURL, *r.Response.Fragment, r.Request.Offset); err == nil { From 3f45f002ce370cc4e3b0e79bf12dd99f457f0edb Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 13:33:41 -0700 Subject: [PATCH 05/33] work around import cycle caused by stores_test.go --- broker/client/reader.go | 91 +++++++++++++++++++++++++++++++++--- broker/fragment/store_gcs.go | 90 ++++------------------------------- broker/fragment/stores.go | 4 +- 3 files changed, 95 insertions(+), 90 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 73b8bac9..6850aeed 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -9,11 +9,15 @@ import ( "io/ioutil" "net/http" "net/url" + "sync" + "cloud.google.com/go/storage" "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" "go.gazette.dev/core/broker/codecs" - "go.gazette.dev/core/broker/fragment" pb "go.gazette.dev/core/broker/protocol" + "golang.org/x/oauth2/google" + "golang.org/x/oauth2/jwt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -60,10 +64,6 @@ func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRe return r } -// Arize logic to avoid use of signed URLs on GCS -var TransformSignedURLs = false -var gcs = fragment.GcsBackend{} - // Read from the journal. If this is the first Read of the Reader, a Read RPC is started. func (r *Reader) Read(p []byte) (n int, err error) { // If we have an open direct reader of a persisted fragment, delegate to it. @@ -179,7 +179,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { if fragURL.Scheme != "gs" { return 0, fmt.Errorf("TransformSignedURLs is only supported for GCS") } - if r.direct, err = gcs.OpenWithOffset(r.ctx, fragURL, + if r.direct, err = gcs.openWithOffset(r.ctx, fragURL, *r.Response.Fragment, r.Request.Offset); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. } @@ -435,3 +435,82 @@ var ( // httpClient is the http.Client used by OpenFragmentURL httpClient = http.DefaultClient ) + +// stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here +// to avoid a cycle. Instead we will repeat a subset of store_gcs.go. + +var TransformSignedURLs = false +var gcs = &gcsBackend{} + +type gcsBackend struct { + client *storage.Client + clientMu sync.Mutex +} + +// Arize Open routine with offset for use with consumers and signed URLs. +func openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { + cfg, gClient, _, err := s.gcsClient(ep) + if err != nil { + return nil, err + } + return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1) +} + +func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) { + var conf *jwt.Config + + if err = parseStoreArgs(ep, &cfg); err != nil { + return + } + // Omit leading slash from bucket prefix. Note that FragmentStore already + // enforces that URL Paths end in '/'. + cfg.bucket, cfg.prefix = ep.Host, ep.Path[1:] + + s.clientMu.Lock() + defer s.clientMu.Unlock() + + if s.client != nil { + client = s.client + return + } + var ctx = context.Background() + + creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl) + if err != nil { + return + } else if creds.JSON != nil { + conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl) + if err != nil { + return + } + client, err = storage.NewClient(ctx, option.WithTokenSource(conf.TokenSource(ctx))) + if err != nil { + return + } + s.client = client + + log.WithFields(log.Fields{ + "ProjectID": creds.ProjectID, + "GoogleAccessID": conf.Email, + "PrivateKeyID": conf.PrivateKeyID, + "Subject": conf.Subject, + "Scopes": conf.Scopes, + }).Info("reader constructed new GCS client") + } else { + // Possible to use GCS without a service account (e.g. with a GCE instance and workload identity). + client, err = storage.NewClient(ctx, option.WithTokenSource(creds.TokenSource)) + if err != nil { + return + } + + // workload identity approach which SignGet() method accepts if you have + // "iam.serviceAccounts.signBlob" permissions against your service account. + s.client = client + + log.WithFields(log.Fields{ + "ProjectID": creds.ProjectID, + }).Info("reader constructed new GCS client without JWT") + } + + return +} diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 52bdda44..a29ed35f 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -27,17 +27,17 @@ type GSStoreConfig struct { RewriterConfig } -type GcsBackend struct { +type gcsBackend struct { client *storage.Client signedURLOptions storage.SignedURLOptions clientMu sync.Mutex } -func (s *GcsBackend) Provider() string { +func (s *gcsBackend) Provider() string { return "gcs" } -func (s *GcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) { +func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) (string, error) { cfg, client, opts, err := s.gcsClient(ep) if err != nil { return "", err @@ -59,7 +59,7 @@ func (s *GcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) } } -func (s *GcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) { +func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (exists bool, err error) { cfg, client, _, err := s.gcsClient(ep) if err != nil { return false, err @@ -73,7 +73,7 @@ func (s *GcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragme return exists, err } -func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) { +func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) { cfg, client, _, err := s.gcsClient(ep) if err != nil { return nil, err @@ -81,16 +81,7 @@ func (s *GcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } -// Arize Open routine with offset for use with consumers and signed URLs. -func (s *GcsBackend) OpenWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { - cfg, client, _, err := s.gcsClient(ep) - if err != nil { - return nil, err - } - return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1) -} - -func (s *GcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error { +func (s *gcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error { cfg, client, _, err := s.gcsClient(ep) if err != nil { return err @@ -114,7 +105,7 @@ func (s *GcsBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) erro return err } -func (s *GcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error { +func (s *gcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error { var cfg, client, _, err = s.gcsClient(ep) if err != nil { return err @@ -145,75 +136,10 @@ func (s *GcsBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.U return err } -func (s *GcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { +func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { cfg, client, _, err := s.gcsClient(fragment.BackingStore.URL()) if err != nil { return err } return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx) } - -func (s *GcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) { - var conf *jwt.Config - - if err = parseStoreArgs(ep, &cfg); err != nil { - return - } - // Omit leading slash from bucket prefix. Note that FragmentStore already - // enforces that URL Paths end in '/'. - cfg.bucket, cfg.prefix = ep.Host, ep.Path[1:] - - s.clientMu.Lock() - defer s.clientMu.Unlock() - - if s.client != nil { - client = s.client - opts = s.signedURLOptions - return - } - var ctx = context.Background() - - creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl) - if err != nil { - return - } else if creds.JSON != nil { - conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl) - if err != nil { - return - } - client, err = storage.NewClient(ctx, option.WithTokenSource(conf.TokenSource(ctx))) - if err != nil { - return - } - opts = storage.SignedURLOptions{ - GoogleAccessID: conf.Email, - PrivateKey: conf.PrivateKey, - } - s.client, s.signedURLOptions = client, opts - - log.WithFields(log.Fields{ - "ProjectID": creds.ProjectID, - "GoogleAccessID": conf.Email, - "PrivateKeyID": conf.PrivateKeyID, - "Subject": conf.Subject, - "Scopes": conf.Scopes, - }).Info("constructed new GCS client") - } else { - // Possible to use GCS without a service account (e.g. with a GCE instance and workload identity). - client, err = storage.NewClient(ctx, option.WithTokenSource(creds.TokenSource)) - if err != nil { - return - } - - // workload identity approach which SignGet() method accepts if you have - // "iam.serviceAccounts.signBlob" permissions against your service account. - opts = storage.SignedURLOptions{} - s.client, s.signedURLOptions = client, opts - - log.WithFields(log.Fields{ - "ProjectID": creds.ProjectID, - }).Info("constructed new GCS client without JWT") - } - - return -} diff --git a/broker/fragment/stores.go b/broker/fragment/stores.go index 4fe9eedb..c857d73b 100644 --- a/broker/fragment/stores.go +++ b/broker/fragment/stores.go @@ -36,12 +36,12 @@ type backend interface { var sharedStores = struct { s3 *s3Backend - gcs *GcsBackend + gcs *gcsBackend azure *azureBackend fs *fsBackend }{ s3: newS3Backend(), - gcs: &GcsBackend{}, + gcs: &gcsBackend{}, azure: &azureBackend{ pipelines: make(map[string]pipeline.Pipeline), clients: make(map[string]*service.Client), From fee88beb2e6728cb747349175b964e9ab770e191 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 13:48:29 -0700 Subject: [PATCH 06/33] fix method --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 6850aeed..ee76e8ed 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -448,7 +448,7 @@ type gcsBackend struct { } // Arize Open routine with offset for use with consumers and signed URLs. -func openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { +func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { cfg, gClient, _, err := s.gcsClient(ep) if err != nil { return nil, err From a469eda1feb000b1996ce8e9903289323caaf61d Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 13:52:40 -0700 Subject: [PATCH 07/33] put back gcsClient method --- broker/fragment/store_gcs.go | 65 ++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index a29ed35f..159c5b8b 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -143,3 +143,68 @@ func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { } return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx) } + +func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) { + var conf *jwt.Config + + if err = parseStoreArgs(ep, &cfg); err != nil { + return + } + // Omit leading slash from bucket prefix. Note that FragmentStore already + // enforces that URL Paths end in '/'. + cfg.bucket, cfg.prefix = ep.Host, ep.Path[1:] + + s.clientMu.Lock() + defer s.clientMu.Unlock() + + if s.client != nil { + client = s.client + opts = s.signedURLOptions + return + } + var ctx = context.Background() + + creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl) + if err != nil { + return + } else if creds.JSON != nil { + conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl) + if err != nil { + return + } + client, err = storage.NewClient(ctx, option.WithTokenSource(conf.TokenSource(ctx))) + if err != nil { + return + } + opts = storage.SignedURLOptions{ + GoogleAccessID: conf.Email, + PrivateKey: conf.PrivateKey, + } + s.client, s.signedURLOptions = client, opts + + log.WithFields(log.Fields{ + "ProjectID": creds.ProjectID, + "GoogleAccessID": conf.Email, + "PrivateKeyID": conf.PrivateKeyID, + "Subject": conf.Subject, + "Scopes": conf.Scopes, + }).Info("constructed new GCS client") + } else { + // Possible to use GCS without a service account (e.g. with a GCE instance and workload identity). + client, err = storage.NewClient(ctx, option.WithTokenSource(creds.TokenSource)) + if err != nil { + return + } + + // workload identity approach which SignGet() method accepts if you have + // "iam.serviceAccounts.signBlob" permissions against your service account. + opts = storage.SignedURLOptions{} + s.client, s.signedURLOptions = client, opts + + log.WithFields(log.Fields{ + "ProjectID": creds.ProjectID, + }).Info("constructed new GCS client without JWT") + } + + return +} From fcca94cc4cd1f8ed07302b65ede4cf8a5d2955da Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 14:04:45 -0700 Subject: [PATCH 08/33] fix return --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index ee76e8ed..ec25a99c 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -449,7 +449,7 @@ type gcsBackend struct { // Arize Open routine with offset for use with consumers and signed URLs. func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { - cfg, gClient, _, err := s.gcsClient(ep) + cfg, gClient, err := s.gcsClient(ep) if err != nil { return nil, err } From 3ceebdcc4af99e8b5f2e18b0c4de0369bc53d41f Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 14:20:57 -0700 Subject: [PATCH 09/33] copy more missing code --- broker/client/reader.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/broker/client/reader.go b/broker/client/reader.go index ec25a99c..fe0f39fd 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -12,12 +12,14 @@ import ( "sync" "cloud.google.com/go/storage" + "github.com/gorilla/schema" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "go.gazette.dev/core/broker/codecs" pb "go.gazette.dev/core/broker/protocol" "golang.org/x/oauth2/google" "golang.org/x/oauth2/jwt" + "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -446,6 +448,12 @@ type gcsBackend struct { client *storage.Client clientMu sync.Mutex } +type GSStoreConfig struct { + bucket string + prefix string + + RewriterConfig +} // Arize Open routine with offset for use with consumers and signed URLs. func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { @@ -514,3 +522,15 @@ func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage. return } + +func parseStoreArgs(ep *url.URL, args interface{}) error { + var decoder = schema.NewDecoder() + decoder.IgnoreUnknownKeys(false) + + if q, err := url.ParseQuery(ep.RawQuery); err != nil { + return err + } else if err = decoder.Decode(args, q); err != nil { + return fmt.Errorf("parsing store URL arguments: %s", err) + } + return nil +} From dd7569c59d013ab5565b112f1d4004b2bac887f5 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 14:34:49 -0700 Subject: [PATCH 10/33] port over more code --- broker/client/reader.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index fe0f39fd..29e0a6b9 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strings" "sync" "cloud.google.com/go/storage" @@ -448,12 +449,6 @@ type gcsBackend struct { client *storage.Client clientMu sync.Mutex } -type GSStoreConfig struct { - bucket string - prefix string - - RewriterConfig -} // Arize Open routine with offset for use with consumers and signed URLs. func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { @@ -523,6 +518,27 @@ func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage. return } +type GSStoreConfig struct { + bucket string + prefix string + + RewriterConfig +} + +type RewriterConfig struct { + // Find is the string to replace in the unmodified journal name. + Find string + // Replace is the string with which Find is replaced in the constructed store path. + Replace string +} + +func (cfg RewriterConfig) rewritePath(s, j string) string { + if cfg.Find == "" { + return s + j + } + return s + strings.Replace(j, cfg.Find, cfg.Replace, 1) +} + func parseStoreArgs(ep *url.URL, args interface{}) error { var decoder = schema.NewDecoder() decoder.IgnoreUnknownKeys(false) From c18011e56bafe628171fcfb4edb665f0d557bea4 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 16:42:08 -0700 Subject: [PATCH 11/33] debug url variations --- broker/client/reader.go | 3 +++ broker/fragment/store_gcs.go | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/broker/client/reader.go b/broker/client/reader.go index 29e0a6b9..80cacc9a 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -173,6 +173,9 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { + log.WithFields(log.Fields{ + "FragmentUrl": r.Response.FragmentUrl, + }).Info("reader handle FragmentUrl") if TransformSignedURLs { var fragURL *url.URL if fragURL, err = url.Parse(r.Response.FragmentUrl); err != nil { diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 159c5b8b..d9e3ba21 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -44,6 +44,10 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) } if DisableSignedUrls { + log.WithFields(log.Fields{ + "ep": fmt.Sprintf("%v", *ep), + "fragment": fmt.Sprintf("%v", fragment), + }).Info("signGet disable signed urls") u := &url.URL{ Path: fmt.Sprintf("/%s/%s", cfg.bucket, cfg.rewritePath(cfg.prefix, fragment.ContentPath())), } From 1ce9c0e401090b8e75812a4801a4a54fe5f88c6b Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 18:12:32 -0700 Subject: [PATCH 12/33] bump up log level --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 80cacc9a..d38e3598 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -175,7 +175,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { log.WithFields(log.Fields{ "FragmentUrl": r.Response.FragmentUrl, - }).Info("reader handle FragmentUrl") + }).Warn("reader handle FragmentUrl") if TransformSignedURLs { var fragURL *url.URL if fragURL, err = url.Parse(r.Response.FragmentUrl); err != nil { From 5dea541988444c171f51cabf399972392e7187bd Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 18:18:55 -0700 Subject: [PATCH 13/33] better logging --- broker/client/reader.go | 4 ++++ broker/fragment/store_gcs.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index d38e3598..72f4d0a3 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -182,6 +182,10 @@ func (r *Reader) Read(p []byte) (n int, err error) { return 0, err } + log.WithFields(log.Fields{ + "url": fmt.Sprintf("%+v", fragURL), + "fragment": fmt.Sprintf("%+v", *r.Response.Fragment), + }).Warn("reader handle url") if fragURL.Scheme != "gs" { return 0, fmt.Errorf("TransformSignedURLs is only supported for GCS") } diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index d9e3ba21..a1ce4d6f 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -45,8 +45,8 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) if DisableSignedUrls { log.WithFields(log.Fields{ - "ep": fmt.Sprintf("%v", *ep), - "fragment": fmt.Sprintf("%v", fragment), + "ep": fmt.Sprintf("%+v", *ep), + "fragment": fmt.Sprintf("%+v", fragment), }).Info("signGet disable signed urls") u := &url.URL{ Path: fmt.Sprintf("/%s/%s", cfg.bucket, cfg.rewritePath(cfg.prefix, fragment.ContentPath())), From f74798b174bebe96e2a6e0299e95399ca033dbe0 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 19:33:46 -0700 Subject: [PATCH 14/33] more logs --- broker/fragment/store_gcs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index a1ce4d6f..80820e72 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -82,6 +82,10 @@ func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment if err != nil { return nil, err } + log.WithFields(log.Fields{ + "ep": fmt.Sprintf("%+v", *ep), + "fragment": fmt.Sprintf("%+v", fragment), + }).Info("Open fragment") return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } From c24f9ec72fdac840e41227db30a453aca55c7e3f Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 20:51:09 -0700 Subject: [PATCH 15/33] more logs --- broker/fragment/store_gcs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 80820e72..6d7b8542 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -68,6 +68,10 @@ func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragme if err != nil { return false, err } + log.WithFields(log.Fields{ + "ep": fmt.Sprintf("%+v", *ep), + "fragment": fmt.Sprintf("%+v", fragment), + }).Info("Exists fragment") _, err = client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Attrs(ctx) if err == nil { exists = true From 57bd4c8324197eae55673d3714b0781a921e03f0 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 21:39:06 -0700 Subject: [PATCH 16/33] modify URL --- broker/client/reader.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 72f4d0a3..c442b21e 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -177,17 +177,13 @@ func (r *Reader) Read(p []byte) (n int, err error) { "FragmentUrl": r.Response.FragmentUrl, }).Warn("reader handle FragmentUrl") if TransformSignedURLs { - var fragURL *url.URL - if fragURL, err = url.Parse(r.Response.FragmentUrl); err != nil { - return 0, err - - } + fragURL := r.Response.Fragment.BackingStore.URL() log.WithFields(log.Fields{ "url": fmt.Sprintf("%+v", fragURL), "fragment": fmt.Sprintf("%+v", *r.Response.Fragment), }).Warn("reader handle url") if fragURL.Scheme != "gs" { - return 0, fmt.Errorf("TransformSignedURLs is only supported for GCS") + return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme) } if r.direct, err = gcs.openWithOffset(r.ctx, fragURL, *r.Response.Fragment, r.Request.Offset); err == nil { From 620b3f54b2c792a667c3268e8b723e124db65a5c Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 22:56:07 -0700 Subject: [PATCH 17/33] adjust offset --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index c442b21e..bc37650f 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -186,7 +186,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme) } if r.direct, err = gcs.openWithOffset(r.ctx, fragURL, - *r.Response.Fragment, r.Request.Offset); err == nil { + *r.Response.Fragment, r.Request.Offset-r.Response.Fragment.Begin); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. } } else { From c9e98f0402e839a4dd6c50884974ca56fe039730 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 5 Jun 2024 22:57:33 -0700 Subject: [PATCH 18/33] better log --- broker/client/reader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/broker/client/reader.go b/broker/client/reader.go index bc37650f..28b3f1fd 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -181,6 +181,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { log.WithFields(log.Fields{ "url": fmt.Sprintf("%+v", fragURL), "fragment": fmt.Sprintf("%+v", *r.Response.Fragment), + "offset": r.Request.Offset, }).Warn("reader handle url") if fragURL.Scheme != "gs" { return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme) From 98e2f8f2d60c470c60e2eec997b48113316d9329 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Thu, 6 Jun 2024 12:22:25 -0700 Subject: [PATCH 19/33] adjust to compressed returned data --- broker/client/reader.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 28b3f1fd..afe5e013 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -186,8 +186,8 @@ func (r *Reader) Read(p []byte) (n int, err error) { if fragURL.Scheme != "gs" { return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme) } - if r.direct, err = gcs.openWithOffset(r.ctx, fragURL, - *r.Response.Fragment, r.Request.Offset-r.Response.Fragment.Begin); err == nil { + if r.direct, err = OpenUnsignedFragmentURL(r.ctx, *r.Response.Fragment, + r.Request.Offset, fragURL); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|. } } else { @@ -301,6 +301,19 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur return NewFragmentReader(resp.Body, fragment, offset) } +func OpenUnsignedFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url *url.URL) (*FragmentReader, error) { + if rdr, err = gcs.open(r.ctx, fragURL, *r.Response.Fragment); err != nil { + return nil, err + } + + // Record metrics related to opening the fragment. + var labels = fragmentLabels(fragment) + fragmentOpen.With(labels).Inc() + fragmentOpenBytes.With(labels).Add(float64(fragment.End - fragment.Begin)) + + return NewFragmentReader(rdr, fragment, offset) +} + // NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a // returned *FragmentReader which has been pre-seeked to the given offset. func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error) { @@ -443,6 +456,8 @@ var ( httpClient = http.DefaultClient ) +// ARIZE specific code to end of file. +// // stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here // to avoid a cycle. Instead we will repeat a subset of store_gcs.go. @@ -454,13 +469,13 @@ type gcsBackend struct { clientMu sync.Mutex } -// Arize Open routine with offset for use with consumers and signed URLs. -func (s *gcsBackend) openWithOffset(ctx context.Context, ep *url.URL, fragment pb.Fragment, offset int64) (io.ReadCloser, error) { +// Arize Open routine for use with consumers and signed URLs. +func (s *gcsBackend) open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) { cfg, gClient, err := s.gcsClient(ep) if err != nil { return nil, err } - return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewRangeReader(ctx, offset, -1) + return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) { From 791498fad7546570cf138be923e2295c450d71aa Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Thu, 6 Jun 2024 12:28:45 -0700 Subject: [PATCH 20/33] fixes --- broker/client/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index afe5e013..e464a651 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -302,7 +302,7 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur } func OpenUnsignedFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url *url.URL) (*FragmentReader, error) { - if rdr, err = gcs.open(r.ctx, fragURL, *r.Response.Fragment); err != nil { + if rdr, err := gcs.open(ctx, url, fragment); err != nil { return nil, err } From 29f8aea1ce1b491a088f4c6a2d042cd5ef0571f5 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Thu, 6 Jun 2024 12:35:11 -0700 Subject: [PATCH 21/33] another fix --- broker/client/reader.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index e464a651..235b0b2b 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -302,7 +302,12 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur } func OpenUnsignedFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url *url.URL) (*FragmentReader, error) { - if rdr, err := gcs.open(ctx, url, fragment); err != nil { + var ( + rdr io.ReadCloser + err error + ) + + if rdr, err = gcs.open(ctx, url, fragment); err != nil { return nil, err } From 10f184698a48e3b9fc20cb19493e0c686063c708 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Thu, 6 Jun 2024 13:56:19 -0700 Subject: [PATCH 22/33] clean up debug logs --- broker/client/reader.go | 14 +++----------- broker/fragment/store_gcs.go | 12 ------------ mainboilerplate/runconsumer/run_consumer.go | 10 +++++----- 3 files changed, 8 insertions(+), 28 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 235b0b2b..74cd267f 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -173,18 +173,10 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { - log.WithFields(log.Fields{ - "FragmentUrl": r.Response.FragmentUrl, - }).Warn("reader handle FragmentUrl") - if TransformSignedURLs { + if SkipSignedURLs { fragURL := r.Response.Fragment.BackingStore.URL() - log.WithFields(log.Fields{ - "url": fmt.Sprintf("%+v", fragURL), - "fragment": fmt.Sprintf("%+v", *r.Response.Fragment), - "offset": r.Request.Offset, - }).Warn("reader handle url") if fragURL.Scheme != "gs" { - return 0, fmt.Errorf("TransformSignedURL unsupported scheme: %s", fragURL.Scheme) + return 0, fmt.Errorf("SkipSignedURL unsupported scheme: %s", fragURL.Scheme) } if r.direct, err = OpenUnsignedFragmentURL(r.ctx, *r.Response.Fragment, r.Request.Offset, fragURL); err == nil { @@ -466,7 +458,7 @@ var ( // stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here // to avoid a cycle. Instead we will repeat a subset of store_gcs.go. -var TransformSignedURLs = false +var SkipSignedURLs = false var gcs = &gcsBackend{} type gcsBackend struct { diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 6d7b8542..159c5b8b 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -44,10 +44,6 @@ func (s *gcsBackend) SignGet(ep *url.URL, fragment pb.Fragment, d time.Duration) } if DisableSignedUrls { - log.WithFields(log.Fields{ - "ep": fmt.Sprintf("%+v", *ep), - "fragment": fmt.Sprintf("%+v", fragment), - }).Info("signGet disable signed urls") u := &url.URL{ Path: fmt.Sprintf("/%s/%s", cfg.bucket, cfg.rewritePath(cfg.prefix, fragment.ContentPath())), } @@ -68,10 +64,6 @@ func (s *gcsBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragme if err != nil { return false, err } - log.WithFields(log.Fields{ - "ep": fmt.Sprintf("%+v", *ep), - "fragment": fmt.Sprintf("%+v", fragment), - }).Info("Exists fragment") _, err = client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Attrs(ctx) if err == nil { exists = true @@ -86,10 +78,6 @@ func (s *gcsBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment if err != nil { return nil, err } - log.WithFields(log.Fields{ - "ep": fmt.Sprintf("%+v", *ep), - "fragment": fmt.Sprintf("%+v", fragment), - }).Info("Open fragment") return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 34b61f06..10da412c 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -76,10 +76,10 @@ type Config interface { type BaseConfig struct { Consumer struct { mbp.ServiceConfig - Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` - MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` - WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` - TransformSignedURLs bool `long:"transform-signed-urls" env:"TRANSFORM_SIGNED_URLS" description:"When a signed URL is received, transform it into an unsigned URL. This is useful when clients do not require the signing."` + Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` + MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` + WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` + SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { @@ -126,7 +126,7 @@ func (sc Cmd) Execute(args []string) error { mbp.Must(err, "building Server instance") // Arize avoidance of using signed URLs. - client.TransformSignedURLs = bc.Consumer.TransformSignedURLs + client.SkipSignedURLs = bc.Consumer.SkipSignedURLs if bc.Broker.Cache.Size <= 0 { log.Warn("--broker.cache.size is disabled; consider setting > 0") From 909bff013c99278e115e648ed08f6f4d7b13feea Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Fri, 13 Sep 2024 18:16:31 -0700 Subject: [PATCH 23/33] update deprecated github action --- .github/workflows/ci-workflow.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-workflow.yaml b/.github/workflows/ci-workflow.yaml index 0ee1ac26..7219349c 100644 --- a/.github/workflows/ci-workflow.yaml +++ b/.github/workflows/ci-workflow.yaml @@ -145,7 +145,7 @@ jobs: # We upload this to the artifacts that are attached to the action just to make it easy for # someone to pull down a build from another branch. - name: "Upload Binaries" - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: "gazette-x86_64-linux-gnu.zip" path: ".build-ci/gazette-x86_64-linux-gnu.zip" From 20d68a2cba5312151c48ea4cd358bba0bc9b30d1 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Fri, 13 Sep 2024 18:22:06 -0700 Subject: [PATCH 24/33] change flag style to help on-prem substitutions --- broker/client/reader.go | 4 ++-- mainboilerplate/runconsumer/run_consumer.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index b2044f56..ea912d8d 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -175,7 +175,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { - if SkipSignedURLs { + if SkipSignedURLs > 0 { fragURL := r.Response.Fragment.BackingStore.URL() if fragURL.Scheme != "gs" { return 0, fmt.Errorf("SkipSignedURL unsupported scheme: %s", fragURL.Scheme) @@ -483,7 +483,7 @@ var ( // stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here // to avoid a cycle. Instead we will repeat a subset of store_gcs.go. -var SkipSignedURLs = false +var SkipSignedURLs = uint32(0) var gcs = &gcsBackend{} type gcsBackend struct { diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 10da412c..c53f2e1a 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -79,7 +79,7 @@ type BaseConfig struct { Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` - SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` + SkipSignedURLs uint32 `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"1" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { From c28028068f2d1eb3b81558c9426578b5e583169d Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Fri, 13 Sep 2024 20:08:07 -0700 Subject: [PATCH 25/33] combine the support for external_account --- broker/fragment/store_gcs.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/broker/fragment/store_gcs.go b/broker/fragment/store_gcs.go index 159c5b8b..92e48292 100644 --- a/broker/fragment/store_gcs.go +++ b/broker/fragment/store_gcs.go @@ -2,6 +2,7 @@ package fragment import ( "context" + "encoding/json" "fmt" "io" "net/url" @@ -144,6 +145,11 @@ func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error { return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx) } +// to help identify when JSON credentials are an external account used by workload identity +type credentialsFile struct { + Type string `json:"type"` +} + func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) { var conf *jwt.Config @@ -167,7 +173,18 @@ func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage. creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl) if err != nil { return - } else if creds.JSON != nil { + } + + // best effort to determine if JWT credentials are for external account + externalAccount := false + if creds.JSON != nil { + var f credentialsFile + if err := json.Unmarshal(creds.JSON, &f); err == nil { + externalAccount = f.Type == "external_account" + } + } + + if creds.JSON != nil && !externalAccount { conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl) if err != nil { return From 564fc533c2db3a28fedf1710af50a3d04925d0b1 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Tue, 17 Sep 2024 09:43:01 -0700 Subject: [PATCH 26/33] update the release version to match upstream --- .github/workflows/ci-workflow.yaml | 4 ++-- mainboilerplate/runconsumer/run_consumer.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-workflow.yaml b/.github/workflows/ci-workflow.yaml index 7219349c..f5e5ff8c 100644 --- a/.github/workflows/ci-workflow.yaml +++ b/.github/workflows/ci-workflow.yaml @@ -77,11 +77,11 @@ jobs: docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:latest" push_images='true' elif [[ '${{ github.ref }}' == 'refs/heads/arize' ]]; then - version="0.89.1-arize-${sha:0:7}" + version="0.99.0-arize-${sha:0:7}" docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:arize-${sha:0:7}" push_images='true' elif [[ '${{ github.ref }}' == *'arize'* ]]; then - version="0.89.1-dev-${sha:0:7}" + version="0.99.0-dev-${sha:0:7}" docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:dev-${sha:0:7}" push_images='true' else diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index c53f2e1a..562d25bd 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -205,6 +205,7 @@ func Main(app Application) { var cfg = app.NewConfig() var parser = flags.NewParser(cfg, flags.Default) + log.Info("Starting consumer...") _, _ = parser.AddCommand("serve", "Serve as Gazette consumer", ` serve a Gazette consumer with the provided configuration, until signaled to exit (via SIGTERM). Upon receiving a signal, the consumer will seek to discharge From 6a6de9d6054158f3a7700587bb71e8c7af25f27a Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Tue, 17 Sep 2024 11:30:41 -0700 Subject: [PATCH 27/33] add support for external_account in another location --- broker/client/reader.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index ea912d8d..f31ccbbc 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -3,6 +3,7 @@ package client import ( "bufio" "context" + "encoding/json" "crypto/tls" "errors" "fmt" @@ -500,6 +501,11 @@ func (s *gcsBackend) open(ctx context.Context, ep *url.URL, fragment pb.Fragment return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } +/ to help identify when JSON credentials are an external account used by workload identity + type credentialsFile struct { + Type string `json:"type"` + } + func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) { var conf *jwt.Config @@ -522,7 +528,18 @@ func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage. creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl) if err != nil { return - } else if creds.JSON != nil { + } + + // best effort to determine if JWT credentials are for external account + externalAccount := false + if creds.JSON != nil { + var f credentialsFile + if err := json.Unmarshal(creds.JSON, &f); err == nil { + externalAccount = f.Type == "external_account" + } + } + + if creds.JSON != nil && !externalAccount { conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl) if err != nil { return From 66a5465d5a7041e87b108580dbc24672f043fff2 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Tue, 17 Sep 2024 11:44:21 -0700 Subject: [PATCH 28/33] fix typo --- broker/client/reader.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index f31ccbbc..72188cca 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -3,8 +3,8 @@ package client import ( "bufio" "context" - "encoding/json" "crypto/tls" + "encoding/json" "errors" "fmt" "io" @@ -501,10 +501,10 @@ func (s *gcsBackend) open(ctx context.Context, ep *url.URL, fragment pb.Fragment return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx) } -/ to help identify when JSON credentials are an external account used by workload identity - type credentialsFile struct { - Type string `json:"type"` - } +// to help identify when JSON credentials are an external account used by workload identity +type credentialsFile struct { + Type string `json:"type"` +} func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) { var conf *jwt.Config From 3c7f3d37d7d73e4c33069c9b6e8a900f6edd73b5 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 18 Sep 2024 08:45:43 -0700 Subject: [PATCH 29/33] by default do not skip signing URLs --- mainboilerplate/runconsumer/run_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 562d25bd..b31663fa 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -79,7 +79,7 @@ type BaseConfig struct { Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` - SkipSignedURLs uint32 `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"1" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` + SkipSignedURLs uint32 `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"0" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { From b136b0f2f36825b0ab7e3a49be55f161fcbd9cd8 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 18 Sep 2024 09:51:42 -0700 Subject: [PATCH 30/33] allow boolean command line arguments to have a value --- broker/client/reader.go | 4 ++-- cmd/gazette/main.go | 2 +- mainboilerplate/runconsumer/run_consumer.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 72188cca..2237c837 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -176,7 +176,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { // If the frame preceding EOF provided a fragment URL, open it directly. if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" { - if SkipSignedURLs > 0 { + if SkipSignedURLs { fragURL := r.Response.Fragment.BackingStore.URL() if fragURL.Scheme != "gs" { return 0, fmt.Errorf("SkipSignedURL unsupported scheme: %s", fragURL.Scheme) @@ -484,7 +484,7 @@ var ( // stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here // to avoid a cycle. Instead we will repeat a subset of store_gcs.go. -var SkipSignedURLs = uint32(0) +var SkipSignedURLs = false var gcs = &gcsBackend{} type gcsBackend struct { diff --git a/cmd/gazette/main.go b/cmd/gazette/main.go index d25cd574..35d975d3 100644 --- a/cmd/gazette/main.go +++ b/cmd/gazette/main.go @@ -131,7 +131,7 @@ func (cmdServe) Execute(args []string) error { } func main() { - var parser = flags.NewParser(Config, flags.Default) + var parser = flags.NewParser(Config, flags.Default|flags.AllowBoolValues) _, _ = parser.AddCommand("serve", "Serve as Gazette broker", ` Serve a Gazette broker with the provided configuration, until signaled to diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index b31663fa..b0072f55 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -79,7 +79,7 @@ type BaseConfig struct { Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` - SkipSignedURLs uint32 `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"0" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` + SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"false" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { @@ -204,7 +204,7 @@ func (sc Cmd) Execute(args []string) error { func Main(app Application) { var cfg = app.NewConfig() - var parser = flags.NewParser(cfg, flags.Default) + var parser = flags.NewParser(cfg, flags.Default|flags.AllowBoolValues) log.Info("Starting consumer...") _, _ = parser.AddCommand("serve", "Serve as Gazette consumer", ` serve a Gazette consumer with the provided configuration, until signaled to From 8d6aba39094871e4af0b376331c82bb5ffd237d3 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 18 Sep 2024 10:04:35 -0700 Subject: [PATCH 31/33] update go-flags package to allow boolean value options --- go.mod | 2 +- go.sum | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index e6aacd12..5dea68b9 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/gorilla/schema v1.4.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/jessevdk/go-flags v1.5.0 + github.com/jessevdk/go-flags v1.6.1 github.com/jgraettinger/cockroach-encoding v1.1.0 github.com/jgraettinger/gorocksdb v0.0.0-20240221161858-8f4873ee26e0 github.com/jgraettinger/urkel v0.1.2 diff --git a/go.sum b/go.sum index 3b448b6a..52166977 100644 --- a/go.sum +++ b/go.sum @@ -271,8 +271,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/jessevdk/go-flags v1.4.1-0.20181221193153-c0795c8afcf4/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= -github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4= +github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc= github.com/jgraettinger/cockroach-encoding v1.1.0 h1:TRSWxnWdjgT1hcfn3aFa3dwhzNa8VHifsKj3gj8cD6E= github.com/jgraettinger/cockroach-encoding v1.1.0/go.mod h1:gcht+UqiTiDC6NEF7DLHUXQStSlWjogvhlkAttXVtmE= github.com/jgraettinger/gorocksdb v0.0.0-20240221161858-8f4873ee26e0 h1:sjH3pqBnS8QFYvtJi6cE+t8B4GVK2b8HoA8zMGIHvcA= @@ -604,7 +604,6 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From a580627cc64c1ef8079b2bcf0f62fd440eedc1ab Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 18 Sep 2024 11:02:56 -0700 Subject: [PATCH 32/33] bool args cannot have default values --- mainboilerplate/runconsumer/run_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index b0072f55..b11cfa6c 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -79,7 +79,7 @@ type BaseConfig struct { Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"` MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."` WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."` - SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" default:"false" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` + SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."` } `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"` Broker struct { From bfb1bcf1cf2ea9cffa413131cb1838963f0b00e0 Mon Sep 17 00:00:00 2001 From: Darren Dowker Date: Wed, 25 Sep 2024 09:46:42 -0700 Subject: [PATCH 33/33] improve the comment about unsigned urls support --- broker/client/reader.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/broker/client/reader.go b/broker/client/reader.go index 2237c837..a77602fe 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -481,8 +481,12 @@ var ( // ARIZE specific code to end of file. // +// To support unsigned URLs we need to be able to deal with buckets directly as a consumer and not via the signed URL. +// In OpenUnsignedFragmentURL we need to be able to open the fragment directly from the bucket. It would have +// been nice to use the backend interface in stores.go which the broker uses to access buckets. Unfortunately // stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here -// to avoid a cycle. Instead we will repeat a subset of store_gcs.go. +// to avoid a cycle. Instead we will repeat a subset of store_gcs.go. This makes the use support of unsigned URLs +// very gcs specific. var SkipSignedURLs = false var gcs = &gcsBackend{}