Skip to content

Commit

Permalink
Merge pull request #463 from aerospike/stage
Browse files Browse the repository at this point in the history
Go Client v8.0.1
  • Loading branch information
khaf authored Feb 5, 2025
2 parents 310ed26 + b1a7440 commit 1d3178b
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change History

## February 5 2025: v8.0.1

- **Fixes**
- [CLIENT-3305] New key struct causes incorrect generation increment in MRT.
- [CLIENT-3326] Fix UDF can read a record with an expired transaction.

## January 22 2025: v8.0.0

- **New Features**
Expand Down
122 changes: 115 additions & 7 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package aerospike

import (
"sync"
"sync/atomic"
"time"

sm "github.com/aerospike/aerospike-client-go/v8/internal/atomic/map"
"github.com/aerospike/aerospike-client-go/v8/types"
)

Expand All @@ -41,8 +41,8 @@ func init() {
// Transaction. Each command in the Transaction must use the same namespace.
type Txn struct {
id int64
reads sm.Map[*Key, *uint64]
writes sm.Map[*Key, struct{}]
reads keyMap[*uint64]
writes keyMap[struct{}]
state TxnState
namespace *string
timeout int
Expand All @@ -58,8 +58,8 @@ type Txn struct {
func NewTxn() *Txn {
return &Txn{
id: createTxnId(),
reads: *sm.New[*Key, *uint64](16),
writes: *sm.New[*Key, struct{}](16),
reads: *newKeyMap[*uint64](16),
writes: *newKeyMap[struct{}](16),
timeout: 0,
state: TxnStateOpen,
}
Expand All @@ -80,8 +80,8 @@ func NewTxnWithCapacity(readsCapacity, writesCapacity int) *Txn {

return &Txn{
id: createTxnId(),
reads: *sm.New[*Key, *uint64](readsCapacity),
writes: *sm.New[*Key, struct{}](writesCapacity),
reads: *newKeyMap[*uint64](readsCapacity),
writes: *newKeyMap[struct{}](writesCapacity),
timeout: 0,
state: TxnStateOpen,
}
Expand Down Expand Up @@ -306,3 +306,111 @@ func (txn *Txn) Clear() {
txn.reads.Clear()
txn.writes.Clear()
}

////////////////////////////////////////////////////////////////////////////
//
// Specialized internal data type to simplify key bookkeeping
//
////////////////////////////////////////////////////////////////////////////

type keyTupple[V any] struct {
k *Key
v V
}

// keyMap implements a keyMap with atomic semantics.
type keyMap[V any] struct {
m map[[20]byte]*keyTupple[V]
mutex sync.RWMutex
}

// New generates a new Map instance.
func newKeyMap[V any](length int) *keyMap[V] {
return &keyMap[V]{
m: make(map[[20]byte]*keyTupple[V], length),
}
}

// Exists atomically checks if a key exists in the map
func (m *keyMap[V]) Exists(k *Key) bool {
if k != nil {
m.mutex.RLock()
_, ok := m.m[k.digest]
m.mutex.RUnlock()
return ok
}
return false
}

// Get atomically retrieves an element from the Map.
func (m *keyMap[V]) Get(k *Key) V {
if k != nil {
m.mutex.RLock()
res, found := m.m[k.digest]
m.mutex.RUnlock()
if found {
return res.v
}
}

var zero V
return zero
}

// Set atomically sets an element in the Map.
// If idx is out of range, it will return an error.
func (m *keyMap[V]) Set(k *Key, v V) {
if k != nil {
m.mutex.Lock()
m.m[k.digest] = &keyTupple[V]{k: k, v: v}
m.mutex.Unlock()
}
}

// Clone copies the map and returns the copy.
func (m *keyMap[V]) Clone() map[*Key]V {
m.mutex.RLock()
res := make(map[*Key]V, len(m.m))
for _, v := range m.m {
res[v.k] = v.v
}
m.mutex.RUnlock()

return res
}

// Returns the keys from the map.
func (m *keyMap[V]) Keys() []*Key {
m.mutex.RLock()
res := make([]*Key, 0, len(m.m))
for _, v := range m.m {
res = append(res, v.k)
}
m.mutex.RUnlock()

return res
}

// Clear will remove all entries.
func (m *keyMap[V]) Clear() {
m.mutex.Lock()
m.m = make(map[[20]byte]*keyTupple[V], len(m.m))
m.mutex.Unlock()
}

// Delete will remove the key and return its value.
func (m *keyMap[V]) Delete(k *Key) V {
if k != nil {
m.mutex.Lock()
res, ok := m.m[k.digest]
delete(m.m, k.digest)
m.mutex.Unlock()

if ok {
return res.v
}
}

var zero V
return zero
}
54 changes: 40 additions & 14 deletions txn_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,60 +28,86 @@ const binNameDigests = "keyds"

func (tm *TxnMonitor) addKey(cluster *Cluster, policy *WritePolicy, cmdKey *Key) Error {
txn := policy.Txn
if err := txn.VerifyCommand(); err != nil {
return err
}

if txn.WriteExistsForKey(cmdKey) {
// Transaction monitor already contains this key.
return nil
}

ops := tm.getTranOps(txn, cmdKey)
ops, err := tm.getTranOps(txn, cmdKey)
if err != nil {
return err
}
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}

func (tm *TxnMonitor) addKeys(cluster *Cluster, policy *BatchPolicy, keys []*Key) Error {
ops := tm.getTranOpsFromKeys(policy.Txn, keys)
ops, err := tm.getTranOpsFromKeys(policy.Txn, keys)
if err != nil {
return err
}
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}

func (tm *TxnMonitor) addKeysFromRecords(cluster *Cluster, policy *BatchPolicy, records []BatchRecordIfc) Error {
ops := tm.getTranOpsFromBatchRecords(policy.Txn, records)
ops, err := tm.getTranOpsFromBatchRecords(policy.Txn, records)
if err != nil {
return err
}

if len(ops) > 0 {
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}
return nil
}

func (tm *TxnMonitor) getTranOps(txn *Txn, cmdKey *Key) []*Operation {
txn.SetNamespace(cmdKey.namespace)
func (tm *TxnMonitor) getTranOps(txn *Txn, cmdKey *Key) ([]*Operation, Error) {
if err := txn.SetNamespace(cmdKey.namespace); err != nil {
return nil, err
}

if txn.MonitorExists() {
return []*Operation{
ListAppendWithPolicyOp(txnOrderedListPolicy, binNameDigests, cmdKey.Digest()),
}
}, nil
} else {
return []*Operation{
PutOp(NewBin(binNameId, txn.Id())),
ListAppendWithPolicyOp(txnOrderedListPolicy, binNameDigests, cmdKey.Digest()),
}
}, nil
}
}

func (tm *TxnMonitor) getTranOpsFromKeys(txn *Txn, keys []*Key) []*Operation {
func (tm *TxnMonitor) getTranOpsFromKeys(txn *Txn, keys []*Key) ([]*Operation, Error) {
if err := txn.VerifyCommand(); err != nil {
return nil, err
}

list := make([]interface{}, 0, len(keys))

for _, key := range keys {
txn.SetNamespace(key.namespace)
if err := txn.SetNamespace(key.namespace); err != nil {
return nil, err
}
list = append(list, NewBytesValue(key.Digest()))
}
return tm.getTranOpsFromValueList(txn, list)
return tm.getTranOpsFromValueList(txn, list), nil
}

func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecordIfc) []*Operation {
func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecordIfc) ([]*Operation, Error) {
if err := txn.VerifyCommand(); err != nil {
return nil, err
}

list := make([]interface{}, 0, len(records))

for _, br := range records {
txn.SetNamespace(br.key().namespace)
if err := txn.SetNamespace(br.key().namespace); err != nil {
return nil, err
}

if br.BatchRec().hasWrite {
list = append(list, br.key().Digest())
Expand All @@ -90,9 +116,9 @@ func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecord

if len(list) == 0 {
// Readonly batch does not need to add key digests.
return nil
return nil, nil
}
return tm.getTranOpsFromValueList(txn, list)
return tm.getTranOpsFromValueList(txn, list), nil
}

func (tm *TxnMonitor) getTranOpsFromValueList(txn *Txn, list []interface{}) []*Operation {
Expand Down
Loading

0 comments on commit 1d3178b

Please sign in to comment.