Skip to content

Commit

Permalink
Fix Query to Scan translation Logic; Clean up Query Record Command
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed May 30, 2015
1 parent d5d827c commit f18731b
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 152 deletions.
178 changes: 178 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,184 @@ func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *
return nil
}

func (cmd *baseCommand) setQuery(policy *QueryPolicy, statement *Statement, write bool) (err error) {
var functionArgBuffer []byte

fieldCount := 0
filterSize := 0
binNameSize := 0

cmd.begin()

if statement.Namespace != "" {
cmd.dataOffset += len(statement.Namespace) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

if statement.IndexName != "" {
cmd.dataOffset += len(statement.IndexName) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

if statement.SetName != "" {
cmd.dataOffset += len(statement.SetName) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

// Allocate space for TaskId field.
cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE)
fieldCount++

if len(statement.Filters) > 0 {
cmd.dataOffset += int(_FIELD_HEADER_SIZE)
filterSize++ // num filters

for _, filter := range statement.Filters {
sz, err := filter.estimateSize()
if err != nil {
return err
}
filterSize += sz
}
cmd.dataOffset += filterSize
fieldCount++

// Query bin names are specified as a field (Scan bin names are specified later as operations)
if len(statement.BinNames) > 0 {
cmd.dataOffset += int(_FIELD_HEADER_SIZE)
binNameSize++ // num bin names

for _, binName := range statement.BinNames {
binNameSize += len(binName) + 1
}
cmd.dataOffset += binNameSize
fieldCount++
}
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
// Estimate scan options size.
cmd.dataOffset += (2 + int(_FIELD_HEADER_SIZE))
fieldCount++
}

if statement.functionName != "" {
cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1 // udf type
cmd.dataOffset += len(statement.packageName) + int(_FIELD_HEADER_SIZE)
cmd.dataOffset += len(statement.functionName) + int(_FIELD_HEADER_SIZE)

if len(statement.functionArgs) > 0 {
functionArgBuffer, err = packValueArray(statement.functionArgs)
if err != nil {
return err
}
} else {
functionArgBuffer = []byte{}
}
cmd.dataOffset += int(_FIELD_HEADER_SIZE) + len(functionArgBuffer)
fieldCount += 4
}

if len(statement.Filters) == 0 {
if len(statement.BinNames) > 0 {
for _, binName := range statement.BinNames {
cmd.estimateOperationSizeForBinName(binName)
}
}
}

if err := cmd.sizeBuffer(); err != nil {
return nil
}

operationCount := 0
if len(statement.Filters) == 0 && len(statement.BinNames) > 0 {
operationCount = len(statement.BinNames)
}

if write {
cmd.writeHeader(policy.BasePolicy, _INFO1_READ, _INFO2_WRITE, fieldCount, operationCount)
} else {
cmd.writeHeader(policy.BasePolicy, _INFO1_READ, 0, fieldCount, operationCount)
}

if statement.Namespace != "" {
cmd.writeFieldString(statement.Namespace, NAMESPACE)
}

if statement.IndexName != "" {
cmd.writeFieldString(statement.IndexName, INDEX_NAME)
}

if statement.SetName != "" {
cmd.writeFieldString(statement.SetName, TABLE)
}

cmd.writeFieldHeader(8, TRAN_ID)
Buffer.Int64ToBytes(int64(statement.TaskId), cmd.dataBuffer, cmd.dataOffset)
cmd.dataOffset += 8

if len(statement.Filters) > 0 {
cmd.writeFieldHeader(filterSize, INDEX_RANGE)
cmd.dataBuffer[cmd.dataOffset] = byte(len(statement.Filters))
cmd.dataOffset++

for _, filter := range statement.Filters {
cmd.dataOffset, err = filter.write(cmd.dataBuffer, cmd.dataOffset)
if err != nil {
return err
}
}

if len(statement.BinNames) > 0 {
cmd.writeFieldHeader(binNameSize, QUERY_BINLIST)
cmd.dataBuffer[cmd.dataOffset] = byte(len(statement.BinNames))
cmd.dataOffset++

for _, binName := range statement.BinNames {
len := copy(cmd.dataBuffer[cmd.dataOffset+1:], binName)
cmd.dataBuffer[cmd.dataOffset] = byte(len)
cmd.dataOffset += len + 1
}
}
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
cmd.writeFieldHeader(2, SCAN_OPTIONS)
priority := byte(policy.Priority)
priority <<= 4
cmd.dataBuffer[cmd.dataOffset] = priority
cmd.dataOffset++
cmd.dataBuffer[cmd.dataOffset] = byte(100)
cmd.dataOffset++
}

if statement.functionName != "" {
cmd.writeFieldHeader(1, UDF_OP)
if statement.returnData {
cmd.dataBuffer[cmd.dataOffset] = byte(1)
} else {
cmd.dataBuffer[cmd.dataOffset] = byte(2)
}
cmd.dataOffset++

cmd.writeFieldString(statement.packageName, UDF_PACKAGE_NAME)
cmd.writeFieldString(statement.functionName, UDF_FUNCTION)
cmd.writeFieldBytes(functionArgBuffer, UDF_ARGLIST)
}

// scan binNames come last
if len(statement.Filters) == 0 {
if len(statement.BinNames) > 0 {
for _, binName := range statement.BinNames {
cmd.writeOperationForBinName(binName, READ)
}
}
}

cmd.end()

return nil
}

func (cmd *baseCommand) estimateKeySize(key *Key, sendKey bool) int {
fieldCount := 0

Expand Down
153 changes: 1 addition & 152 deletions query_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package aerospike

import Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"

type queryCommand struct {
*baseMultiCommand

Expand All @@ -36,156 +34,7 @@ func (cmd *queryCommand) getPolicy(ifc command) Policy {
}

func (cmd *queryCommand) writeBuffer(ifc command) (err error) {
var functionArgBuffer []byte

fieldCount := 0
filterSize := 0
binNameSize := 0

cmd.begin()

if cmd.statement.Namespace != "" {
cmd.dataOffset += len(cmd.statement.Namespace) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

if cmd.statement.IndexName != "" {
cmd.dataOffset += len(cmd.statement.IndexName) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

if cmd.statement.SetName != "" {
cmd.dataOffset += len(cmd.statement.SetName) + int(_FIELD_HEADER_SIZE)
fieldCount++
}

if len(cmd.statement.Filters) > 0 {
cmd.dataOffset += int(_FIELD_HEADER_SIZE)
filterSize++ // num filters

for _, filter := range cmd.statement.Filters {
sz, err := filter.estimateSize()
if err != nil {
return err
}
filterSize += sz
}
cmd.dataOffset += filterSize
fieldCount++
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
// Estimate scan options size.
cmd.dataOffset += (2 + int(_FIELD_HEADER_SIZE))
fieldCount++
}

if len(cmd.statement.BinNames) > 0 {
cmd.dataOffset += int(_FIELD_HEADER_SIZE)
binNameSize++ // num bin names

for _, binName := range cmd.statement.BinNames {
binNameSize += len(binName) + 1
}
cmd.dataOffset += binNameSize
fieldCount++
}

// make sure taskId is a non-zero random 64bit number
cmd.statement.setTaskId()

cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE)
fieldCount++

if cmd.statement.functionName != "" {
cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1 // udf type
cmd.dataOffset += len(cmd.statement.packageName) + int(_FIELD_HEADER_SIZE)
cmd.dataOffset += len(cmd.statement.functionName) + int(_FIELD_HEADER_SIZE)

if len(cmd.statement.functionArgs) > 0 {
functionArgBuffer, err = packValueArray(cmd.statement.functionArgs)
if err != nil {
return err
}
} else {
functionArgBuffer = []byte{}
}
cmd.dataOffset += int(_FIELD_HEADER_SIZE) + len(functionArgBuffer)
fieldCount += 4
}
if err := cmd.sizeBuffer(); err != nil {
return nil
}

readAttr := _INFO1_READ
cmd.writeHeader(cmd.policy.GetBasePolicy(), readAttr, 0, fieldCount, 0)

if cmd.statement.Namespace != "" {
cmd.writeFieldString(cmd.statement.Namespace, NAMESPACE)
}

if cmd.statement.IndexName != "" {
cmd.writeFieldString(cmd.statement.IndexName, INDEX_NAME)
}

if cmd.statement.SetName != "" {
cmd.writeFieldString(cmd.statement.SetName, TABLE)
}

if len(cmd.statement.Filters) > 0 {
cmd.writeFieldHeader(filterSize, INDEX_RANGE)
cmd.dataBuffer[cmd.dataOffset] = byte(len(cmd.statement.Filters))
cmd.dataOffset++

for _, filter := range cmd.statement.Filters {
cmd.dataOffset, err = filter.write(cmd.dataBuffer, cmd.dataOffset)
if err != nil {
return err
}
}
} else {
// Calling query with no filters is more efficiently handled by a primary index scan.
cmd.writeFieldHeader(2, SCAN_OPTIONS)
priority := byte(cmd.policy.Priority)
priority <<= 4
cmd.dataBuffer[cmd.dataOffset] = priority
cmd.dataOffset++
cmd.dataBuffer[cmd.dataOffset] = byte(100)
cmd.dataOffset++
}

if len(cmd.statement.BinNames) > 0 {
cmd.writeFieldHeader(binNameSize, QUERY_BINLIST)
cmd.dataBuffer[cmd.dataOffset] = byte(len(cmd.statement.BinNames))
cmd.dataOffset++

for _, binName := range cmd.statement.BinNames {
len := copy(cmd.dataBuffer[cmd.dataOffset+1:], binName)
cmd.dataBuffer[cmd.dataOffset] = byte(len)
cmd.dataOffset += len + 1
}
}

cmd.writeFieldHeader(8, TRAN_ID)
Buffer.Int64ToBytes(int64(cmd.statement.TaskId), cmd.dataBuffer, cmd.dataOffset)
cmd.dataOffset += 8

if cmd.statement.functionName != "" {
cmd.writeFieldHeader(1, UDF_OP)
if cmd.statement.returnData {
cmd.dataBuffer[cmd.dataOffset] = byte(1)
cmd.dataOffset++
} else {
cmd.dataBuffer[cmd.dataOffset] = byte(2)
cmd.dataOffset++
}

cmd.writeFieldString(cmd.statement.packageName, UDF_PACKAGE_NAME)
cmd.writeFieldString(cmd.statement.functionName, UDF_FUNCTION)
cmd.writeFieldBytes(functionArgBuffer, UDF_ARGLIST)
}
cmd.end()

return nil
return cmd.setQuery(cmd.policy, cmd.statement, false)
}

func (cmd *queryCommand) parseResult(ifc command, conn *Connection) error {
Expand Down

0 comments on commit f18731b

Please sign in to comment.