Skip to content

Commit

Permalink
intermediate statistic feature fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Aug 22, 2016
1 parent 781baaa commit ddef41c
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public abstract class LoadExecutorBase<T extends Item>
counterSubm = new AtomicLong(0),
countRej = new AtomicLong(0),
counterResults = new AtomicLong(0);
private final AtomicInteger currConcurrencyLevel = new AtomicInteger(0);
private final AtomicInteger currConcurrency = new AtomicInteger(0);
protected T lastItem;
protected final Object state = new Object();
protected final ItemBuffer<T> itemOutBuff;
Expand Down Expand Up @@ -360,13 +360,12 @@ protected IoStats createIntermediateStats() {
//
@Override
public boolean isFullThrottleEntered() {
return totalThreadCount - currConcurrencyLevel.get() < totalThreadCount * fullLoadThreshold;
return currConcurrency.get() > totalThreadCount * fullLoadThreshold;
}
//
@Override
public boolean isFullThrottleExited() {
return isShutdown.get() &&
totalThreadCount - currConcurrencyLevel.get() > totalThreadCount * fullLoadThreshold;
return isShutdown.get() && currConcurrency.get() < totalThreadCount * fullLoadThreshold;
}
////////////////////////////////////////////////////////////////////////////////////////////////
protected LoadExecutorBase(
Expand Down Expand Up @@ -813,7 +812,7 @@ protected void logTrace(
}
//
protected final void incrementBusyThreadCount() {
currConcurrencyLevel.incrementAndGet();
currConcurrency.incrementAndGet();
}
//
@Override
Expand Down Expand Up @@ -843,7 +842,7 @@ public void ioTaskCompleted(final IoTask<T> ioTask)
if(nodeBalancer != null) {
nodeBalancer.markTaskFinish(nodeAddr);
}
currConcurrencyLevel.decrementAndGet();
currConcurrency.decrementAndGet();
//
if(IoTask.Status.SUCC == status) {
// update the metrics with success
Expand Down Expand Up @@ -894,7 +893,7 @@ public int ioTaskCompletedBatch(
//
final int n = to - from;
if(n > 0) {
currConcurrencyLevel.addAndGet(-n);
currConcurrency.addAndGet(-n);
if(storageNodeAddrs != null) {
final String nodeAddr = ioTasks.get(from).getNodeAddr();
nodeBalancer.markTasksFinish(nodeAddr, n);
Expand Down Expand Up @@ -970,7 +969,7 @@ public int ioTaskCompletedBatch(
//
protected void ioTaskCancelled(final int n) {
LOG.debug(Markers.MSG, "{}: I/O task canceled", hashCode());
currConcurrencyLevel.decrementAndGet();
currConcurrency.decrementAndGet();
ioStats.markFail(n);
if(medIoStats != null && medIoStats.isStarted()) {
medIoStats.markFail(n);
Expand All @@ -979,7 +978,7 @@ protected void ioTaskCancelled(final int n) {
}
//
protected void ioTaskFailed(final int n, final Throwable e) {
currConcurrencyLevel.decrementAndGet();
currConcurrency.decrementAndGet();
ioStats.markFail(n);
if(medIoStats != null && medIoStats.isStarted()) {
medIoStats.markFail();
Expand Down

0 comments on commit ddef41c

Please sign in to comment.