Skip to content

Commit

Permalink
Merge branch 'v3'
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Jun 8, 2020
2 parents e2ccbfa + 910557e commit 1679be2
Show file tree
Hide file tree
Showing 208 changed files with 1,359 additions and 750 deletions.
2 changes: 1 addition & 1 deletion .travis/wait_for_node.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
################################################################################
# Copyright 2013-2016 Aerospike, Inc.
# Copyright 2013-2020 Aerospike, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Change History

## June 8 2020: v3.0.0

Major feature release. There are a few minor breaking API changes. See `ClientPolicy`.

Note: There has been significant changes to clustering code. We recommend extensive testing before using in production.

* **New Features**

- Adds support for Relaxed Strong Consistency mode.
- Adds support for whitelists in Roles.

## May 28 2020: v2.12.0

Minor feature release.
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright 2014-2016 Aerospike, Inc.
Copyright 2014-2020 Aerospike, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
90 changes: 85 additions & 5 deletions admin_command.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 Aerospike, Inc.
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use acmd file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@ const (
_DROP_ROLE byte = 11
_GRANT_PRIVILEGES byte = 12
_REVOKE_PRIVILEGES byte = 13
_SET_WHITELIST byte = 14
_QUERY_ROLES byte = 16
_LOGIN byte = 20

Expand All @@ -53,6 +54,7 @@ const (
_ROLES byte = 10
_ROLE byte = 11
_PRIVILEGES byte = 12
_WHITELIST byte = 13

// Misc
_MSG_VERSION int64 = 2
Expand Down Expand Up @@ -122,12 +124,27 @@ func (acmd *adminCommand) revokeRoles(cluster *Cluster, policy *AdminPolicy, use
return acmd.executeCommand(cluster, policy)
}

func (acmd *adminCommand) createRole(cluster *Cluster, policy *AdminPolicy, roleName string, privileges []Privilege) error {
acmd.writeHeader(_CREATE_ROLE, 2)
func (acmd *adminCommand) createRole(cluster *Cluster, policy *AdminPolicy, roleName string, privileges []Privilege, whitelist []string) error {
fieldcount := 1
if len(privileges) > 1 {
fieldcount++
}
if len(whitelist) > 1 {
fieldcount++
}
acmd.writeHeader(_CREATE_ROLE, fieldcount)
acmd.writeFieldStr(_ROLE, roleName)
if err := acmd.writePrivileges(privileges); err != nil {
return err

if len(privileges) > 0 {
if err := acmd.writePrivileges(privileges); err != nil {
return err
}
}

if len(whitelist) > 0 {
acmd.writeWhitelist(whitelist)
}

return acmd.executeCommand(cluster, policy)
}

Expand Down Expand Up @@ -155,6 +172,19 @@ func (acmd *adminCommand) revokePrivileges(cluster *Cluster, policy *AdminPolicy
return acmd.executeCommand(cluster, policy)
}

func (acmd *adminCommand) setWhitelist(cluster *Cluster, policy *AdminPolicy, roleName string, whitelist []string) error {
fieldCount := 1
if len(whitelist) > 0 {
fieldCount++
}
acmd.writeHeader(_REVOKE_PRIVILEGES, fieldCount)
acmd.writeFieldStr(_ROLE, roleName)
if len(whitelist) > 0 {
acmd.writeWhitelist(whitelist)
}
return acmd.executeCommand(cluster, policy)
}

func (acmd *adminCommand) queryUser(cluster *Cluster, policy *AdminPolicy, user string) (*UserRoles, error) {
acmd.writeHeader(_QUERY_USERS, 1)
acmd.writeFieldStr(_USER, user)
Expand Down Expand Up @@ -258,6 +288,26 @@ func (acmd *adminCommand) writePrivileges(privileges []Privilege) error {

return nil
}

func (acmd *adminCommand) writeWhitelist(whitelist []string) {
offset := acmd.dataOffset + int(_FIELD_HEADER_SIZE)

comma := false
for _, address := range whitelist {
if comma {
acmd.dataBuffer[acmd.dataOffset] = ','
acmd.dataOffset++
} else {
comma = true
}

offset += copy(acmd.dataBuffer[offset:], address)
}

size := offset - acmd.dataOffset - int(_FIELD_HEADER_SIZE)
acmd.writeFieldHeader(_WHITELIST, size)
acmd.dataOffset = offset
}
func (acmd *adminCommand) writeSize() {
// Write total size of message which is the current offset.
var size = int64(acmd.dataOffset-8) | (_MSG_VERSION << 56) | (_MSG_TYPE << 48)
Expand Down Expand Up @@ -565,6 +615,8 @@ func (acmd *adminCommand) parseRolesFull(receiveSize int) (int, []*Role, error)
acmd.dataOffset += len
} else if id == _PRIVILEGES {
acmd.parsePrivileges(role)
} else if id == _WHITELIST {
acmd.parseWhitelist(len)
} else {
acmd.dataOffset += len
}
Expand Down Expand Up @@ -606,3 +658,31 @@ func (acmd *adminCommand) parsePrivileges(role *Role) {
role.Privileges = append(role.Privileges, priv)
}
}

func (acmd *adminCommand) parseWhitelist(length int) []string {
list := []string{}
begin := acmd.dataOffset
max := begin + length

for acmd.dataOffset < max {
if acmd.dataBuffer[acmd.dataOffset] == ',' {
l := acmd.dataOffset - begin
if l > 0 {
s := string(acmd.dataBuffer[begin : begin+l])
list = append(list, s)
}
acmd.dataOffset++
begin = acmd.dataOffset
} else {
acmd.dataOffset++
}
}

l := acmd.dataOffset - begin
if l > 0 {
s := string(acmd.dataBuffer[begin : begin+l])
list = append(list, s)
}

return list
}
2 changes: 1 addition & 1 deletion admin_policy.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 Aerospike, Inc.
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
15 changes: 15 additions & 0 deletions aerospike.go
Original file line number Diff line number Diff line change
@@ -1 +1,16 @@
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package aerospike provides a client to connect and interact with an Aerospike cluster.
package aerospike
2 changes: 1 addition & 1 deletion aerospike_bench_reflect_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !as_performance

// Copyright 2013-2019 Aerospike, Inc.
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion aerospike_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 Aerospike, Inc.
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
38 changes: 27 additions & 11 deletions aerospike_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike_test

import (
Expand All @@ -17,17 +31,19 @@ import (
asl "github.com/aerospike/aerospike-client-go/logger"
)

var host = flag.String("h", "127.0.0.1", "Aerospike server seed hostnames or IP addresses")
var port = flag.Int("p", 3000, "Aerospike server seed hostname or IP address port number.")
var user = flag.String("U", "", "Username.")
var password = flag.String("P", "", "Password.")
var authMode = flag.String("A", "internal", "Authentication mode: internal | external")
var clientPolicy *as.ClientPolicy
var client *as.Client
var useReplicas = flag.Bool("use-replicas", false, "Aerospike will use replicas as well as master partitions.")
var debug = flag.Bool("debug", false, "Will set the logging level to DEBUG.")

var namespace = flag.String("n", "test", "Namespace")
var (
host = flag.String("h", "127.0.0.1", "Aerospike server seed hostnames or IP addresses")
port = flag.Int("p", 3000, "Aerospike server seed hostname or IP address port number.")
user = flag.String("U", "", "Username.")
password = flag.String("P", "", "Password.")
authMode = flag.String("A", "internal", "Authentication mode: internal | external")
useReplicas = flag.Bool("use-replicas", false, "Aerospike will use replicas as well as master partitions.")
debug = flag.Bool("debug", false, "Will set the logging level to DEBUG.")
namespace = flag.String("n", "test", "Namespace")

clientPolicy *as.ClientPolicy
client *as.Client
)

func initTestVars() {
var buf bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion auth_mode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 Aerospike, Inc.
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
106 changes: 106 additions & 0 deletions batch_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2013-2020 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aerospike

import "time"

type batcher interface {
command

cloneBatchCommand(batch *batchNode) batcher
filteredOut() int

retryBatch(ifc batcher, cluster *Cluster, deadline time.Time, iteration, commandSentCounter int) (bool, error)
generateBatchNodes(*Cluster) ([]*batchNode, error)
setSequence(int, int)
}

type batchCommand struct {
baseMultiCommand

batch *batchNode
policy *BatchPolicy
sequenceAP int
sequenceSC int

filteredOutCnt int
}

func (cmd *batchCommand) prepareRetry(ifc command, isTimeout bool) bool {
if !(cmd.policy.ReplicaPolicy == SEQUENCE || cmd.policy.ReplicaPolicy == PREFER_RACK) {
// Perform regular retry to same node.
return true
}

cmd.sequenceAP++

if !isTimeout || cmd.policy.ReadModeSC != ReadModeSCLinearize {
cmd.sequenceSC++
}
return false
}

func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, deadline time.Time, iteration, commandSentCounter int) (bool, error) {
// Retry requires keys for this node to be split among other nodes.
// This is both recursive and exponential.
batchNodes, err := ifc.generateBatchNodes(cluster)
if err != nil {
return false, err
}

if len(batchNodes) == 1 && batchNodes[0].Node == cmd.batch.Node {
// Batch node is the same. Go through normal retry.
return false, nil
}

// Run batch requests sequentially in same thread.
for _, batchNode := range batchNodes {
command := ifc.cloneBatchCommand(batchNode)
command.setSequence(cmd.sequenceAP, cmd.sequenceSC)
if err := command.executeAt(command, cmd.policy.GetBasePolicy(), true, deadline, iteration, commandSentCounter); err != nil {
return false, err
}
}

return true, nil
}

func (cmd *batchCommand) setSequence(ap, sc int) {
cmd.sequenceAP, cmd.sequenceSC = ap, sc
}

func (cmd *batchCommand) getPolicy(ifc command) Policy {
return cmd.policy
}

func (cmd *batchCommand) Execute() error {
return cmd.execute(cmd, true)
}

func (cmd *batchCommand) filteredOut() int {
return cmd.filteredOutCnt
}

func (cmd *batchCommand) generateBatchNodes(cluster *Cluster) ([]*batchNode, error) {
panic("Unreachable")
}

func (cmd *batchCommand) cloneBatchCommand(batch *batchNode) batcher {
panic("Unreachable")
}

func (cmd *batchCommand) writeBuffer(ifc command) error {
panic("Unreachable")
}
Loading

0 comments on commit 1679be2

Please sign in to comment.