From bc0e4d8b6b434e5dc61366532666faff68abfe0c Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Mon, 22 Dec 2014 12:48:50 +0100 Subject: [PATCH] added consistency level to BasePolicy --- batch_command_exists.go | 2 +- batch_command_get.go | 2 +- command.go | 44 ++++++++++++++++++++++++----------------- consistency_level.go | 30 ++++++++++++++++++++++++++++ execute_command.go | 2 +- exists_command.go | 17 +++++----------- policy.go | 6 ++++++ query_command.go | 6 +----- read_command.go | 13 +++--------- read_header_command.go | 2 +- 10 files changed, 75 insertions(+), 49 deletions(-) create mode 100644 consistency_level.go diff --git a/batch_command_exists.go b/batch_command_exists.go index 447ae306..607e99d7 100644 --- a/batch_command_exists.go +++ b/batch_command_exists.go @@ -50,7 +50,7 @@ func (cmd *batchCommandExists) getPolicy(ifc command) Policy { } func (cmd *batchCommandExists) writeBuffer(ifc command) error { - return cmd.setBatchExists(cmd.batchNamespace) + return cmd.setBatchExists(cmd.policy, cmd.batchNamespace) } // Parse all results in the batch. Add records to shared list. diff --git a/batch_command_get.go b/batch_command_get.go index c2b36551..bb11d60d 100644 --- a/batch_command_get.go +++ b/batch_command_get.go @@ -55,7 +55,7 @@ func (cmd *batchCommandGet) getPolicy(ifc command) Policy { } func (cmd *batchCommandGet) writeBuffer(ifc command) error { - return cmd.setBatchGet(cmd.batchNamespace, cmd.binNames, cmd.readAttr) + return cmd.setBatchGet(cmd.policy, cmd.batchNamespace, cmd.binNames, cmd.readAttr) } // Parse all results in the batch. Add records to shared list. diff --git a/command.go b/command.go index 08243516..175767fc 100644 --- a/command.go +++ b/command.go @@ -35,6 +35,9 @@ const ( // Do not read the bins _INFO1_NOBINDATA int = (1 << 5) + // Involve all replicas in read operation. + _INFO1_CONSISTENCY_ALL = (1 << 6) + // Create or update record _INFO2_WRITE int = (1 << 0) // Fling a record into the belly of Moloch. @@ -166,13 +169,13 @@ func (cmd *baseCommand) setTouch(policy *WritePolicy, key *Key) error { } // Writes the command for exist operations -func (cmd *baseCommand) setExists(key *Key) error { +func (cmd *baseCommand) setExists(policy *BasePolicy, key *Key) error { cmd.begin() fieldCount := cmd.estimateKeySize(key) if err := cmd.sizeBuffer(); err != nil { return nil } - cmd.writeHeader(_INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0) + cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0) cmd.writeKey(key) cmd.end() return nil @@ -180,13 +183,13 @@ func (cmd *baseCommand) setExists(key *Key) error { } // Writes the command for get operations (all bins) -func (cmd *baseCommand) setReadForKeyOnly(key *Key) error { +func (cmd *baseCommand) setReadForKeyOnly(policy *BasePolicy, key *Key) error { cmd.begin() fieldCount := cmd.estimateKeySize(key) if err := cmd.sizeBuffer(); err != nil { return nil } - cmd.writeHeader(_INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0) + cmd.writeHeader(policy, _INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0) cmd.writeKey(key) cmd.end() return nil @@ -194,7 +197,7 @@ func (cmd *baseCommand) setReadForKeyOnly(key *Key) error { } // Writes the command for get operations (specified bins) -func (cmd *baseCommand) setRead(key *Key, binNames []string) (err error) { +func (cmd *baseCommand) setRead(policy *BasePolicy, key *Key, binNames []string) (err error) { if binNames != nil && len(binNames) > 0 { cmd.begin() fieldCount := cmd.estimateKeySize(key) @@ -205,7 +208,7 @@ func (cmd *baseCommand) setRead(key *Key, binNames []string) (err error) { if err = cmd.sizeBuffer(); err != nil { return nil } - cmd.writeHeader(_INFO1_READ, 0, fieldCount, len(binNames)) + cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ, 0, fieldCount, len(binNames)) cmd.writeKey(key) for i := range binNames { @@ -213,14 +216,14 @@ func (cmd *baseCommand) setRead(key *Key, binNames []string) (err error) { } cmd.end() } else { - err = cmd.setReadForKeyOnly(key) + err = cmd.setReadForKeyOnly(policy, key) } return err } // Writes the command for getting metadata operations -func (cmd *baseCommand) setReadHeader(key *Key) error { +func (cmd *baseCommand) setReadHeader(policy *BasePolicy, key *Key) error { cmd.begin() fieldCount := cmd.estimateKeySize(key) cmd.estimateOperationSizeForBinName("") @@ -232,7 +235,7 @@ func (cmd *baseCommand) setReadHeader(key *Key) error { // The workaround is to request a non-existent bin. // TODO: Fix this on server. //command.setRead(_INFO1_READ | _INFO1_NOBINDATA); - cmd.writeHeader(_INFO1_READ, 0, fieldCount, 1) + cmd.writeHeader(policy.GetBasePolicy(), _INFO1_READ, 0, fieldCount, 1) cmd.writeKey(key) cmd.writeOperationForBinName("", READ) @@ -286,7 +289,7 @@ func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []* if writeAttr != 0 { cmd.writeHeaderWithPolicy(policy, readAttr, writeAttr, fieldCount, len(operations)) } else { - cmd.writeHeader(readAttr, writeAttr, fieldCount, len(operations)) + cmd.writeHeader(policy.GetBasePolicy(), readAttr, writeAttr, fieldCount, len(operations)) } cmd.writeKey(key) @@ -310,7 +313,7 @@ func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []* return nil } -func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string, args []Value) error { +func (cmd *baseCommand) setUdf(policy Policy, key *Key, packageName string, functionName string, args []Value) error { cmd.begin() fieldCount := cmd.estimateKeySize(key) argBytes, err := packValueArray(args) @@ -322,7 +325,7 @@ func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string if err := cmd.sizeBuffer(); err != nil { return nil } - cmd.writeHeader(0, _INFO2_WRITE, fieldCount, 0) + cmd.writeHeader(policy.GetBasePolicy(), 0, _INFO2_WRITE, fieldCount, 0) cmd.writeKey(key) cmd.writeFieldString(packageName, UDF_PACKAGE_NAME) cmd.writeFieldString(functionName, UDF_FUNCTION) @@ -332,7 +335,7 @@ func (cmd *baseCommand) setUdf(key *Key, packageName string, functionName string return nil } -func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error { +func (cmd *baseCommand) setBatchExists(policy *BasePolicy, batchNamespace *batchNamespace) error { // Estimate buffer size cmd.begin() keys := batchNamespace.keys @@ -344,7 +347,7 @@ func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error { return nil } - cmd.writeHeader(_INFO1_READ|_INFO1_NOBINDATA, 0, 2, 0) + cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, 2, 0) cmd.writeFieldString(*batchNamespace.namespace, NAMESPACE) cmd.writeFieldHeader(byteSize, DIGEST_RIPE_ARRAY) @@ -357,7 +360,7 @@ func (cmd *baseCommand) setBatchExists(batchNamespace *batchNamespace) error { return nil } -func (cmd *baseCommand) setBatchGet(batchNamespace *batchNamespace, binNames map[string]struct{}, readAttr int) error { +func (cmd *baseCommand) setBatchGet(policy Policy, batchNamespace *batchNamespace, binNames map[string]struct{}, readAttr int) error { // Estimate buffer size cmd.begin() keys := batchNamespace.keys @@ -379,7 +382,7 @@ func (cmd *baseCommand) setBatchGet(batchNamespace *batchNamespace, binNames map if binNames != nil { operationCount = len(binNames) } - cmd.writeHeader(readAttr, 0, 2, operationCount) + cmd.writeHeader(policy.GetBasePolicy(), readAttr, 0, 2, operationCount) cmd.writeFieldString(*batchNamespace.namespace, NAMESPACE) cmd.writeFieldHeader(byteSize, DIGEST_RIPE_ARRAY) @@ -434,7 +437,7 @@ func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName * if binNames != nil { operationCount = len(binNames) } - cmd.writeHeader(readAttr, 0, fieldCount, operationCount) + cmd.writeHeader(policy.GetBasePolicy(), readAttr, 0, fieldCount, operationCount) if namespace != nil { cmd.writeFieldString(*namespace, NAMESPACE) @@ -518,7 +521,12 @@ func (cmd *baseCommand) estimateOperationSize() { } // Generic header write. -func (cmd *baseCommand) writeHeader(readAttr int, writeAttr int, fieldCount int, operationCount int) { +func (cmd *baseCommand) writeHeader(policy *BasePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) { + + if policy.ConsistencyLevel == CONSISTENCY_ALL { + readAttr |= _INFO1_CONSISTENCY_ALL + } + // Write all header data except total size which must be written last. cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length. cmd.dataBuffer[9] = byte(readAttr) diff --git a/consistency_level.go b/consistency_level.go new file mode 100644 index 00000000..530a7ce2 --- /dev/null +++ b/consistency_level.go @@ -0,0 +1,30 @@ +/* + * Copyright 2012-2014 Aerospike, Inc. + * + * Portions may be licensed to Aerospike, Inc. under one or more contributor + * license agreements. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package aerospike + +// How replicas should be consulted in a read operation to provide the desired +// consistency guarantee. +type ConsistencyLevel int + +const ( + // Involve a single replica in the operation. + CONSISTENCY_ONE = iota + + // Involve all replicas in the operation. + CONSISTENCY_ALL +) diff --git a/execute_command.go b/execute_command.go index fcbbdf21..6fd42dfc 100644 --- a/execute_command.go +++ b/execute_command.go @@ -39,7 +39,7 @@ func newExecuteCommand( } func (cmd *executeCommand) writeBuffer(ifc command) error { - return cmd.setUdf(cmd.key, cmd.packageName, cmd.functionName, cmd.args) + return cmd.setUdf(cmd.policy, cmd.key, cmd.packageName, cmd.functionName, cmd.args) } func (cmd *executeCommand) Execute() error { diff --git a/exists_command.go b/exists_command.go index e53c41d3..72e0fe30 100644 --- a/exists_command.go +++ b/exists_command.go @@ -24,22 +24,15 @@ var _ command = &existsCommand{} type existsCommand struct { *singleCommand - policy Policy + policy *BasePolicy exists bool } -func newExistsCommand(cluster *Cluster, policy Policy, key *Key) *existsCommand { - newExistsCmd := &existsCommand{ +func newExistsCommand(cluster *Cluster, policy *BasePolicy, key *Key) *existsCommand { + return &existsCommand{ singleCommand: newSingleCommand(cluster, key), + policy: policy, } - - if policy == nil { - newExistsCmd.policy = NewWritePolicy(0, 0) - } else { - newExistsCmd.policy = policy - } - - return newExistsCmd } func (cmd *existsCommand) getPolicy(ifc command) Policy { @@ -47,7 +40,7 @@ func (cmd *existsCommand) getPolicy(ifc command) Policy { } func (cmd *existsCommand) writeBuffer(ifc command) error { - return cmd.setExists(cmd.key) + return cmd.setExists(cmd.policy.GetBasePolicy(), cmd.key) } func (cmd *existsCommand) parseResult(ifc command, conn *Connection) error { diff --git a/policy.go b/policy.go index 5557cc33..2c950fe9 100644 --- a/policy.go +++ b/policy.go @@ -33,6 +33,11 @@ type BasePolicy struct { // Currently, only used for scans. Priority Priority //= Priority.DEFAULT; + // How replicas should be consulted in a read operation to provide the desired + // consistency guarantee. Default to allowing one replica to be used in the + // read operation. + ConsistencyLevel ConsistencyLevel //= CONSISTENCY_ONE + // Timeout specifies transaction timeout. // This timeout is used to set the socket timeout and is also sent to the // server along with the transaction in the wire protocol. @@ -54,6 +59,7 @@ type BasePolicy struct { func NewPolicy() *BasePolicy { return &BasePolicy{ Priority: DEFAULT, + ConsistencyLevel: CONSISTENCY_ONE, Timeout: 0 * time.Millisecond, MaxRetries: 2, SleepBetweenRetries: 500 * time.Millisecond, diff --git a/query_command.go b/query_command.go index 33c47922..5ac05711 100644 --- a/query_command.go +++ b/query_command.go @@ -25,10 +25,6 @@ type queryCommand struct { policy *QueryPolicy statement *Statement - - // RecordSet recordSet; - // Records chan *Record - // Errors chan error } func newQueryCommand(node *Node, policy *QueryPolicy, statement *Statement, recChan chan *Record, errChan chan error) *queryCommand { @@ -136,7 +132,7 @@ func (cmd *queryCommand) writeBuffer(ifc command) (err error) { } readAttr := _INFO1_READ - cmd.writeHeader(readAttr, 0, fieldCount, 0) + cmd.writeHeader(cmd.policy.GetBasePolicy(), readAttr, 0, fieldCount, 0) if cmd.statement.Namespace != "" { cmd.writeFieldString(cmd.statement.Namespace, NAMESPACE) diff --git a/read_command.go b/read_command.go index 3b0b43cc..f9ed79de 100644 --- a/read_command.go +++ b/read_command.go @@ -30,18 +30,11 @@ type readCommand struct { } func newReadCommand(cluster *Cluster, policy Policy, key *Key, binNames []string) *readCommand { - newReadCmd := &readCommand{ + return &readCommand{ singleCommand: newSingleCommand(cluster, key), binNames: binNames, + policy: policy, } - - if policy != nil { - newReadCmd.policy = policy - } else { - newReadCmd.policy = NewPolicy() - } - - return newReadCmd } func (cmd *readCommand) getPolicy(ifc command) Policy { @@ -49,7 +42,7 @@ func (cmd *readCommand) getPolicy(ifc command) Policy { } func (cmd *readCommand) writeBuffer(ifc command) error { - return cmd.setRead(cmd.key, cmd.binNames) + return cmd.setRead(cmd.policy.GetBasePolicy(), cmd.key, cmd.binNames) } func (cmd *readCommand) parseResult(ifc command, conn *Connection) error { diff --git a/read_header_command.go b/read_header_command.go index 7f8117e6..3aaea710 100644 --- a/read_header_command.go +++ b/read_header_command.go @@ -45,7 +45,7 @@ func (cmd *readHeaderCommand) getPolicy(ifc command) Policy { } func (cmd *readHeaderCommand) writeBuffer(ifc command) error { - return cmd.setReadHeader(cmd.key) + return cmd.setReadHeader(cmd.policy.GetBasePolicy(), cmd.key) } func (cmd *readHeaderCommand) parseResult(ifc command, conn *Connection) error {