Skip to content

Commit

Permalink
[CLIENT-3261] New OnLockingOnly attribute for various write policies …
Browse files Browse the repository at this point in the history
…to write only if provisional does not exist
  • Loading branch information
khaf committed Jan 15, 2025
1 parent fe03b7d commit 6f45202
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 3 deletions.
13 changes: 13 additions & 0 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (ba *batchAttr) setRead(rp *BatchPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.txnAttr = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
Expand Down Expand Up @@ -142,6 +143,7 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.txnAttr = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
Expand Down Expand Up @@ -174,6 +176,7 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = wp.Expiration
ba.hasWrite = true
ba.sendKey = wp.SendKey
Expand Down Expand Up @@ -209,6 +212,10 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) {
ba.writeAttr |= _INFO2_DURABLE_DELETE
}

if wp.OnLockingOnly {
ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

if wp.CommitLevel == COMMIT_MASTER {
ba.infoAttr |= _INFO3_COMMIT_MASTER
}
Expand Down Expand Up @@ -251,6 +258,7 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = up.Expiration
ba.generation = 0
ba.hasWrite = true
Expand All @@ -260,6 +268,10 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
ba.writeAttr |= _INFO2_DURABLE_DELETE
}

if up.OnLockingOnly {
ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

if up.CommitLevel == COMMIT_MASTER {
ba.infoAttr |= _INFO3_COMMIT_MASTER
}
Expand All @@ -270,6 +282,7 @@ func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DELETE
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = 0
ba.hasWrite = true
ba.sendKey = dp.SendKey
Expand Down
10 changes: 10 additions & 0 deletions batch_udf_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ type BatchUDFPolicy struct {
// Valid for Aerospike Server Enterprise Edition 3.10+ only.
DurableDelete bool

// Execute the write command only if the record is not already locked by this transaction.
// If this field is true and the record is already locked by this transaction, the command
// will return an error with the [types.MRT_ALREADY_LOCKED] error code.
//
// This field is useful for safely retrying non-idempotent writes as an alternative to simply
// aborting the transaction.
//
// Default: false
OnLockingOnly bool

// SendKey determines to whether send user defined key in addition to hash digest on both reads and writes.
// If true and the UDF writes a record, the key will be stored with the record on the server.
// The default is to not send the user defined key.
Expand Down
10 changes: 10 additions & 0 deletions batch_write_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ type BatchWritePolicy struct {
// Valid for Aerospike Server Enterprise Edition 3.10+ only.
DurableDelete bool

// Execute the write command only if the record is not already locked by this transaction.
// If this field is true and the record is already locked by this transaction, the command
// will return an error with the [types.MRT_ALREADY_LOCKED] error code.
//
// This field is useful for safely retrying non-idempotent writes as an alternative to simply
// aborting the transaction.
//
// Default: false
OnLockingOnly bool

// SendKey determines to whether send user defined key in addition to hash digest on both reads and writes.
// If the key is sent on a write, the key will be stored with the record on
// the server.
Expand Down
18 changes: 15 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
_INFO4_MRT_ROLL_FORWARD = (1 << 1)
// Roll back MRT.
_INFO4_MRT_ROLL_BACK = (1 << 2)
// Must be able to lock record in transaction.
_INFO4_MRT_ON_LOCKING_ONLY = (1 << 4)

// Interpret SC_READ bits in info3.
//
Expand Down Expand Up @@ -2892,6 +2894,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
generation := uint32(0)
readAttr := 0
infoAttr := 0
txnAttr := 0

switch policy.RecordExistsAction {
case UPDATE:
Expand Down Expand Up @@ -2923,6 +2926,10 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
writeAttr |= _INFO2_DURABLE_DELETE
}

if policy.OnLockingOnly {
txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

// if (policy.Xdr) {
// readAttr |= _INFO1_XDR;
// }
Expand All @@ -2932,7 +2939,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)
cmd.dataBuffer[12] = 0 // unused
cmd.dataBuffer[12] = byte(txnAttr)
cmd.dataBuffer[13] = 0 // clear the result code
cmd.dataOffset = 14
cmd.WriteUint32(generation)
Expand All @@ -2954,6 +2961,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
readAttr := args.readAttr
writeAttr := args.writeAttr
infoAttr := 0
txnAttr := 0
operationCount := len(args.operations)

switch policy.RecordExistsAction {
Expand Down Expand Up @@ -2986,6 +2994,10 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
writeAttr |= _INFO2_DURABLE_DELETE
}

if policy.OnLockingOnly {
txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

// if (policy.xdr) {
// readAttr |= _INFO1_XDR;
// }
Expand Down Expand Up @@ -3013,7 +3025,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)
cmd.dataBuffer[12] = 0 // unused
cmd.dataBuffer[12] = byte(txnAttr)
cmd.dataBuffer[13] = 0 // clear the result code
cmd.dataOffset = 14
cmd.WriteUint32(generation)
Expand Down Expand Up @@ -3113,7 +3125,7 @@ func (cmd *baseCommand) writeKeyAttr(
cmd.WriteByte(byte(attr.readAttr))
cmd.WriteByte(byte(attr.writeAttr))
cmd.WriteByte(byte(attr.infoAttr))
cmd.WriteByte(0) // unused
cmd.WriteByte(byte(attr.txnAttr))
cmd.WriteByte(0) // clear the result code
cmd.WriteUint32(attr.generation)
cmd.WriteUint32(attr.expiration)
Expand Down
10 changes: 10 additions & 0 deletions write_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ type WritePolicy struct {
// This prevents deleted records from reappearing after node failures.
// Valid for Aerospike Server Enterprise Edition 3.10+ only.
DurableDelete bool

// Execute the write command only if the record is not already locked by this transaction.
// If this field is true and the record is already locked by this transaction, the command
// will return an error with the [types.MRT_ALREADY_LOCKED] error code.
//
// This field is useful for safely retrying non-idempotent writes as an alternative to simply
// aborting the transaction.
//
// Default: false
OnLockingOnly bool
}

// NewWritePolicy initializes a new WritePolicy instance with default parameters.
Expand Down

0 comments on commit 6f45202

Please sign in to comment.