Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v8.0.0-beta.3 #462

Open
wants to merge 9 commits into
base: v8
Choose a base branch
from
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Change History

## January 17 2025: v8.0.0-beta.3

- **New Features**
- [CLIENT-3257] Implement iterator support for `Recordset`.

- **Fixes**
- [CLIENT-3275] Return an error when MRT commit called, but transaction was already aborted.
- [CLIENT-3295] Duplicate parsing of fields in RecordParser.ParseRecord.
- [CLIENT-3261] New OnLockingOnly attribute for various write policies to write only if provisional does not exist.
- [CLIENT-3227] Clarify that BasePolicy.UseCompression requires Enterprise Edition.
- [CLIENT-3260] Add MRT_ALREADY_LOCKED and MRT_MONITOR_EXISTS error codes.
- [CLIENT-3292][CLIENT-3293] Update dependencies due to Snyk CVE reports.
- [CLIENT-3274] Fix MRT related client exception inconsistency between clients.

## December 20 2024: v8.0.0-beta.2

- **New Features**
Expand Down
13 changes: 13 additions & 0 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (ba *batchAttr) setRead(rp *BatchPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.txnAttr = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
Expand Down Expand Up @@ -142,6 +143,7 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.txnAttr = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
Expand Down Expand Up @@ -174,6 +176,7 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = wp.Expiration
ba.hasWrite = true
ba.sendKey = wp.SendKey
Expand Down Expand Up @@ -209,6 +212,10 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) {
ba.writeAttr |= _INFO2_DURABLE_DELETE
}

if wp.OnLockingOnly {
ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

if wp.CommitLevel == COMMIT_MASTER {
ba.infoAttr |= _INFO3_COMMIT_MASTER
}
Expand Down Expand Up @@ -251,6 +258,7 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = up.Expiration
ba.generation = 0
ba.hasWrite = true
Expand All @@ -260,6 +268,10 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
ba.writeAttr |= _INFO2_DURABLE_DELETE
}

if up.OnLockingOnly {
ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

if up.CommitLevel == COMMIT_MASTER {
ba.infoAttr |= _INFO3_COMMIT_MASTER
}
Expand All @@ -270,6 +282,7 @@ func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) {
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DELETE
ba.infoAttr = 0
ba.txnAttr = 0
ba.expiration = 0
ba.hasWrite = true
ba.sendKey = dp.SendKey
Expand Down
2 changes: 1 addition & 1 deletion batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, deadline time

cmd.splitRetry = true

// Run batch requests sequentially in same thread.
// Run batch requests sequentially in same goroutine.
var ferr Error
for _, batchNode := range batchNodes {
command := ifc.cloneBatchCommand(batchNode)
Expand Down
10 changes: 10 additions & 0 deletions batch_udf_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ type BatchUDFPolicy struct {
// Valid for Aerospike Server Enterprise Edition 3.10+ only.
DurableDelete bool

// Execute the write command only if the record is not already locked by this transaction.
// If this field is true and the record is already locked by this transaction, the command
// will return an error with the [types.MRT_ALREADY_LOCKED] error code.
//
// This field is useful for safely retrying non-idempotent writes as an alternative to simply
// aborting the transaction.
//
// Default: false
OnLockingOnly bool

// SendKey determines to whether send user defined key in addition to hash digest on both reads and writes.
// If true and the UDF writes a record, the key will be stored with the record on the server.
// The default is to not send the user defined key.
Expand Down
10 changes: 10 additions & 0 deletions batch_write_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ type BatchWritePolicy struct {
// Valid for Aerospike Server Enterprise Edition 3.10+ only.
DurableDelete bool

// Execute the write command only if the record is not already locked by this transaction.
// If this field is true and the record is already locked by this transaction, the command
// will return an error with the [types.MRT_ALREADY_LOCKED] error code.
//
// This field is useful for safely retrying non-idempotent writes as an alternative to simply
// aborting the transaction.
//
// Default: false
OnLockingOnly bool

// SendKey determines to whether send user defined key in addition to hash digest on both reads and writes.
// If the key is sent on a write, the key will be stored with the record on
// the server.
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func (clnt *Client) Commit(txn *Txn) (CommitStatus, Error) {
case TxnStateCommitted:
return CommitStatusAlreadyCommitted, nil
case TxnStateAborted:
return CommitStatusAlreadyAborted, nil
return CommitStatusAlreadyAborted, newError(types.TXN_ALREADY_ABORTED, "Transaction already aborted")
}
}

Expand All @@ -1491,7 +1491,7 @@ func (clnt *Client) Abort(txn *Txn) (AbortStatus, Error) {
case TxnStateVerified:
return tr.Abort(&clnt.GetDefaultTxnRollPolicy().BatchPolicy)
case TxnStateCommitted:
return AbortStatusAlreadyCommitted, nil
return AbortStatusAlreadyCommitted, newError(types.TXN_ALREADY_COMMITTED, "Transaction already committed")
case TxnStateAborted:
return AbortStatusAlreadyAborted, nil
}
Expand Down
18 changes: 15 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
_INFO4_MRT_ROLL_FORWARD = (1 << 1)
// Roll back MRT.
_INFO4_MRT_ROLL_BACK = (1 << 2)
// Must be able to lock record in transaction.
_INFO4_MRT_ON_LOCKING_ONLY = (1 << 4)

// Interpret SC_READ bits in info3.
//
Expand Down Expand Up @@ -2892,6 +2894,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
generation := uint32(0)
readAttr := 0
infoAttr := 0
txnAttr := 0

switch policy.RecordExistsAction {
case UPDATE:
Expand Down Expand Up @@ -2923,6 +2926,10 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
writeAttr |= _INFO2_DURABLE_DELETE
}

if policy.OnLockingOnly {
txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

// if (policy.Xdr) {
// readAttr |= _INFO1_XDR;
// }
Expand All @@ -2932,7 +2939,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)
cmd.dataBuffer[12] = 0 // unused
cmd.dataBuffer[12] = byte(txnAttr)
cmd.dataBuffer[13] = 0 // clear the result code
cmd.dataOffset = 14
cmd.WriteUint32(generation)
Expand All @@ -2954,6 +2961,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
readAttr := args.readAttr
writeAttr := args.writeAttr
infoAttr := 0
txnAttr := 0
operationCount := len(args.operations)

switch policy.RecordExistsAction {
Expand Down Expand Up @@ -2986,6 +2994,10 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
writeAttr |= _INFO2_DURABLE_DELETE
}

if policy.OnLockingOnly {
txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY
}

// if (policy.xdr) {
// readAttr |= _INFO1_XDR;
// }
Expand Down Expand Up @@ -3013,7 +3025,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)
cmd.dataBuffer[12] = 0 // unused
cmd.dataBuffer[12] = byte(txnAttr)
cmd.dataBuffer[13] = 0 // clear the result code
cmd.dataOffset = 14
cmd.WriteUint32(generation)
Expand Down Expand Up @@ -3113,7 +3125,7 @@ func (cmd *baseCommand) writeKeyAttr(
cmd.WriteByte(byte(attr.readAttr))
cmd.WriteByte(byte(attr.writeAttr))
cmd.WriteByte(byte(attr.infoAttr))
cmd.WriteByte(0) // unused
cmd.WriteByte(byte(attr.txnAttr))
cmd.WriteByte(0) // clear the result code
cmd.WriteUint32(attr.generation)
cmd.WriteUint32(attr.expiration)
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/aerospike/aerospike-client-go/v8
go 1.23

require (
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.36.1
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/yuin/gopher-lua v1.1.1
golang.org/x/sync v0.10.0
)
Expand All @@ -16,12 +17,10 @@ require (
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.28.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg=
github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw=
github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
Expand All @@ -39,8 +41,8 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
2 changes: 2 additions & 0 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type BasePolicy struct {
// This option will increase cpu and memory usage (for extra compressed buffers),but
// decrease the size of data sent over the network.
//
// Valid for Aerospike Server Enterprise Edition only.
//
// Default: false
UseCompression bool // = false

Expand Down
19 changes: 5 additions & 14 deletions record_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,14 @@ func (rp *recordParser) parseTranDeadline(txn *Txn) {
}
}
func (rp *recordParser) parseRecord(key *Key, isOperation bool) (*Record, Error) {
var bins BinMap
receiveOffset := rp.cmd.dataOffset

// There can be fields in the response (setname etc).
// But for now, ignore them. Expose them to the API if needed in the future.
if rp.fieldCount > 0 {
// Just skip over all the fields
for i := 0; i < rp.fieldCount; i++ {
fieldSize := int(Buffer.BytesToUint32(rp.cmd.dataBuffer, receiveOffset))
receiveOffset += (4 + fieldSize)
}
if rp.opCount == 0 {
// Bin data was not returned.
return newRecord(rp.cmd.node, key, nil, rp.generation, rp.expiration), nil
}

if rp.opCount > 0 {
bins = make(BinMap, rp.opCount)
}
receiveOffset := rp.cmd.dataOffset

bins := make(BinMap, rp.opCount)
for i := 0; i < rp.opCount; i++ {
opSize := int(Buffer.BytesToUint32(rp.cmd.dataBuffer, receiveOffset))
particleType := int(rp.cmd.dataBuffer[receiveOffset+5])
Expand Down
12 changes: 12 additions & 0 deletions recordset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package aerospike

import (
"fmt"
"iter"
"math/rand"
"reflect"
"runtime"
Expand Down Expand Up @@ -207,3 +208,14 @@ func (rcs *Recordset) sendError(err Error) {
}
}
}

// Records implements an iterator over the Recordset.
func (rcs *Recordset) Records() iter.Seq2[*Record, Error] {
return func(yield func(*Record, Error) bool) {
for res := range rcs.records {
if !yield(res.Record, res.Err) {
return
}
}
}
}
Loading
Loading