Skip to content

Commit

Permalink
merge in latest from arize branch
Browse files Browse the repository at this point in the history
  • Loading branch information
ddowker committed Sep 13, 2024
2 parents 10f1846 + 72c2838 commit be19305
Show file tree
Hide file tree
Showing 30 changed files with 1,178 additions and 515 deletions.
23 changes: 20 additions & 3 deletions broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,15 @@ func (b *appendFSM) onValidatePreconditions() {
}
}

// It's possible a peer might have a larger end offset which is not
// reflected in our index, if a commit wasn't accepted by all peers.
// Such writes are reported as failed to the client and are retried
// (this failure mode is what makes journals at-least-once).
var indexMin, indexMax = b.resolved.replica.index.OffsetRange()

var maxOffset = b.pln.spool.End
if eo := b.resolved.replica.index.EndOffset(); eo > maxOffset {
maxOffset = eo
if indexMax > maxOffset {
maxOffset = indexMax
}

if b.req.CheckRegisters != nil &&
Expand All @@ -434,6 +440,17 @@ func (b *appendFSM) onValidatePreconditions() {
// Re-sync the pipeline at the explicitly requested |maxOffset|.
b.rollToOffset = maxOffset
b.state = stateSendPipelineSync
} else if b.pln.spool.Begin == indexMin {
// The spool holds the journal's first known write and should be rolled.
// This has the effect of "dirtying" the remote fragment index,
// and protects against data loss if N > R consistency is lost (eg, Etcd fails).
// When the remote index is dirty, recovering brokers are clued in that writes
// against this journal have already occurred (and `gazctl reset-head`
// must be run to recover). If the index were instead pristine,
// recovering brokers cannot distinguish this case from a newly-created
// journal, which risks double-writes to journal offsets.
b.rollToOffset = b.pln.spool.End
b.state = stateStreamContent
} else {
b.state = stateStreamContent
}
Expand All @@ -457,7 +474,7 @@ func (b *appendFSM) onStreamContent(req *pb.AppendRequest, err error) {
// Potentially roll the Fragment forward ahead of this append. Our
// pipeline is synchronized, so we expect this will always succeed
// and don't ask for an acknowledgement.
var proposal = maybeRollFragment(b.pln.spool, 0, b.resolved.journalSpec.Fragment)
var proposal = maybeRollFragment(b.pln.spool, b.rollToOffset, b.resolved.journalSpec.Fragment)

if b.pln.spool.Fragment.Fragment != proposal {
b.pln.scatter(&pb.ReplicateRequest{
Expand Down
16 changes: 16 additions & 0 deletions broker/append_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,22 @@ func TestFSMStreamAndReadAcknowledgements(t *testing.T) {
fsm.onStreamContent(&pb.AppendRequest{}, nil) // Intent to commit.
fsm.onStreamContent(nil, io.EOF) // Client EOF.

// We previously completed a first write of this journal,
// which causes the _next_ write to roll the spool forward.
expect = pb.ReplicateRequest{
Proposal: &pb.Fragment{
Journal: "a/journal",
Begin: 2054,
End: 2054,
CompressionCodec: pb.CompressionCodec_GZIP,
},
Registers: boxLabels("after", ""), // Union/subtract applied.
Acknowledge: false, // In-sync pipeline isn't acknowledged again.
}
peerRecv(expect)

// Now the Append validation error causes a rollback.
expect.Acknowledge = true
peerRecv(expect) // Rollback.
peerSend(pb.ReplicateResponse{Status: pb.Status_OK}) // Send & read ACK.
fsm.onReadAcknowledgements()
Expand Down
29 changes: 27 additions & 2 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"bufio"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
Expand All @@ -11,6 +12,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/gorilla/schema"
Expand Down Expand Up @@ -397,7 +399,7 @@ func (fr *FragmentReader) Close() error {
// })
// // rr.Read will read Fragments directly from NAS.
func InstallFileTransport(root string) (remove func()) {
var transport = http.DefaultTransport.(*http.Transport).Clone()
var transport = httpClient.Transport.(*http.Transport).Clone()
transport.RegisterProtocol("file", http.NewFileTransport(http.Dir(root)))

var prevClient = httpClient
Expand Down Expand Up @@ -427,6 +429,29 @@ func fragmentLabels(fragment pb.Fragment) prometheus.Labels {
}
}

// newHttpClient returns an http client for readers to use for fetching fragments.
// It disables http2 because we've observed some rather horrific behavior from http2
// lately. When there's an error with the underlying transport, the http2 client can still
// use it for additional streams, creating the potential for connection failures to cause
// more widespread errors for other requests to the same host. Falling back to http 1.1
// is intended to be a short term workaround.
func newHttpClient() *http.Client {
// The Go docs on disabling http2 are wrong. See: https://github.com/golang/go/issues/39302
return &http.Client{
Transport: &http.Transport{
TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper),
TLSClientConfig: &tls.Config{},
ForceAttemptHTTP2: false,
// Defaults below are taken from http.DefaultTransport
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
}

var (
// Map common broker error statuses into named errors.
ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String())
Expand All @@ -450,7 +475,7 @@ var (
ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")

// httpClient is the http.Client used by OpenFragmentURL
httpClient = http.DefaultClient
httpClient = newHttpClient()
)

// ARIZE specific code to end of file.
Expand Down
33 changes: 17 additions & 16 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,20 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
var condCh = fi.condCh
var err error

// If the requested offset isn't covered by the index, but we do have a
// Fragment covering a *greater* offset, where that Fragment is also older
// than a large time.Duration, then: skip forward the request offset to
// the Fragment offset. This case allows us to recover from "holes" or
// deletions in the offset space of a Journal, while not impacting races
// which can occur between delayed persistence to the Fragment store
// vs hand-off of Journals to new brokers (eg, a new broker which isn't
// yet aware of a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
if !found && ind != len(fi.set) &&
fi.set[ind].ModTime != 0 &&
fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix() {

// If the requested offset isn't covered by the index, but we do have
// a persisted fragment with a *greater* offset...
if !found && ind != len(fi.set) && fi.set[ind].ModTime != 0 &&
// AND the client is reading from the very beginning of the journal,
// OR the next available fragment was persisted quite a while ago.
(req.Offset == 0 || (fi.set[ind].ModTime < timeNow().Add(-offsetJumpAgeThreshold).Unix())) {

// Then skip the read forward to the first or next available offset.
// This case allows us to recover from "holes" or deletions in the
// offset space of a Journal, while not impacting races which can occur
// between delayed persistence to the Fragment store vs hand-off of
// Journals to new brokers (eg, a new broker which isn't yet aware of
// a Fragment currently being uploaded, should block a read
// of an offset covered by that Fragment until it becomes available).
resp.Offset = fi.set[ind].Begin
found = true
}
Expand Down Expand Up @@ -124,12 +125,12 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
}
}

// EndOffset returns the last (largest) End offset in the index.
func (fi *Index) EndOffset() int64 {
// OffsetRange returns the [Begin, End) offset range of all Fragments in the index.
func (fi *Index) OffsetRange() (int64, int64) {
defer fi.mu.RUnlock()
fi.mu.RLock()

return fi.set.EndOffset()
return fi.set.BeginOffset(), fi.set.EndOffset()
}

// SpoolCommit adds local Spool Fragment |frag| to the index.
Expand Down
33 changes: 31 additions & 2 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) {
c.Check(err, gc.IsNil)
}

func (s *IndexSuite) TestQueryAtMissingByteZero(c *gc.C) {
var ind = NewIndex(context.Background())
var now = time.Now().Unix()

// Establish local fragment fixtures.
var set = buildSet(c, 100, 200, 200, 300)
ind.SpoolCommit(set[0])

// A read from byte zero cannot proceed.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp.Status, gc.Equals, pb.Status_OFFSET_NOT_YET_AVAILABLE)

// Set ModTime, marking the fragment as very recently persisted.
// A read from byte zero now skips forward.
set[0].ModTime = now
ind.ReplaceRemote(set)

resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0, Block: false})
c.Check(resp, gc.DeepEquals, &pb.ReadResponse{
Offset: 100,
WriteHead: 300,
Fragment: &pb.Fragment{Begin: 100, End: 200, ModTime: now},
})
}

func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
var ind = NewIndex(context.Background())
var baseTime = time.Unix(1500000000, 0)
Expand Down Expand Up @@ -263,7 +288,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
<-ind.FirstRefreshCh()

c.Check(ind.set, gc.HasLen, 3)
c.Check(ind.EndOffset(), gc.Equals, int64(0x255))
var bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x255))

// Expect root/one provides Fragment 222-255.
var resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand All @@ -279,7 +306,9 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
ind.ReplaceRemote(set)

c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected.
c.Check(ind.EndOffset(), gc.Equals, int64(0x555))
bo, eo = ind.OffsetRange()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x555))

// Expect root/two now provides Fragment 222-333.
resp, _, _ = ind.Query(context.Background(), &pb.ReadRequest{Offset: 0x223})
Expand Down
47 changes: 36 additions & 11 deletions broker/fragment/store_azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ type azureStoreConfig struct {
accountTenantID string // The tenant ID that owns the storage account that we're writing into
// NOTE: This is not the tenant ID that owns the servie principal
storageAccountName string // Storage accounts in Azure are the equivalent to a "bucket" in S3
blobDomain string // base storage domain for azure cloud (e.g. "blob.core.windows.net")
containerName string // In azure, blobs are stored inside of containers, which live inside accounts
prefix string // This is the path prefix for the blobs inside the container

RewriterConfig
}

func (cfg *azureStoreConfig) serviceUrl() string {
return fmt.Sprintf("https://%s.blob.core.windows.net", cfg.storageAccountName)
return fmt.Sprintf("https://%s.%s", cfg.storageAccountName, cfg.blobDomain)
}

func (cfg *azureStoreConfig) containerURL() string {
Expand Down Expand Up @@ -122,6 +123,7 @@ func (a *azureBackend) SignGet(endpoint *url.URL, fragment pb.Fragment, d time.D
log.WithFields(log.Fields{
"tenantId": cfg.accountTenantID,
"storageAccountName": cfg.storageAccountName,
"blobDomain": cfg.blobDomain,
"containerName": cfg.containerName,
"blobName": blobName,
"expiryTime": sasQueryParams.ExpiryTime(),
Expand Down Expand Up @@ -216,12 +218,14 @@ func (a *azureBackend) List(ctx context.Context, store pb.FragmentStore, ep *url
} else if frag, err := pb.ParseFragmentFromRelativePath(journal, blob.Name[len(*segmentList.Prefix):]); err != nil {
log.WithFields(log.Fields{
"storageAccountName": cfg.storageAccountName,
"blobDomain": cfg.blobDomain,
"name": blob.Name,
"err": err,
}).Warning("parsing fragment")
} else if *(blob.Properties.ContentLength) == 0 && frag.ContentLength() > 0 {
log.WithFields(log.Fields{
"storageAccountName": cfg.storageAccountName,
"blobDomain": cfg.blobDomain,
"name": blob.Name,
}).Warning("zero-length fragment")
} else {
Expand Down Expand Up @@ -253,17 +257,31 @@ func (a *azureBackend) Remove(ctx context.Context, fragment pb.Fragment) error {
// credentials that can be used with `azblob` Pipelines.
func getAzureStorageCredential(coreCredential azcore.TokenCredential, tenant string) (azblob.TokenCredential, error) {
var tokenRefresher = func(credential azblob.TokenCredential) time.Duration {
accessToken, err := coreCredential.GetToken(context.Background(), policy.TokenRequestOptions{TenantID: tenant, Scopes: []string{"https://storage.azure.com/.default"}})
if err != nil {
panic(err)
}
credential.SetToken(accessToken.Token)
var backoff_duration = time.Second
for {
accessToken, err := coreCredential.GetToken(context.Background(), policy.TokenRequestOptions{TenantID: tenant, Scopes: []string{"https://storage.azure.com/.default"}})
if err != nil {
log.WithFields(log.Fields{
"tenant": tenant,
"backoff_duration": backoff_duration,
}).Errorf("Error refreshing credential, will retry: %v", err)
time.Sleep(backoff_duration)
if backoff_duration*2 > (time.Minute * 5) {
backoff_duration = time.Minute * 5
} else {
backoff_duration = backoff_duration * 2
}
continue
} else {
credential.SetToken(accessToken.Token)

// Give 60s of padding in order to make sure we always have a non-expired token.
// If we didn't do this, we would *begin* the refresh process as the token expires,
// potentially leaving any consumer with an expired token while we fetch a new one.
exp := accessToken.ExpiresOn.Sub(time.Now().Add(time.Minute))
return exp
// Give 60s of padding in order to make sure we always have a non-expired token.
// If we didn't do this, we would *begin* the refresh process as the token expires,
// potentially leaving any consumer with an expired token while we fetch a new one.
exp := accessToken.ExpiresOn.Sub(time.Now().Add(time.Minute))
return exp
}
}
}

credential := azblob.NewTokenCredential("", tokenRefresher)
Expand All @@ -279,6 +297,12 @@ func parseAzureEndpoint(endpoint *url.URL) (cfg azureStoreConfig, err error) {
// enforces that URL Paths end in '/'.
var splitPath = strings.Split(endpoint.Path[1:], "/")

// arize change to support china cloud
cfg.blobDomain = os.Getenv("AZURE_BLOB_DOMAIN")
if cfg.blobDomain == "" {
cfg.blobDomain = "blob.core.windows.net"
}

if endpoint.Scheme == "azure" {
// Since only one non-ad "Shared Key" credential can be injected via
// environment variables, we should only keep around one client for
Expand Down Expand Up @@ -430,6 +454,7 @@ func (a *azureBackend) getAzurePipeline(ep *url.URL) (cfg azureStoreConfig, clie
log.WithFields(log.Fields{
"tenant": cfg.accountTenantID,
"storageAccountName": cfg.storageAccountName,
"blobDomain": cfg.blobDomain,
"storageContainerName": cfg.containerName,
"pathPrefix": cfg.prefix,
}).Info("constructed new Azure Storage pipeline client")
Expand Down
13 changes: 10 additions & 3 deletions broker/fragment/store_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ type S3StoreConfig struct {
// SSEKMSKeyId specifies the ID for the AWS KMS symmetric customer managed key
// By default, not used.
SSEKMSKeyId string
// Region is the region for the bucket. If empty, the region is determined
// from `Profile` or the default credentials.
Region string
}

type s3Backend struct {
clients map[[2]string]*s3.S3
clients map[[3]string]*s3.S3
clientsMu sync.Mutex
}

func newS3Backend() *s3Backend {
return &s3Backend{
clients: make(map[[2]string]*s3.S3),
clients: make(map[[3]string]*s3.S3),
}
}

Expand Down Expand Up @@ -198,14 +201,18 @@ func (s *s3Backend) s3Client(ep *url.URL) (cfg S3StoreConfig, client *s3.S3, err
defer s.clientsMu.Unlock()
s.clientsMu.Lock()

var key = [2]string{cfg.Endpoint, cfg.Profile}
var key = [3]string{cfg.Endpoint, cfg.Profile, cfg.Region}
if client = s.clients[key]; client != nil {
return
}

var awsConfig = aws.NewConfig()
awsConfig.WithCredentialsChainVerboseErrors(true)

if cfg.Region != "" {
awsConfig.WithRegion(cfg.Region)
}

if cfg.Endpoint != "" {
awsConfig.WithEndpoint(cfg.Endpoint)
// We must force path style because bucket-named virtual hosts
Expand Down
3 changes: 2 additions & 1 deletion broker/fragment/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ func TestStoreInteractions(t *testing.T) {
}

func TestParseStoreArgsS3(t *testing.T) {
storeURL, _ := url.Parse("s3://bucket/prefix/?endpoint=https://s3.region.amazonaws.com&SSE=kms&SSEKMSKeyId=123")
storeURL, _ := url.Parse("s3://bucket/prefix/?endpoint=https://s3.region.amazonaws.com&SSE=kms&SSEKMSKeyId=123&region=some-region")
var s3Cfg S3StoreConfig
parseStoreArgs(storeURL, &s3Cfg)
require.Equal(t, "bucket", storeURL.Host)
require.Equal(t, "prefix/", storeURL.Path[1:])
require.Equal(t, "https://s3.region.amazonaws.com", s3Cfg.Endpoint)
require.Equal(t, "kms", s3Cfg.SSE)
require.Equal(t, "123", s3Cfg.SSEKMSKeyId)
require.Equal(t, "some-region", s3Cfg.Region)
}

func readFrag(t *testing.T, f pb.Fragment) string {
Expand Down
Loading

0 comments on commit be19305

Please sign in to comment.