Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index after adding data without vector schema for vector predicate #9214

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions chunker/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ type Person struct {
type Product struct {
Uid string `json:"uid,omitempty"`
Name string `json:"name"`
Discription string `json:"discription"`
Discription_v string `json:"discription_v"`
Description string `json:"description"`
Description_v string `json:"description_v"`
}

func Parse(b []byte, op int) ([]*api.NQuad, error) {
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func TestNquadsJsonEmptyStringVectorPred(t *testing.T) {
p := Product{
Uid: "1",
Name: "",
Discription_v: "",
Description_v: "",
}

b, err := json.Marshal([]Product{p})
Expand All @@ -1420,16 +1420,16 @@ func TestNquadsJsonEmptyStringVectorPred(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(fastNQ))

// predicate Name should be empty and edge for Discription_v should not be there
// predicate Name should be empty and edge for Description_v should not be there
// we do not create edge for "" in float32vector.
exp := &Experiment{
t: t,
nqs: nq,
schema: `name: string @index(exact) .
discription_v: float32vector .`,
description_v: float32vector .`,
query: `{product(func: uid(1)) {
name
discription_v
description_v
}}`,
expected: `{"product":[{
"name":""}]}`,
Expand All @@ -1443,7 +1443,7 @@ func TestNquadsJsonEmptyStringVectorPred(t *testing.T) {
func TestNquadsJsonEmptySquareBracketVectorPred(t *testing.T) {
p := Product{
Name: "ipad",
Discription_v: "[]",
Description_v: "[]",
}

b, err := json.Marshal(p)
Expand All @@ -1457,16 +1457,16 @@ func TestNquadsJsonEmptySquareBracketVectorPred(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(fastNQ))

// predicate Name should have value "ipad" and edge for Discription_v should not be there
// predicate Name should have value "ipad" and edge for Description_v should not be there
// we do not create edge for [] in float32vector.
exp := &Experiment{
t: t,
nqs: nq,
schema: `name: string @index(exact) .
discription_v: float32vector .`,
description_v: float32vector .`,
query: `{product(func: eq(name, "ipad")) {
name
discription_v
description_v
}}`,
expected: `{"product":[{
"name":"ipad"}]}`,
Expand All @@ -1480,7 +1480,7 @@ func TestNquadsJsonEmptySquareBracketVectorPred(t *testing.T) {
func TestNquadsJsonValidVector(t *testing.T) {
p := Product{
Name: "ipad",
Discription_v: "[1.1, 2.2, 3.3]",
Description_v: "[1.1, 2.2, 3.3]",
}

b, err := json.Marshal(p)
Expand All @@ -1498,14 +1498,14 @@ func TestNquadsJsonValidVector(t *testing.T) {
t: t,
nqs: nq,
schema: `name: string @index(exact) .
discription_v: float32vector .`,
description_v: float32vector .`,
query: `{product(func: eq(name, "ipad")) {
name
discription_v
description_v
}}`,
expected: `{"product":[{
"name":"ipad",
"discription_v":[1.1, 2.2, 3.3]}]}`,
"description_v":[1.1, 2.2, 3.3]}]}`,
}
exp.verify()

Expand Down
2 changes: 1 addition & 1 deletion dgraphapi/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func UnmarshalVectorResp(resp *api.Response) ([][]float32, error) {
type Data struct {
Vector []struct {
UID string `json:"uid"`
ProjectDescriptionV []float32 `json:"project_discription_v"`
ProjectDescriptionV []float32 `json:"project_description_v"`
} `json:"vector"`
}
var data Data
Expand Down
79 changes: 66 additions & 13 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,39 +1387,92 @@ func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error {
builder.fn = func(uid uint64, pl *List, txn *Txn) ([]*pb.DirectedEdge, error) {
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
edges := []*pb.DirectedEdge{}
err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
// Add index entries based on p.
val := types.Val{
Value: p.Value,
Tid: types.TypeID(p.ValType),
}
edge.Lang = string(p.LangTag)

processAddIndexMutation := func(edge *pb.DirectedEdge, val types.Val) ([]*pb.DirectedEdge, error) {
for {
newEdges, err := txn.addIndexMutations(ctx, &indexMutationInfo{
tokenizers: tokenizers,
factorySpecs: factorySpecs,
edge: &edge,
edge: edge,
val: val,
op: pb.DirectedEdge_SET,
})
switch err {
case ErrRetry:
time.Sleep(10 * time.Millisecond)
default:
if !runForVectors {
edges = append(edges, newEdges...)
}
return err
return newEdges, err
}
}
}

// There are two cases to consider here:
// 1. This can be a schema mutation where the user adds a index on existing vectors.
// 2. This can be a vector mutation where the user adds vectors to the DB on a
// predicate that is already indexed.
if runForVectors {
val, err := pl.Value(txn.StartTs)
if err != nil {
return []*pb.DirectedEdge{}, err
}

// In the first case, val.Tid is default, so we need to convert the
// vector into the vfloat type and re-add it to the DB.
if val.Tid != types.VFloatID {
// Here, we convert the defaultID type vector into vfloat.
sv, err := types.Convert(val, types.VFloatID)
if err != nil {
return []*pb.DirectedEdge{}, err
}
b := types.ValueForType(types.BinaryID)
if err = types.Marshal(sv, &b); err != nil {
return []*pb.DirectedEdge{}, err
}
edge.Value = b.Value.([]byte)
edge.ValueType = types.VFloatID.Enum()

inKey := x.DataKey(edge.Attr, uid)
p, err := txn.Get(inKey)
if err != nil {
return []*pb.DirectedEdge{}, err
}

if err := p.addMutation(ctx, txn, &edge); err != nil {
return []*pb.DirectedEdge{}, err
}
}
// In the second case, we don't need to convert the vector as it is already
// in the vfloat type. We just need to process it further.
_, err = processAddIndexMutation(&edge, val)
if err != nil {
return []*pb.DirectedEdge{}, err
}

return edges, nil
}

err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
shivaji-kharse marked this conversation as resolved.
Show resolved Hide resolved
// Add index entries based on p.
val := types.Val{
Value: p.Value,
Tid: types.TypeID(p.ValType),
}
edge.Lang = string(p.LangTag)

newEdges, err := processAddIndexMutation(&edge, val)
if err != nil {
return err
}
edges = append(edges, newEdges...)
return nil
})
if err != nil {
return []*pb.DirectedEdge{}, err
}
return edges, err
}
if len(factorySpecs) != 0 {

if runForVectors {
return builder.RunWithoutTemp(ctx)
}
return builder.Run(ctx)
Expand Down
12 changes: 6 additions & 6 deletions systest/vector/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestVectorIncrBackupRestore(t *testing.T) {
require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 500
pred := "project_discription_v"
pred := "project_description_v"
allVectors := make([][][]float32, 0, 5)
allRdfs := make([]string, 0, 5)
for i := 1; i <= 5; i++ {
Expand All @@ -78,7 +78,7 @@ func TestVectorIncrBackupRestore(t *testing.T) {
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", incrFrom, i))
require.NoError(t, dgraphapi.WaitForRestore(c))
query := `{
vector(func: has(project_discription_v)) {
vector(func: has(project_description_v)) {
count(uid)
}
}`
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestVectorBackupRestore(t *testing.T) {
require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 1000
pred := "project_discription_v"
pred := "project_description_v"
rdfs, vectors := dgraphapi.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) {
require.NoError(t, gc.SetupSchema(testSchema))
// add data to the vector predicate
numVectors := 3
pred := "project_discription_v"
pred := "project_description_v"
rdfs, vectors := dgraphapi.GenerateRandomVectors(0, numVectors, 1, pred)
mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) {
require.NoError(t, dgraphapi.WaitForRestore(c))

query := ` {
vectors(func: has(project_discription_v)) {
vectors(func: has(project_description_v)) {
count(uid)
}
}`
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestVectorBackupRestoreReIndexing(t *testing.T) {
require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 1000
pred := "project_discription_v"
pred := "project_description_v"
rdfs, vectors := dgraphapi.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
Expand Down
4 changes: 2 additions & 2 deletions systest/vector/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func testExportAndLiveLoad(t *testing.T, c *dgraphtest.LocalCluster, exportForma
require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 100
pred := "project_discription_v"
pred := "project_description_v"
rdfs, vectors := dgraphapi.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
Expand All @@ -76,7 +76,7 @@ func testExportAndLiveLoad(t *testing.T, c *dgraphtest.LocalCluster, exportForma
require.NoError(t, gc.DropAll())

query := `{
vector(func: has(project_discription_v)) {
vector(func: has(project_description_v)) {
count(uid)
}
}`
Expand Down
Loading
Loading