Skip to content

Commit

Permalink
Improve and Fix Batch commands
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Apr 13, 2015
1 parent 7e82d22 commit 842474b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 136 deletions.
20 changes: 11 additions & 9 deletions batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package aerospike

import (
"bytes"

. "github.com/aerospike/aerospike-client-go/logger"
. "github.com/aerospike/aerospike-client-go/types"
Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"
Expand All @@ -25,22 +27,23 @@ type batchCommandExists struct {

batchNamespace *batchNamespace
policy *BasePolicy
keyMap map[string]*batchItem
keys []*Key
existsArray []bool
index int
}

func newBatchCommandExists(
node *Node,
batchNamespace *batchNamespace,
policy *BasePolicy,
keyMap map[string]*batchItem,
keys []*Key,
existsArray []bool,
) *batchCommandExists {
return &batchCommandExists{
baseMultiCommand: newMultiCommand(node, nil),
batchNamespace: batchNamespace,
policy: policy,
keyMap: keyMap,
keys: keys,
existsArray: existsArray,
}
}
Expand All @@ -50,7 +53,7 @@ func (cmd *batchCommandExists) getPolicy(ifc command) Policy {
}

func (cmd *batchCommandExists) writeBuffer(ifc command) error {
return cmd.setBatchExists(cmd.policy, cmd.batchNamespace)
return cmd.setBatchExists(cmd.policy, cmd.keys, cmd.batchNamespace)
}

// Parse all results in the batch. Add records to shared list.
Expand Down Expand Up @@ -91,14 +94,13 @@ func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int)
return false, err
}

item := cmd.keyMap[string(key.digest)]

if item != nil {
index := item.GetIndex()
offset := cmd.batchNamespace.offsets[cmd.index]
cmd.index++

if bytes.Equal(key.digest, cmd.keys[offset].digest) {
// only set the results to true; as a result, no synchronization is needed
if resultCode == 0 {
cmd.existsArray[index] = true
cmd.existsArray[offset] = true
}
} else {
Logger.Debug("Unexpected batch key returned: " + key.namespace + "," + Buffer.BytesToHexString(key.digest))
Expand Down
22 changes: 13 additions & 9 deletions batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package aerospike

import (
"bytes"

. "github.com/aerospike/aerospike-client-go/logger"
. "github.com/aerospike/aerospike-client-go/types"
Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"
Expand All @@ -25,17 +27,18 @@ type batchCommandGet struct {

batchNamespace *batchNamespace
policy Policy
keyMap map[string]*batchItem
keys []*Key
binNames map[string]struct{}
records []*Record
readAttr int
index int
}

func newBatchCommandGet(
node *Node,
batchNamespace *batchNamespace,
policy Policy,
keyMap map[string]*batchItem,
keys []*Key,
binNames map[string]struct{},
records []*Record,
readAttr int,
Expand All @@ -44,7 +47,7 @@ func newBatchCommandGet(
baseMultiCommand: newMultiCommand(node, nil),
batchNamespace: batchNamespace,
policy: policy,
keyMap: keyMap,
keys: keys,
binNames: binNames,
records: records,
readAttr: readAttr,
Expand All @@ -56,7 +59,7 @@ func (cmd *batchCommandGet) getPolicy(ifc command) Policy {
}

func (cmd *batchCommandGet) writeBuffer(ifc command) error {
return cmd.setBatchGet(cmd.policy, cmd.batchNamespace, cmd.binNames, cmd.readAttr)
return cmd.setBatchGet(cmd.policy, cmd.keys, cmd.batchNamespace, cmd.binNames, cmd.readAttr)
}

// Parse all results in the batch. Add records to shared list.
Expand Down Expand Up @@ -92,12 +95,13 @@ func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bo
if err != nil {
return false, err
}
item := cmd.keyMap[string(key.digest)]

if item != nil {
offset := cmd.batchNamespace.offsets[cmd.index] //cmd.keyMap[string(key.digest)]
cmd.index++

if bytes.Equal(key.digest, cmd.keys[offset].digest) {
if resultCode == 0 {
index := item.GetIndex()
if cmd.records[index], err = cmd.parseRecord(key, opCount, generation, expiration); err != nil {
if cmd.records[offset], err = cmd.parseRecord(key, opCount, generation, expiration); err != nil {
return false, err
}
}
Expand Down Expand Up @@ -143,7 +147,7 @@ func (cmd *batchCommandGet) parseRecord(key *Key, opCount int, generation int, e
// Currently, the batch command returns all the bins even if a subset of
// the bins are requested. We have to filter it on the client side.
// TODO: Filter batch bins on server!
if cmd.binNames == nil || contains(cmd.binNames, name) {
if len(cmd.binNames) == 0 || contains(cmd.binNames, name) {
if bins == nil {
bins = map[string]interface{}{}
}
Expand Down
61 changes: 0 additions & 61 deletions batch_item.go

This file was deleted.

51 changes: 33 additions & 18 deletions batch_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,37 @@ func newBatchNodeList(cluster *Cluster, keys []*Key) ([]*batchNode, error) {
// Split keys by server node.
batchNodes := make([]*batchNode, 0, nodeCount+1)

for _, key := range keys {
for i, key := range keys {
partition := NewPartitionByKey(key)

// error not required
node, _ := cluster.GetNode(partition)
batchNode := findBatchNode(batchNodes, node)

if batchNode == nil {
batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, key))
batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, key.Namespace(), i))
} else {
batchNode.AddKey(key)
batchNode.AddKey(key.Namespace(), i)
}
}
return batchNodes, nil
}

func newBatchNode(node *Node, keyCapacity int, key *Key) *batchNode {
func newBatchNode(node *Node, keyCapacity int, namespace string, offset int) *batchNode {
return &batchNode{
Node: node,
KeyCapacity: keyCapacity,
BatchNamespaces: []*batchNamespace{newBatchNamespace(&key.namespace, keyCapacity, key)},
BatchNamespaces: []*batchNamespace{newBatchNamespace(&namespace, keyCapacity, offset)},
}
}

func (bn *batchNode) AddKey(key *Key) {
batchNamespace := bn.findNamespace(&key.namespace)
func (bn *batchNode) AddKey(namespace string, offset int) {
batchNamespace := bn.findNamespace(&namespace)

if batchNamespace == nil {
bn.BatchNamespaces = append(bn.BatchNamespaces, newBatchNamespace(&key.namespace, bn.KeyCapacity, key))
bn.BatchNamespaces = append(bn.BatchNamespaces, newBatchNamespace(&namespace, bn.KeyCapacity, offset))
} else {
batchNamespace.keys = append(batchNamespace.keys, key)
batchNamespace.add(offset)
}
}

Expand All @@ -82,23 +82,38 @@ func (bn *batchNode) findNamespace(ns *string) *batchNamespace {
}

func findBatchNode(nodes []*batchNode, node *Node) *batchNode {
for _, batchNode := range nodes {
for i := range nodes {
// Note: using pointer equality for performance.
if batchNode.Node == node {
return batchNode
if nodes[i].Node == node {
return nodes[i]
}
}
return nil
}

type batchNamespace struct {
namespace *string
keys []*Key
namespace *string
offsets []int
offsetSize int
}

func newBatchNamespace(namespace *string, capacity int, key *Key) *batchNamespace {
return &batchNamespace{
namespace: namespace,
keys: []*Key{key},
func newBatchNamespace(namespace *string, capacity, offset int) *batchNamespace {
res := &batchNamespace{
namespace: namespace,
offsets: make([]int, capacity),
offsetSize: 1,
}
res.offsets[0] = offset

return res
}

func (bn *batchNamespace) add(offset int) {
if bn.offsetSize >= len(bn.offsets) {
cpy := make([]int, bn.offsetSize*2)
copy(cpy, bn.offsets)
bn.offsets = cpy
}
bn.offsets[bn.offsetSize] = offset
bn.offsetSize++
}
23 changes: 12 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,8 @@ func (clnt *Client) BatchExists(policy *BasePolicy, keys []*Key) ([]bool, error)
// when a key exists, the corresponding index will be marked true
existsArray := make([]bool, len(keys))

keyMap := newBatchItemList(keys)

if err := clnt.batchExecute(keys, func(node *Node, bns *batchNamespace) command {
return newBatchCommandExists(node, bns, policy, keyMap, existsArray)
return newBatchCommandExists(node, bns, policy, keys, existsArray)
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -347,14 +345,13 @@ func (clnt *Client) BatchGet(policy *BasePolicy, keys []*Key, binNames ...string
// when a key exists, the corresponding index will be set to record
records := make([]*Record, len(keys))

keyMap := newBatchItemList(keys)
binSet := map[string]struct{}{}
for idx := range binNames {
binSet[binNames[idx]] = struct{}{}
}

err := clnt.batchExecute(keys, func(node *Node, bns *batchNamespace) command {
return newBatchCommandGet(node, bns, policy, keyMap, binSet, records, _INFO1_READ)
return newBatchCommandGet(node, bns, policy, keys, binSet, records, _INFO1_READ)
})
if err != nil {
return nil, err
Expand All @@ -375,9 +372,8 @@ func (clnt *Client) BatchGetHeader(policy *BasePolicy, keys []*Key) ([]*Record,
// when a key exists, the corresponding index will be set to record
records := make([]*Record, len(keys))

keyMap := newBatchItemList(keys)
err := clnt.batchExecute(keys, func(node *Node, bns *batchNamespace) command {
return newBatchCommandGet(node, bns, policy, keyMap, nil, records, _INFO1_READ|_INFO1_NOBINDATA)
return newBatchCommandGet(node, bns, policy, keys, nil, records, _INFO1_READ|_INFO1_NOBINDATA)
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -587,7 +583,7 @@ func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath
return nil, err
}

conn, err := node.GetConnection(clnt.cluster.connectionTimeout)
conn, err := node.GetConnection(clnt.cluster.clientPolicy.Timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -647,7 +643,7 @@ func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask,
return nil, err
}

conn, err := node.GetConnection(clnt.cluster.connectionTimeout)
conn, err := node.GetConnection(clnt.cluster.clientPolicy.Timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -688,7 +684,7 @@ func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) {
return nil, err
}

conn, err := node.GetConnection(clnt.cluster.connectionTimeout)
conn, err := node.GetConnection(clnt.cluster.clientPolicy.Timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1151,6 +1147,8 @@ func (clnt *Client) batchExecute(keys []*Key, cmdGen func(node *Node, bns *batch

// Use a goroutine per namespace per node
errs := []error{}
errm := new(sync.Mutex)

wg.Add(len(batchNodes))
for _, batchNode := range batchNodes {
// copy to avoid race condition
Expand All @@ -1160,7 +1158,9 @@ func (clnt *Client) batchExecute(keys []*Key, cmdGen func(node *Node, bns *batch
defer wg.Done()
command := cmdGen(bn, bns)
if err := command.Execute(); err != nil {
errm.Lock()
errs = append(errs, err)
errm.Unlock()
}
}(bn.Node, bns)
}
Expand Down Expand Up @@ -1236,7 +1236,8 @@ func mergeErrors(errs []error) error {
}
var msg bytes.Buffer
for _, err := range errs {
msg.WriteString(err.Error() + "\n")
msg.WriteString(err.Error())
msg.WriteString("\n")
}
return errors.New(msg.String())
}
Loading

0 comments on commit 842474b

Please sign in to comment.