diff --git a/internal/config/config.go b/internal/config/config.go index d3756667b..63171d61c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -260,7 +260,7 @@ func DefaultConfig() *Config { Enabled: false, }, }, - RateLimit: 100, + RateLimit: 10_000, }, Profiler: Profiler{ Enabled: false, diff --git a/internal/storage/postgres/bundleReader.go b/internal/storage/postgres/bundleReader.go index e26c83560..1a90d759e 100644 --- a/internal/storage/postgres/bundleReader.go +++ b/internal/storage/postgres/bundleReader.go @@ -41,7 +41,7 @@ func (b *BundleReader) Read(ctx context.Context, tenantID, name string) (bundle query, args, err = builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -55,7 +55,7 @@ func (b *BundleReader) Read(ctx context.Context, tenantID, name string) (bundle if errors.Is(err, sql.ErrNoRows) { return nil, errors.New(base.ErrorCode_ERROR_CODE_BUNDLE_NOT_FOUND.String()) } - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } m := jsonpb.Unmarshaler{} diff --git a/internal/storage/postgres/bundleWriter.go b/internal/storage/postgres/bundleWriter.go index e0758418c..a9ff22daf 100644 --- a/internal/storage/postgres/bundleWriter.go +++ b/internal/storage/postgres/bundleWriter.go @@ -44,7 +44,7 @@ func (b *BundleWriter) Write(ctx context.Context, bundles []storage.Bundle) (nam m := jsonpb.Marshaler{} jsonStr, err := m.MarshalToString(bundle.DataBundle) if err != nil { - return names, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_ARGUMENT) + return names, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_ARGUMENT) } insertBuilder = insertBuilder.Values(bundle.Name, jsonStr, bundle.TenantID) @@ -55,14 +55,14 @@ func (b *BundleWriter) Write(ctx context.Context, bundles []storage.Bundle) (nam query, args, err = insertBuilder.ToSql() if err != nil { - return names, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return names, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql insert query", slog.Any("query", query), slog.Any("arguments", args)) _, err = b.database.DB.ExecContext(ctx, query, args...) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully wrote bundles to the database", slog.Any("number_of_bundles", len(bundles))) @@ -83,12 +83,12 @@ func (b *BundleWriter) Delete(ctx context.Context, tenantID, name string) (err e query, args, err = deleteBuilder.ToSql() if err != nil { - return utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } _, err = b.database.DB.ExecContext(ctx, query, args...) if err != nil { - return utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("bundle successfully deleted") diff --git a/internal/storage/postgres/dataReader.go b/internal/storage/postgres/dataReader.go index d15277f04..0d2b0e0ca 100644 --- a/internal/storage/postgres/dataReader.go +++ b/internal/storage/postgres/dataReader.go @@ -51,14 +51,14 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -75,7 +75,7 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi var query string query, args, err = builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -84,7 +84,7 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi var rows *sql.Rows rows, err = tx.QueryContext(ctx, query, args...) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -94,18 +94,18 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi rt := storage.RelationTuple{} err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } collection.Add(rt.ToTuple()) } if err = rows.Err(); err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully retrieved relation tuples from the database") @@ -126,14 +126,14 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -150,12 +150,12 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } var v uint64 v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) } builder = builder.Where(squirrel.GtOrEq{"id": v}) } @@ -167,7 +167,7 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil var args []interface{} query, args, err = builder.ToSql() if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -176,7 +176,7 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil var rows *sql.Rows rows, err = tx.QueryContext(ctx, query, args...) if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -188,20 +188,20 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil rt := storage.RelationTuple{} err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } lastID = rt.ID tuples = append(tuples, rt.ToTuple()) } // Check for any errors during iteration. if err = rows.Err(); err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully read relation tuples from database") @@ -226,14 +226,14 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -250,7 +250,7 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, var query string query, args, err = builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -268,7 +268,7 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, if errors.Is(err, sql.ErrNoRows) { return nil, nil } else { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } } @@ -277,13 +277,13 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, unmarshaler := &jsonpb.Unmarshaler{} err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully retrieved Single attribute from the database") @@ -303,14 +303,14 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -327,7 +327,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte var query string query, args, err = builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -336,7 +336,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte var rows *sql.Rows rows, err = tx.QueryContext(ctx, query, args...) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -351,7 +351,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte // Scan the row from the database into the fields of `rt` and `valueStr`. err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Unmarshal the JSON data from `valueStr` into `rt.Value`. @@ -359,19 +359,19 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte unmarshaler := &jsonpb.Unmarshaler{} err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } collection.Add(rt.ToAttribute()) } if err = rows.Err(); err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully retrieved attributes tuples from the database") @@ -392,14 +392,14 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -416,12 +416,12 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } var v uint64 v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) } builder = builder.Where(squirrel.GtOrEq{"id": v}) } @@ -433,7 +433,7 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter var args []interface{} query, args, err = builder.ToSql() if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -442,7 +442,7 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter var rows *sql.Rows rows, err = tx.QueryContext(ctx, query, args...) if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -459,7 +459,7 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter // Scan the row from the database into the fields of `rt` and `valueStr`. err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } lastID = rt.ID @@ -468,20 +468,20 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter unmarshaler := &jsonpb.Unmarshaler{} err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } attributes = append(attributes, rt.ToAttribute()) } // Check for any errors during iteration. if err = rows.Err(); err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully read attributes from the database") @@ -506,14 +506,14 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -527,12 +527,12 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } var v uint64 v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) } query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10)) @@ -547,7 +547,7 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn var rows *sql.Rows rows, err = tx.QueryContext(ctx, query) if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -559,7 +559,7 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn var entityId string err = rows.Scan(&lastID, &entityId) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } entityIDs = append(entityIDs, entityId) @@ -567,13 +567,13 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn // Check for any errors during iteration. if err = rows.Err(); err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully retrieved unique entities from the database") @@ -598,14 +598,14 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID var st token.SnapToken st, err = snapshot.EncodedToken{Value: snap}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } // Begin a new read-only transaction with the specified isolation level. var tx *sql.Tx tx, err = r.database.DB.BeginTx(ctx, &r.txOptions) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } defer func() { @@ -626,12 +626,12 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } var v uint64 v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN) } builder = builder.Where(squirrel.GtOrEq{"id": v}) } @@ -643,7 +643,7 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID var args []interface{} query, args, err = builder.ToSql() if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args)) @@ -652,7 +652,7 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID var rows *sql.Rows rows, err = tx.QueryContext(ctx, query, args...) if err != nil { - return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -664,19 +664,19 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID var subjectID string err = rows.Scan(&lastID, &subjectID) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } subjectIDs = append(subjectIDs, subjectID) } // Check for any errors during iteration. if err = rows.Err(); err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } // Commit the transaction. err = tx.Commit() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully retrieved unique subject references from the database") @@ -703,7 +703,7 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1) query, args, err := builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } // Execute the query and retrieve the highest transaction ID. @@ -713,7 +713,7 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S if errors.Is(err, sql.ErrNoRows) { return snapshot.Token{Value: types.XID8{Uint: 0}}, nil } - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } slog.Debug("successfully retrieved latest snapshot token") diff --git a/internal/storage/postgres/dataWriter.go b/internal/storage/postgres/dataWriter.go index e36e2c53e..07755ceca 100644 --- a/internal/storage/postgres/dataWriter.go +++ b/internal/storage/postgres/dataWriter.go @@ -5,11 +5,10 @@ import ( "database/sql" "errors" "log/slog" - "strings" - - "github.com/golang/protobuf/jsonpb" "github.com/Masterminds/squirrel" + "github.com/golang/protobuf/jsonpb" + "github.com/jackc/pgx/v5/pgconn" "github.com/Permify/permify/internal/storage/postgres/snapshot" "github.com/Permify/permify/internal/storage/postgres/types" @@ -67,15 +66,16 @@ func (w *DataWriter) Write( tkn, err := w.write(ctx, tenantID, tupleCollection, attributeCollection) if err != nil { // Check if the error is due to serialization, and if so, retry. - if strings.Contains(err.Error(), "could not serialize") { + if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) { slog.Warn("serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i)) + utils.WaitWithBackoff(ctx, tenantID, i) continue // Retry the operation. } // If the error is not serialization-related, handle it and return. - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE) } // If to write is successful, return the token. - return tkn, err + return tkn, nil } // Log an error if the operation failed after reaching the maximum number of retries. @@ -106,15 +106,16 @@ func (w *DataWriter) Delete( tkn, err := w.delete(ctx, tenantID, tupleFilter, attributeFilter) if err != nil { // Check if the error is due to serialization, and if so, retry. - if strings.Contains(err.Error(), "could not serialize") { + if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) { slog.Warn("serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i)) + utils.WaitWithBackoff(ctx, tenantID, i) continue // Retry the operation. } // If the error is not serialization-related, handle it and return. - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE) } // If the delete operation is successful, return the token. - return tkn, err + return tkn, nil } // Log an error if the operation failed after reaching the maximum number of retries. @@ -145,15 +146,16 @@ func (w *DataWriter) RunBundle( tkn, err := w.runBundle(ctx, tenantID, arguments, b) if err != nil { // Check if the error is due to serialization, and if so, retry. - if strings.Contains(err.Error(), "could not serialize") { - slog.Warn("Serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i)) + if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) { + slog.Warn("serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i)) + utils.WaitWithBackoff(ctx, tenantID, i) continue // Retry the operation. } // If the error is not serialization-related, handle it and return. - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE) } // If the operation is successful, return the token. - return tkn, err + return tkn, nil } // Log an error if the operation failed after reaching the maximum number of retries. diff --git a/internal/storage/postgres/schemaReader.go b/internal/storage/postgres/schemaReader.go index 44c8c25c5..b148fd674 100644 --- a/internal/storage/postgres/schemaReader.go +++ b/internal/storage/postgres/schemaReader.go @@ -44,7 +44,7 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) query, args, err = builder.ToSql() if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -52,7 +52,7 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) var rows *sql.Rows rows, err = r.database.DB.QueryContext(ctx, query, args...) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -61,19 +61,19 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) sd := storage.SchemaDefinition{} err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } definitions = append(definitions, sd.Serialized()) } if err = rows.Err(); err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } slog.Debug("successfully retrieved", slog.Any("schema definitions", len(definitions))) sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } slog.Debug("successfully created schema") @@ -95,7 +95,7 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, query, args, err = builder.ToSql() if err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -103,20 +103,20 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, var def storage.SchemaDefinition row := r.database.DB.QueryRowContext(ctx, query, args...) if err = row.Err(); err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil { if errors.Is(err, sql.ErrNoRows) { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) } - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } var sch *base.SchemaDefinition sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized()) if err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } definition, err = schema.GetEntityByName(sch, name) @@ -140,7 +140,7 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v query, args, err = builder.ToSql() if err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -148,14 +148,14 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v var def storage.SchemaDefinition row := r.database.DB.QueryRowContext(ctx, query, args...) if err = row.Err(); err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil { if errors.Is(err, sql.ErrNoRows) { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) } - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } slog.Debug("successfully retrieved rule definition for", slog.Any("name", name)) @@ -163,7 +163,7 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v var sch *base.SchemaDefinition sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized()) if err != nil { - return nil, "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } definition, err = schema.GetRuleByName(sch, name) @@ -186,7 +186,7 @@ func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (versio Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1). ToSql() if err != nil { - return "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -195,9 +195,9 @@ func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (versio err = row.Scan(&version) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) + return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND) } - return "", utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } slog.Debug("successfully found the latest schema version", slog.Any("version", version)) diff --git a/internal/storage/postgres/schemaWriter.go b/internal/storage/postgres/schemaWriter.go index 58bf25b26..09f6b73b0 100644 --- a/internal/storage/postgres/schemaWriter.go +++ b/internal/storage/postgres/schemaWriter.go @@ -44,14 +44,14 @@ func (w *SchemaWriter) WriteSchema(ctx context.Context, schemas []storage.Schema query, args, err = insertBuilder.ToSql() if err != nil { - return utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql insert query", slog.Any("query", query), slog.Any("arguments", args)) _, err = w.database.DB.ExecContext(ctx, query, args...) if err != nil { - return utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully wrote schemas to the database", slog.Any("number_of_schemas", len(schemas))) diff --git a/internal/storage/postgres/tenantReader.go b/internal/storage/postgres/tenantReader.go index 9b190d0f9..27c5abecc 100644 --- a/internal/storage/postgres/tenantReader.go +++ b/internal/storage/postgres/tenantReader.go @@ -40,7 +40,7 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } builder = builder.Where(squirrel.GtOrEq{"id": t.(utils.ContinuousToken).Value}) } @@ -52,7 +52,7 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi query, args, err = builder.ToSql() if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER) } slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args)) @@ -60,7 +60,7 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi var rows *sql.Rows rows, err = r.database.DB.QueryContext(ctx, query, args...) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } defer rows.Close() @@ -70,13 +70,13 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi sd := storage.Tenant{} err = rows.Scan(&sd.ID, &sd.Name, &sd.CreatedAt) if err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN) } lastID = sd.ID tenants = append(tenants, sd.ToTenant()) } if err = rows.Err(); err != nil { - return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL) + return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL) } slog.Debug("successfully listed tenants", slog.Any("number_of_tenants", len(tenants))) diff --git a/internal/storage/postgres/tenantWriter.go b/internal/storage/postgres/tenantWriter.go index b76fa7a54..410e47cee 100644 --- a/internal/storage/postgres/tenantWriter.go +++ b/internal/storage/postgres/tenantWriter.go @@ -3,10 +3,13 @@ package postgres import ( "context" "database/sql" + "errors" "log/slog" "strings" "time" + "go.opentelemetry.io/otel/codes" + "github.com/Masterminds/squirrel" "google.golang.org/protobuf/types/known/timestamppb" @@ -44,12 +47,15 @@ func (w *TenantWriter) CreateTenant(ctx context.Context, id, name string) (resul err = query.QueryRowContext(ctx).Scan(&createdAt) if err != nil { if strings.Contains(err.Error(), "duplicate key value") { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_UNIQUE_CONSTRAINT) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + slog.Error("error encountered", slog.Any("error", err)) + return nil, errors.New(base.ErrorCode_ERROR_CODE_UNIQUE_CONSTRAINT.String()) } - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } - slog.Debug("successfully created Tenant", slog.Any("id", id), slog.Any("name", name), slog.Any("createdAt", createdAt)) + slog.Debug("successfully created Tenant", slog.Any("id", id), slog.Any("name", name), slog.Any("created_at", createdAt)) return &base.Tenant{ Id: id, @@ -71,7 +77,7 @@ func (w *TenantWriter) DeleteTenant(ctx context.Context, tenantID string) (resul query := w.database.Builder.Delete(TenantsTable).Where(squirrel.Eq{"id": tenantID}).Suffix("RETURNING name, created_at").RunWith(w.database.DB) err = query.QueryRowContext(ctx).Scan(&name, &createdAt) if err != nil { - return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION) + return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION) } slog.Debug("successfully deleted tenant") diff --git a/internal/storage/postgres/utils/common.go b/internal/storage/postgres/utils/common.go index ba704f6ba..a8291d78c 100644 --- a/internal/storage/postgres/utils/common.go +++ b/internal/storage/postgres/utils/common.go @@ -2,13 +2,17 @@ package utils import ( "context" + "errors" "fmt" "log/slog" + "math" + "strings" + "time" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/rand" - "github.com/pkg/errors" + "go.opentelemetry.io/otel/trace" "github.com/Masterminds/squirrel" @@ -118,20 +122,21 @@ func GenerateGCQuery(table string, value uint64) squirrel.DeleteBuilder { // HandleError records an error in the given span, logs the error, and returns a standardized error. // This function is used for consistent error handling across different parts of the application. -func HandleError(span trace.Span, err error, errorCode base.ErrorCode) error { +func HandleError(ctx context.Context, span trace.Span, err error, errorCode base.ErrorCode) error { // Record the error on the span span.RecordError(err) - // Set the status of the span - span.SetStatus(codes.Error, err.Error()) - // Check if the error is context-related - if IsContextRelatedError(err) { - // Use debug level logging for context-related errors - slog.Debug("context-related error encountered", slog.Any("error", err), slog.Any("errorCode", errorCode)) + if IsContextRelatedError(ctx, err) || IsSerializationRelatedError(err) { + // Set the status of the span + span.SetStatus(codes.Unset, err.Error()) + // Use debug level logging for context or serialization-related errors + slog.Debug("an error related to context or serialization was encountered during the operation", slog.String("error", err.Error())) } else { + // Set the status of the span + span.SetStatus(codes.Error, err.Error()) // Use error level logging for all other errors - slog.Error("error encountered", slog.Any("error", err), slog.Any("errorCode", errorCode)) + slog.Error("error encountered", slog.Any("error", err)) } // Return a new standardized error with the provided error code @@ -139,8 +144,36 @@ func HandleError(span trace.Span, err error, errorCode base.ErrorCode) error { } // IsContextRelatedError checks if the error is due to context cancellation, deadline exceedance, or closed connection -func IsContextRelatedError(err error) bool { - return errors.Is(err, context.Canceled) || +func IsContextRelatedError(ctx context.Context, err error) bool { + if errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded) { + return true + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || - err.Error() == "conn closed" + strings.Contains(err.Error(), "conn closed") { + return true + } + return false +} + +// IsSerializationRelatedError checks if the error is a serialization failure, typically in database transactions. +func IsSerializationRelatedError(err error) bool { + if strings.Contains(err.Error(), "could not serialize") || + strings.Contains(err.Error(), "duplicate key value") { + return true + } + return false +} + +// WaitWithBackoff implements an exponential backoff strategy with jitter for retries. +// It waits for a calculated duration or until the context is cancelled, whichever comes first. +func WaitWithBackoff(ctx context.Context, tenantID string, retries int) { + backoff := time.Duration(math.Min(float64(20*time.Millisecond)*math.Pow(2, float64(retries)), float64(1*time.Second))) + jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5) + nextBackoff := backoff + jitter + slog.Warn("waiting before retry", slog.String("tenant_id", tenantID), slog.Int64("backoff_duration", nextBackoff.Milliseconds())) + select { + case <-time.After(nextBackoff): + case <-ctx.Done(): + } }