From 77429c839e1ea7cde6b0da37a6d9c878b44c446a Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Sat, 24 Jan 2015 10:02:49 +0100 Subject: [PATCH 1/3] marshalling support for KVS operations major code cleanup: * removed lock in Scan and Query operations for Cancellation check * uses binPool for binMaps in various operations --- CHANGELOG.md | 43 +++ aerospike_bench_test.go | 84 +++++- batch_command.go | 10 +- batch_command_exists.go | 4 - batch_command_get.go | 4 - bin.go | 41 ++- client.go | 505 ++++++++++++++--------------------- client_object_test.go | 335 +++++++++++++++++++++++ delete_command.go | 7 +- key.go | 4 +- marshal.go | 217 +++++++++++++++ operate_command.go | 3 - packer.go | 27 +- query_record_command.go | 19 +- query_test.go | 2 +- read_command.go | 333 ++++++++++++++++++++++- read_header_command.go | 7 +- recordset.go | 58 +++- scan_command.go | 18 +- scan_test.go | 36 ++- server_command.go | 4 - tools/benchmark/benchmark.go | 101 ++++++- touch_command.go | 7 +- types/pool.go | 9 +- types/result_code.go | 14 +- udf_test.go | 10 +- value.go | 29 +- write_command.go | 7 +- 28 files changed, 1474 insertions(+), 464 deletions(-) create mode 100644 client_object_test.go create mode 100644 marshal.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 405f75b1..2503cc1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,48 @@ # Change history +## Feb 13 2015 : v1.4.0 + + This is a major release, and makes using the client much easier to develop web applications. + + * **New Features** + + * Added Marshalling Support. + * Added `Recordset.Results()`. Consumers of a recordset do not have to implement a select anymore. Instead of: + ```go + recordset, err := client.ScanAll(...) + L: + for { + select { + case r := <-recordset.Record: + if r == nil { + break L + } + // process record here + case e := <-recordset.Errors: + // handle error here + } + } + ``` + + one should only range on `recordset.Results()`: + + ```go + recordset, err := client.ScanAll(...) + for res := range recordset.Results() { + if res.Err != nil { + // handle error here + } else { + // process record here + fmt.Println(res.Record.Bins) + } + } + ``` + Use of the old pattern is discouraged and deprecated, and direct access to recordset.Records and recordset.Errors will be removed in a future release. + + * **Improvements** + + * Custom Types are now allowed as bin values. + ## Jan 14 2015 : v1.3.0 * **Breaking Changes** diff --git a/aerospike_bench_test.go b/aerospike_bench_test.go index e0a5ec9d..3644ae59 100644 --- a/aerospike_bench_test.go +++ b/aerospike_bench_test.go @@ -15,6 +15,9 @@ package aerospike_test import ( + "bytes" + "runtime" + . "github.com/aerospike/aerospike-client-go" "testing" @@ -23,18 +26,39 @@ import ( var r *Record var err error -func benchGet(times int, client *Client, key *Key) { +type OBJECT struct { + Price int + DBName string + Blob []byte +} + +func benchGet(times int, client *Client, key *Key, obj interface{}) { for i := 0; i < times; i++ { - if r, err = client.Get(nil, key); err != nil { - panic(err) + if obj == nil { + if r, err = client.Get(nil, key); err != nil { + panic(err) + } + } else { + if err = client.GetObject(nil, key, obj); err != nil { + panic(err) + } } } } -func benchPut(times int, client *Client, key *Key, bins []*Bin, wp *WritePolicy) { +func benchPut(times int, client *Client, key *Key, wp *WritePolicy, obj interface{}) { for i := 0; i < times; i++ { - if err = client.PutBins(wp, key, bins...); err != nil { - panic(err) + if obj == nil { + dbName := NewBin("dbname", "CouchDB") + price := NewBin("price", 0) + keywords := NewBin("keywords", []string{"concurrent", "fast"}) + if err = client.PutBins(wp, key, dbName, price, keywords); err != nil { + panic(err) + } + } else { + if err = client.PutObject(wp, key, obj); err != nil { + panic(err) + } } } } @@ -46,13 +70,34 @@ func Benchmark_Get(b *testing.B) { } key, _ := NewKey("test", "databases", "Aerospike") + obj := &OBJECT{198, "Jack Shaftoe and Company", []byte(bytes.Repeat([]byte{32}, 1000))} + client.PutObject(nil, key, obj) + b.N = 100000 + runtime.GC() b.ResetTimer() - benchGet(b.N, client, key) + benchGet(b.N, client, key, nil) +} + +func Benchmark_GetObject(b *testing.B) { + client, err := NewClientWithPolicy(clientPolicy, *host, *port) + if err != nil { + b.Fail() + } + + key, _ := NewKey("test", "databases", "Aerospike") + + obj := &OBJECT{198, "Jack Shaftoe and Company", []byte(bytes.Repeat([]byte{32}, 1000))} + client.PutObject(nil, key, obj) + + b.N = 100000 + runtime.GC() + b.ResetTimer() + benchGet(b.N, client, key, obj) } func Benchmark_Put(b *testing.B) { - client, err := NewClient("localhost", 3000) + client, err := NewClient(*host, *port) if err != nil { b.Fail() } @@ -60,11 +105,24 @@ func Benchmark_Put(b *testing.B) { key, _ := NewKey("test", "databases", "Aerospike") writepolicy := NewWritePolicy(0, 0) - dbName := NewBin("dbname", "CouchDB") - price := NewBin("price", 0) - // keywords := NewBin("keywords", []string{"concurrent", "fast"}) - // speeds := NewBin("keywords", []int{18, 251}) + b.N = 100000 + runtime.GC() + b.ResetTimer() + benchPut(b.N, client, key, writepolicy, nil) +} + +func Benchmark_PutObject(b *testing.B) { + client, err := NewClient(*host, *port) + if err != nil { + b.Fail() + } + + obj := &OBJECT{198, "Jack Shaftoe and Company", []byte(bytes.Repeat([]byte{32}, 1000))} + key, _ := NewKey("test", "databases", "Aerospike") + writepolicy := NewWritePolicy(0, 0) + b.N = 100000 + runtime.GC() b.ResetTimer() - benchPut(b.N, client, key, []*Bin{dbName, price}, writepolicy) + benchPut(b.N, client, key, writepolicy, obj) } diff --git a/batch_command.go b/batch_command.go index 37239d27..63431a1c 100644 --- a/batch_command.go +++ b/batch_command.go @@ -18,7 +18,7 @@ import ( "fmt" . "github.com/aerospike/aerospike-client-go/types" - . "github.com/aerospike/aerospike-client-go/types/atomic" + // . "github.com/aerospike/aerospike-client-go/types/atomic" Buffer "github.com/aerospike/aerospike-client-go/utils/buffer" ) @@ -28,22 +28,18 @@ const ( type multiCommand interface { Stop() - IsValid() bool } type baseMultiCommand struct { *baseCommand recordset *Recordset - - valid *AtomicBool } func newMultiCommand(node *Node, recordset *Recordset) *baseMultiCommand { return &baseMultiCommand{ baseCommand: &baseCommand{node: node}, recordset: recordset, - valid: NewAtomicBool(true), } } @@ -132,7 +128,3 @@ func (cmd *baseMultiCommand) readBytes(length int) error { cmd.dataOffset += length return nil } - -func (cmd *baseMultiCommand) IsValid() bool { - return cmd.valid.Get() -} diff --git a/batch_command_exists.go b/batch_command_exists.go index 3d492f85..74f6c7d8 100644 --- a/batch_command_exists.go +++ b/batch_command_exists.go @@ -60,10 +60,6 @@ func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int) cmd.dataOffset = 0 for cmd.dataOffset < receiveSize { - if !cmd.IsValid() { - return false, NewAerospikeError(QUERY_TERMINATED) - } - if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil { return false, err } diff --git a/batch_command_get.go b/batch_command_get.go index 9bcd64e0..f642b204 100644 --- a/batch_command_get.go +++ b/batch_command_get.go @@ -118,10 +118,6 @@ func (cmd *batchCommandGet) parseRecord(key *Key, opCount int, generation int, e var bins map[string]interface{} for i := 0; i < opCount; i++ { - if !cmd.IsValid() { - return nil, NewAerospikeError(QUERY_TERMINATED) - } - if err := cmd.readBytes(8); err != nil { return nil, err } diff --git a/bin.go b/bin.go index 4bb56f83..dfde729f 100644 --- a/bin.go +++ b/bin.go @@ -14,6 +14,10 @@ package aerospike +import ( + . "github.com/aerospike/aerospike-client-go/types" +) + // BinMap is used to define a map of bin names to values. type BinMap map[string]interface{} @@ -35,15 +39,36 @@ func NewBin(name string, value interface{}) *Bin { } } -func binMapToBins(bins BinMap) []*Bin { - binList := make([]*Bin, 0, len(bins)) - for k, v := range bins { - binList = append(binList, NewBin(k, v)) - } - return binList -} - // String implements Stringer interface. func (bn *Bin) String() string { return bn.Name + ":" + bn.Value.String() } + +func binMapToBins(bins []*Bin, binMap BinMap) []*Bin { + i := 0 + for k, v := range binMap { + bins[i].Name = k + bins[i].Value = NewValue(v) + i++ + } + + return bins +} + +// pool Bins so that we won't have to allocate them everytime +var binPool = NewPool(512) + +func init() { + binPool.New = func(params ...interface{}) interface{} { + size := params[0].(int) + bins := make([]*Bin, size, size) + for i := range bins { + bins[i] = &Bin{} + } + return bins + } + + binPool.IsUsable = func(obj interface{}, params ...interface{}) bool { + return len(obj.([]*Bin)) >= params[0].(int) + } +} diff --git a/client.go b/client.go index 8b0cae41..360d7513 100644 --- a/client.go +++ b/client.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io/ioutil" + "reflect" "strconv" "strings" "sync" @@ -36,7 +37,7 @@ type Client struct { DefaultPolicy *BasePolicy // DefaultWritePolicy is used for all write commands without a specific policy. DefaultWritePolicy *WritePolicy - // DefaultScanPolicy is used for all quey commands without a specific policy. + // DefaultScanPolicy is used for all query commands without a specific policy. DefaultScanPolicy *ScanPolicy // DefaultQueryPolicy is used for all scan commands without a specific policy. DefaultQueryPolicy *QueryPolicy @@ -54,14 +55,14 @@ func NewClient(hostname string, port int) (*Client, error) { } // NewClientWithPolicy generates a new Client using the specified ClientPolicy. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func NewClientWithPolicy(policy *ClientPolicy, hostname string, port int) (*Client, error) { return NewClientWithPolicyAndHost(policy, NewHost(hostname, port)) } // NewClientWithPolicyAndHost generates a new Client the specified ClientPolicy and // sets up the cluster using the provided hosts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client, error) { if policy == nil { policy = NewClientPolicy() @@ -119,28 +120,40 @@ func (clnt *Client) GetNodeNames() []string { // Put writes record bin(s) to the server. // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. -// If the policy is nil, a default policy will be generated. -func (clnt *Client) Put(policy *WritePolicy, key *Key, bins BinMap) error { - return clnt.PutBins(policy, key, binMapToBins(bins)...) +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) Put(policy *WritePolicy, key *Key, binMap BinMap) error { + // get a slice of pre-allocated and pooled bins + bins := binPool.Get(len(binMap)).([]*Bin) + res := clnt.PutBins(policy, key, binMapToBins(bins[:len(binMap)], binMap)...) + binPool.Put(bins) + return res } // PutBins writes record bin(s) to the server. // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. // This method avoids using the BinMap allocation and iteration and is lighter on GC. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) PutBins(policy *WritePolicy, key *Key, bins ...*Bin) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newWriteCommand(clnt.cluster, policy, key, bins, WRITE) return command.Execute() } +// PutObject writes record bin(s) to the server. +// The policy specifies the transaction timeout, record expiration and how the transaction is +// handled when the record already exists. +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) PutObject(policy *WritePolicy, key *Key, obj interface{}) (err error) { + policy = clnt.getUsableWritePolicy(policy) + + bins := marshal(obj) + command := newWriteCommand(clnt.cluster, policy, key, bins, WRITE) + res := command.Execute() + binPool.Put(bins) + return res +} + //------------------------------------------------------- // Operations string //------------------------------------------------------- @@ -149,20 +162,18 @@ func (clnt *Client) PutBins(policy *WritePolicy, key *Key, bins ...*Bin) error { // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. // This call only works for string and []byte values. -// If the policy is nil, a default policy will be generated. -func (clnt *Client) Append(policy *WritePolicy, key *Key, bins BinMap) error { - return clnt.AppendBins(policy, key, binMapToBins(bins)...) +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) Append(policy *WritePolicy, key *Key, binMap BinMap) error { + // get a slice of pre-allocated and pooled bins + bins := binPool.Get(len(binMap)).([]*Bin) + res := clnt.AppendBins(policy, key, binMapToBins(bins[:len(binMap)], binMap)...) + binPool.Put(bins) + return res } // AppendBins works the same as Append, but avoids BinMap allocation and iteration. func (clnt *Client) AppendBins(policy *WritePolicy, key *Key, bins ...*Bin) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newWriteCommand(clnt.cluster, policy, key, bins, APPEND) return command.Execute() } @@ -171,20 +182,17 @@ func (clnt *Client) AppendBins(policy *WritePolicy, key *Key, bins ...*Bin) erro // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. // This call works only for string and []byte values. -// If the policy is nil, a default policy will be generated. -func (clnt *Client) Prepend(policy *WritePolicy, key *Key, bins BinMap) error { - return clnt.PrependBins(policy, key, binMapToBins(bins)...) +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) Prepend(policy *WritePolicy, key *Key, binMap BinMap) error { + bins := binPool.Get(len(binMap)).([]*Bin) + res := clnt.PrependBins(policy, key, binMapToBins(bins[:len(binMap)], binMap)...) + binPool.Put(bins) + return res } // PrependBins works the same as Prepend, but avoids BinMap allocation and iteration. func (clnt *Client) PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newWriteCommand(clnt.cluster, policy, key, bins, PREPEND) return command.Execute() } @@ -197,20 +205,18 @@ func (clnt *Client) PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) err // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. // This call only works for integer values. -// If the policy is nil, a default policy will be generated. -func (clnt *Client) Add(policy *WritePolicy, key *Key, bins BinMap) error { - return clnt.AddBins(policy, key, binMapToBins(bins)...) +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) Add(policy *WritePolicy, key *Key, binMap BinMap) error { + // get a slice of pre-allocated and pooled bins + bins := binPool.Get(len(binMap)).([]*Bin) + res := clnt.AddBins(policy, key, binMapToBins(bins[:len(binMap)], binMap)...) + binPool.Put(bins) + return res } // AddBins works the same as Add, but avoids BinMap allocation and iteration. func (clnt *Client) AddBins(policy *WritePolicy, key *Key, bins ...*Bin) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newWriteCommand(clnt.cluster, policy, key, bins, ADD) return command.Execute() } @@ -221,15 +227,9 @@ func (clnt *Client) AddBins(policy *WritePolicy, key *Key, bins ...*Bin) error { // Delete deletes a record for specified key. // The policy specifies the transaction timeout. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Delete(policy *WritePolicy, key *Key) (bool, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newDeleteCommand(clnt.cluster, policy, key) err := command.Execute() return command.Existed(), err @@ -244,13 +244,7 @@ func (clnt *Client) Delete(policy *WritePolicy, key *Key) (bool, error) { // policy's expiration. // If the record doesn't exist, it will return an error. func (clnt *Client) Touch(policy *WritePolicy, key *Key) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newTouchCommand(clnt.cluster, policy, key) return command.Execute() } @@ -261,34 +255,22 @@ func (clnt *Client) Touch(policy *WritePolicy, key *Key) error { // Exists determine if a record key exists. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Exists(policy *BasePolicy, key *Key) (bool, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) command := newExistsCommand(clnt.cluster, policy, key) err := command.Execute() return command.Exists(), err } // BatchExists determines if multiple record keys exist in one batch request. -// The returned array bool is in positional order with the original key array order. +// The returned boolean array is in positional order with the original key array order. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) BatchExists(policy *BasePolicy, keys []*Key) ([]bool, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) - // same array can be used without sychronization; + // same array can be used without synchronization; // when a key exists, the corresponding index will be marked true existsArray := make([]bool, len(keys)) @@ -309,15 +291,10 @@ func (clnt *Client) BatchExists(policy *BasePolicy, keys []*Key) ([]bool, error) // Get reads a record header and bins for specified key. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Record, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) + command := newReadCommand(clnt.cluster, policy, key, binNames) if err := command.Execute(); err != nil { return nil, err @@ -325,18 +302,28 @@ func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Reco return command.GetRecord(), nil } +// GetObject reads a record for specified key and puts the result into the provided object. +// The policy can be used to specify timeouts. +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) GetObject(policy *BasePolicy, key *Key, obj interface{}) error { + policy = clnt.getUsablePolicy(policy) + + binNames := objectMappings.getFields(reflect.ValueOf(obj).Type().Elem().Name()) + command := newReadCommand(clnt.cluster, policy, key, binNames) + command.object = obj + if err := command.Execute(); err != nil { + return err + } + return nil +} + // GetHeader reads a record generation and expiration only for specified key. // Bins are not read. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) GetHeader(policy *BasePolicy, key *Key) (*Record, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) + command := newReadHeaderCommand(clnt.cluster, policy, key) if err := command.Execute(); err != nil { return nil, err @@ -352,17 +339,11 @@ func (clnt *Client) GetHeader(policy *BasePolicy, key *Key) (*Record, error) { // The returned records are in positional order with the original key array order. // If a key is not found, the positional record will be nil. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) BatchGet(policy *BasePolicy, keys []*Key, binNames ...string) ([]*Record, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) - // same array can be used without sychronization; + // same array can be used without synchronization; // when a key exists, the corresponding index will be set to record records := make([]*Record, len(keys)) @@ -386,17 +367,11 @@ func (clnt *Client) BatchGet(policy *BasePolicy, keys []*Key, binNames ...string // The returned records are in positional order with the original key array order. // If a key is not found, the positional record will be nil. // The policy can be used to specify timeouts. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) BatchGetHeader(policy *BasePolicy, keys []*Key) ([]*Record, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) - // same array can be used without sychronization; + // same array can be used without synchronization; // when a key exists, the corresponding index will be set to record records := make([]*Record, len(keys)) @@ -421,15 +396,9 @@ func (clnt *Client) BatchGetHeader(policy *BasePolicy, keys []*Key) ([]*Record, // // Write operations are always performed first, regardless of operation order // relative to read operations. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newOperateCommand(clnt.cluster, policy, key, operations) if err := command.Execute(); err != nil { return nil, err @@ -444,7 +413,7 @@ func (clnt *Client) Operate(policy *WritePolicy, key *Key, operations ...*Operat // ScanAll reads all records in specified namespace and set from all nodes. // If the policy's concurrentNodes is specified, each server node will be read in // parallel. Otherwise, server nodes are read sequentially. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) ScanAll(apolicy *ScanPolicy, namespace string, setName string, binNames ...string) (*Recordset, error) { policy := *clnt.getUsableScanPolicy(apolicy) @@ -492,18 +461,19 @@ func (clnt *Client) ScanAll(apolicy *ScanPolicy, namespace string, setName strin } // ScanNode reads all records in specified namespace and set for one node only. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) ScanNode(apolicy *ScanPolicy, node *Node, namespace string, setName string, binNames ...string) (*Recordset, error) { policy := *clnt.getUsableScanPolicy(apolicy) // results channel must be async for performance res := newRecordset(policy.RecordQueueSize, 1) - return res, clnt.scanNode(&policy, node, res, namespace, setName, binNames...) + go clnt.scanNode(&policy, node, res, namespace, setName, binNames...) + return res, nil } // ScanNode reads all records in specified namespace and set for one node only. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) scanNode(policy *ScanPolicy, node *Node, recordset *Recordset, namespace string, setName string, binNames ...string) error { if policy.WaitUntilMigrationsAreOver { // wait until migrations on node are finished @@ -526,15 +496,9 @@ func (clnt *Client) scanNode(policy *ScanPolicy, node *Node, recordset *Recordse // within a single bin. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) GetLargeList(policy *WritePolicy, key *Key, binName string, userModule string) *LargeList { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) return NewLargeList(clnt, policy, key, binName, userModule) } @@ -543,15 +507,9 @@ func (clnt *Client) GetLargeList(policy *WritePolicy, key *Key, binName string, // within a single bin. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) GetLargeMap(policy *WritePolicy, key *Key, binName string, userModule string) *LargeMap { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) return NewLargeMap(clnt, policy, key, binName, userModule) } @@ -560,15 +518,9 @@ func (clnt *Client) GetLargeMap(policy *WritePolicy, key *Key, binName string, u // within a single bin. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) GetLargeSet(policy *WritePolicy, key *Key, binName string, userModule string) *LargeSet { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) return NewLargeSet(clnt, policy, key, binName, userModule) } @@ -577,15 +529,9 @@ func (clnt *Client) GetLargeSet(policy *WritePolicy, key *Key, binName string, u // within a single bin. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) GetLargeStack(policy *WritePolicy, key *Key, binName string, userModule string) *LargeStack { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) return NewLargeStack(clnt, policy, key, binName, userModule) } @@ -600,15 +546,9 @@ func (clnt *Client) GetLargeStack(policy *WritePolicy, key *Key, binName string, // RegisterTask instance. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) RegisterUDFFromFile(policy *WritePolicy, clientPath string, serverPath string, language Language) (*RegisterTask, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) udfBody, err := ioutil.ReadFile(clientPath) if err != nil { return nil, err @@ -623,15 +563,9 @@ func (clnt *Client) RegisterUDFFromFile(policy *WritePolicy, clientPath string, // RegisterTask instance. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath string, language Language) (*RegisterTask, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) content := base64.StdEncoding.EncodeToString(udfBody) var strCmd bytes.Buffer @@ -697,15 +631,9 @@ func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath // RemoveTask instance. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) var strCmd bytes.Buffer // errors are to remove errcheck warnings // they will always be nil as stated in golang docs @@ -745,15 +673,10 @@ func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask, // ListUDF lists all packages containing user defined functions in the server. // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) { - if policy == nil { - if clnt.DefaultPolicy != nil { - policy = clnt.DefaultPolicy - } else { - policy = NewPolicy() - } - } + policy = clnt.getUsablePolicy(policy) + var strCmd bytes.Buffer // errors are to remove errcheck warnings // they will always be nil as stated in golang docs @@ -821,15 +744,9 @@ func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) { // udf file = /.lua // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Execute(policy *WritePolicy, key *Key, packageName string, functionName string, args ...Value) (interface{}, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) command := newExecuteCommand(clnt.cluster, policy, key, packageName, functionName, args) if err := command.Execute(); err != nil { return nil, err @@ -875,20 +792,14 @@ func mapContainsKeyPartial(theMap map[string]interface{}, key string) (bool, int // ExecuteTask instance. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) ExecuteUDF(policy *QueryPolicy, statement *Statement, packageName string, functionName string, functionArgs ...Value, ) (*ExecuteTask, error) { - if policy == nil { - if clnt.DefaultQueryPolicy != nil { - policy = clnt.DefaultQueryPolicy - } else { - policy = NewQueryPolicy() - } - } + policy = clnt.getUsableQueryPolicy(policy) nodes := clnt.cluster.GetNodes() if len(nodes) == 0 { @@ -923,15 +834,9 @@ func (clnt *Client) ExecuteUDF(policy *QueryPolicy, // Recordset.Records channel. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) Query(policy *QueryPolicy, statement *Statement) (*Recordset, error) { - if policy == nil { - if clnt.DefaultQueryPolicy != nil { - policy = clnt.DefaultQueryPolicy - } else { - policy = NewQueryPolicy() - } - } + policy = clnt.getUsableQueryPolicy(policy) nodes := clnt.cluster.GetNodes() if len(nodes) == 0 { @@ -964,15 +869,9 @@ func (clnt *Client) Query(policy *QueryPolicy, statement *Statement) (*Recordset // record channel. // // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) QueryNode(policy *QueryPolicy, node *Node, statement *Statement) (*Recordset, error) { - if policy == nil { - if clnt.DefaultQueryPolicy != nil { - policy = clnt.DefaultQueryPolicy - } else { - policy = NewQueryPolicy() - } - } + policy = clnt.getUsableQueryPolicy(policy) if policy.WaitUntilMigrationsAreOver { // wait until all migrations are finished @@ -1017,7 +916,7 @@ func (clnt *Client) QueryNode(policy *QueryPolicy, node *Node, statement *Statem // The user can optionally wait for command completion by using the returned // IndexTask instance. // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) CreateIndex( policy *WritePolicy, namespace string, @@ -1026,13 +925,7 @@ func (clnt *Client) CreateIndex( binName string, indexType IndexType, ) (*IndexTask, error) { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) var strCmd bytes.Buffer _, err := strCmd.WriteString("sindex-create:ns=") @@ -1078,20 +971,14 @@ func (clnt *Client) CreateIndex( // DropIndex deletes a secondary index. // This method is only supported by Aerospike 3 servers. -// If the policy is nil, a default policy will be generated. +// If the policy is nil, the default relevant policy will be used. func (clnt *Client) DropIndex( policy *WritePolicy, namespace string, setName string, indexName string, ) error { - if policy == nil { - if clnt.DefaultWritePolicy != nil { - policy = clnt.DefaultWritePolicy - } else { - policy = NewWritePolicy(0, 0) - } - } + policy = clnt.getUsableWritePolicy(policy) var strCmd bytes.Buffer _, err := strCmd.WriteString("sindex-delete:ns=") _, err = strCmd.WriteString(namespace) @@ -1133,13 +1020,8 @@ func (clnt *Client) DropIndex( // Create user with password and roles. Clear-text password will be hashed using bcrypt // before sending to server. func (clnt *Client) CreateUser(policy *AdminPolicy, user string, password string, roles []string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + hash, err := hashPassword(password) if err != nil { return err @@ -1150,26 +1032,16 @@ func (clnt *Client) CreateUser(policy *AdminPolicy, user string, password string // Remove user from cluster. func (clnt *Client) DropUser(policy *AdminPolicy, user string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.dropUser(clnt.cluster, policy, user) } // Change user's password. Clear-text password will be hashed using bcrypt before sending to server. func (clnt *Client) ChangePassword(policy *AdminPolicy, user string, password string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + if clnt.cluster.user == "" { return NewAerospikeError(INVALID_USER) } @@ -1198,65 +1070,40 @@ func (clnt *Client) ChangePassword(policy *AdminPolicy, user string, password st // Add roles to user's list of roles. func (clnt *Client) GrantRoles(policy *AdminPolicy, user string, roles []string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.grantRoles(clnt.cluster, policy, user, roles) } // Remove roles from user's list of roles. func (clnt *Client) RevokeRoles(policy *AdminPolicy, user string, roles []string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.revokeRoles(clnt.cluster, policy, user, roles) } // Replace user's list of roles. func (clnt *Client) ReplaceRoles(policy *AdminPolicy, user string, roles []string) error { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.replaceRoles(clnt.cluster, policy, user, roles) } // Retrieve roles for a given user. func (clnt *Client) QueryUser(policy *AdminPolicy, user string) (*UserRoles, error) { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.queryUser(clnt.cluster, policy, user) } // Retrieve all users and their roles. func (clnt *Client) QueryUsers(policy *AdminPolicy) ([]*UserRoles, error) { - if policy == nil { - if clnt.DefaultAdminPolicy != nil { - policy = clnt.DefaultAdminPolicy - } else { - policy = NewAdminPolicy() - } - } + policy = clnt.getUsableAdminPolicy(policy) + command := newAdminCommand() return command.queryUsers(clnt.cluster, policy) } @@ -1291,22 +1138,6 @@ func (clnt *Client) sendInfoCommand(policy *WritePolicy, command string) (map[st return results, nil } -//------------------------------------------------------- -// Utility Functions -//------------------------------------------------------- - -// mergeErrors merges several errors into one -func mergeErrors(errs []error) error { - if errs == nil || len(errs) == 0 { - return nil - } - var msg bytes.Buffer - for _, err := range errs { - msg.WriteString(err.Error() + "\n") - } - return errors.New(msg.String()) -} - // batchExecute Uses sync.WaitGroup to run commands using multiple goroutines, // and waits for their return func (clnt *Client) batchExecute(keys []*Key, cmdGen func(node *Node, bns *batchNamespace) command) error { @@ -1339,6 +1170,28 @@ func (clnt *Client) batchExecute(keys []*Key, cmdGen func(node *Node, bns *batch return mergeErrors(errs) } +func (clnt *Client) getUsablePolicy(policy *BasePolicy) *BasePolicy { + if policy == nil { + if clnt.DefaultPolicy != nil { + return clnt.DefaultPolicy + } else { + return NewPolicy() + } + } + return policy +} + +func (clnt *Client) getUsableWritePolicy(policy *WritePolicy) *WritePolicy { + if policy == nil { + if clnt.DefaultWritePolicy != nil { + return clnt.DefaultWritePolicy + } else { + return NewWritePolicy(0, 0) + } + } + return policy +} + func (clnt *Client) getUsableScanPolicy(policy *ScanPolicy) *ScanPolicy { if policy == nil { if clnt.DefaultScanPolicy != nil { @@ -1349,3 +1202,41 @@ func (clnt *Client) getUsableScanPolicy(policy *ScanPolicy) *ScanPolicy { } return policy } + +func (clnt *Client) getUsableQueryPolicy(policy *QueryPolicy) *QueryPolicy { + if policy == nil { + if clnt.DefaultQueryPolicy != nil { + policy = clnt.DefaultQueryPolicy + } else { + policy = NewQueryPolicy() + } + } + return policy +} + +func (clnt *Client) getUsableAdminPolicy(policy *AdminPolicy) *AdminPolicy { + if policy == nil { + if clnt.DefaultAdminPolicy != nil { + policy = clnt.DefaultAdminPolicy + } else { + policy = NewAdminPolicy() + } + } + return policy +} + +//------------------------------------------------------- +// Utility Functions +//------------------------------------------------------- + +// mergeErrors merges several errors into one +func mergeErrors(errs []error) error { + if errs == nil || len(errs) == 0 { + return nil + } + var msg bytes.Buffer + for _, err := range errs { + msg.WriteString(err.Error() + "\n") + } + return errors.New(msg.String()) +} diff --git a/client_object_test.go b/client_object_test.go new file mode 100644 index 00000000..eb46e6b5 --- /dev/null +++ b/client_object_test.go @@ -0,0 +1,335 @@ +// Copyright 2013-2014 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aerospike_test + +import ( + "math" + "time" + + . "github.com/aerospike/aerospike-client-go" + // . "github.com/aerospike/aerospike-client-go/utils/buffer" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// ALL tests are isolated by SetName and Key, which are 50 random charachters +var _ = Describe("Aerospike", func() { + initTestVars() + + Describe("Data operations on objects", func() { + // connection data + var err error + var ns = "test" + var set = randString(50) + var key *Key + var client *Client + + BeforeEach(func() { + // use the same client for all + client, err = NewClientWithPolicy(clientPolicy, *host, *port) + Expect(err).ToNot(HaveOccurred()) + + key, err = NewKey(ns, set, randString(50)) + Expect(err).ToNot(HaveOccurred()) + }) + + // type SomeBool bool TODO: FIXIT + type SomeBool bool + type SomeByte byte + type SomeInt int + type SomeUint uint + type SomeInt8 int8 + type SomeUint8 uint8 + type SomeInt16 int16 + type SomeUint16 uint16 + type SomeInt32 int32 + type SomeUint32 uint32 + type SomeInt64 int64 + type SomeUint64 uint64 + type SomeFloat32 float32 + type SomeFloat64 float64 + type SomeString string + + type SomeStruct struct { + A int + Self *SomeStruct + } + + type testObject struct { + Nil interface{} + NilP *int + + Bool bool + BoolP *bool + + Byte byte + ByteP *byte + + Int int + Intp *int + + Int8 int8 + Int8P *int8 + UInt8 uint8 + UInt8P *uint8 + + Int16 int16 + Int16P *int16 + UInt16 uint16 + UInt16P *uint16 + + Int32 int32 + Int32P *int32 + UInt32 uint32 + UInt32P *uint32 + + Int64 int64 + Int64P *int64 + UInt64 uint64 + UInt64P *uint64 + + F32 float32 + F32P *float32 + + F64 float64 + F64P *float64 + + String string + StringP *string + + Interface interface{} + InterfaceP interface{} + InterfacePP *interface{} + + Array [3]interface{} + Slice []interface{} + Map map[interface{}]interface{} + + CustomBool SomeBool + CustomBoolP *SomeBool + CustomByte SomeByte + CustomByteP *SomeByte + CustomInt SomeInt + CustomIntP *SomeInt + CustomUint SomeUint + CustomUintP *SomeUint + CustomInt8 SomeInt8 + CustomInt8P *SomeInt8 + CustomUint8 SomeUint8 + CustomUint8P *SomeUint8 + CustomInt16 SomeInt16 + CustomInt16P *SomeInt16 + CustomUint16 SomeUint16 + CustomUint16P *SomeUint16 + CustomInt32 SomeInt32 + CustomInt32P *SomeInt32 + CustomUint32 SomeUint32 + CustomUint32P *SomeUint32 + CustomInt64 SomeInt64 + CustomInt64P *SomeInt64 + CustomUint64 SomeUint64 + CustomUint64P *SomeUint64 + + CustomFloat32 SomeFloat32 + CustomFloat32P *SomeFloat32 + CustomFloat64 SomeFloat64 + CustomFloat64P *SomeFloat64 + + CustomString SomeString + CustomStringP *SomeString + + NestedObj SomeStruct + NestedObjP *testObject + EmptyNestedObjP *testObject + + // std lib type + Tm time.Time + TmP *time.Time + } + + makeTestObject := func() *testObject { + bl := true + b := byte(0) + ip := 11 + p8 := int8(4) + up8 := uint8(6) + p16 := int16(8) + up16 := uint16(10) + p32 := int32(12) + up32 := uint32(14) + p64 := int64(16) + up64 := uint64(math.MaxUint64) + f32p := float32(math.MaxFloat32) + f64p := math.MaxFloat64 + str := "pointer to a string" + iface := interface{}("a string") + + ctbl := SomeBool(true) + ctb := SomeByte(100) + cti := SomeInt(math.MinInt64) + ctui := SomeUint(math.MaxInt64) + cti8 := SomeInt8(103) + ctui8 := SomeUint8(math.MaxUint8) + cti16 := SomeInt16(math.MinInt16) + ctui16 := SomeUint16(math.MaxUint16) + cti32 := SomeInt32(math.MinInt32) + ctui32 := SomeUint32(math.MaxUint32) + cti64 := SomeInt64(math.MinInt64) + ctui64 := SomeUint64(math.MaxUint64) + cf32 := SomeFloat32(math.SmallestNonzeroFloat32) + cf64 := SomeFloat64(math.SmallestNonzeroFloat64) + ctstr := SomeString("Some string") + + now := time.Now() + + return &testObject{ + Bool: true, + BoolP: &bl, + + Nil: nil, + NilP: nil, + + Byte: byte(0), + ByteP: &b, + + Int: 1, + Intp: &ip, + + Int8: 3, + Int8P: &p8, + UInt8: 5, + UInt8P: &up8, + + Int16: 7, + Int16P: &p16, + UInt16: 9, + UInt16P: &up16, + + Int32: 11, + Int32P: &p32, + UInt32: 13, + UInt32P: &up32, + + Int64: math.MaxInt64, + Int64P: &p64, + UInt64: math.MaxUint64, + UInt64P: &up64, + + F32: 1.87132794, + F32P: &f32p, + F64: 59285092891.502818573, + F64P: &f64p, + + String: "string", + StringP: &str, + + Interface: iface, + // InterfaceP: ifaceP, // NOTICE: NOT SUPPORTED + InterfacePP: &iface, + + Array: [3]interface{}{1, "string", nil}, + Slice: []interface{}{1, "string", []byte{1, 11, 111}, nil, true}, + Map: map[interface{}]interface{}{1: "string", "string": nil, nil: map[interface{}]interface{}{"1": ip}, true: false}, + + CustomBool: true, + CustomBoolP: &ctbl, + CustomByte: 100, + CustomByteP: &ctb, + CustomInt: 100, + CustomIntP: &cti, + CustomUint: 100, + CustomUintP: &ctui, + CustomInt8: 100, + CustomInt8P: &cti8, + CustomUint8: 100, + CustomUint8P: &ctui8, + CustomInt16: 100, + CustomInt16P: &cti16, + CustomUint16: 100, + CustomUint16P: &ctui16, + CustomInt32: 100, + CustomInt32P: &cti32, + CustomUint32: 100, + CustomUint32P: &ctui32, + CustomInt64: 100, + CustomInt64P: &cti64, + CustomUint64: 100, + CustomUint64P: &ctui64, + + CustomFloat32: cf32, + CustomFloat32P: &cf32, + CustomFloat64: cf64, + CustomFloat64P: &cf64, + + CustomString: ctstr, + CustomStringP: &ctstr, + + NestedObj: SomeStruct{A: 1, Self: &SomeStruct{A: 999}}, + NestedObjP: &testObject{Int: 1, Intp: &ip, Tm: now}, + + Tm: now, + TmP: &now, + } + } + + Context("PutObject operations", func() { + + It("must save an object with the most complex structure possible", func() { + + testObj := makeTestObject() + err := client.PutObject(nil, key, &testObj) + Expect(err).ToNot(HaveOccurred()) + + resObj := &testObject{} + err = client.GetObject(nil, key, resObj) + Expect(err).ToNot(HaveOccurred()) + Expect(resObj).To(Equal(testObj)) + + }) + + It("must save an object and read it back respecting the tags", func() { + + type InnerStruct struct { + PersistNot int `as:"-"` + PersistAsInner1 int `as:"inner1"` + } + + type TaggedStruct struct { + DontPersist int `as:"-"` + PersistAsFld1 int `as:"fld1"` + + IStruct InnerStruct + } + + testObj := TaggedStruct{DontPersist: 1, PersistAsFld1: 2, IStruct: InnerStruct{PersistNot: 10, PersistAsInner1: 11}} + err := client.PutObject(nil, key, &testObj) + Expect(err).ToNot(HaveOccurred()) + + resObj := &TaggedStruct{} + err = client.GetObject(nil, key, resObj) + Expect(err).ToNot(HaveOccurred()) + + Expect(resObj.DontPersist).To(Equal(0)) + Expect(resObj.PersistAsFld1).To(Equal(2)) + Expect(resObj.IStruct.PersistNot).To(Equal(0)) + Expect(resObj.IStruct.PersistAsInner1).To(Equal(11)) + + }) + + }) // GetHeader context + + }) +}) diff --git a/delete_command.go b/delete_command.go index 2031b3c6..c93d2e19 100644 --- a/delete_command.go +++ b/delete_command.go @@ -31,12 +31,7 @@ type deleteCommand struct { func newDeleteCommand(cluster *Cluster, policy *WritePolicy, key *Key) *deleteCommand { newDeleteCmd := &deleteCommand{ singleCommand: newSingleCommand(cluster, key), - } - - if policy == nil { - newDeleteCmd.policy = NewWritePolicy(0, 0) - } else { - newDeleteCmd.policy = policy + policy: policy, } return newDeleteCmd diff --git a/key.go b/key.go index bb88d8e0..0208ee79 100644 --- a/key.go +++ b/key.go @@ -160,12 +160,12 @@ var keyBufPool *Pool func init() { hashPool = NewPool(512) - hashPool.New = func() interface{} { + hashPool.New = func(params ...interface{}) interface{} { return ripemd160.New() } keyBufPool = NewPool(512) - keyBufPool.New = func() interface{} { + keyBufPool.New = func(params ...interface{}) interface{} { return new(bytes.Buffer) } } diff --git a/marshal.go b/marshal.go new file mode 100644 index 00000000..d30d7054 --- /dev/null +++ b/marshal.go @@ -0,0 +1,217 @@ +// Copyright 2013-2014 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aerospike + +import ( + "math" + "reflect" + "strings" + "sync" + "time" +) + +const ( + aerospikeTag = "as" + keyTag = "key" +) + +func valueToInterface(f reflect.Value) interface{} { + // get to the core value + for f.Kind() == reflect.Ptr { + if f.IsNil() { + return nil + } + f = reflect.Indirect(f) + } + + switch f.Kind() { + case reflect.Uint64: + return int64(f.Uint()) + case reflect.Float64, reflect.Float32: + return int(math.Float64bits(f.Float())) + case reflect.Struct: + if f.Type().PkgPath() == "time" && f.Type().Name() == "Time" { + return f.Interface().(time.Time).UTC().UnixNano() + } else { + return structToMap(f) + } + case reflect.Bool: + if f.Bool() == true { + return int64(1) + } + return int64(0) + case reflect.Map, reflect.Slice, reflect.Interface: + if f.IsNil() { + return nil + } + return f.Interface() + default: + return f.Interface() + } +} + +func fieldAlias(f reflect.StructField) string { + alias := f.Tag.Get(aerospikeTag) + if alias != "" { + alias = strings.Trim(alias, " ") + + // if tag is -, the field should not be persisted + if alias == "-" { + return "" + } + return alias + } else { + return f.Name + } +} + +func structToMap(s reflect.Value) map[string]interface{} { + if !s.IsValid() { + return nil + } + + // map tags + cacheObjectTags(s) + + typeOfT := s.Type() + numFields := s.NumField() + + var binMap map[string]interface{} + for i := 0; i < numFields; i++ { + // skip unexported fields + if typeOfT.Field(i).PkgPath != "" { + continue + } + + binValue := valueToInterface(s.Field(i)) + + if binValue != nil { + if binMap == nil { + binMap = make(map[string]interface{}, numFields) + } + + alias := fieldAlias(typeOfT.Field(i)) + if alias == "" { + continue + } + + binMap[alias] = binValue + } + } + + return binMap +} + +func marshal(v interface{}) []*Bin { + s := reflect.Indirect(reflect.ValueOf(v).Elem()) + typeOfT := s.Type() + + // map tags + cacheObjectTags(s) + + numFields := s.NumField() + bins := binPool.Get(numFields).([]*Bin) + + binCount := 0 + for i := 0; i < numFields; i++ { + // skip unexported fields + if typeOfT.Field(i).PkgPath != "" { + continue + } + + binValue := valueToInterface(s.Field(i)) + + if binValue != nil { + alias := fieldAlias(typeOfT.Field(i)) + if alias == "" { + continue + } + + bins[binCount].Name = alias + bins[binCount].Value = NewValue(binValue) + binCount++ + } + } + + return bins[:binCount] +} + +type SyncMap struct { + objectMappings map[string]map[string]string + objectFields map[string][]string + mutex sync.RWMutex +} + +func (sm *SyncMap) setMapping(objType string, mapping map[string]string, fields []string) { + sm.mutex.Lock() + sm.objectMappings[objType] = mapping + sm.mutex.Unlock() +} + +func (sm *SyncMap) mappingExists(objType string) bool { + sm.mutex.RLock() + _, exists := sm.objectMappings[objType] + sm.mutex.RUnlock() + return exists +} + +func (sm *SyncMap) getMapping(objType string) map[string]string { + sm.mutex.RLock() + mapping := sm.objectMappings[objType] + sm.mutex.RUnlock() + return mapping +} + +func (sm *SyncMap) getFields(objType string) []string { + sm.mutex.RLock() + fields := sm.objectFields[objType] + sm.mutex.RUnlock() + return fields +} + +var objectMappings = &SyncMap{objectMappings: map[string]map[string]string{}, objectFields: map[string][]string{}} + +func cacheObjectTags(obj reflect.Value) { + objType := obj.Type().Name() + // exit if already processed + if objectMappings.mappingExists(objType) { + return + } + + mapping := map[string]string{} + fields := []string{} + + typeOfT := obj.Type() + numFields := obj.NumField() + for i := 0; i < numFields; i++ { + f := typeOfT.Field(i) + // skip unexported fields + if f.PkgPath != "" { + continue + } + + tag := strings.Trim(f.Tag.Get(aerospikeTag), " ") + if tag != "-" { + if tag != "" { + mapping[tag] = f.Name + fields = append(fields, tag) + } else { + fields = append(fields, f.Name) + } + } + } + + objectMappings.setMapping(objType, mapping, fields) +} diff --git a/operate_command.go b/operate_command.go index 055b48e0..886aed32 100644 --- a/operate_command.go +++ b/operate_command.go @@ -22,9 +22,6 @@ type operateCommand struct { } func newOperateCommand(cluster *Cluster, policy *WritePolicy, key *Key, operations []*Operation) *operateCommand { - if policy == nil { - policy = NewWritePolicy(0, 0) - } return &operateCommand{ readCommand: newReadCommand(cluster, policy, key, nil), policy: policy, diff --git a/packer.go b/packer.go index 72d6fa09..6f45d41c 100644 --- a/packer.go +++ b/packer.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "reflect" + "time" ParticleType "github.com/aerospike/aerospike-client-go/types/particle_type" Buffer "github.com/aerospike/aerospike-client-go/utils/buffer" @@ -179,6 +180,9 @@ func (pckr *packer) PackObject(obj interface{}) error { case uint64: pckr.PackAULong(v) return nil + case time.Time: + pckr.PackALong(v.UnixNano()) + return nil case nil: pckr.PackNil() return nil @@ -198,23 +202,32 @@ func (pckr *packer) PackObject(obj interface{}) error { } // check for array and map + rv := reflect.ValueOf(obj) switch reflect.TypeOf(obj).Kind() { case reflect.Array, reflect.Slice: - s := reflect.ValueOf(obj) - l := s.Len() + l := rv.Len() arr := make([]interface{}, l) for i := 0; i < l; i++ { - arr[i] = s.Index(i).Interface() + arr[i] = rv.Index(i).Interface() } return pckr.PackList(arr) case reflect.Map: - s := reflect.ValueOf(obj) - l := s.Len() + l := rv.Len() amap := make(map[interface{}]interface{}, l) - for _, i := range s.MapKeys() { - amap[i.Interface()] = s.MapIndex(i).Interface() + for _, i := range rv.MapKeys() { + amap[i.Interface()] = rv.MapIndex(i).Interface() } return pckr.PackMap(amap) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return pckr.PackObject(rv.Int()) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return pckr.PackObject(rv.Uint()) + case reflect.Bool: + return pckr.PackObject(rv.Bool()) + case reflect.String: + return pckr.PackObject(rv.String()) + case reflect.Float32, reflect.Float64: + return pckr.PackObject(rv.Float()) } panic(fmt.Sprintf("Type `%v` not supported to pack.", reflect.TypeOf(obj))) diff --git a/query_record_command.go b/query_record_command.go index c6db4eab..1135cbbb 100644 --- a/query_record_command.go +++ b/query_record_command.go @@ -106,23 +106,14 @@ func (cmd *queryRecordCommand) parseRecordResults(ifc command, receiveSize int) bins[name] = value } - if !cmd.IsValid() { - return false, NewAerospikeError(QUERY_TERMINATED) - } - // If the channel is full and it blocks, we don't want this command to // block forever, or panic in case the channel is closed in the meantime. - L: - for { - select { - // send back the result on the async channel - case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration): - break L - case <-cmd.recordset.cancelled: - return false, NewAerospikeError(SCAN_TERMINATED) - } + select { + // send back the result on the async channel + case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration): + case <-cmd.recordset.cancelled: + return false, NewAerospikeError(SCAN_TERMINATED) } - } return true, nil diff --git a/query_test.go b/query_test.go index 3b85c8f6..e3484299 100644 --- a/query_test.go +++ b/query_test.go @@ -49,7 +49,7 @@ var _ = Describe("Query operations", func() { var wpolicy = NewWritePolicy(0, 0) wpolicy.SendKey = true - const keyCount = 100 + const keyCount = 10000 bin1 := NewBin("Aerospike1", rand.Intn(math.MaxInt16)) bin2 := NewBin("Aerospike2", randString(100)) bin3 := NewBin("Aerospike3", rand.Intn(math.MaxInt16)) diff --git a/read_command.go b/read_command.go index 4e49ec23..113de8e8 100644 --- a/read_command.go +++ b/read_command.go @@ -15,6 +15,11 @@ package aerospike import ( + "math" + "reflect" + "strings" + "time" + . "github.com/aerospike/aerospike-client-go/logger" . "github.com/aerospike/aerospike-client-go/types" @@ -27,6 +32,11 @@ type readCommand struct { policy Policy binNames []string record *Record + + // pointer to the object that's going to be unmarshalled + object interface{} + // mapping of aliases for the object fields + objectMappings map[string]map[string]string } func newReadCommand(cluster *Cluster, policy Policy, key *Key, binNames []string) *readCommand { @@ -47,7 +57,6 @@ func (cmd *readCommand) writeBuffer(ifc command) error { func (cmd *readCommand) parseResult(ifc command, conn *Connection) error { // Read header. - // Logger.Debug("readCommand Parse Result: trying to read %d bytes from the connection...", int(_MSG_TOTAL_HEADER_SIZE)) _, err := conn.Read(cmd.dataBuffer, int(_MSG_TOTAL_HEADER_SIZE)) if err != nil { Logger.Warn("parse result error: " + err.Error()) @@ -65,8 +74,6 @@ func (cmd *readCommand) parseResult(ifc command, conn *Connection) error { opCount := int(uint16(Buffer.BytesToInt16(cmd.dataBuffer, 28))) receiveSize := int((sz & 0xFFFFFFFFFFFF) - int64(headerLength)) - // Logger.Debug("readCommand Parse Result: resultCode: %d, headerLength: %d, generation: %d, expiration: %d, fieldCount: %d, opCount: %d, receiveSize: %d", resultCode, headerLength, generation, expiration, fieldCount, opCount, receiveSize) - // Read remaining message bytes. if receiveSize > 0 { if err = cmd.sizeBufferSz(receiveSize); err != nil { @@ -95,15 +102,19 @@ func (cmd *readCommand) parseResult(ifc command, conn *Connection) error { return NewAerospikeError(resultCode) } - if opCount == 0 { - // data Bin was not returned. - cmd.record = newRecord(cmd.node, cmd.key, nil, generation, expiration) - return nil - } + if cmd.object == nil { + if opCount == 0 { + // data Bin was not returned. + cmd.record = newRecord(cmd.node, cmd.key, nil, generation, expiration) + return nil + } - cmd.record, err = cmd.parseRecord(opCount, fieldCount, generation, expiration) - if err != nil { - return err + cmd.record, err = cmd.parseRecord(opCount, fieldCount, generation, expiration) + if err != nil { + return err + } + } else { + cmd.parseObject(opCount, fieldCount, generation, expiration) } return nil @@ -157,6 +168,56 @@ func (cmd *readCommand) parseRecord( return newRecord(cmd.node, cmd.key, bins, generation, expiration), nil } +func (cmd *readCommand) parseObject( + opCount int, + fieldCount int, + generation int, + expiration int, +) error { + receiveOffset := 0 + + // There can be fields in the response (setname etc). + // But for now, ignore them. Expose them to the API if needed in the future. + // Logger.Debug("field count: %d, databuffer: %v", fieldCount, cmd.dataBuffer) + if fieldCount > 0 { + // Just skip over all the fields + for i := 0; i < fieldCount; i++ { + // Logger.Debug("%d", receiveOffset) + fieldSize := int(uint32(Buffer.BytesToInt32(cmd.dataBuffer, receiveOffset))) + receiveOffset += (4 + fieldSize) + } + } + + var rv reflect.Value + if opCount > 0 { + rv = reflect.ValueOf(cmd.object).Elem() + + // map tags + cacheObjectTags(rv) + + cmd.objectMappings = objectMappings.objectMappings //getMapping(rv.Type().Name()) + } + + for i := 0; i < opCount; i++ { + opSize := int(uint32(Buffer.BytesToInt32(cmd.dataBuffer, receiveOffset))) + particleType := int(cmd.dataBuffer[receiveOffset+5]) + nameSize := int(cmd.dataBuffer[receiveOffset+7]) + name := string(cmd.dataBuffer[receiveOffset+8 : receiveOffset+8+nameSize]) + receiveOffset += 4 + 4 + nameSize + + particleBytesSize := int(opSize - (4 + nameSize)) + value, _ := bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize) + // if _, err := cmd.setObjectField(cmd.object, name, value); err != nil { + if _, err := cmd.setObjectField(rv, name, value); err != nil { + return err + } + + receiveOffset += particleBytesSize + } + + return nil +} + func (cmd *readCommand) GetRecord() *Record { return cmd.record } @@ -164,3 +225,253 @@ func (cmd *readCommand) GetRecord() *Record { func (cmd *readCommand) Execute() error { return cmd.execute(cmd) } + +func (cmd *readCommand) setObjectField(obj reflect.Value, fieldName string, value interface{}) (interface{}, error) { + // TODO: This part has potential to be improved + // try to find the field by name + + // find the name based on tag mapping + iobj := reflect.Indirect(obj) + if name, exists := cmd.objectMappings[iobj.Type().Name()][fieldName]; exists { + fieldName = name + } + f := iobj.FieldByName(fieldName) + + if f.CanSet() { + switch f.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + f.SetInt(int64(value.(int))) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if v, ok := value.(int); ok { + f.SetUint(uint64(v)) + } else { + f.SetUint(value.(uint64)) + } + case reflect.Float64, reflect.Float32: + f.SetFloat(float64(math.Float64frombits(uint64(value.(int))))) + case reflect.String: + rv := reflect.ValueOf(value.(string)) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Bool: + f.SetBool(value.(int) == 1) + case reflect.Interface: + f.Set(reflect.ValueOf(value)) + case reflect.Ptr: + switch f.Type().Elem().Kind() { + case reflect.Int: + tempV := int(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Uint: + tempV := uint(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.String: + tempV := string(value.(string)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Int8: + tempV := int8(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Uint8: + tempV := uint8(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Int16: + tempV := int16(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Uint16: + tempV := uint16(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Int32: + tempV := int32(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Uint32: + tempV := uint32(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Int64: + tempV := int64(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Uint64: + tempV := uint64(value.(int)) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Float64: + tempV := math.Float64frombits(uint64(value.(int))) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Bool: + tempV := bool(value.(int) == 1) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Float32: + tempV := float32(math.Float64frombits(uint64(value.(int)))) + rv := reflect.ValueOf(&tempV) + if rv.Type() != f.Type() { + rv = rv.Convert(f.Type()) + } + f.Set(rv) + case reflect.Interface: + f.Set(reflect.ValueOf(&value)) + case reflect.Struct: + // support time.Time + if f.Type().Elem().PkgPath() == "time" && f.Type().Elem().Name() == "Time" { + tm := time.Unix(0, int64(value.(int))) + f.Set(reflect.ValueOf(&tm)) + break + } else { + valMap := value.(map[interface{}]interface{}) + // iteraste over struct fields and recursively fill them up + if valMap != nil { + newObjPtr := f + if f.IsNil() { + newObjPtr = reflect.New(f.Type().Elem()) + } + theStruct := newObjPtr.Elem().Type() + numFields := newObjPtr.Elem().NumField() + for i := 0; i < numFields; i++ { + // skip unexported fields + if theStruct.Field(i).PkgPath != "" { + continue + } + + alias := theStruct.Field(i).Name + tag := strings.Trim(theStruct.Field(i).Tag.Get(aerospikeTag), " ") + if tag != "" { + alias = tag + } + + if valMap[alias] != nil { + cmd.setObjectField(newObjPtr, alias, valMap[alias]) + } + } + + // set the field + f.Set(newObjPtr) + } + } + } + case reflect.Slice, reflect.Array: + // BLOBs come back as []byte + vlen := 0 + if theArray, ok := value.([]interface{}); ok { + vlen = len(theArray) + } else if theArray, ok := value.([]byte); ok { + vlen = len(theArray) + } + // theArray := value.([]interface{}) + if f.Kind() == reflect.Slice { + if f.IsNil() { + newArray := reflect.MakeSlice(reflect.SliceOf(f.Type().Elem()), vlen, vlen) + reflect.Copy(newArray, reflect.ValueOf(value)) + } + f.Set(reflect.ValueOf(value)) + } else { + reflect.Copy(f, reflect.ValueOf(value)) + } + case reflect.Map: + theMap := value.(map[interface{}]interface{}) + if theMap != nil { + newMap := reflect.MakeMap(f.Type()) + var newKey, newVal reflect.Value + for key, elem := range theMap { + if key != nil { + newKey = reflect.ValueOf(key) + } else { + newKey = reflect.Zero(f.Type().Key()) + } + + if elem != nil { + newVal = reflect.ValueOf(elem) + } else { + newVal = reflect.Zero(f.Type().Elem()) + } + + newMap.SetMapIndex(newKey, newVal) + } + } + + f.Set(reflect.ValueOf(theMap)) + case reflect.Struct: + // support time.Time + if f.Type().PkgPath() == "time" && f.Type().Name() == "Time" { + f.Set(reflect.ValueOf(time.Unix(0, int64(value.(int))))) + break + } + + valMap := value.(map[interface{}]interface{}) + // iteraste over struct fields and recursively fill them up + typeOfT := f.Type() + numFields := f.NumField() + for i := 0; i < numFields; i++ { + // skip unexported fields + if typeOfT.Field(i).PkgPath != "" { + continue + } + + alias := typeOfT.Field(i).Name + tag := strings.Trim(typeOfT.Field(i).Tag.Get(aerospikeTag), " ") + if tag != "" { + alias = tag + } + + if valMap[alias] != nil { + cmd.setObjectField(f, alias, valMap[alias]) + } + } + + // set the field + f.Set(f) + } + } + + return nil, nil +} diff --git a/read_header_command.go b/read_header_command.go index 32d1539f..518fb764 100644 --- a/read_header_command.go +++ b/read_header_command.go @@ -29,12 +29,7 @@ type readHeaderCommand struct { func newReadHeaderCommand(cluster *Cluster, policy Policy, key *Key) *readHeaderCommand { newReadHeaderCmd := &readHeaderCommand{ singleCommand: newSingleCommand(cluster, key), - } - - if policy != nil { - newReadHeaderCmd.policy = policy - } else { - newReadHeaderCmd.policy = NewPolicy() + policy: policy, } return newReadHeaderCmd diff --git a/recordset.go b/recordset.go index cb70f942..43fcb30a 100644 --- a/recordset.go +++ b/recordset.go @@ -20,11 +20,20 @@ import ( . "github.com/aerospike/aerospike-client-go/types/atomic" ) +type result struct { + Record *Record + Err error +} + // Recordset encapsulates the result of Scan and Query commands. type Recordset struct { // Records is a channel on which the resulting records will be sent back. + // NOTE: Do not use Records directly. Range on channel returned by Results() instead. + // Will be unexported in the future Records chan *Record // Errors is a channel on which all errors will be sent back. + // NOTE: Do not use Records directly. Range on channel returned by Results() instead. + // Will be unexported in the future Errors chan error wgGoroutines sync.WaitGroup @@ -53,7 +62,50 @@ func (rcs *Recordset) IsActive() bool { return rcs.active.Get() } -// Close all streams to different nodes. +// Results returns a new receive-only channel with the results of the Scan/Query +// This is a more idiomatic approach to the iterator pattern in getting the +// results back from the recordset, and doesn't require the user to write the +// ugly select in their code. +// Result embeds A Record and an error reference. +// +// Example: +// +// recordset, err := client.ScanAll(nil, namespace, set) +// handleError(err) +// for res := range recordset.Results() { +// if res.Err != nil { +// // handle error here +// } else { +// // process record here +// fmt.Println(res.Record.Bins) +// } +// } +func (rcs *Recordset) Results() <-chan *result { + res := make(chan *result, len(rcs.Records)) + + go func() { + L: + for { + select { + case r := <-rcs.Records: + if r != nil { + res <- &result{Record: r, Err: nil} + } else { + close(res) + break L + } + case e := <-rcs.Errors: + if e != nil { + res <- &result{Record: nil, Err: e} + } + } + } + }() + + return (<-chan *result)(res) +} + +// Close all streams from different nodes. func (rcs *Recordset) Close() { // do it only once if rcs.active.CompareAndToggle(true) { @@ -70,9 +122,7 @@ func (rcs *Recordset) Close() { func (rcs *Recordset) signalEnd() { rcs.wgGoroutines.Done() - - cnt := rcs.goroutines.DecrementAndGet() - if cnt == 0 { + if rcs.goroutines.DecrementAndGet() == 0 { rcs.Close() } } diff --git a/scan_command.go b/scan_command.go index 88e26988..fbff103b 100644 --- a/scan_command.go +++ b/scan_command.go @@ -128,20 +128,14 @@ func (cmd *scanCommand) parseRecordResults(ifc command, receiveSize int) (bool, bins[name] = value } - if !cmd.IsValid() { + // If the channel is full and it blocks, we don't want this command to + // block forever, or panic in case the channel is closed in the meantime. + select { + // send back the result on the async channel + case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration): + case <-cmd.recordset.cancelled: return false, NewAerospikeError(SCAN_TERMINATED) } - - L: - for { - select { - // send back the result on the async channel - case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration): - break L - case <-cmd.recordset.cancelled: - return false, NewAerospikeError(SCAN_TERMINATED) - } - } } return true, nil diff --git a/scan_test.go b/scan_test.go index fd30eceb..7951f562 100644 --- a/scan_test.go +++ b/scan_test.go @@ -34,7 +34,7 @@ var _ = Describe("Scan operations", func() { var wpolicy = NewWritePolicy(0, 0) wpolicy.SendKey = true - const keyCount = 100 + const keyCount = 10000 bin1 := NewBin("Aerospike1", rand.Intn(math.MaxInt16)) bin2 := NewBin("Aerospike2", randString(100)) var keys map[string]*Key @@ -93,7 +93,35 @@ var _ = Describe("Scan operations", func() { } }) + It("must Scan and get all records back for a specified node using Results() channel", func() { + Expect(len(keys)).To(Equal(keyCount)) + + for _, node := range client.GetNodes() { + recordset, err := client.ScanNode(nil, node, ns, set) + Expect(err).ToNot(HaveOccurred()) + + counter := 0 + for res := range recordset.Results() { + Expect(res.Err).NotTo(HaveOccurred()) + key, exists := keys[string(res.Record.Key.Digest())] + + Expect(exists).To(Equal(true)) + Expect(key.Value().GetObject()).To(Equal(res.Record.Key.Value().GetObject())) + Expect(res.Record.Bins[bin1.Name]).To(Equal(bin1.Value.GetObject())) + Expect(res.Record.Bins[bin2.Name]).To(Equal(bin2.Value.GetObject())) + + delete(keys, string(res.Record.Key.Digest())) + + counter++ + } + } + + Expect(len(keys)).To(Equal(0)) + }) + It("must Scan and get all records back for a specified node", func() { + Expect(len(keys)).To(Equal(keyCount)) + for _, node := range client.GetNodes() { recordset, err := client.ScanNode(nil, node, ns, set) Expect(err).ToNot(HaveOccurred()) @@ -105,6 +133,8 @@ var _ = Describe("Scan operations", func() { }) It("must Scan and get all records back from all nodes concurrently", func() { + Expect(len(keys)).To(Equal(keyCount)) + recordset, err := client.ScanAll(nil, ns, set) Expect(err).ToNot(HaveOccurred()) @@ -114,6 +144,8 @@ var _ = Describe("Scan operations", func() { }) It("must Scan and get all records back from all nodes sequnetially", func() { + Expect(len(keys)).To(Equal(keyCount)) + scanPolicy := NewScanPolicy() scanPolicy.ConcurrentNodes = false @@ -126,6 +158,8 @@ var _ = Describe("Scan operations", func() { }) It("must Cancel Scan", func() { + Expect(len(keys)).To(Equal(keyCount)) + recordset, err := client.ScanAll(nil, ns, set) Expect(err).ToNot(HaveOccurred()) diff --git a/server_command.go b/server_command.go index 5089fc75..6e75eb89 100644 --- a/server_command.go +++ b/server_command.go @@ -81,10 +81,6 @@ func (cmd *serverCommand) parseRecordResults(ifc command, receiveSize int) (bool return false, err } } - - if !cmd.IsValid() { - return false, NewAerospikeError(QUERY_TERMINATED) - } } return true, nil } diff --git a/tools/benchmark/benchmark.go b/tools/benchmark/benchmark.go index 96a14d4d..7b586339 100644 --- a/tools/benchmark/benchmark.go +++ b/tools/benchmark/benchmark.go @@ -65,6 +65,7 @@ var maxRetries = flag.Int("maxRetries", 2, "Maximum number of retries before abo var connQueueSize = flag.Int("queueSize", 4096, "Maximum number of connections to pool.") var randBinData = flag.Bool("R", false, "Use dynamically generated random bin values instead of default static fixed bin values.") +var useMarshalling = flag.Bool("M", false, "Use marshaling a struct instead of simple key/value operations") var debugMode = flag.Bool("d", false, "Run benchmarks in debug mode.") var profileMode = flag.Bool("profile", false, "Run benchmarks with profiler active on port 6060.") var showUsage = flag.Bool("u", false, "Show usage information.") @@ -83,6 +84,13 @@ var wg sync.WaitGroup var currThroughput int64 var lastReport int64 +// Underscores are there so that the field name is the same as key/value mode +type dataStruct struct { + Int________ int64 + String_____ string + Bytes______ []byte +} + func main() { log.SetOutput(os.Stdout) @@ -235,21 +243,71 @@ func readFlags() { } } +// new random bin generator based on benchmark specs +func getRandValue() Value { + switch binDataType { + case "B": + return NewBytesValue(randBytes(binDataSize)) + case "S": + return NewStringValue(string(randBytes(binDataSize))) + default: + return NewLongValue(xr.Int64()) + } +} + // new random bin generator based on benchmark specs func getBin() *Bin { var bin *Bin switch binDataType { case "B": - bin = &Bin{Name: "information", Value: NewBytesValue(randBytes(binDataSize))} + bin = &Bin{Name: "Bytes______", Value: getRandValue()} case "S": - bin = &Bin{Name: "information", Value: NewStringValue(string(randBytes(binDataSize)))} + bin = &Bin{Name: "String_____", Value: getRandValue()} default: - bin = &Bin{Name: "information", Value: NewLongValue(xr.Int64())} + bin = &Bin{Name: "Int________", Value: getRandValue()} } return bin } +func setBin(bin *Bin) { + switch binDataType { + case "B": + bin.Value = getRandValue() + case "S": + bin.Value = getRandValue() + default: + bin.Value = getRandValue() + } +} + +// new random bin generator based on benchmark specs +func getDataStruct() *dataStruct { + var ds *dataStruct + switch binDataType { + case "B": + ds = &dataStruct{Bytes______: randBytes(binDataSize)} + case "S": + ds = &dataStruct{String_____: string(randBytes(binDataSize))} + default: + ds = &dataStruct{Int________: xr.Int64()} + } + + return ds +} + +// new random bin generator based on benchmark specs +func setDataStruct(ds *dataStruct) { + switch binDataType { + case "B": + ds.Bytes______ = randBytes(binDataSize) + case "S": + ds.String_____ = string(randBytes(binDataSize)) + default: + ds.Int________ = xr.Int64() + } +} + var r *Record const random_alpha_num = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" @@ -284,6 +342,7 @@ func runBench(client *Client, ident int, times int) { readpolicy := writepolicy.GetBasePolicy() defaultBin := getBin() + defaultObj := getDataStruct() t := time.Now() var WCount, RCount int @@ -300,18 +359,30 @@ func runBench(client *Client, ident int, times int) { rLatList := make([]int64, latCols+1) bin := defaultBin + obj := defaultObj for i := 1; workloadType == "RU" || i <= times; i++ { rLat, wLat = 0, 0 key, _ := NewKey(*namespace, *set, ident*times+(i%times)) if workloadType == "I" || int(xr.Uint64()%100) >= workloadPercent { WCount++ - // if randomBin data has been requested - if *randBinData { - bin = getBin() - } - tm = time.Now() - if err = client.PutBins(writepolicy, key, bin); err != nil { - incOnError(&writeErr, &writeTOErr, err) + if !*useMarshalling { + // if randomBin data has been requested + if *randBinData { + setBin(bin) + } + tm = time.Now() + if err = client.PutBins(writepolicy, key, bin); err != nil { + incOnError(&writeErr, &writeTOErr, err) + } + } else { + // if randomBin data has been requested + if *randBinData { + setDataStruct(obj) + } + tm = time.Now() + if err = client.PutObject(writepolicy, key, obj); err != nil { + incOnError(&writeErr, &writeTOErr, err) + } } wLat = int64(time.Now().Sub(tm) / time.Millisecond) wLatTotal += wLat @@ -332,8 +403,14 @@ func runBench(client *Client, ident int, times int) { } else { RCount++ tm = time.Now() - if r, err = client.Get(readpolicy, key, bin.Name); err != nil { - incOnError(&readErr, &readTOErr, err) + if !*useMarshalling { + if r, err = client.Get(readpolicy, key, bin.Name); err != nil { + incOnError(&readErr, &readTOErr, err) + } + } else { + if err = client.GetObject(readpolicy, key, obj); err != nil { + incOnError(&readErr, &readTOErr, err) + } } rLat = int64(time.Now().Sub(tm) / time.Millisecond) rLatTotal += rLat diff --git a/touch_command.go b/touch_command.go index d417e9d8..dcd1f739 100644 --- a/touch_command.go +++ b/touch_command.go @@ -30,12 +30,7 @@ type touchCommand struct { func newTouchCommand(cluster *Cluster, policy *WritePolicy, key *Key) *touchCommand { newTouchCmd := &touchCommand{ singleCommand: *newSingleCommand(cluster, key), - } - - if policy == nil { - newTouchCmd.policy = NewWritePolicy(0, 0) - } else { - newTouchCmd.policy = policy + policy: policy, } return newTouchCmd diff --git a/types/pool.go b/types/pool.go index 846b79de..b58b1a74 100644 --- a/types/pool.go +++ b/types/pool.go @@ -23,7 +23,8 @@ type Pool struct { pool *AtomicQueue poolSize int - New func() interface{} + New func(params ...interface{}) interface{} + IsUsable func(obj interface{}, params ...interface{}) bool } // NewPool creates a new fixed size pool. @@ -36,10 +37,10 @@ func NewPool(poolSize int) *Pool { // Get returns an element from the pool. If pool is empty, and a New function is defined, // the result of the New function will be returned -func (bp *Pool) Get() interface{} { +func (bp *Pool) Get(params ...interface{}) interface{} { res := bp.pool.Poll() - if res == nil && bp.New != nil { - res = bp.New() + if (res == nil || (bp.IsUsable != nil && !bp.IsUsable(res, params...))) && bp.New != nil { + res = bp.New(params...) } return res diff --git a/types/result_code.go b/types/result_code.go index 66f09817..22dc6c7b 100644 --- a/types/result_code.go +++ b/types/result_code.go @@ -137,7 +137,13 @@ const ( INVALID_PASSWORD ResultCode = 62 // Security credential is invalid. - INVALID_CREDENTIAL ResultCode = 63 + EXPIRED_PASSWORD ResultCode = 63 + + // Forbidden password (e.g. recently used) + FORBIDDEN_PASSWORD ResultCode = 64 + + // Security credential is invalid. + INVALID_CREDENTIAL ResultCode = 65 // Role name is invalid. INVALID_ROLE ResultCode = 70 @@ -336,6 +342,12 @@ func ResultCodeToString(resultCode ResultCode) string { case INVALID_PASSWORD: return "Invalid password" + case EXPIRED_PASSWORD: + return "Expired password" + + case FORBIDDEN_PASSWORD: + return "Forbidden password" + case INVALID_CREDENTIAL: return "Invalid credential" diff --git a/udf_test.go b/udf_test.go index 50b4d0ea..bbc3a5f0 100644 --- a/udf_test.go +++ b/udf_test.go @@ -132,6 +132,7 @@ var _ = Describe("UDF/Query tests", func() { Context("must run the UDF on all records", func() { BeforeEach(func() { + set = randString(50) for i := 0; i < keyCount; i++ { key, err = NewKey(ns, set, randString(50)) Expect(err).ToNot(HaveOccurred()) @@ -149,13 +150,8 @@ var _ = Describe("UDF/Query tests", func() { Expect(err).ToNot(HaveOccurred()) // wait until UDF is run on all records - for { - if err := <-exTask.OnComplete(); err == nil { - break - } else { - panic(err) - } - } + err = <-exTask.OnComplete() + Expect(err).ToNot(HaveOccurred()) // read all data and make sure it is consistent recordset, err := client.ScanAll(nil, ns, set) diff --git a/value.go b/value.go index 4404e86a..d0aa907d 100644 --- a/value.go +++ b/value.go @@ -76,7 +76,9 @@ func NewValue(v interface{}) Value { case nil: return &NullValue{} case int: - return NewIntegerValue(int(val)) + return NewIntegerValue(val) + case int64: + return NewLongValue(val) case string: return NewStringValue(val) case []Value: @@ -94,13 +96,11 @@ func NewValue(v interface{}) Value { case uint16: return NewIntegerValue(int(val)) case uint32: - return NewLongValue(int64(val)) + return NewIntegerValue(int(val)) case uint: if !Buffer.Arch64Bits || (val <= math.MaxInt64) { return NewLongValue(int64(val)) } - case int64: - return NewLongValue(int64(val)) case []interface{}: return NewListValue(val) case map[interface{}]interface{}: @@ -112,25 +112,30 @@ func NewValue(v interface{}) Value { } // check for array and map - switch reflect.TypeOf(v).Kind() { + rv := reflect.ValueOf(v) + switch rv.Kind() { case reflect.Array, reflect.Slice: - s := reflect.ValueOf(v) - l := s.Len() + l := rv.Len() arr := make([]interface{}, l) for i := 0; i < l; i++ { - arr[i] = s.Index(i).Interface() + arr[i] = rv.Index(i).Interface() } return NewListValue(arr) case reflect.Map: - s := reflect.ValueOf(v) - l := s.Len() + l := rv.Len() amap := make(map[interface{}]interface{}, l) - for _, i := range s.MapKeys() { - amap[i.Interface()] = s.MapIndex(i).Interface() + for _, i := range rv.MapKeys() { + amap[i.Interface()] = rv.MapIndex(i).Interface() } return NewMapValue(amap) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return NewLongValue(reflect.ValueOf(v).Int()) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32: + return NewLongValue(int64(reflect.ValueOf(v).Uint())) + case reflect.String: + return NewStringValue(rv.String()) } // panic for anything that is not supported. diff --git a/write_command.go b/write_command.go index e44f165f..4b4b9d79 100644 --- a/write_command.go +++ b/write_command.go @@ -35,16 +35,11 @@ func newWriteCommand(cluster *Cluster, newWriteCmd := &writeCommand{ singleCommand: *newSingleCommand(cluster, key), + policy: policy, bins: bins, operation: operation, } - if policy == nil { - newWriteCmd.policy = NewWritePolicy(0, 0) - } else { - newWriteCmd.policy = policy - } - return newWriteCmd } From e1ee50e3ee4c424df82a0888f30825acdb2a3572 Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Tue, 17 Feb 2015 18:26:35 +0100 Subject: [PATCH 2/3] Added new server error codes --- types/result_code.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/types/result_code.go b/types/result_code.go index 22dc6c7b..a7c73479 100644 --- a/types/result_code.go +++ b/types/result_code.go @@ -19,9 +19,6 @@ package types type ResultCode int const ( - // End of Recordset in Query or Scan. - END_OF_RECORDSET ResultCode = -8 - // Asynchronous max concurrent database commands have been exceeded and therefore rejected. TYPE_NOT_SUPPORTED ResultCode = -7 @@ -112,6 +109,9 @@ const ( // Bin name length greater than 14 characters. BIN_NAME_TOO_LONG ResultCode = 21 + // Operation not allowed at this time. + FAIL_FORBIDDEN ResultCode = 22 + // There are no more records left for query. QUERY_END ResultCode = 50 @@ -148,7 +148,11 @@ const ( // Role name is invalid. INVALID_ROLE ResultCode = 70 - INVALID_PRIVILEGE ResultCode = 71 + // Role already exists. + ROLE_ALREADY_EXISTS ResultCode = 71 + + // Privilege is invalid. + INVALID_PRIVILEGE ResultCode = 72 // User must be authentication before performing database operations. NOT_AUTHENTICATED ResultCode = 80 @@ -222,9 +226,6 @@ func KeepConnection(resultCode int) bool { // Return result code as a string. func ResultCodeToString(resultCode ResultCode) string { switch ResultCode(resultCode) { - case END_OF_RECORDSET: - return "End of recordset." - case TYPE_NOT_SUPPORTED: return "Type cannot be converted to Value Type." @@ -312,6 +313,9 @@ func ResultCodeToString(resultCode ResultCode) string { case BIN_NAME_TOO_LONG: return "Bin name length greater than 14 characters" + case FAIL_FORBIDDEN: + return "Operation not allowed at this time" + case QUERY_END: return "Query end" @@ -354,6 +358,9 @@ func ResultCodeToString(resultCode ResultCode) string { case INVALID_ROLE: return "Invalid role" + case ROLE_ALREADY_EXISTS: + return "Role already exists" + case INVALID_PRIVILEGE: return "Invalid privilege" From 5c1cb887cd5bdd4b1b8e1ced7236976d270807e9 Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Tue, 17 Feb 2015 18:39:19 +0100 Subject: [PATCH 3/3] Updated Changelog --- CHANGELOG.md | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2503cc1c..1c5c5c1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,38 @@ # Change history -## Feb 13 2015 : v1.4.0 +## Feb 17 2015 : v1.4.0 - This is a major release, and makes using the client much easier to develop web applications. + This is a major release, and makes using the client much easier to develop applications. * **New Features** - * Added Marshalling Support. + * Added Marshalling Support for Put and Get operations. Refer to [Marshalling Test](client_object_test.go) to see how to take advantage. + Same functionality for other APIs will follow soon. + Example: + ```go + type SomeStruct struct { + A int `as:"a"` // alias the field to myself + Self *SomeStruct `as:"-"` // will not persist the field + } + + type OtherStruct struct { + i interface{} + OtherObject *OtherStruct + } + + obj := &OtherStruct { + i: 15, + OtherObject: OtherStruct {A: 18}, + } + + key, _ := as.NewKey("ns", "set", value) + err := client.PutObject(nil, key, obj) + // handle error here + + rObj := &OtherStruct{} + err = client.GetObject(nil, key, rObj) + ``` + * Added `Recordset.Results()`. Consumers of a recordset do not have to implement a select anymore. Instead of: ```go recordset, err := client.ScanAll(...) @@ -37,6 +63,7 @@ } } ``` + Use of the old pattern is discouraged and deprecated, and direct access to recordset.Records and recordset.Errors will be removed in a future release. * **Improvements**