Skip to content

Commit

Permalink
Fixed invalid connection handling on node connections
Browse files Browse the repository at this point in the history
  • Loading branch information
rndive committed May 17, 2015
1 parent 604afc9 commit f846d7a
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 23 deletions.
7 changes: 4 additions & 3 deletions admin_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ func (acmd *AdminCommand) executeCommand(cluster *Cluster, policy *AdminPolicy)
}

if _, err := conn.Write(acmd.dataBuffer[:acmd.dataOffset]); err != nil {
conn.Close()
node.InvalidateConnection(conn)
return err
}

if _, err := conn.Read(acmd.dataBuffer, _HEADER_SIZE); err != nil {
conn.Close()
node.InvalidateConnection(conn)
return err
}

Expand Down Expand Up @@ -280,12 +280,13 @@ func (acmd *AdminCommand) readUsers(cluster *Cluster, policy *AdminPolicy) ([]*U
}

if _, err := conn.Write(acmd.dataBuffer[:acmd.dataOffset]); err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}

status, list, err := acmd.readUserBlocks(conn)
if err != nil {
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)
Expand Down
15 changes: 7 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,10 @@ func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath

responseMap, err := RequestInfo(conn, strCmd.String())
if err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)

var response string
for _, v := range responseMap {
Expand All @@ -622,8 +623,6 @@ func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath
return nil, NewAerospikeError(COMMAND_REJECTED, fmt.Sprintf("Registration failed: %s\nFile: %s\nLine: %s\nMessage: %s",
res["error"], res["file"], res["line"], res["message"]))
}

node.PutConnection(conn)
return NewRegisterTask(clnt.cluster, serverPath), nil
}

Expand Down Expand Up @@ -656,9 +655,10 @@ func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask,

responseMap, err := RequestInfo(conn, strCmd.String())
if err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)

var response string
for _, v := range responseMap {
Expand Down Expand Up @@ -697,9 +697,10 @@ func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) {

responseMap, err := RequestInfo(conn, strCmd.String())
if err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)

var response string
for _, v := range responseMap {
Expand Down Expand Up @@ -734,8 +735,6 @@ func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) {
res = append(res, udf)
}

node.PutConnection(conn)

return res, nil
}

Expand Down Expand Up @@ -1120,7 +1119,7 @@ func (clnt *Client) sendInfoCommand(policy *WritePolicy, command string) (map[st

info, err := newInfo(conn, command)
if err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)
Expand Down
6 changes: 3 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ func (cmd *baseCommand) execute(ifc command) (err error) {
if err != nil {
// All runtime exceptions are considered fatal. Do not retry.
// Close socket to flush out possible garbage. Do not put back in pool.
cmd.conn.Close()
node.InvalidateConnection(cmd.conn)
return err
}

Expand All @@ -798,7 +798,7 @@ func (cmd *baseCommand) execute(ifc command) (err error) {
if err != nil {
// IO errors are considered temporary anomalies. Retry.
// Close socket to flush out possible garbage. Do not put back in pool.
cmd.conn.Close()
node.InvalidateConnection(cmd.conn)

Logger.Warn("Node " + node.String() + ": " + err.Error())
// IO error means connection to server node is unhealthy.
Expand All @@ -814,7 +814,7 @@ func (cmd *baseCommand) execute(ifc command) (err error) {
// cancelling/closing the batch/multi commands will return an error, which will
// close the connection to throw away its data and signal the server about the
// situation. We will not put back the connection in the buffer.
cmd.conn.Close()
node.InvalidateConnection(cmd.conn)
return err
}

Expand Down
1 change: 1 addition & 0 deletions execute_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (etsk *ExecuteTask) IsDone() (bool, error) {
}
responseMap, err := RequestInfo(conn, command)
if err != nil {
node.InvalidateConnection(conn)
return false, err
}

Expand Down
2 changes: 1 addition & 1 deletion info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func RequestNodeInfo(node *Node, name ...string) (map[string]string, error) {

response, err := RequestInfo(conn, name...)
if err != nil {
conn.Close()
node.InvalidateConnection(conn)
return nil, err
}
node.PutConnection(conn)
Expand Down
16 changes: 12 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,25 @@ func (nd *Node) Refresh() ([]*Host, error) {

infoMap, err := RequestInfo(conn, "node", "partition-generation", "services")
if err != nil {
conn.Close()
nd.InvalidateConnection(conn)
nd.DecreaseHealth()
return nil, err
}

if err := nd.verifyNodeName(infoMap); err != nil {
nd.PutConnection(conn)
return nil, err
}
nd.RestoreHealth()
nd.responded.Set(true)

if friends, err = nd.addFriends(infoMap); err != nil {
nd.PutConnection(conn)
return nil, err
}

if err := nd.updatePartitions(conn, infoMap); err != nil {
nd.InvalidateConnection(conn)
return nil, err
}

Expand Down Expand Up @@ -212,7 +215,7 @@ L:
return conn, nil
}
}
conn.Close()
nd.InvalidateConnection(conn)
}

// if connection count is limited and enough connections are already created, don't create a new one
Expand Down Expand Up @@ -260,11 +263,16 @@ L:
func (nd *Node) PutConnection(conn *Connection) {
conn.refresh()
if !nd.active.Get() || !nd.connections.Offer(conn) {
nd.connectionCount.DecrementAndGet()
conn.Close()
nd.InvalidateConnection(conn)
}
}

// InvalidateConnection closes and discards a connection from the pool.
func (nd *Node) InvalidateConnection(conn *Connection) {
nd.connectionCount.DecrementAndGet()
conn.Close()
}

// RestoreHealth marks the node as healthy.
func (nd *Node) RestoreHealth() {
// There can be cases where health is full, but active is false.
Expand Down
6 changes: 3 additions & 3 deletions node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("Aerospike", func() {
Expect(c).NotTo(BeNil())
Expect(c.IsConnected()).To(BeTrue())

c.Close()
node.InvalidateConnection(c)
}

})
Expand Down Expand Up @@ -104,7 +104,7 @@ var _ = Describe("Aerospike", func() {
Expect(c).NotTo(BeNil())
Expect(c.IsConnected()).To(BeTrue())

c.Close()
node.InvalidateConnection(c)
}

for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -260,7 +260,7 @@ var _ = Describe("Aerospike", func() {
c3, err := node.GetConnection(0)
Expect(err).NotTo(HaveOccurred())
Expect(c3).NotTo(BeNil())
defer c3.Close()
defer node.InvalidateConnection(c3)
Expect(c3).ToNot(Equal(c))
Expect(c3.IsConnected()).To(BeTrue())

Expand Down
2 changes: 1 addition & 1 deletion security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var _ = Describe("Security tests", func() {
for i := 0; i < client_policy.ConnectionQueueSize; i++ {
conn, err := node.GetConnection(time.Second)
Expect(err).ToNot(HaveOccurred())
conn.Close()
node.InvalidateConnection(conn)
}
}

Expand Down
2 changes: 2 additions & 0 deletions tools/asinfo/asinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ func main() {
log.Fatalln(err.Error())
} else {
if infoMap, err := as.RequestInfo(conn, strings.Trim(*value, " ")); err != nil {
node.InvalidateConnection(conn)
log.Fatalln(err.Error())
} else {
node.PutConnection(conn)
cnt := 1
for k, v := range infoMap {
log.Printf("%d : %s\n %s\n\n", cnt, k, v)
Expand Down

0 comments on commit f846d7a

Please sign in to comment.