Skip to content

Commit

Permalink
Merge pull request #1743 from Permify/ufuk/monitoring
Browse files Browse the repository at this point in the history
feat: new global tracer and meter
  • Loading branch information
tolgaOzen authored Nov 4, 2024
2 parents 461e36d + f011909 commit 5d40010
Show file tree
Hide file tree
Showing 25 changed files with 92 additions and 115 deletions.
13 changes: 4 additions & 9 deletions internal/engines/cache/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"encoding/hex"
"time"

"go.opentelemetry.io/otel"
api "go.opentelemetry.io/otel/metric"

"github.com/cespare/xxhash/v2"

"github.com/Permify/permify/internal"
"github.com/Permify/permify/internal/engines"
"github.com/Permify/permify/internal/invoke"
"github.com/Permify/permify/internal/storage"
Expand All @@ -18,11 +18,6 @@ import (
"github.com/Permify/permify/pkg/telemetry"
)

var (
tracer = otel.Tracer("check-cache")
meter = otel.Meter("check-cache")
)

// CheckEngineWithCache is a struct that holds an instance of a cache.Cache for managing engine cache.
type CheckEngineWithCache struct {
// schemaReader is responsible for reading schema information
Expand All @@ -46,8 +41,8 @@ func NewCheckEngineWithCache(
schemaReader: schemaReader,
checker: checker,
cache: cache,
cacheCounter: telemetry.NewCounter(meter, "cache_check_count", "Number of permission cached checks performed"),
cacheHitDurationHistogram: telemetry.NewHistogram(meter, "cache_hit_duration", "microseconds", "Duration of cache hits in microseconds"),
cacheCounter: telemetry.NewCounter(internal.Meter, "cache_check_count", "Number of permission cached checks performed"),
cacheHitDurationHistogram: telemetry.NewHistogram(internal.Meter, "cache_hit_duration", "microseconds", "Duration of cache hits in microseconds"),
}
}

Expand All @@ -72,7 +67,7 @@ func (c *CheckEngineWithCache) Check(ctx context.Context, request *base.Permissi

// If a cached result is found, handle exclusion and return the result.
if found {
ctx, span := tracer.Start(ctx, "hit")
ctx, span := internal.Tracer.Start(ctx, "hit")
defer span.End()
start := time.Now()

Expand Down
2 changes: 0 additions & 2 deletions internal/engines/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (engine *CheckEngine) checkDirectRelation(request *base.PermissionCheckRequ
// TupleFilter helps in filtering out the relationships for a specific entity and a permission.
var rit *database.TupleIterator
rit, err = engine.dataReader.QueryRelationships(ctx, request.GetTenantId(), filter, request.GetMetadata().GetSnapToken(), database.NewCursorPagination())

// If there's an error in querying, return a denied permission response along with the error.
if err != nil {
return denied(emptyResponseMetadata()), err
Expand Down Expand Up @@ -466,7 +465,6 @@ func (engine *CheckEngine) checkDirectAttribute(
// storageContext.NewContextualAttributes creates a new instance of ContextualAttributes based on the attributes
// retrieved from the request context.
val, err = storageContext.NewContextualAttributes(request.GetContext().GetAttributes()...).QuerySingleAttribute(filter)

// An error occurred while querying the single attribute, so we return a denied response with empty metadata
// and the error.
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/engines/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func (engine *ExpandEngine) expandDirectAttribute(

// Attempt to get the attribute using the defined filter.
val, err = storageContext.NewContextualAttributes(request.GetContext().GetAttributes()...).QuerySingleAttribute(filter)

// If there's an error in getting the attribute, send a failure response through the channel and return from the function.
if err != nil {
expandChan <- expandFailResponse(err)
Expand Down
1 change: 0 additions & 1 deletion internal/engines/subjectFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (engine *SubjectFilter) subjectFilterDirectAttribute(
// storageContext.NewContextualAttributes creates a new instance of ContextualAttributes based on the attributes
// retrieved from the request context.
val, err = storageContext.NewContextualAttributes(request.GetContext().GetAttributes()...).QuerySingleAttribute(filter)

// An error occurred while querying the single attribute, so we return a denied response with empty metadata
// and the error.
if err != nil {
Expand Down
35 changes: 15 additions & 20 deletions internal/invoke/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelCodes "go.opentelemetry.io/otel/codes"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"github.com/Permify/permify/internal"
"github.com/Permify/permify/internal/storage"
base "github.com/Permify/permify/pkg/pb/base/v1"
"github.com/Permify/permify/pkg/telemetry"
"github.com/Permify/permify/pkg/token"
"github.com/Permify/permify/pkg/tuple"
)

var (
tracer = otel.Tracer("invoke")
meter = otel.Meter("invoke")
)

// Invoker is an interface that groups multiple permission-related interfaces.
// It is used to define a common contract for invoking various permission operations.
type Invoker interface {
Expand Down Expand Up @@ -103,22 +98,22 @@ func NewDirectInvoker(
ec: ec,
lo: lo,
sp: sp,
checkCounter: telemetry.NewCounter(meter, "check_count", "Number of permission checks performed"),
lookupEntityCounter: telemetry.NewCounter(meter, "lookup_entity_count", "Number of permission lookup entity performed"),
lookupSubjectCounter: telemetry.NewCounter(meter, "lookup_subject_count", "Number of permission lookup subject performed"),
subjectPermissionCounter: telemetry.NewCounter(meter, "subject_permission_count", "Number of subject permission performed"),
checkDurationHistogram: telemetry.NewHistogram(meter, "check_duration", "microseconds", "Duration of checks in microseconds"),
lookupEntityDurationHistogram: telemetry.NewHistogram(meter, "lookup_entity_duration", "microseconds", "Duration of lookup entity duration in microseconds"),
lookupSubjectDurationHistogram: telemetry.NewHistogram(meter, "lookup_subject_duration", "microseconds", "Duration of lookup subject duration in microseconds"),
subjectPermissionDurationHistogram: telemetry.NewHistogram(meter, "subject_permission_duration", "microseconds", "Duration of subject permission duration in microseconds"),
checkCounter: telemetry.NewCounter(internal.Meter, "check_count", "Number of permission checks performed"),
lookupEntityCounter: telemetry.NewCounter(internal.Meter, "lookup_entity_count", "Number of permission lookup entity performed"),
lookupSubjectCounter: telemetry.NewCounter(internal.Meter, "lookup_subject_count", "Number of permission lookup subject performed"),
subjectPermissionCounter: telemetry.NewCounter(internal.Meter, "subject_permission_count", "Number of subject permission performed"),
checkDurationHistogram: telemetry.NewHistogram(internal.Meter, "check_duration", "microseconds", "Duration of checks in microseconds"),
lookupEntityDurationHistogram: telemetry.NewHistogram(internal.Meter, "lookup_entity_duration", "microseconds", "Duration of lookup entity duration in microseconds"),
lookupSubjectDurationHistogram: telemetry.NewHistogram(internal.Meter, "lookup_subject_duration", "microseconds", "Duration of lookup subject duration in microseconds"),
subjectPermissionDurationHistogram: telemetry.NewHistogram(internal.Meter, "subject_permission_duration", "microseconds", "Duration of subject permission duration in microseconds"),
}
}

// Check is a method that implements the Check interface.
// It calls the Run method of the CheckEngine with the provided context and PermissionCheckRequest,
// and returns a PermissionCheckResponse and an error if any.
func (invoker *DirectInvoker) Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error) {
ctx, span := tracer.Start(ctx, "check", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "check", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
Expand Down Expand Up @@ -208,7 +203,7 @@ func (invoker *DirectInvoker) Check(ctx context.Context, request *base.Permissio
// It calls the Run method of the ExpandEngine with the provided context and PermissionExpandRequest,
// and returns a PermissionExpandResponse and an error if any.
func (invoker *DirectInvoker) Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error) {
ctx, span := tracer.Start(ctx, "expand", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "expand", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
Expand Down Expand Up @@ -242,7 +237,7 @@ func (invoker *DirectInvoker) Expand(ctx context.Context, request *base.Permissi
// It calls the Run method of the LookupEntityEngine with the provided context and PermissionLookupEntityRequest,
// and returns a PermissionLookupEntityResponse and an error if any.
func (invoker *DirectInvoker) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
ctx, span := tracer.Start(ctx, "lookup-entity", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "lookup-entity", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
Expand Down Expand Up @@ -289,7 +284,7 @@ func (invoker *DirectInvoker) LookupEntity(ctx context.Context, request *base.Pe
// It calls the Stream method of the LookupEntityEngine with the provided context, PermissionLookupEntityRequest, and Permission_LookupEntityStreamServer,
// and returns an error if any.
func (invoker *DirectInvoker) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
ctx, span := tracer.Start(ctx, "lookup-entity-stream", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "lookup-entity-stream", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
Expand Down Expand Up @@ -335,7 +330,7 @@ func (invoker *DirectInvoker) LookupEntityStream(ctx context.Context, request *b
// LookupSubject is a method of the DirectInvoker structure. It handles the task of looking up subjects
// and returning the results in a response.
func (invoker *DirectInvoker) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
ctx, span := tracer.Start(ctx, "lookup-subject", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "lookup-subject", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
Expand Down Expand Up @@ -389,7 +384,7 @@ func (invoker *DirectInvoker) LookupSubject(ctx context.Context, request *base.P
// SubjectPermission is a method of the DirectInvoker structure. It handles the task of subject's permissions
// and returning the results in a response.
func (invoker *DirectInvoker) SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error) {
ctx, span := tracer.Start(ctx, "subject-permission", trace.WithAttributes(
ctx, span := internal.Tracer.Start(ctx, "subject-permission", trace.WithAttributes(
attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
Expand Down
4 changes: 2 additions & 2 deletions internal/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package internal
import "go.opentelemetry.io/otel"

var (
tracer = otel.Tracer("permify")
meter = otel.Meter("permify")
Tracer = otel.Tracer("permify")
Meter = otel.Meter("permify")
)
7 changes: 4 additions & 3 deletions internal/servers/bundleServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
otelCodes "go.opentelemetry.io/otel/codes"
"google.golang.org/grpc/status"

"github.com/Permify/permify/internal"
"github.com/Permify/permify/internal/storage"
"github.com/Permify/permify/internal/validation"
v1 "github.com/Permify/permify/pkg/pb/base/v1"
Expand All @@ -32,7 +33,7 @@ func NewBundleServer(

// Write handles the writing of bundles.
func (r *BundleServer) Write(ctx context.Context, request *v1.BundleWriteRequest) (*v1.BundleWriteResponse, error) {
ctx, span := tracer.Start(ctx, "bundle.write")
ctx, span := internal.Tracer.Start(ctx, "bundle.write")
defer span.End()

v := request.Validate()
Expand Down Expand Up @@ -73,7 +74,7 @@ func (r *BundleServer) Write(ctx context.Context, request *v1.BundleWriteRequest

// Read handles the reading of bundles.
func (r *BundleServer) Read(ctx context.Context, request *v1.BundleReadRequest) (*v1.BundleReadResponse, error) {
ctx, span := tracer.Start(ctx, "bundle.read")
ctx, span := internal.Tracer.Start(ctx, "bundle.read")
defer span.End()

v := request.Validate()
Expand All @@ -96,7 +97,7 @@ func (r *BundleServer) Read(ctx context.Context, request *v1.BundleReadRequest)

// Delete handles the deletion of bundles.
func (r *BundleServer) Delete(ctx context.Context, request *v1.BundleDeleteRequest) (*v1.BundleDeleteResponse, error) {
ctx, span := tracer.Start(ctx, "bundle.delete")
ctx, span := internal.Tracer.Start(ctx, "bundle.delete")
defer span.End()

v := request.Validate()
Expand Down
29 changes: 15 additions & 14 deletions internal/servers/dataServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/status"

"github.com/Permify/permify/internal"
"github.com/Permify/permify/internal/storage"
"github.com/Permify/permify/internal/validation"
"github.com/Permify/permify/pkg/attribute"
Expand Down Expand Up @@ -47,19 +48,19 @@ func NewDataServer(
dw: dw,
br: br,
sr: sr,
writeDataHistogram: telemetry.NewHistogram(meter, "write_data", "microseconds", "Duration of writing data in microseconds"),
deleteDataHistogram: telemetry.NewHistogram(meter, "delete_data", "microseconds", "Duration of deleting data in microseconds"),
readAttributesHistogram: telemetry.NewHistogram(meter, "read_attributes", "microseconds", "Duration of reading attributes in microseconds"),
readRelationshipsHistogram: telemetry.NewHistogram(meter, "read_relationships", "microseconds", "Duration of reading relationships in microseconds"),
writeRelationshipsHistogram: telemetry.NewHistogram(meter, "write_relationships", "microseconds", "Duration of writing relationships in microseconds"),
deleteRelationshipsHistogram: telemetry.NewHistogram(meter, "delete_relationships", "microseconds", "Duration of deleting relationships in microseconds"),
runBundleHistogram: telemetry.NewHistogram(meter, "delete_relationships", "run_bundle", "Duration of running bunble in microseconds"),
writeDataHistogram: telemetry.NewHistogram(internal.Meter, "write_data", "microseconds", "Duration of writing data in microseconds"),
deleteDataHistogram: telemetry.NewHistogram(internal.Meter, "delete_data", "microseconds", "Duration of deleting data in microseconds"),
readAttributesHistogram: telemetry.NewHistogram(internal.Meter, "read_attributes", "microseconds", "Duration of reading attributes in microseconds"),
readRelationshipsHistogram: telemetry.NewHistogram(internal.Meter, "read_relationships", "microseconds", "Duration of reading relationships in microseconds"),
writeRelationshipsHistogram: telemetry.NewHistogram(internal.Meter, "write_relationships", "microseconds", "Duration of writing relationships in microseconds"),
deleteRelationshipsHistogram: telemetry.NewHistogram(internal.Meter, "delete_relationships", "microseconds", "Duration of deleting relationships in microseconds"),
runBundleHistogram: telemetry.NewHistogram(internal.Meter, "delete_relationships", "run_bundle", "Duration of running bunble in microseconds"),
}
}

// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
ctx, span := tracer.Start(ctx, "data.read.relationships")
ctx, span := internal.Tracer.Start(ctx, "data.read.relationships")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -110,7 +111,7 @@ func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.Relation

// ReadAttributes - Allows directly querying the stored engines data to display and filter stored attribute tuples
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
ctx, span := tracer.Start(ctx, "data.read.attributes")
ctx, span := internal.Tracer.Start(ctx, "data.read.attributes")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -161,7 +162,7 @@ func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeRe

// Write - Write relationships and attributes to writeDB
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
ctx, span := tracer.Start(ctx, "data.write")
ctx, span := internal.Tracer.Start(ctx, "data.write")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -260,7 +261,7 @@ func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*

// WriteRelationships - Write relation tuples to writeDB
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
ctx, span := tracer.Start(ctx, "relationships.write")
ctx, span := internal.Tracer.Start(ctx, "relationships.write")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -329,7 +330,7 @@ func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.Relatio

// Delete - Delete relationships and attributes from writeDB
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
ctx, span := tracer.Start(ctx, "data.delete")
ctx, span := internal.Tracer.Start(ctx, "data.delete")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -361,7 +362,7 @@ func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest)

// DeleteRelationships - Delete relationships from writeDB
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
ctx, span := tracer.Start(ctx, "relationships.delete")
ctx, span := internal.Tracer.Start(ctx, "relationships.delete")
defer span.End()
start := time.Now()

Expand Down Expand Up @@ -393,7 +394,7 @@ func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.Relati

// RunBundle executes a bundle and returns its snapshot token.
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
ctx, span := tracer.Start(ctx, "bundle.run")
ctx, span := internal.Tracer.Start(ctx, "bundle.run")
defer span.End()
start := time.Now()

Expand Down
Loading

0 comments on commit 5d40010

Please sign in to comment.