Skip to content

Commit

Permalink
Merge pull request #1004 from Permify/refactor/switch-to-gobreaker-ad…
Browse files Browse the repository at this point in the history
…d-singleflight

feat: gobreaker add singleflight
  • Loading branch information
tolgaOzen authored Jan 12, 2024
2 parents e1d127d + 2b65179 commit f291920
Show file tree
Hide file tree
Showing 18 changed files with 479 additions and 773 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
Expand All @@ -151,4 +152,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
resenje.org/singleflight v0.4.1 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.10.0 h1:EaGW2JJh15aKOejeuJ+wpFSHnbd7GE6Wvp3TsNhb6LY=
Expand Down Expand Up @@ -952,6 +954,8 @@ modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
resenje.org/singleflight v0.4.1 h1:ryGHRaOBwhnZLyf34LMDf4AsTSHrs4hdGPdG/I4Hmac=
resenje.org/singleflight v0.4.1/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
47 changes: 0 additions & 47 deletions internal/storage/decorators/bundleReaderWithCircuitBreaker.go

This file was deleted.

71 changes: 0 additions & 71 deletions internal/storage/decorators/bundleWriterWithCircuitBreaker.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package decorators
package cache

import (
"context"
Expand All @@ -11,27 +11,27 @@ import (
base "github.com/Permify/permify/pkg/pb/base/v1"
)

// SchemaReaderWithCache - Add cache behaviour to schema reader
type SchemaReaderWithCache struct {
// SchemaReader - Add cache behaviour to schema reader
type SchemaReader struct {
delegate storage.SchemaReader
cache cache.Cache
}

// NewSchemaReaderWithCache new instance of SchemaReaderWithCache
func NewSchemaReaderWithCache(delegate storage.SchemaReader, cache cache.Cache) *SchemaReaderWithCache {
return &SchemaReaderWithCache{
// NewSchemaReader new instance of SchemaReader
func NewSchemaReader(delegate storage.SchemaReader, cache cache.Cache) *SchemaReader {
return &SchemaReader{
delegate: delegate,
cache: cache,
}
}

// ReadSchema - Read schema from the repository
func (r *SchemaReaderWithCache) ReadSchema(ctx context.Context, tenantID, version string) (schema *base.SchemaDefinition, err error) {
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (schema *base.SchemaDefinition, err error) {
return r.delegate.ReadSchema(ctx, tenantID, version)
}

// ReadEntityDefinition - Read entity definition from the repository
func (r *SchemaReaderWithCache) ReadEntityDefinition(ctx context.Context, tenantID, entityName, version string) (definition *base.EntityDefinition, v string, err error) {
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, entityName, version string) (definition *base.EntityDefinition, v string, err error) {
var s interface{}
found := false
if version != "" {
Expand All @@ -54,7 +54,7 @@ func (r *SchemaReaderWithCache) ReadEntityDefinition(ctx context.Context, tenant
}

// ReadRuleDefinition - Read rule definition from the repository
func (r *SchemaReaderWithCache) ReadRuleDefinition(ctx context.Context, tenantID, ruleName, version string) (definition *base.RuleDefinition, v string, err error) {
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, ruleName, version string) (definition *base.RuleDefinition, v string, err error) {
var s interface{}
found := false
if version != "" {
Expand All @@ -77,6 +77,6 @@ func (r *SchemaReaderWithCache) ReadRuleDefinition(ctx context.Context, tenantID
}

// HeadVersion - Finds the latest version of the schema.
func (r *SchemaReaderWithCache) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
return r.delegate.HeadVersion(ctx, tenantID)
}
32 changes: 32 additions & 0 deletions internal/storage/decorators/circuitBreaker/bundleReader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package circuitBreaker

import (
"context"

"github.com/sony/gobreaker"

"github.com/Permify/permify/internal/storage"
base "github.com/Permify/permify/pkg/pb/base/v1"
)

// BundleReader - Add circuit breaker behaviour to bundle reader
type BundleReader struct {
delegate storage.BundleReader
cb *gobreaker.CircuitBreaker
}

// NewBundleReader - Add circuit breaker behaviour to new bundle reader
func NewBundleReader(delegate storage.BundleReader, cb *gobreaker.CircuitBreaker) *BundleReader {
return &BundleReader{delegate: delegate, cb: cb}
}

// Read - Reads bundles from the repository
func (r *BundleReader) Read(ctx context.Context, tenantID, name string) (bundle *base.DataBundle, err error) {
response, err := r.cb.Execute(func() (interface{}, error) {
return r.delegate.Read(ctx, tenantID, name)
})
if err != nil {
return nil, err
}
return response.(*base.DataBundle), nil
}
151 changes: 151 additions & 0 deletions internal/storage/decorators/circuitBreaker/dataReader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package circuitBreaker

import (
"context"

"github.com/sony/gobreaker"

"github.com/Permify/permify/internal/storage"
"github.com/Permify/permify/pkg/database"
base "github.com/Permify/permify/pkg/pb/base/v1"
"github.com/Permify/permify/pkg/token"
)

// DataReader - Add circuit breaker behaviour to data reader
type DataReader struct {
delegate storage.DataReader
cb *gobreaker.CircuitBreaker
}

// NewDataReader - Add circuit breaker behaviour to new data reader
func NewDataReader(delegate storage.DataReader, cb *gobreaker.CircuitBreaker) *DataReader {
return &DataReader{delegate: delegate, cb: cb}
}

// QueryRelationships - Reads relation tuples from the repository
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, token string) (*database.TupleIterator, error) {
response, err := r.cb.Execute(func() (interface{}, error) {
return r.delegate.QueryRelationships(ctx, tenantID, filter, token)
})
if err != nil {
return nil, err
}
return response.(*database.TupleIterator), nil
}

// ReadRelationships - Reads relation tuples from the repository with different options.
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, token string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
type circuitBreakerResponse struct {
Collection *database.TupleCollection
ContinuousToken database.EncodedContinuousToken
}

response, err := r.cb.Execute(func() (interface{}, error) {
var err error
var resp circuitBreakerResponse
resp.Collection, resp.ContinuousToken, err = r.delegate.ReadRelationships(ctx, tenantID, filter, token, pagination)
return resp, err
})
if err != nil {
return nil, nil, err
}

resp := response.(circuitBreakerResponse)
return resp.Collection, resp.ContinuousToken, nil
}

// QuerySingleAttribute - Reads a single attribute from the repository.
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string) (*base.Attribute, error) {
response, err := r.cb.Execute(func() (interface{}, error) {
return r.delegate.QuerySingleAttribute(ctx, tenantID, filter, token)
})
if err != nil {
return nil, err
}
return response.(*base.Attribute), nil
}

// QueryAttributes - Reads multiple attributes from the repository.
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string) (*database.AttributeIterator, error) {
response, err := r.cb.Execute(func() (interface{}, error) {
return r.delegate.QueryAttributes(ctx, tenantID, filter, token)
})
if err != nil {
return nil, err
}
return response.(*database.AttributeIterator), nil
}

// ReadAttributes - Reads multiple attributes from the repository with different options.
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
type circuitBreakerResponse struct {
Collection *database.AttributeCollection
ContinuousToken database.EncodedContinuousToken
}

response, err := r.cb.Execute(func() (interface{}, error) {
var err error
var resp circuitBreakerResponse
resp.Collection, resp.ContinuousToken, err = r.delegate.ReadAttributes(ctx, tenantID, filter, token, pagination)
return resp, err
})
if err != nil {
return nil, nil, err
}

resp := response.(circuitBreakerResponse)
return resp.Collection, resp.ContinuousToken, nil
}

// QueryUniqueEntities - Reads unique entities from the repository with different options.
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, token string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
type circuitBreakerResponse struct {
IDs []string
ContinuousToken database.EncodedContinuousToken
}

response, err := r.cb.Execute(func() (interface{}, error) {
var err error
var resp circuitBreakerResponse
resp.IDs, resp.ContinuousToken, err = r.delegate.QueryUniqueEntities(ctx, tenantID, name, token, pagination)
return resp, err
})
if err != nil {
return nil, nil, err
}

resp := response.(circuitBreakerResponse)
return resp.IDs, resp.ContinuousToken, nil
}

// QueryUniqueSubjectReferences - Reads unique subject references from the repository with different options.
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, token string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
type circuitBreakerResponse struct {
IDs []string
ContinuousToken database.EncodedContinuousToken
}

response, err := r.cb.Execute(func() (interface{}, error) {
var err error
var resp circuitBreakerResponse
resp.IDs, resp.ContinuousToken, err = r.delegate.QueryUniqueSubjectReferences(ctx, tenantID, subjectReference, token, pagination)
return resp, err
})
if err != nil {
return nil, nil, err
}

resp := response.(circuitBreakerResponse)
return resp.IDs, resp.ContinuousToken, nil
}

// HeadSnapshot - Reads the latest version of the snapshot from the repository.
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
response, err := r.cb.Execute(func() (interface{}, error) {
return r.delegate.HeadSnapshot(ctx, tenantID)
})
if err != nil {
return nil, err
}
return response.(token.SnapToken), nil
}
Loading

0 comments on commit f291920

Please sign in to comment.