diff --git a/CHANGELOG.md b/CHANGELOG.md index e992a3ad..6e8b29f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Change History +## January 17 2025: v8.0.0-beta.3 + +- **New Features** + - [CLIENT-3257] Implement iterator support for `Recordset`. + +- **Fixes** + - [CLIENT-3275] Return an error when MRT commit called, but transaction was already aborted. + - [CLIENT-3295] Duplicate parsing of fields in RecordParser.ParseRecord. + - [CLIENT-3261] New OnLockingOnly attribute for various write policies to write only if provisional does not exist. + - [CLIENT-3227] Clarify that BasePolicy.UseCompression requires Enterprise Edition. + - [CLIENT-3260] Add MRT_ALREADY_LOCKED and MRT_MONITOR_EXISTS error codes. + - [CLIENT-3292][CLIENT-3293] Update dependencies due to Snyk CVE reports. + - [CLIENT-3274] Fix MRT related client exception inconsistency between clients. + ## December 20 2024: v8.0.0-beta.2 - **New Features** diff --git a/batch_attr.go b/batch_attr.go index 9e3b127c..16ebd5f6 100644 --- a/batch_attr.go +++ b/batch_attr.go @@ -115,6 +115,7 @@ func (ba *batchAttr) setRead(rp *BatchPolicy) { case ReadModeSCAllowUnavailable: ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX } + ba.txnAttr = 0 ba.expiration = uint32(rp.ReadTouchTTLPercent) ba.generation = 0 ba.hasWrite = false @@ -142,6 +143,7 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) { case ReadModeSCAllowUnavailable: ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX } + ba.txnAttr = 0 ba.expiration = uint32(rp.ReadTouchTTLPercent) ba.generation = 0 ba.hasWrite = false @@ -174,6 +176,7 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = wp.Expiration ba.hasWrite = true ba.sendKey = wp.SendKey @@ -209,6 +212,10 @@ func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) { ba.writeAttr |= _INFO2_DURABLE_DELETE } + if wp.OnLockingOnly { + ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + if wp.CommitLevel == COMMIT_MASTER { ba.infoAttr |= _INFO3_COMMIT_MASTER } @@ -251,6 +258,7 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = up.Expiration ba.generation = 0 ba.hasWrite = true @@ -260,6 +268,10 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) { ba.writeAttr |= _INFO2_DURABLE_DELETE } + if up.OnLockingOnly { + ba.txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + if up.CommitLevel == COMMIT_MASTER { ba.infoAttr |= _INFO3_COMMIT_MASTER } @@ -270,6 +282,7 @@ func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) { ba.readAttr = 0 ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DELETE ba.infoAttr = 0 + ba.txnAttr = 0 ba.expiration = 0 ba.hasWrite = true ba.sendKey = dp.SendKey diff --git a/batch_command.go b/batch_command.go index 3ad301dd..69e0ab76 100644 --- a/batch_command.go +++ b/batch_command.go @@ -90,7 +90,7 @@ func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, deadline time cmd.splitRetry = true - // Run batch requests sequentially in same thread. + // Run batch requests sequentially in same goroutine. var ferr Error for _, batchNode := range batchNodes { command := ifc.cloneBatchCommand(batchNode) diff --git a/batch_udf_policy.go b/batch_udf_policy.go index e43ba9a3..59b38026 100644 --- a/batch_udf_policy.go +++ b/batch_udf_policy.go @@ -43,6 +43,16 @@ type BatchUDFPolicy struct { // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool + // SendKey determines to whether send user defined key in addition to hash digest on both reads and writes. // If true and the UDF writes a record, the key will be stored with the record on the server. // The default is to not send the user defined key. diff --git a/batch_write_policy.go b/batch_write_policy.go index 5574ad0a..83ea260b 100644 --- a/batch_write_policy.go +++ b/batch_write_policy.go @@ -67,6 +67,16 @@ type BatchWritePolicy struct { // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool + // SendKey determines to whether send user defined key in addition to hash digest on both reads and writes. // If the key is sent on a write, the key will be stored with the record on // the server. diff --git a/client.go b/client.go index ee94be51..55c6fc75 100644 --- a/client.go +++ b/client.go @@ -1474,7 +1474,7 @@ func (clnt *Client) Commit(txn *Txn) (CommitStatus, Error) { case TxnStateCommitted: return CommitStatusAlreadyCommitted, nil case TxnStateAborted: - return CommitStatusAlreadyAborted, nil + return CommitStatusAlreadyAborted, newError(types.TXN_ALREADY_ABORTED, "Transaction already aborted") } } @@ -1491,7 +1491,7 @@ func (clnt *Client) Abort(txn *Txn) (AbortStatus, Error) { case TxnStateVerified: return tr.Abort(&clnt.GetDefaultTxnRollPolicy().BatchPolicy) case TxnStateCommitted: - return AbortStatusAlreadyCommitted, nil + return AbortStatusAlreadyCommitted, newError(types.TXN_ALREADY_COMMITTED, "Transaction already committed") case TxnStateAborted: return AbortStatusAlreadyAborted, nil } diff --git a/command.go b/command.go index 57aec92f..3ff8de89 100644 --- a/command.go +++ b/command.go @@ -92,6 +92,8 @@ const ( _INFO4_MRT_ROLL_FORWARD = (1 << 1) // Roll back MRT. _INFO4_MRT_ROLL_BACK = (1 << 2) + // Must be able to lock record in transaction. + _INFO4_MRT_ON_LOCKING_ONLY = (1 << 4) // Interpret SC_READ bits in info3. // @@ -2892,6 +2894,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo generation := uint32(0) readAttr := 0 infoAttr := 0 + txnAttr := 0 switch policy.RecordExistsAction { case UPDATE: @@ -2923,6 +2926,10 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo writeAttr |= _INFO2_DURABLE_DELETE } + if policy.OnLockingOnly { + txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + // if (policy.Xdr) { // readAttr |= _INFO1_XDR; // } @@ -2932,7 +2939,7 @@ func (cmd *baseCommand) writeHeaderWrite(policy *WritePolicy, writeAttr, fieldCo cmd.dataBuffer[9] = byte(readAttr) cmd.dataBuffer[10] = byte(writeAttr) cmd.dataBuffer[11] = byte(infoAttr) - cmd.dataBuffer[12] = 0 // unused + cmd.dataBuffer[12] = byte(txnAttr) cmd.dataBuffer[13] = 0 // clear the result code cmd.dataOffset = 14 cmd.WriteUint32(generation) @@ -2954,6 +2961,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA readAttr := args.readAttr writeAttr := args.writeAttr infoAttr := 0 + txnAttr := 0 operationCount := len(args.operations) switch policy.RecordExistsAction { @@ -2986,6 +2994,10 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA writeAttr |= _INFO2_DURABLE_DELETE } + if policy.OnLockingOnly { + txnAttr |= _INFO4_MRT_ON_LOCKING_ONLY + } + // if (policy.xdr) { // readAttr |= _INFO1_XDR; // } @@ -3013,7 +3025,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA cmd.dataBuffer[9] = byte(readAttr) cmd.dataBuffer[10] = byte(writeAttr) cmd.dataBuffer[11] = byte(infoAttr) - cmd.dataBuffer[12] = 0 // unused + cmd.dataBuffer[12] = byte(txnAttr) cmd.dataBuffer[13] = 0 // clear the result code cmd.dataOffset = 14 cmd.WriteUint32(generation) @@ -3113,7 +3125,7 @@ func (cmd *baseCommand) writeKeyAttr( cmd.WriteByte(byte(attr.readAttr)) cmd.WriteByte(byte(attr.writeAttr)) cmd.WriteByte(byte(attr.infoAttr)) - cmd.WriteByte(0) // unused + cmd.WriteByte(byte(attr.txnAttr)) cmd.WriteByte(0) // clear the result code cmd.WriteUint32(attr.generation) cmd.WriteUint32(attr.expiration) diff --git a/go.mod b/go.mod index 4c9efd62..adbd27f4 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/aerospike/aerospike-client-go/v8 go 1.23 require ( - github.com/onsi/ginkgo/v2 v2.22.0 - github.com/onsi/gomega v1.36.1 + github.com/onsi/ginkgo/v2 v2.22.2 + github.com/onsi/gomega v1.36.2 + github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad github.com/yuin/gopher-lua v1.1.1 golang.org/x/sync v0.10.0 ) @@ -16,12 +17,10 @@ require ( github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/kr/pretty v0.3.1 // indirect github.com/stretchr/testify v1.10.0 // indirect - github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad // indirect - golang.org/x/net v0.32.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.28.0 // indirect - google.golang.org/protobuf v1.35.2 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 882d2ea1..65742b70 100644 --- a/go.sum +++ b/go.sum @@ -16,10 +16,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= -github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= -github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -27,10 +27,12 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= +github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= -golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= @@ -39,8 +41,8 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= -google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= -google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/policy.go b/policy.go index b19833d1..be084c1a 100644 --- a/policy.go +++ b/policy.go @@ -159,6 +159,8 @@ type BasePolicy struct { // This option will increase cpu and memory usage (for extra compressed buffers),but // decrease the size of data sent over the network. // + // Valid for Aerospike Server Enterprise Edition only. + // // Default: false UseCompression bool // = false diff --git a/record_parser.go b/record_parser.go index 2b953ff1..54d19a9e 100644 --- a/record_parser.go +++ b/record_parser.go @@ -158,23 +158,14 @@ func (rp *recordParser) parseTranDeadline(txn *Txn) { } } func (rp *recordParser) parseRecord(key *Key, isOperation bool) (*Record, Error) { - var bins BinMap - receiveOffset := rp.cmd.dataOffset - - // There can be fields in the response (setname etc). - // But for now, ignore them. Expose them to the API if needed in the future. - if rp.fieldCount > 0 { - // Just skip over all the fields - for i := 0; i < rp.fieldCount; i++ { - fieldSize := int(Buffer.BytesToUint32(rp.cmd.dataBuffer, receiveOffset)) - receiveOffset += (4 + fieldSize) - } + if rp.opCount == 0 { + // Bin data was not returned. + return newRecord(rp.cmd.node, key, nil, rp.generation, rp.expiration), nil } - if rp.opCount > 0 { - bins = make(BinMap, rp.opCount) - } + receiveOffset := rp.cmd.dataOffset + bins := make(BinMap, rp.opCount) for i := 0; i < rp.opCount; i++ { opSize := int(Buffer.BytesToUint32(rp.cmd.dataBuffer, receiveOffset)) particleType := int(rp.cmd.dataBuffer[receiveOffset+5]) diff --git a/recordset.go b/recordset.go index dcf2c88e..f9648411 100644 --- a/recordset.go +++ b/recordset.go @@ -16,6 +16,7 @@ package aerospike import ( "fmt" + "iter" "math/rand" "reflect" "runtime" @@ -207,3 +208,14 @@ func (rcs *Recordset) sendError(err Error) { } } } + +// Records implements an iterator over the Recordset. +func (rcs *Recordset) Records() iter.Seq2[*Record, Error] { + return func(yield func(*Record, Error) bool) { + for res := range rcs.records { + if !yield(res.Record, res.Err) { + return + } + } + } +} diff --git a/scan_test.go b/scan_test.go index c4d7aead..f71cc2ad 100644 --- a/scan_test.go +++ b/scan_test.go @@ -86,6 +86,47 @@ var _ = gg.Describe("Scan operations", func() { return counter } + // read all records from the channel and make sure all of them are returned + // if cancelCnt is set, it will cancel the scan after specified record count + var checkResultsIter = func(recordset *as.Recordset, cancelCnt int, rawCDT bool) int { + counter := 0 + for rec, err := range recordset.Records() { + gm.Expect(err).ToNot(gm.HaveOccurred()) + key, exists := keys[string(rec.Key.Digest())] + + gm.Expect(exists).To(gm.Equal(true)) + gm.Expect(key.Value().GetObject()).To(gm.Equal(rec.Key.Value().GetObject())) + + gm.Expect(rec.Bins[bin3.Name]).NotTo(gm.BeNil()) + gm.Expect(rec.Bins[bin4.Name]).NotTo(gm.BeNil()) + if rawCDT { + gm.Expect(rec.Bins[bin3.Name].(*as.RawBlobValue).ParticleType).To(gm.Equal(particleType.MAP)) + gm.Expect(rec.Bins[bin4.Name].(*as.RawBlobValue).ParticleType).To(gm.Equal(particleType.LIST)) + + // rewrite the record to the database to see if the values are correctly written + err := client.Put(nil, rec.Key, rec.Bins) + gm.Expect(err).ToNot(gm.HaveOccurred()) + } else { + gm.Expect(rec.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject())) + gm.Expect(rec.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject())) + + gm.Expect(rec.Bins[bin3.Name]).To(gm.Equal(map[interface{}]interface{}{"1": 1, "2": 2})) + gm.Expect(rec.Bins[bin4.Name]).To(gm.Equal([]interface{}{1, 2, 3})) + + delete(keys, string(rec.Key.Digest())) + } + + counter++ + // cancel scan abruptly + if cancelCnt != 0 && counter == cancelCnt { + recordset.Close() + } + } + + gm.Expect(counter).To(gm.BeNumerically(">", 0)) + return counter + } + gg.BeforeEach(func() { keys = make(map[string]*as.Key, keyCount) set = randString(50) @@ -124,6 +165,29 @@ var _ = gg.Describe("Scan operations", func() { gm.Expect(len(keys)).To(gm.Equal(0)) }) + gg.It("must Scan and paginate to get all records back from all partitions concurrently using Records() iterator", func() { + gm.Expect(len(keys)).To(gm.Equal(keyCount)) + + pf := as.NewPartitionFilterAll() + spolicy := as.NewScanPolicy() + spolicy.MaxRecords = 30 + + times := 0 + received := 0 + for received < keyCount { + times++ + recordset, err := client.ScanPartitions(spolicy, pf, ns, set) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + recs := checkResultsIter(recordset, 0, false) + gm.Expect(recs).To(gm.BeNumerically("<=", int(spolicy.MaxRecords))) + received += recs + } + + gm.Expect(times).To(gm.BeNumerically(">=", keyCount/spolicy.MaxRecords)) + gm.Expect(len(keys)).To(gm.Equal(0)) + }) + gg.It("must Scan and paginate to get all records back from all partitions concurrently, ONE BY ONE", func() { gm.Expect(len(keys)).To(gm.Equal(keyCount)) diff --git a/tools.go b/tools.go new file mode 100644 index 00000000..636354ce --- /dev/null +++ b/tools.go @@ -0,0 +1,23 @@ +//go:build tools +// +build tools + +// Copyright 2014-2022 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This package imports things required by build scripts, to force `go mod` to see them as dependencies +package aerospike + +import ( + _ "github.com/wadey/gocovmerge" +) diff --git a/txn.go b/txn.go index 0d986454..6368b7a8 100644 --- a/txn.go +++ b/txn.go @@ -15,7 +15,6 @@ package aerospike import ( - "fmt" "sync/atomic" "time" @@ -213,7 +212,7 @@ func (txn *Txn) prepareReadForBatchRecordsIfc(records []BatchRecordIfc) Error { // Verify that the MRT state allows future commands. func (txn *Txn) VerifyCommand() Error { if txn.state != TxnStateOpen { - return newError(types.FAIL_FORBIDDEN, fmt.Sprintf("Command not allowed in current MRT state: %#v", txn.state)) + return newError(types.COMMON_ERROR, "Issuing commands to this transaction is forbidden because it has been ended by a commit or abort") } return nil } diff --git a/txn_error.go b/txn_error.go index f6d65f4f..99b1b586 100644 --- a/txn_error.go +++ b/txn_error.go @@ -24,32 +24,31 @@ type TxnError struct { CommitError CommitError // Verify result for each read key in the MRT. May be nil if failure occurred before verify. - VerifyRecords []BatchRecordIfc + VerifyRecords []*BatchRecord // Roll forward/backward result for each write key in the MRT. May be nil if failure occurred before // roll forward/backward. - RollRecords []BatchRecordIfc + RollRecords []*BatchRecord } var _ error = &TxnError{} var _ Error = &TxnError{} -// func NewTxnCommitError(err CommitError, verifyRecords, rollRecords []BatchRecordIfc, cause Error) Error { -func NewTxnCommitError(err CommitError, cause Error) Error { +func NewTxnCommitError(err CommitError, verifyRecords, rollRecords []*BatchRecord, cause Error) Error { if cause == nil { res := newError(types.TXN_FAILED, string(err)) return &TxnError{ AerospikeError: *(res.(*AerospikeError)), CommitError: err, - // VerifyRecords: verifyRecords, - // RollRecords: rollRecords, + VerifyRecords: verifyRecords, + RollRecords: rollRecords, } } return &TxnError{ AerospikeError: *(cause.(*AerospikeError)), CommitError: err, - // VerifyRecords: verifyRecords, - // RollRecords: rollRecords, + VerifyRecords: verifyRecords, + RollRecords: rollRecords, } } diff --git a/txn_roll.go b/txn_roll.go index a5585471..028b14cf 100644 --- a/txn_roll.go +++ b/txn_roll.go @@ -37,7 +37,7 @@ func (txr *TxnRoll) Verify(verifyPolicy, rollPolicy *BatchPolicy) Error { txr.txn.SetState(TxnStateAborted) if err := txr.Roll(rollPolicy, _INFO4_MRT_ROLL_BACK); err != nil { - return NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, err) + return NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, txr.verifyRecords, txr.rollRecords, err) } @@ -47,11 +47,11 @@ func (txr *TxnRoll) Verify(verifyPolicy, rollPolicy *BatchPolicy) Error { txnKey := getTxnMonitorKey(txr.txn) if err := txr.Close(writePolicy, txnKey); err != nil { - return NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, err) + return NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, txr.verifyRecords, txr.rollRecords, err) } } - return NewTxnCommitError(CommitErrorVerifyFail, err) + return NewTxnCommitError(CommitErrorVerifyFail, txr.verifyRecords, txr.rollRecords, err) } txr.txn.SetState(TxnStateVerified) return nil @@ -65,7 +65,7 @@ func (txr *TxnRoll) Commit(rollPolicy *BatchPolicy) (CommitStatus, Error) { if txr.txn.MonitorExists() { if err := txr.MarkRollForward(writePolicy, txnKey); err != nil { - aec := NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, err) + aec := NewTxnCommitError(CommitErrorVerifyFailAbortAbandoned, txr.verifyRecords, txr.rollRecords, err) if err.resultCode() == types.MRT_ABORTED { aec.markInDoubt(false) diff --git a/types/result_code.go b/types/result_code.go index 49bea226..d580334a 100644 --- a/types/result_code.go +++ b/types/result_code.go @@ -21,6 +21,13 @@ import "fmt" type ResultCode int const ( + // Multi-record transaction failed + // Multi-record transaction commit called, but the transaction was already aborted. + TXN_ALREADY_ABORTED ResultCode = -24 + + // Multi-record transaction abort called, but the transaction was already committed. + TXN_ALREADY_COMMITTED ResultCode = -23 + // Multi-record transaction failed. TXN_FAILED ResultCode = -22 @@ -272,6 +279,12 @@ const ( // MRT was already aborted. MRT_ABORTED ResultCode = 125 + // This record has been locked by a previous update in this transaction. + MRT_ALREADY_LOCKED ResultCode = 126 + + // This transaction has already started. Writing to the same transaction with independent goroutines is unsafe. + MRT_MONITOR_EXISTS ResultCode = 127 + // BATCH_DISABLED defines batch functionality has been disabled. BATCH_DISABLED ResultCode = 150 @@ -333,6 +346,13 @@ const ( // ResultCodeToString returns a human readable errors message based on the result code. func ResultCodeToString(resultCode ResultCode) string { switch ResultCode(resultCode) { + + case TXN_ALREADY_ABORTED: + return "Multi-record transaction commit called, but the transaction was already aborted" + + case TXN_ALREADY_COMMITTED: + return "Multi-record transaction abort called, but the transaction was already committed" + case TXN_FAILED: return "Multi-record transaction failed" @@ -578,6 +598,12 @@ func ResultCodeToString(resultCode ResultCode) string { case MRT_ABORTED: return "MRT was already aborted" + case MRT_ALREADY_LOCKED: + return "This record has been locked by a previous update in this transaction" + + case MRT_MONITOR_EXISTS: + return "This transaction has already started. Writing to the same transaction with independent goroutines is unsafe" + case BATCH_DISABLED: return "Batch functionality has been disabled" @@ -642,6 +668,10 @@ func ResultCodeToString(resultCode ResultCode) string { func (rc ResultCode) String() string { switch rc { + case TXN_ALREADY_ABORTED: + return "TXN_ALREADY_ABORTED" + case TXN_ALREADY_COMMITTED: + return "TXN_ALREADY_COMMITTED" case TXN_FAILED: return "TXN_FAILED" case GRPC_ERROR: @@ -806,6 +836,10 @@ func (rc ResultCode) String() string { return "MRT_COMMITTED" case MRT_ABORTED: return "MRT_ABORTED" + case MRT_ALREADY_LOCKED: + return "MRT_ALREADY_LOCKED" + case MRT_MONITOR_EXISTS: + return "MRT_MONITOR_EXISTS" case BATCH_DISABLED: return "BATCH_DISABLED" case BATCH_MAX_REQUESTS_EXCEEDED: diff --git a/write_policy.go b/write_policy.go index 32a96cf5..7eea7481 100644 --- a/write_policy.go +++ b/write_policy.go @@ -74,6 +74,16 @@ type WritePolicy struct { // This prevents deleted records from reappearing after node failures. // Valid for Aerospike Server Enterprise Edition 3.10+ only. DurableDelete bool + + // Execute the write command only if the record is not already locked by this transaction. + // If this field is true and the record is already locked by this transaction, the command + // will return an error with the [types.MRT_ALREADY_LOCKED] error code. + // + // This field is useful for safely retrying non-idempotent writes as an alternative to simply + // aborting the transaction. + // + // Default: false + OnLockingOnly bool } // NewWritePolicy initializes a new WritePolicy instance with default parameters.