Skip to content

Commit

Permalink
refactor(datawriter): enhance modularity and readability of RunBundle…
Browse files Browse the repository at this point in the history
… function
  • Loading branch information
tolgaOzen committed Dec 5, 2023
1 parent 39bc67f commit 67dc81e
Show file tree
Hide file tree
Showing 6 changed files with 562 additions and 515 deletions.
44 changes: 25 additions & 19 deletions internal/storage/postgres/dataReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter and snapshot value.
var args []interface{}
Expand Down Expand Up @@ -107,7 +108,7 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
}

slog.Info("Successfully retrieved relationship tuples from the database.")
slog.Info("Successfully retrieved relation tuples from the database.")

// Return a TupleIterator created from the TupleCollection.
return collection.CreateTupleIterator(), nil
Expand Down Expand Up @@ -135,8 +136,9 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil
return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
Expand Down Expand Up @@ -202,7 +204,7 @@ func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, fil
return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
}

slog.Info("Successfully read relationships from database.")
slog.Info("Successfully read relation tuples from database.")

// Return the results and encoded continuous token for pagination.
if len(tuples) > int(pagination.PageSize()) {
Expand Down Expand Up @@ -233,8 +235,9 @@ func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string,
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter and snapshot value.
var args []interface{}
Expand Down Expand Up @@ -309,8 +312,9 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter and snapshot value.
var args []interface{}
Expand Down Expand Up @@ -397,8 +401,9 @@ func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter
return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
Expand Down Expand Up @@ -508,8 +513,9 @@ func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, sn
return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)

Expand Down Expand Up @@ -595,8 +601,9 @@ func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID
return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Rollback the transaction in case of any error.
defer utils.Rollback(tx)
defer func() {
_ = tx.Rollback()
}()

// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
builder := r.database.Builder.
Expand Down Expand Up @@ -693,14 +700,13 @@ func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.S
}

// Execute the query and retrieve the highest transaction ID.
row := r.database.DB.QueryRowContext(ctx, query, args...)
err = row.Scan(&xid)
err = r.database.DB.QueryRowContext(ctx, query, args...).Scan(&xid)
if err != nil {
// If no rows are found, return a snapshot token with a value of 0.
if errors.Is(err, sql.ErrNoRows) {
return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
}
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
}

slog.Info("Successfully retrieved latest snapshot token")
Expand Down
Loading

0 comments on commit 67dc81e

Please sign in to comment.