Skip to content

Commit

Permalink
Merge pull request #471 from aerospike/v7_stage
Browse files Browse the repository at this point in the history
Go Client v7.9.0
  • Loading branch information
khaf authored Feb 26, 2025
2 parents aa8b342 + 5327cca commit f573d58
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 60 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Change History


## February 26 2025: v7.9.0

- **New Features**
- [CLIENT-3334] Support old PHP7 client encoded boolean and null values.

- **Fixes**
- [CLIENT-3348] Parsing error during node rack update.

## December 19 2024: v7.8.0

Minor fix release.
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var _ = gg.Describe("Aerospike", func() {
c, err := as.NewClientWithPolicyAndHost(&cpolicy, dbHost)
gm.Expect(err).NotTo(gm.HaveOccurred())

info := info(c, "racks:")
info := info(c, "rack-ids")
if strings.HasPrefix(strings.ToUpper(info), "ERROR") {
gg.Skip("Skipping RackAware test since it is not supported on this cluster...")
}
Expand Down
66 changes: 19 additions & 47 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package aerospike

import (
"bufio"
"errors"
"io"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -143,7 +141,7 @@ func (nd *Node) Refresh(peers *peers) Error {
var infoMap map[string]string
commands := []string{"node", "peers-generation", "partition-generation"}
if nd.cluster.clientPolicy.RackAware {
commands = append(commands, "racks:")
commands = append(commands, "rack-ids")
}

infoMap, err := nd.RequestInfo(&nd.cluster.infoPolicy, commands...)
Expand Down Expand Up @@ -174,7 +172,7 @@ func (nd *Node) Refresh(peers *peers) Error {
return err
}
// Should not fail in other cases
logger.Logger.Warn("Updating node rack info failed with error: %s (racks: `%s`)", err, infoMap["racks:"])
logger.Logger.Warn("Updating node rack info failed with error: %s (rack-ids: `%s`)", err, infoMap["rack-ids"])
}

nd.failures.Set(0)
Expand Down Expand Up @@ -228,57 +226,31 @@ func (nd *Node) updateRackInfo(infoMap map[string]string) Error {
return nil
}

// Do not raise an error if the server does not support rackaware
if strings.HasPrefix(strings.ToUpper(infoMap["racks:"]), "ERROR") {
// Receive format: <ns1>:<rack1>;<ns2>:<rack2>...
rackIds := infoMap["rack-ids"]

// Do not panic if the server does not support rackaware
if len(rackIds) == 0 || strings.HasPrefix(strings.ToUpper(rackIds), "ERROR") {
return newError(types.UNSUPPORTED_FEATURE, "You have set the ClientPolicy.RackAware = true, but the server does not support this feature.")
}

ss := strings.Split(infoMap["racks:"], ";")
racks := map[string]int{}
for _, s := range ss {
in := bufio.NewReader(strings.NewReader(s))
_, err := in.ReadString('=')
if err != nil {
return newErrorAndWrap(err, types.PARSE_ERROR)
rackPairs := strings.Split(rackIds, ";")
racks := make(map[string]int)
for i := range rackPairs {
// split at the last occurrence of `:` to ensure rack will be an integer
lastIdx := strings.LastIndex(rackPairs[i], ":")
if lastIdx < 0 {
return newError(types.PARSE_ERROR, "invalid rack value `%s`", rackPairs[i])
}

ns, err := in.ReadString(':')
ns := rackPairs[i][:lastIdx]
rack := rackPairs[i][lastIdx+1:]

rackNo, err := strconv.Atoi(rack)
if err != nil {
return newErrorAndWrap(err, types.PARSE_ERROR)
}

for {
_, err = in.ReadString('_')
if err != nil {
return newErrorAndWrap(err, types.PARSE_ERROR)
}

rackStr, err := in.ReadString('=')
if err != nil {
return newErrorAndWrap(err, types.PARSE_ERROR)
}

rack, err := strconv.Atoi(rackStr[:len(rackStr)-1])
if err != nil {
return newErrorAndWrap(err, types.PARSE_ERROR)
}

nodesList, err := in.ReadString(':')
if err != nil && err != io.EOF {
return newErrorAndWrap(err, types.PARSE_ERROR)
}

nodes := strings.Split(strings.Trim(nodesList, ":"), ",")
for i := range nodes {
if nodes[i] == nd.name {
racks[ns[:len(ns)-1]] = rack
}
}

if err == io.EOF {
break
}
}
racks[ns] = rackNo
}

nd.racks.Set(racks)
Expand Down
25 changes: 13 additions & 12 deletions types/particle_type/particle_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ package particleType
// Server particle types. Unsupported types are commented out.
const (
//revive:disable
NULL = 0
INTEGER = 1
FLOAT = 2
STRING = 3
BLOB = 4
DIGEST = 6
BOOL = 17
HLL = 18
MAP = 19
LIST = 20
LDT = 21
GEOJSON = 23
NULL = 0
INTEGER = 1
FLOAT = 2
STRING = 3
BLOB = 4
DIGEST = 6
PHP_BLOB = 11 // Had to reintroduce to support the old PHP7 client
BOOL = 17
HLL = 18
MAP = 19
LIST = 20
LDT = 21
GEOJSON = 23
//revive:enable
)
24 changes: 24 additions & 0 deletions unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aerospike

import (
"bytes"
"fmt"
"math"
"reflect"
Expand Down Expand Up @@ -232,6 +233,29 @@ func (upckr *unpacker) unpackBlob(count int, isMapKey bool) (interface{}, Error)
case ParticleType.GEOJSON:
val = NewGeoJSONValue(string(upckr.buffer[upckr.offset : upckr.offset+count]))

case ParticleType.PHP_BLOB:
PHP_BLOB_PARSER:
switch count {
case 4:
if bytes.Equal(upckr.buffer[upckr.offset:upckr.offset+count], []byte{0x62, 0x3A, 0x31, 0x3B}) {
val = true
break PHP_BLOB_PARSER
} else if bytes.Equal(upckr.buffer[upckr.offset:upckr.offset+count], []byte{0x62, 0x3A, 0x30, 0x3B}) {
val = false
break PHP_BLOB_PARSER
}
fallthrough
case 2:
if bytes.Equal(upckr.buffer[upckr.offset:upckr.offset+count], []byte{0x4E, 0x3B}) {
val = nil
break PHP_BLOB_PARSER
}
fallthrough
default:
newObj := make([]byte, count)
copy(newObj, upckr.buffer[upckr.offset:upckr.offset+count])
val = newObj
}
default:
return nil, newError(types.PARSE_ERROR, fmt.Sprintf("Error while unpacking BLOB. Type-header with code `%d` not recognized.", theType))
}
Expand Down
17 changes: 17 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aerospike

import (
"bytes"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -1247,6 +1248,22 @@ func bytesToParticle(ptype int, buf []byte, offset int, length int) (interface{}
case ParticleType.LDT:
return newUnpacker(buf, offset, length).unpackObjects()

case ParticleType.PHP_BLOB:
if length == 4 {
if bytes.Equal(buf[offset:offset+length], []byte{0x62, 0x3A, 0x31, 0x3B}) {
return true, nil
} else if bytes.Equal(buf[offset:offset+length], []byte{0x62, 0x3A, 0x30, 0x3B}) {
return false, nil
}
} else if length == 2 {
if bytes.Equal(buf[offset:offset+length], []byte{0x4E, 0x3B}) {
return nil, nil
}
}
// generic PHP_BLOB
newObj := make([]byte, length)
copy(newObj, buf[offset:offset+length])
return newObj, nil
}
return nil, nil
}
Expand Down

0 comments on commit f573d58

Please sign in to comment.