diff --git a/client.go b/client.go index 8a617949..f0b1c6b2 100644 --- a/client.go +++ b/client.go @@ -1024,10 +1024,6 @@ func parseIndexErrorCode(response string) types.ResultCode { // This method is only supported by Aerospike 4.9+ servers. // If the policy is nil, the default relevant policy will be used. func (clnt *Client) QueryPartitions(policy *QueryPolicy, statement *Statement, partitionFilter *PartitionFilter) (*Recordset, Error) { - if statement.Filter != nil { - return nil, ErrPartitionScanQueryNotSupported.err() - } - policy = clnt.getUsableQueryPolicy(policy) nodes := clnt.cluster.GetNodes() if len(nodes) == 0 { diff --git a/multi_command.go b/multi_command.go index 93c2589a..647a0e15 100644 --- a/multi_command.go +++ b/multi_command.go @@ -218,7 +218,7 @@ func (cmd *baseMultiCommand) parseKey(fieldCount int, bval *int64) (*Key, Error) return nil, err } case BVAL_ARRAY: - *bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, cmd.dataOffset) + *bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, 1) } } diff --git a/query_executor.go b/query_executor.go index 35930503..0f666592 100644 --- a/query_executor.go +++ b/query_executor.go @@ -43,14 +43,13 @@ func (clnt *Client) queryPartitions(policy *QueryPolicy, tracker *partitionTrack cmd := newQueryPartitionCommand(policy, tracker, nodePartition, statement, recordset) weg.execute(cmd) } - // no need to manage the errors; they are send back via the recordset - weg.wait() + errs = chainErrors(weg.wait(), errs) done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy) if done || err != nil { + errs = chainErrors(err, errs) // Query is complete. - if err != nil { - errs = chainErrors(err, errs) + if errs != nil { recordset.sendError(errs) } return diff --git a/query_test.go b/query_test.go index 3b145d20..8a376846 100644 --- a/query_test.go +++ b/query_test.go @@ -58,9 +58,11 @@ var _ = gg.Describe("Query operations", func() { bin4 := as.NewBin("Aerospike4", "constValue") bin5 := as.NewBin("Aerospike5", -1) bin6 := as.NewBin("Aerospike6", 1) + bin7 := as.NewBin("Aerospike7", nil) var keys map[string]*as.Key var indexName string var indexName2 string + var indexName3 string // read all records from the channel and make sure all of them are returned var checkResults = func(recordset *as.Recordset, cancelCnt int) { @@ -118,7 +120,8 @@ var _ = gg.Describe("Query operations", func() { keys[string(key.Digest())] = key bin3 = as.NewBin("Aerospike3", rand.Intn(math.MaxInt16)) - err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6) + bin7 = as.NewBin("Aerospike7", i%3) + err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7) gm.Expect(err).ToNot(gm.HaveOccurred()) } @@ -129,6 +132,10 @@ var _ = gg.Describe("Query operations", func() { // queries only work on indices indexName2 = set + bin6.Name createIndex(wpolicy, ns, set, indexName2, bin6.Name, as.NUMERIC) + + // queries only work on indices + indexName3 = set + bin7.Name + createIndex(wpolicy, ns, set, indexName3, bin7.Name, as.NUMERIC) }) gg.AfterEach(func() { @@ -137,6 +144,9 @@ var _ = gg.Describe("Query operations", func() { indexName = set + bin6.Name gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred()) + + indexName = set + bin7.Name + gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred()) }) var queryPolicy = as.NewQueryPolicy() @@ -170,7 +180,7 @@ var _ = gg.Describe("Query operations", func() { gm.Expect(counter).To(gm.Equal(keyCount)) }) - gg.It("must Query and get all partition records back for a specified key", func() { + gg.It("must Scan and get all partition records back for a specified key", func() { gm.Expect(len(keys)).To(gm.Equal(keyCount)) counter := 0 @@ -203,6 +213,77 @@ var _ = gg.Describe("Query operations", func() { gm.Expect(counter).To(gm.BeNumerically("<", keyCount)) }) + gg.It("must Query per key partition and get all partition records back for a specified key and filter", func() { + gm.Expect(len(keys)).To(gm.Equal(keyCount)) + + counter := 0 + + var rkey *as.Key + for _, k := range keys { + rkey = k + + pf := as.NewPartitionFilterByKey(rkey) + stm := as.NewStatement(ns, set) + stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2)) + recordset, err := client.QueryPartitions(queryPolicy, stm, pf) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + for res := range recordset.Results() { + gm.Expect(res.Err).NotTo(gm.HaveOccurred()) + gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject())) + gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject())) + + delete(keys, string(res.Record.Key.Digest())) + + counter++ + } + } + + gm.Expect(len(keys)).To(gm.Equal(334)) + // This depends on how many keys end up in the same partition. + // Since keys are statistically distributed randomly and uniformly, + // we expect that there aren't many partitions that share more than one key. + gm.Expect(counter).To(gm.BeNumerically("~", keyCount - 334, 50)) + }) + + gg.It("must Query and get all partition records back for a specified key and filter", func() { + gm.Expect(len(keys)).To(gm.Equal(keyCount)) + + counter := 0 + + pf := as.NewPartitionFilterAll() + stm := as.NewStatement(ns, set) + stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2)) + recordset, err := client.QueryPartitions(queryPolicy, stm, pf) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + for res := range recordset.Results() { + gm.Expect(res.Err).NotTo(gm.HaveOccurred()) + gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject())) + gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject())) + + delete(keys, string(res.Record.Key.Digest())) + + counter++ + } + + gm.Expect(len(keys)).To(gm.Equal(334)) + gm.Expect(counter).To(gm.Equal(keyCount - 334)) + }) + + gg.It("must return error on a Query when index is not found", func() { + pf := as.NewPartitionFilterAll() + stm := as.NewStatement(ns, set) + stm.SetFilter(as.NewRangeFilter(randString(10), 1, 2)) + recordset, err := client.QueryPartitions(queryPolicy, stm, pf) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + for res := range recordset.Results() { + gm.Expect(res.Err).To(gm.HaveOccurred()) + gm.Expect(res.Err.Matches(ast.INDEX_NOTFOUND)).To(gm.BeTrue()) + } + }) + gg.It("must Query and get all partition records back for a specified partition range", func() { gm.Expect(len(keys)).To(gm.Equal(keyCount)) diff --git a/scan_executor.go b/scan_executor.go index b292f64f..2e92702b 100644 --- a/scan_executor.go +++ b/scan_executor.go @@ -43,13 +43,12 @@ func (clnt *Client) scanPartitions(policy *ScanPolicy, tracker *partitionTracker cmd := newScanPartitionCommand(policy, tracker, nodePartition, namespace, setName, binNames, recordset) weg.execute(cmd) } - // no need to manage the errors; they are send back via the recordset - weg.wait() + errs = chainErrors(weg.wait(), errs) if done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy); done || err != nil { + errs = chainErrors(err, errs) // Scan is complete. - if err != nil { - errs = chainErrors(err, errs) + if errs != nil { recordset.sendError(errs) } return