Skip to content

Commit

Permalink
Fixed issue #58 regarding race condition accessing Cluster.password
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed May 15, 2015
1 parent e95115c commit 2e749f2
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 26 deletions.
12 changes: 9 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

## May 15 2015 : v1.5.1

Maintenance release.
Minor Improvements.

NOTICE: All LDTs on server other than LLIST have been deprecated, and will be removed in the future. As Such, all API regarding those features are considered deprecated and will be removed in the future.

* **Improvements**

* Introduces `ClientPolicy.IdleTimeout` to close stale connections to the server. Thanks to Mário Freitas (@imkira). PR #57

* Use type alias instead of struct for NullValue.

* Removed workaround regarding filtering bin names on the client for `BatchGet`.
* Removed workaround regarding filtering bin names on the client for `BatchGet`. Issue #60

* **Fixes**

* Fixed #58 regarding race condition accessing `Cluster.password`.

* Fixed minor bugs regarding handling of nulls in structs for `GetObj()` and `PutObj()`.

* Fixed a bug regarding setting TaskIds on the client.
Expand All @@ -20,7 +26,7 @@

* Removed deprecated `ReplaceRoles()` method.

* Removed deprecated `SetCapacity()` and `GetCapacity()` methods.
* Removed deprecated `SetCapacity()` and `GetCapacity()` methods for LDTs.

## April 13 2015 : v1.5.0

Expand Down
2 changes: 1 addition & 1 deletion admin_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (acmd *AdminCommand) setPassword(cluster *Cluster, policy *AdminPolicy, use
func (acmd *AdminCommand) changePassword(cluster *Cluster, policy *AdminPolicy, user string, password []byte) error {
acmd.writeHeader(_CHANGE_PASSWORD, 3)
acmd.writeFieldStr(_USER, user)
acmd.writeFieldBytes(_OLD_PASSWORD, cluster.password)
acmd.writeFieldBytes(_OLD_PASSWORD, cluster.Password())
acmd.writeFieldBytes(_PASSWORD, password)
return acmd.executeCommand(cluster, policy)
}
Expand Down
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,8 @@ func (clnt *Client) ChangePassword(policy *AdminPolicy, user string, password st
return err
}
}
clnt.cluster.changePassword(user, hash)

clnt.cluster.changePassword(user, password, hash)

return nil
}
Expand Down
23 changes: 21 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,28 @@ func (clstr *Cluster) WaitUntillMigrationIsFinished(timeout time.Duration) (err
}
}

func (clstr *Cluster) changePassword(user string, password []byte) {
func (clstr *Cluster) Password() (res []byte) {
clstr.mutex.RLock()
res = clstr.password
clstr.mutex.RUnlock()

return res
}

func (clstr *Cluster) changePassword(user string, password string, hash []byte) {
// change password ONLY if the user is the same
if clstr.user == user {
clstr.password = password
clstr.mutex.Lock()
clstr.clientPolicy.Password = password
clstr.password = hash
clstr.mutex.Unlock()
}
}

func (clstr *Cluster) ClientPolicy() (res ClientPolicy) {
clstr.mutex.RLock()
res = clstr.clientPolicy
clstr.mutex.RUnlock()

return res
}
25 changes: 25 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ func NewConnection(address string, timeout time.Duration) (*Connection, error) {
return newConn, nil
}

// NewConnectionWithPolicy creates a connection on the network and returns the pointer
// A minimum timeout of 2 seconds will always be applied.
// If the connection is not established in the specified timeout,
// an error will be returned.
func NewConnectionWithPolicy(address string, policy *ClientPolicy) (*Connection, error) {
newConn, err := NewConnection(address, policy.Timeout)
if err != nil {
return nil, err
}

password, err := hashPassword(policy.Password)
if err != nil {
return nil, err
}

if policy.User != "" {
newConn.Authenticate(policy.User, password)
}

newConn.setIdleTimeout(policy.IdleTimeout)
newConn.refresh()

return newConn, nil
}

// Write writes the slice to the connection buffer.
func (ctn *Connection) Write(buf []byte) (total int, err error) {
// make sure all bytes are written
Expand Down
20 changes: 2 additions & 18 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,27 +226,11 @@ L:
break L
}

if conn, err = NewConnection(nd.address, nd.cluster.clientPolicy.Timeout); err != nil {
cp := nd.cluster.ClientPolicy()
if conn, err = NewConnectionWithPolicy(nd.address, &cp); err != nil {
return nil, err
}

// need to authenticate
if err = conn.Authenticate(nd.cluster.user, nd.cluster.password); err != nil {
// Socket not authenticated. Do not put back into pool.
conn.Close()

return nil, err
}

if err = conn.SetTimeout(timeout); err != nil {
// Socket not authenticated. Do not put back into pool.
conn.Close()
return nil, err
}

conn.setIdleTimeout(nd.cluster.clientPolicy.IdleTimeout)
conn.refresh()

nd.connectionCount.IncrementAndGet()
return conn, nil
}
Expand Down
2 changes: 1 addition & 1 deletion node_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (ndv *nodeValidator) setAddress(timeout time.Duration) error {
defer conn.Close()

// need to authenticate
if err := conn.Authenticate(ndv.cluster.user, ndv.cluster.password); err != nil {
if err := conn.Authenticate(ndv.cluster.user, ndv.cluster.Password()); err != nil {
// Socket not authenticated. Do not put back into pool.
conn.Close()

Expand Down

0 comments on commit 2e749f2

Please sign in to comment.