Skip to content

Commit

Permalink
Merge pull request #9 from Arize-ai/arize-dev/signed-urls2
Browse files Browse the repository at this point in the history
Allow not using signed URLs on consumers
  • Loading branch information
ddowker authored Oct 11, 2024
2 parents 72c2838 + bfb1bcf commit 3770af5
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 19 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ jobs:
docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:latest"
push_images='true'
elif [[ '${{ github.ref }}' == 'refs/heads/arize' ]]; then
version="0.89.1-arize-${sha:0:7}"
version="0.99.0-arize-${sha:0:7}"
docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:arize-${sha:0:7}"
push_images='true'
elif [[ '${{ github.ref }}' == *'arize'* ]]; then
version="0.89.1-dev-${sha:0:7}"
version="0.99.0-dev-${sha:0:7}"
docker_tag="${{ secrets.REGISTRY_PATH }}/gazette/broker:dev-${sha:0:7}"
push_images='true'
else
Expand Down Expand Up @@ -145,7 +145,7 @@ jobs:
# We upload this to the artifacts that are attached to the action just to make it easy for
# someone to pull down a build from another branch.
- name: "Upload Binaries"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: "gazette-x86_64-linux-gnu.zip"
path: ".build-ci/gazette-x86_64-linux-gnu.zip"
Expand Down
192 changes: 183 additions & 9 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@ import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/gorilla/schema"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.gazette.dev/core/broker/codecs"
pb "go.gazette.dev/core/broker/protocol"
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -26,9 +36,9 @@ import (
// seek to the requested offset, and read its content.
//
// Reader returns EOF if:
// * The broker closes the RPC, eg because its assignment has change or it's shutting down.
// * The requested EndOffset has been read through.
// * A Fragment being read by the Reader reaches EOF.
// - The broker closes the RPC, eg because its assignment has change or it's shutting down.
// - The requested EndOffset has been read through.
// - A Fragment being read by the Reader reaches EOF.
//
// If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable
// is returned upon reaching the journal write head.
Expand Down Expand Up @@ -166,9 +176,20 @@ func (r *Reader) Read(p []byte) (n int, err error) {

// If the frame preceding EOF provided a fragment URL, open it directly.
if !r.Request.MetadataOnly && r.Response.Status == pb.Status_OK && r.Response.FragmentUrl != "" {
if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, r.Response.FragmentUrl); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
if SkipSignedURLs {
fragURL := r.Response.Fragment.BackingStore.URL()
if fragURL.Scheme != "gs" {
return 0, fmt.Errorf("SkipSignedURL unsupported scheme: %s", fragURL.Scheme)
}
if r.direct, err = OpenUnsignedFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, fragURL); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
} else {
if r.direct, err = OpenFragmentURL(r.ctx, *r.Response.Fragment,
r.Request.Offset, r.Response.FragmentUrl); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.direct|.
}
}
return
}
Expand Down Expand Up @@ -199,9 +220,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 {
}

// Seek provides a limited form of seeking support. Specifically, if:
// * A Fragment URL is being directly read, and
// * The Seek offset is ahead of the current Reader offset, and
// * The Fragment also covers the desired Seek offset
// - A Fragment URL is being directly read, and
// - The Seek offset is ahead of the current Reader offset, and
// - The Fragment also covers the desired Seek offset
//
// Then a seek is performed by reading and discarding to the seeked offset.
// Seek will otherwise return ErrSeekRequiresNewReader.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
Expand Down Expand Up @@ -274,6 +296,24 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur
return NewFragmentReader(resp.Body, fragment, offset)
}

func OpenUnsignedFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url *url.URL) (*FragmentReader, error) {
var (
rdr io.ReadCloser
err error
)

if rdr, err = gcs.open(ctx, url, fragment); err != nil {
return nil, err
}

// Record metrics related to opening the fragment.
var labels = fragmentLabels(fragment)
fragmentOpen.With(labels).Inc()
fragmentOpenBytes.With(labels).Add(float64(fragment.End - fragment.Begin))

return NewFragmentReader(rdr, fragment, offset)
}

// NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a
// returned *FragmentReader which has been pre-seeked to the given offset.
func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error) {
Expand Down Expand Up @@ -438,3 +478,137 @@ var (
// httpClient is the http.Client used by OpenFragmentURL
httpClient = newHttpClient()
)

// ARIZE specific code to end of file.
//
// To support unsigned URLs we need to be able to deal with buckets directly as a consumer and not via the signed URL.
// In OpenUnsignedFragmentURL we need to be able to open the fragment directly from the bucket. It would have
// been nice to use the backend interface in stores.go which the broker uses to access buckets. Unfortunately
// stores_test.go, which is in broker/fragment, imports broker/client so we cannot import broker/fragment here
// to avoid a cycle. Instead we will repeat a subset of store_gcs.go. This makes the use support of unsigned URLs
// very gcs specific.

var SkipSignedURLs = false
var gcs = &gcsBackend{}

type gcsBackend struct {
client *storage.Client
clientMu sync.Mutex
}

// Arize Open routine for use with consumers and signed URLs.
func (s *gcsBackend) open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) {
cfg, gClient, err := s.gcsClient(ep)
if err != nil {
return nil, err
}
return gClient.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).NewReader(ctx)
}

// to help identify when JSON credentials are an external account used by workload identity
type credentialsFile struct {
Type string `json:"type"`
}

func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, err error) {
var conf *jwt.Config

if err = parseStoreArgs(ep, &cfg); err != nil {
return
}
// Omit leading slash from bucket prefix. Note that FragmentStore already
// enforces that URL Paths end in '/'.
cfg.bucket, cfg.prefix = ep.Host, ep.Path[1:]

s.clientMu.Lock()
defer s.clientMu.Unlock()

if s.client != nil {
client = s.client
return
}
var ctx = context.Background()

creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl)
if err != nil {
return
}

// best effort to determine if JWT credentials are for external account
externalAccount := false
if creds.JSON != nil {
var f credentialsFile
if err := json.Unmarshal(creds.JSON, &f); err == nil {
externalAccount = f.Type == "external_account"
}
}

if creds.JSON != nil && !externalAccount {
conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl)
if err != nil {
return
}
client, err = storage.NewClient(ctx, option.WithTokenSource(conf.TokenSource(ctx)))
if err != nil {
return
}
s.client = client

log.WithFields(log.Fields{
"ProjectID": creds.ProjectID,
"GoogleAccessID": conf.Email,
"PrivateKeyID": conf.PrivateKeyID,
"Subject": conf.Subject,
"Scopes": conf.Scopes,
}).Info("reader constructed new GCS client")
} else {
// Possible to use GCS without a service account (e.g. with a GCE instance and workload identity).
client, err = storage.NewClient(ctx, option.WithTokenSource(creds.TokenSource))
if err != nil {
return
}

// workload identity approach which SignGet() method accepts if you have
// "iam.serviceAccounts.signBlob" permissions against your service account.
s.client = client

log.WithFields(log.Fields{
"ProjectID": creds.ProjectID,
}).Info("reader constructed new GCS client without JWT")
}

return
}

type GSStoreConfig struct {
bucket string
prefix string

RewriterConfig
}

type RewriterConfig struct {
// Find is the string to replace in the unmodified journal name.
Find string
// Replace is the string with which Find is replaced in the constructed store path.
Replace string
}

func (cfg RewriterConfig) rewritePath(s, j string) string {
if cfg.Find == "" {
return s + j
}
return s + strings.Replace(j, cfg.Find, cfg.Replace, 1)
}

func parseStoreArgs(ep *url.URL, args interface{}) error {
var decoder = schema.NewDecoder()
decoder.IgnoreUnknownKeys(false)

if q, err := url.ParseQuery(ep.RawQuery); err != nil {
return err
} else if err = decoder.Decode(args, q); err != nil {
return fmt.Errorf("parsing store URL arguments: %s", err)
}
return nil
}
19 changes: 18 additions & 1 deletion broker/fragment/store_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fragment

import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
Expand Down Expand Up @@ -144,6 +145,11 @@ func (s *gcsBackend) Remove(ctx context.Context, fragment pb.Fragment) error {
return client.Bucket(cfg.bucket).Object(cfg.rewritePath(cfg.prefix, fragment.ContentPath())).Delete(ctx)
}

// to help identify when JSON credentials are an external account used by workload identity
type credentialsFile struct {
Type string `json:"type"`
}

func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.Client, opts storage.SignedURLOptions, err error) {
var conf *jwt.Config

Expand All @@ -167,7 +173,18 @@ func (s *gcsBackend) gcsClient(ep *url.URL) (cfg GSStoreConfig, client *storage.
creds, err := google.FindDefaultCredentials(ctx, storage.ScopeFullControl)
if err != nil {
return
} else if creds.JSON != nil {
}

// best effort to determine if JWT credentials are for external account
externalAccount := false
if creds.JSON != nil {
var f credentialsFile
if err := json.Unmarshal(creds.JSON, &f); err == nil {
externalAccount = f.Type == "external_account"
}
}

if creds.JSON != nil && !externalAccount {
conf, err = google.JWTConfigFromJSON(creds.JSON, storage.ScopeFullControl)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion cmd/gazette/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (cmdServe) Execute(args []string) error {
}

func main() {
var parser = flags.NewParser(Config, flags.Default)
var parser = flags.NewParser(Config, flags.Default|flags.AllowBoolValues)

_, _ = parser.AddCommand("serve", "Serve as Gazette broker", `
Serve a Gazette broker with the provided configuration, until signaled to
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/gorilla/schema v1.4.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
github.com/jessevdk/go-flags v1.5.0
github.com/jessevdk/go-flags v1.6.1
github.com/jgraettinger/cockroach-encoding v1.1.0
github.com/jgraettinger/gorocksdb v0.0.0-20240221161858-8f4873ee26e0
github.com/jgraettinger/urkel v0.1.2
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jessevdk/go-flags v1.4.1-0.20181221193153-c0795c8afcf4/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4=
github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc=
github.com/jgraettinger/cockroach-encoding v1.1.0 h1:TRSWxnWdjgT1hcfn3aFa3dwhzNa8VHifsKj3gj8cD6E=
github.com/jgraettinger/cockroach-encoding v1.1.0/go.mod h1:gcht+UqiTiDC6NEF7DLHUXQStSlWjogvhlkAttXVtmE=
github.com/jgraettinger/gorocksdb v0.0.0-20240221161858-8f4873ee26e0 h1:sjH3pqBnS8QFYvtJi6cE+t8B4GVK2b8HoA8zMGIHvcA=
Expand Down Expand Up @@ -604,7 +604,6 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
7 changes: 6 additions & 1 deletion mainboilerplate/runconsumer/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type BaseConfig struct {
Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"`
MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."`
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."`
} `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"`

Broker struct {
Expand Down Expand Up @@ -124,6 +125,9 @@ func (sc Cmd) Execute(args []string) error {
var srv, err = server.New("", bc.Consumer.Port)
mbp.Must(err, "building Server instance")

// Arize avoidance of using signed URLs.
client.SkipSignedURLs = bc.Consumer.SkipSignedURLs

if bc.Broker.Cache.Size <= 0 {
log.Warn("--broker.cache.size is disabled; consider setting > 0")
}
Expand Down Expand Up @@ -200,7 +204,8 @@ func (sc Cmd) Execute(args []string) error {
func Main(app Application) {
var cfg = app.NewConfig()

var parser = flags.NewParser(cfg, flags.Default)
var parser = flags.NewParser(cfg, flags.Default|flags.AllowBoolValues)
log.Info("Starting consumer...")
_, _ = parser.AddCommand("serve", "Serve as Gazette consumer", `
serve a Gazette consumer with the provided configuration, until signaled to
exit (via SIGTERM). Upon receiving a signal, the consumer will seek to discharge
Expand Down

0 comments on commit 3770af5

Please sign in to comment.