diff --git a/batch_attr.go b/batch_attr.go index 9e3b127c..16ebd5f6 100644 --- a/batch_attr.go +++ b/batch_attr.go @@ -115,6 +115,7 @@ func (ba *batchAttr) setRead(rp *BatchPolicy) { case ReadModeSCAllowUnavailable: ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX } + ba.txnAttr = 0 ba.expiration = uint32(rp.ReadTouchTTLPercent) ba.generation = 0 ba.hasWrite = false @@ -142,6 +143,7 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) { case ReadModeSCAllowUnavailable: ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX } + ba.txnAttr = 0 ba.expiration = uint32(rp.ReadTouchTTLPercent) ba.generation = 0 ba.hasWrite = false @@ -174,6 +176,7 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = wp.Expiration ba.hasWrite = true ba.sendKey = wp.SendKey @@ -209,6 +212,10 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) { ba.writeAttr |= _INFO2_DURABLE_DELETE } + if wp.OnLockingOnly { + ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + if wp.CommitLevel == COMMIT_MASTER { ba.infoAttr |= _INFO3_COMMIT_MASTER } @@ -251,6 +258,7 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = up.Expiration ba.generation = 0 ba.hasWrite = true @@ -260,6 +268,10 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) { ba.writeAttr |= _INFO2_DURABLE_DELETE } + if up.OnLockingOnly { + ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + if up.CommitLevel == COMMIT_MASTER { ba.infoAttr |= _INFO3_COMMIT_MASTER } @@ -270,6 +282,7 @@ func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DELETE ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = 0 ba.hasWrite = true ba.sendKey = dp.SendKey diff --git a/batch_udf_policy.go b/batch_udf_policy.go index e43ba9a3..59b38026 100644 --- a/batch_udf_policy.go +++ b/batch_udf_policy.go @@ -43,6 +43,16 @@ type BatchUDFPolicy struct { // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool + // SendKey determines to whether send user defined key in addition to hash digest on both reads and writes. // If true and the UDF writes a record, the key will be stored with the record on the server. // The default is to not send the user defined key. diff --git a/batch_write_policy.go b/batch_write_policy.go index 5574ad0a..83ea260b 100644 --- a/batch_write_policy.go +++ b/batch_write_policy.go @@ -67,6 +67,16 @@ type BatchWritePolicy struct { // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool + // SendKey determines to whether send user defined key in addition to hash digest on both reads and writes. // If the key is sent on a write, the key will be stored with the record on // the server. diff --git a/command.go b/command.go index 57aec92f..3ff8de89 100644 --- a/command.go +++ b/command.go @@ -92,6 +92,8 @@ const ( _INFO4_MRT_ROLL_FORWARD = (1 << 1) // Roll back MRT. _INFO4_MRT_ROLL_BACK = (1 << 2) + // Must be able to lock record in transaction. + _INFO4_MRT_ON_LOCKING_ONLY = (1 << 4) // Interpret SC_READ bits in info3. // @@ -2892,6 +2894,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo generation := uint32(0) readAttr := 0 infoAttr := 0 + txnAttr := 0 switch policy.RecordExistsAction { case UPDATE: @@ -2923,6 +2926,10 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo writeAttr |= _INFO2_DURABLE_DELETE } + if policy.OnLockingOnly { + txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + // if (policy.Xdr) { // readAttr |= _INFO1_XDR; // } @@ -2932,7 +2939,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo cmd.dataBuffer[9] = byte(readAttr) cmd.dataBuffer[10] = byte(writeAttr) cmd.dataBuffer[11] = byte(infoAttr) - cmd.dataBuffer[12] = 0 // unused + cmd.dataBuffer[12] = byte(txnAttr) cmd.dataBuffer[13] = 0 // clear the result code cmd.dataOffset = 14 cmd.WriteUint32(generation) @@ -2954,6 +2961,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA readAttr := args.readAttr writeAttr := args.writeAttr infoAttr := 0 + txnAttr := 0 operationCount := len(args.operations) switch policy.RecordExistsAction { @@ -2986,6 +2994,10 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA writeAttr |= _INFO2_DURABLE_DELETE } + if policy.OnLockingOnly { + txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + // if (policy.xdr) { // readAttr |= _INFO1_XDR; // } @@ -3013,7 +3025,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA cmd.dataBuffer[9] = byte(readAttr) cmd.dataBuffer[10] = byte(writeAttr) cmd.dataBuffer[11] = byte(infoAttr) - cmd.dataBuffer[12] = 0 // unused + cmd.dataBuffer[12] = byte(txnAttr) cmd.dataBuffer[13] = 0 // clear the result code cmd.dataOffset = 14 cmd.WriteUint32(generation) @@ -3113,7 +3125,7 @@ func (cmd *baseCommand) writeKeyAttr( cmd.WriteByte(byte(attr.readAttr)) cmd.WriteByte(byte(attr.writeAttr)) cmd.WriteByte(byte(attr.infoAttr)) - cmd.WriteByte(0) // unused + cmd.WriteByte(byte(attr.txnAttr)) cmd.WriteByte(0) // clear the result code cmd.WriteUint32(attr.generation) cmd.WriteUint32(attr.expiration) diff --git a/write_policy.go b/write_policy.go index 32a96cf5..7eea7481 100644 --- a/write_policy.go +++ b/write_policy.go @@ -74,6 +74,16 @@ type WritePolicy struct { // This prevents deleted records from reappearing after node failures. // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool } // NewWritePolicy initializes a new WritePolicy instance with default parameters.