Skip to content

Commit

Permalink
Avoid race condition in chaining prefined errors
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed May 27, 2021
1 parent abbe778 commit 4e69b10
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
* **Fixes**

- Handle lack of key digests in `BatchExists` command.
- Allow and handle nil arguments for `chainError`
- Allow and handle nil arguments for `chainError`.
- Avoid race condition in chaining prefined errors.

## May 10 2021: v5.0.0
This is a major feature release. It is also a major breaking release. We have adopted Go's module system as recommended by the Go authors, so the new release moves the active branch to `v5`.
Expand Down
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, Error
cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if filteredOut > 0 {
err = chainErrors(ErrFilteredOut, err)
err = chainErrors(ErrFilteredOut.err(), err)
}

if err != nil {
Expand Down Expand Up @@ -398,7 +398,7 @@ func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...strin
}

if filteredOut > 0 {
err = chainErrors(ErrFilteredOut, err)
err = chainErrors(ErrFilteredOut.err(), err)
}

return records, err
Expand Down Expand Up @@ -426,7 +426,7 @@ func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) E
}

if filteredOut > 0 {
err = chainErrors(ErrFilteredOut, err)
err = chainErrors(ErrFilteredOut.err(), err)
}

return err
Expand Down Expand Up @@ -456,7 +456,7 @@ func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record,
}

if filteredOut > 0 {
err = chainErrors(ErrFilteredOut, err)
err = chainErrors(ErrFilteredOut.err(), err)
}

return records, err
Expand Down
4 changes: 2 additions & 2 deletions client_reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (clnt *Client) BatchGetObjects(policy *BatchPolicy, keys []*Key, objects []
}

if filteredOut > 0 {
err = chainErrors(ErrFilteredOut, err)
err = chainErrors(ErrFilteredOut.err(), err)
}

return objectsFound, err
Expand Down Expand Up @@ -207,7 +207,7 @@ func (clnt *Client) scanNodeObjects(policy *ScanPolicy, node *Node, recordset *R
// If the policy is nil, the default relevant policy will be used.
func (clnt *Client) QueryPartitionObjects(policy *QueryPolicy, statement *Statement, objChan interface{}, partitionFilter *PartitionFilter) (*Recordset, Error) {
if statement.Filter != nil {
return nil, ErrPartitionScanQueryNotSupported
return nil, ErrPartitionScanQueryNotSupported.err()
}

policy = clnt.getUsableQueryPolicy(policy)
Expand Down
4 changes: 2 additions & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool,

// too many retries
if (policy.MaxRetries <= 0 && iterations > 0) || (policy.MaxRetries > 0 && iterations > policy.MaxRetries) {
return chainErrors(ErrMaxRetriesExceeded, errChain).iter(iterations).setInDoubt(isRead, commandSentCounter)
return chainErrors(ErrMaxRetriesExceeded.err(), errChain).iter(iterations).setInDoubt(isRead, commandSentCounter)
}

// Sleep before trying again, after the first iteration
Expand Down Expand Up @@ -2176,7 +2176,7 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool,
}

// execution timeout
errChain = chainErrors(ErrTimeout, errChain).iter(iterations)
errChain = chainErrors(ErrTimeout.err(), errChain).iter(iterations)
return errChain
}

Expand Down
2 changes: 0 additions & 2 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ func (ase *AerospikeError) wrap(err error) Error {
return ase
}

// chain wraps an error inside a new error. The new error cannot be nil.
// if the old error is nil, the new error will be returned.
func (ase *AerospikeError) iter(i int) Error {
if ase == nil {
return nil
Expand Down
8 changes: 4 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,22 +425,22 @@ func (nd *Node) getConnection(deadline time.Time, timeout time.Duration) (conn *
// This is more or less a copy of the logic in the beginning of newConnection function.
func (nd *Node) newConnectionAllowed() Error {
if !nd.active.Get() {
return ErrServerNotAvailable
return ErrServerNotAvailable.err()
}

// if connection count is limited and enough connections are already created, don't create a new one
cc := nd.connectionCount.IncrementAndGet()
defer nd.connectionCount.DecrementAndGet()
if nd.cluster.clientPolicy.LimitConnectionsToQueueSize && cc > nd.cluster.clientPolicy.ConnectionQueueSize {
return ErrTooManyConnectionsForNode
return ErrTooManyConnectionsForNode.err()
}

// Check for opening connection threshold
if nd.cluster.clientPolicy.OpeningConnectionThreshold > 0 {
ct := nd.cluster.connectionThreshold.IncrementAndGet()
defer nd.cluster.connectionThreshold.DecrementAndGet()
if ct > nd.cluster.clientPolicy.OpeningConnectionThreshold {
return ErrTooManyOpeningConnections
return ErrTooManyOpeningConnections.err()
}
}

Expand Down Expand Up @@ -537,7 +537,7 @@ func (nd *Node) getConnectionWithHint(deadline time.Time, timeout time.Duration,
if err = nd.newConnectionAllowed(); err == nil {
go nd.makeConnectionForPool(hint)
}
return nil, ErrConnectionPoolEmpty
return nil, ErrConnectionPoolEmpty.err()
}

if err = conn.SetTimeout(deadline, timeout); err != nil {
Expand Down

0 comments on commit 4e69b10

Please sign in to comment.