Skip to content

Commit

Permalink
added consistency level to BasePolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 22, 2014
1 parent d4f330e commit bc0e4d8
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 49 deletions.
2 changes: 1 addition & 1 deletion batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (cmd *batchCommandExists) getPolicy(ifc command) Policy {
}

func (cmd *batchCommandExists) writeBuffer(ifc command) error {
return cmd.setBatchExists(cmd.batchNamespace)
return cmd.setBatchExists(cmd.policy, cmd.batchNamespace)
}

// Parse all results in the batch. Add records to shared list.
Expand Down
2 changes: 1 addition & 1 deletion batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (cmd *batchCommandGet) getPolicy(ifc command) Policy {
}

func (cmd *batchCommandGet) writeBuffer(ifc command) error {
return cmd.setBatchGet(cmd.batchNamespace, cmd.binNames, cmd.readAttr)
return cmd.setBatchGet(cmd.policy, cmd.batchNamespace, cmd.binNames, cmd.readAttr)
}

// Parse all results in the batch. Add records to shared list.
Expand Down
44 changes: 26 additions & 18 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// Do not read the bins
_INFO1_NOBINDATA int = (1 << 5)

// Involve all replicas in read operation.
_INFO1_CONSISTENCY_ALL = (1 << 6)

// Create or update record
_INFO2_WRITE int = (1 << 0)
// Fling a record into the belly of Moloch.
Expand Down Expand Up @@ -166,35 +169,35 @@ func (cmd *baseCommand) setTouch(policy *WritePolicy, key *Key) error {
}

// Writes the command for exist operations
func (cmd *baseCommand) setExists(key *Key) error {
func (cmd *baseCommand) setExists(policy *BasePolicy, key *Key) error {
cmd.begin()
fieldCount := cmd.estimateKeySize(key)
if err := cmd.sizeBuffer(); err != nil {
return nil
}
cmd.writeHeader(_INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0)
cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0)
cmd.writeKey(key)
cmd.end()
return nil

}

// Writes the command for get operations (all bins)
func (cmd *baseCommand) setReadForKeyOnly(key *Key) error {
func (cmd *baseCommand) setReadForKeyOnly(policy *BasePolicy, key *Key) error {
cmd.begin()
fieldCount := cmd.estimateKeySize(key)
if err := cmd.sizeBuffer(); err != nil {
return nil
}
cmd.writeHeader(_INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0)
cmd.writeHeader(policy, _INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0)
cmd.writeKey(key)
cmd.end()
return nil

}

// Writes the command for get operations (specified bins)
func (cmd *baseCommand) setRead(key *Key, binNames []string) (err error) {
func (cmd *baseCommand) setRead(policy *BasePolicy, key *Key, binNames []string) (err error) {
if binNames != nil && len(binNames) > 0 {
cmd.begin()
fieldCount := cmd.estimateKeySize(key)
Expand All @@ -205,22 +208,22 @@ func (cmd *baseCommand) setRead(key *Key, binNames []string) (err error) {
if err = cmd.sizeBuffer(); err != nil {
return nil
}
cmd.writeHeader(_INFO1_READ, 0, fieldCount, len(binNames))
cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ, 0, fieldCount, len(binNames))
cmd.writeKey(key)

for i := range binNames {
cmd.writeOperationForBinName(binNames[i], READ)
}
cmd.end()
} else {
err = cmd.setReadForKeyOnly(key)
err = cmd.setReadForKeyOnly(policy, key)
}

return err
}

// Writes the command for getting metadata operations
func (cmd *baseCommand) setReadHeader(key *Key) error {
func (cmd *baseCommand) setReadHeader(policy *BasePolicy, key *Key) error {
cmd.begin()
fieldCount := cmd.estimateKeySize(key)
cmd.estimateOperationSizeForBinName("")
Expand All @@ -232,7 +235,7 @@ func (cmd *baseCommand) setReadHeader(key *Key) error {
// The workaround is to request a non-existent bin.
// TODO: Fix this on server.
//command.setRead(_INFO1_READ | _INFO1_NOBINDATA);
cmd.writeHeader(_INFO1_READ, 0, fieldCount, 1)
cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ, 0, fieldCount, 1)

cmd.writeKey(key)
cmd.writeOperationForBinName("", READ)
Expand Down Expand Up @@ -286,7 +289,7 @@ func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []*
if writeAttr != 0 {
cmd.writeHeaderWithPolicy(policy, readAttr, writeAttr, fieldCount, len(operations))
} else {
cmd.writeHeader(readAttr, writeAttr, fieldCount, len(operations))
cmd.writeHeader(policy.GetBasePolicy(), readAttr, writeAttr, fieldCount, len(operations))
}
cmd.writeKey(key)

Expand All @@ -310,7 +313,7 @@ func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []*
return nil
}

func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string, args []Value) error {
func (cmd *baseCommand) setUdf(policy Policy, key *Key, packageName string, functionName string, args []Value) error {
cmd.begin()
fieldCount := cmd.estimateKeySize(key)
argBytes, err := packValueArray(args)
Expand All @@ -322,7 +325,7 @@ func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string
if err := cmd.sizeBuffer(); err != nil {
return nil
}
cmd.writeHeader(0, _INFO2_WRITE, fieldCount, 0)
cmd.writeHeader(policy.GetBasePolicy(), 0, _INFO2_WRITE, fieldCount, 0)
cmd.writeKey(key)
cmd.writeFieldString(packageName, UDF_PACKAGE_NAME)
cmd.writeFieldString(functionName, UDF_FUNCTION)
Expand All @@ -332,7 +335,7 @@ func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string
return nil
}

func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error {
func (cmd *baseCommand) setBatchExists(policy *BasePolicy, batchNamespace *batchNamespace) error {
// Estimate buffer size
cmd.begin()
keys := batchNamespace.keys
Expand All @@ -344,7 +347,7 @@ func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error {
return nil
}

cmd.writeHeader(_INFO1_READ|_INFO1_NOBINDATA, 0, 2, 0)
cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, 2, 0)
cmd.writeFieldString(*batchNamespace.namespace, NAMESPACE)
cmd.writeFieldHeader(byteSize, DIGEST_RIPE_ARRAY)

Expand All @@ -357,7 +360,7 @@ func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error {
return nil
}

func (cmd *baseCommand) setBatchGet(batchNamespace *batchNamespace, binNames map[string]struct{}, readAttr int) error {
func (cmd *baseCommand) setBatchGet(policy Policy, batchNamespace *batchNamespace, binNames map[string]struct{}, readAttr int) error {
// Estimate buffer size
cmd.begin()
keys := batchNamespace.keys
Expand All @@ -379,7 +382,7 @@ func (cmd *baseCommand) setBatchGet(batchNamespace *batchNamespace, binNames map
if binNames != nil {
operationCount = len(binNames)
}
cmd.writeHeader(readAttr, 0, 2, operationCount)
cmd.writeHeader(policy.GetBasePolicy(), readAttr, 0, 2, operationCount)
cmd.writeFieldString(*batchNamespace.namespace, NAMESPACE)
cmd.writeFieldHeader(byteSize, DIGEST_RIPE_ARRAY)

Expand Down Expand Up @@ -434,7 +437,7 @@ func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *
if binNames != nil {
operationCount = len(binNames)
}
cmd.writeHeader(readAttr, 0, fieldCount, operationCount)
cmd.writeHeader(policy.GetBasePolicy(), readAttr, 0, fieldCount, operationCount)

if namespace != nil {
cmd.writeFieldString(*namespace, NAMESPACE)
Expand Down Expand Up @@ -518,7 +521,12 @@ func (cmd *baseCommand) estimateOperationSize() {
}

// Generic header write.
func (cmd *baseCommand) writeHeader(readAttr int, writeAttr int, fieldCount int, operationCount int) {
func (cmd *baseCommand) writeHeader(policy *BasePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) {

if policy.ConsistencyLevel == CONSISTENCY_ALL {
readAttr |= _INFO1_CONSISTENCY_ALL
}

// Write all header data except total size which must be written last.
cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length.
cmd.dataBuffer[9] = byte(readAttr)
Expand Down
30 changes: 30 additions & 0 deletions consistency_level.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2012-2014 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package aerospike

// How replicas should be consulted in a read operation to provide the desired
// consistency guarantee.
type ConsistencyLevel int

const (
// Involve a single replica in the operation.
CONSISTENCY_ONE = iota

// Involve all replicas in the operation.
CONSISTENCY_ALL
)
2 changes: 1 addition & 1 deletion execute_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newExecuteCommand(
}

func (cmd *executeCommand) writeBuffer(ifc command) error {
return cmd.setUdf(cmd.key, cmd.packageName, cmd.functionName, cmd.args)
return cmd.setUdf(cmd.policy, cmd.key, cmd.packageName, cmd.functionName, cmd.args)
}

func (cmd *executeCommand) Execute() error {
Expand Down
17 changes: 5 additions & 12 deletions exists_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,23 @@ var _ command = &existsCommand{}
type existsCommand struct {
*singleCommand

policy Policy
policy *BasePolicy
exists bool
}

func newExistsCommand(cluster *Cluster, policy Policy, key *Key) *existsCommand {
newExistsCmd := &existsCommand{
func newExistsCommand(cluster *Cluster, policy *BasePolicy, key *Key) *existsCommand {
return &existsCommand{
singleCommand: newSingleCommand(cluster, key),
policy: policy,
}

if policy == nil {
newExistsCmd.policy = NewWritePolicy(0, 0)
} else {
newExistsCmd.policy = policy
}

return newExistsCmd
}

func (cmd *existsCommand) getPolicy(ifc command) Policy {
return cmd.policy.GetBasePolicy()
}

func (cmd *existsCommand) writeBuffer(ifc command) error {
return cmd.setExists(cmd.key)
return cmd.setExists(cmd.policy.GetBasePolicy(), cmd.key)
}

func (cmd *existsCommand) parseResult(ifc command, conn *Connection) error {
Expand Down
6 changes: 6 additions & 0 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type BasePolicy struct {
// Currently, only used for scans.
Priority Priority //= Priority.DEFAULT;

// How replicas should be consulted in a read operation to provide the desired
// consistency guarantee. Default to allowing one replica to be used in the
// read operation.
ConsistencyLevel ConsistencyLevel //= CONSISTENCY_ONE

// Timeout specifies transaction timeout.
// This timeout is used to set the socket timeout and is also sent to the
// server along with the transaction in the wire protocol.
Expand All @@ -54,6 +59,7 @@ type BasePolicy struct {
func NewPolicy() *BasePolicy {
return &BasePolicy{
Priority: DEFAULT,
ConsistencyLevel: CONSISTENCY_ONE,
Timeout: 0 * time.Millisecond,
MaxRetries: 2,
SleepBetweenRetries: 500 * time.Millisecond,
Expand Down
6 changes: 1 addition & 5 deletions query_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ type queryCommand struct {

policy *QueryPolicy
statement *Statement

// RecordSet recordSet;
// Records chan *Record
// Errors chan error
}

func newQueryCommand(node *Node, policy *QueryPolicy, statement *Statement, recChan chan *Record, errChan chan error) *queryCommand {
Expand Down Expand Up @@ -136,7 +132,7 @@ func (cmd *queryCommand) writeBuffer(ifc command) (err error) {
}

readAttr := _INFO1_READ
cmd.writeHeader(readAttr, 0, fieldCount, 0)
cmd.writeHeader(cmd.policy.GetBasePolicy(), readAttr, 0, fieldCount, 0)

if cmd.statement.Namespace != "" {
cmd.writeFieldString(cmd.statement.Namespace, NAMESPACE)
Expand Down
13 changes: 3 additions & 10 deletions read_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,19 @@ type readCommand struct {
}

func newReadCommand(cluster *Cluster, policy Policy, key *Key, binNames []string) *readCommand {
newReadCmd := &readCommand{
return &readCommand{
singleCommand: newSingleCommand(cluster, key),
binNames: binNames,
policy: policy,
}

if policy != nil {
newReadCmd.policy = policy
} else {
newReadCmd.policy = NewPolicy()
}

return newReadCmd
}

func (cmd *readCommand) getPolicy(ifc command) Policy {
return cmd.policy
}

func (cmd *readCommand) writeBuffer(ifc command) error {
return cmd.setRead(cmd.key, cmd.binNames)
return cmd.setRead(cmd.policy.GetBasePolicy(), cmd.key, cmd.binNames)
}

func (cmd *readCommand) parseResult(ifc command, conn *Connection) error {
Expand Down
2 changes: 1 addition & 1 deletion read_header_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (cmd *readHeaderCommand) getPolicy(ifc command) Policy {
}

func (cmd *readHeaderCommand) writeBuffer(ifc command) error {
return cmd.setReadHeader(cmd.key)
return cmd.setReadHeader(cmd.policy.GetBasePolicy(), cmd.key)
}

func (cmd *readHeaderCommand) parseResult(ifc command, conn *Connection) error {
Expand Down

0 comments on commit bc0e4d8

Please sign in to comment.