Skip to content

QueuedThreadPool will not reserve a thread with jobs waiting #13208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -259,7 +260,7 @@ public void onDataAvailable(Stream stream)

// Handler.handle() should have returned, make sure we block that thread.
long delaySeconds = 10;
await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1);
await().atMost(5, SECONDS).until(serverThreads::getIdleThreads, Matchers.equalTo(1));
CountDownLatch serverBlockLatch = new CountDownLatch(1);
serverThreads.execute(() ->
{
Expand All @@ -276,7 +277,7 @@ public void onDataAvailable(Stream stream)
if (serverThreads.getCurrentReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
await().atMost(5, SECONDS).until(() -> serverThreads.getCurrentReservedThreads() == 1);
await().atMost(5, SECONDS).until(serverThreads::getCurrentReservedThreads, Matchers.equalTo(1));
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch reservedBlockLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,8 @@ public void onHeaders(Stream stream, HeadersFrame frame)
@Test
public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception
{
// This test is fragile with regards to reserved thread strategies.

long idleTimeout = 750;
// Use a small thread pool to cause request queueing.
QueuedThreadPool serverExecutor = new QueuedThreadPool(5);
Expand Down Expand Up @@ -755,9 +757,9 @@ public void onHeaders(Stream stream, HeadersFrame frame)

// Wait for the server to finish serving requests.
await().atMost(5, TimeUnit.SECONDS).until(handled::get, is(0));
assertThat(requests.get(), is(count - 1));
assertThat(requests.get(), is(count));

// The first 2 requests are handled normally and responded with 200, the last 2 are
// The first 3 requests are handled normally and responded with 200, the last one is
// not handled due to timeout while queued, but they are responded anyway with a 500.
await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count + 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testDump() throws Exception
String dump = _server.dump();
assertThat(dump, containsString("oejs.Server@"));
assertThat(dump, containsString("oejut.QueuedThreadPool"));
assertThat(dump, containsString("+= oejut.ReservedThreadExecutor@"));
assertThat(dump, containsString("+= oejut.QueuedThreadPool$ReservedThreadExecutor@"));
assertThat(dump, containsString("oeji.ArrayByteBufferPool@"));
assertThat(dump, containsString("+- System Properties size="));
assertThat(dump, containsString("+> java.home: "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ protected void doStart() throws Exception
}
else
{
ReservedThreadExecutor rte = new ReservedThreadExecutor(this, _reservedThreads);
ReservedThreadExecutor rte = newReservedThreadExecutor();
rte.setIdleTimeout(_idleTimeout, TimeUnit.MILLISECONDS);
_tryExecutor = rte;
}
Expand All @@ -244,6 +244,11 @@ protected void doStart() throws Exception
ensureThreads();
}

protected ReservedThreadExecutor newReservedThreadExecutor()
{
return new ReservedThreadExecutor();
}

@Override
protected void doStop() throws Exception
{
Expand All @@ -265,8 +270,10 @@ protected void doStop() throws Exception
{
// Fill the job queue with noop jobs to wakeup idle threads.
for (int i = 0; i < threads; ++i)
{
if (!jobs.offer(NOOP))
break;
}

// try to let jobs complete naturally for half our stop time
joinThreads(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
Expand Down Expand Up @@ -679,7 +686,8 @@ public int getReadyThreads()
@ManagedAttribute("number of threads leased for use by jetty components")
public int getLeasedThreads()
{
return getMaxLeasedThreads() - getMaxReservedThreads();
ThreadPoolBudget budget = _budget;
return budget == null ? 0 : budget.getLeasedThreads();
}

/**
Expand All @@ -694,8 +702,7 @@ public int getLeasedThreads()
@ManagedAttribute("maximum number of threads leased to internal components")
public int getMaxLeasedThreads()
{
ThreadPoolBudget budget = _budget;
return budget == null ? 0 : budget.getLeasedThreads();
return getLeasedThreads();
}

/**
Expand Down Expand Up @@ -723,7 +730,7 @@ public int getIdleThreads()
@ManagedAttribute("number of threads executing internal and unleased jobs")
public int getBusyThreads()
{
return getThreads() - getReadyThreads();
return Math.max(0, getThreads() - getIdleThreads());
}

/**
Expand All @@ -735,7 +742,7 @@ public int getBusyThreads()
@ManagedAttribute("number of threads executing unleased jobs")
public int getUtilizedThreads()
{
return getThreads() - getLeasedThreads() - getReadyThreads();
return Math.max(0, getThreads() - getIdleThreads() - getLeasedThreads() + getReservedThreads() - getCurrentReservedThreads());
}

/**
Expand All @@ -745,8 +752,7 @@ public int getUtilizedThreads()
* @deprecated The term available can be applied to either idle or reserved threads, but not both.
* Use {@link #getCurrentReservedThreads()} or {@link #getIdleThreads()} instead.
*/
@ManagedAttribute("maximum number of threads available to run unleased jobs (deprecated")
@Deprecated(forRemoval = true, since = "12.1.0")
@ManagedAttribute("maximum number of threads available to run unleased jobs")
public int getMaxAvailableThreads()
{
return getMaxThreads() - getLeasedThreads();
Expand All @@ -764,7 +770,10 @@ public int getMaxAvailableThreads()
@ManagedAttribute("utilization rate of threads executing unleased jobs")
public double getUtilizationRate()
{
return (double)getUtilizedThreads() / (getMaxThreads() - getLeasedThreads());
int maxAvailableThreads = getMaxAvailableThreads();
if (maxAvailableThreads <= 0)
return 0.0D;
return (double)getUtilizedThreads() / maxAvailableThreads;
}

/**
Expand Down Expand Up @@ -1135,7 +1144,7 @@ public String toString()
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();

return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,t=%dms,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d/%d,t=%dms,q=%d}[%s]",
TypeUtil.toShortName(getClass()),
_name,
hashCode(),
Expand All @@ -1144,6 +1153,7 @@ public String toString()
threads,
getMaxThreads(),
idle,
getCurrentReservedThreads(),
getReservedThreads(),
NanoTime.millisUntil(_evictThreshold.get()),
queue,
Expand Down Expand Up @@ -1239,4 +1249,43 @@ private void doRunJob(Runnable job)
}
}
}

/**
* A ReservedThreadExecutor that will not reserve a thread if the QTP has jobs waiting
*/
protected class ReservedThreadExecutor extends org.eclipse.jetty.util.thread.ReservedThreadExecutor
{
public ReservedThreadExecutor()
{
super(QueuedThreadPool.this, QueuedThreadPool.this._reservedThreads);
}

public ReservedThreadExecutor(Executor executor, int capacity, int minSize, int maxPending)
{
super(executor, capacity, minSize, maxPending);
}

@Override
protected void startReservedThread()
{
// If we have more jobs queued than the number of pending reserved threads,
// then we have some real jobs in the queue.
// So we will not start a reserved thread as it may stop a real job from being executed.
if (isTaskWaiting())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am borderline -1 on this change.

This fundamentally changes Jetty's behavior from EXECUTE_PRODUCE_CONSUME to PRODUCE_EXECUTE_CONSUME when it's under enough load that tasks start getting queued.

In my understanding, that is supposed to help lower max latency at the expense of higher average latency. This sounds reasonable, but how much effect does that really have? And is that a good thing? Wouldn't that have an impact on throughput which could lead to a snowball effect?

Plus, this is fundamentally racy if the number of threads has been tuned to be just enough to handle the load: in such case, spawning a new reserved thread becomes totally random and you still have the issue of a reserved thread idling out while there is a job waiting in the queue depending on the exact timing of this test vs the queuing of the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lorban I understand your reluctance, so I've moved the javadoc changes to a different PR for 12.1.x and we can take our time on this one.

So here is my thinking:

  • Reserving a thread is always an optional optimisation. We can run with maxReservedThreads (as it should be called) of 0.
  • When running in a constrained environment of limited threads, then reserving a thread is less important than getting real work done.

Perhaps this should only be conditional on us reaching maxThreads. Prior to that it is better to start a thread to reserve it, than to avoid doing so. However, I doubt we will have threads queued (at least not for long) if we are at max threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm aligned with your view of the importance this optimization could have.

My fear is that a system that needs all its threads to serve a load with the optimized path might not be able to serve that same load with the unoptimized path, and would enter a spiral of death by switching to a slower code path when it reaches a certain load.

Maybe I'm over-worrying, but I think I won't be able to find peace of mind without seeing the results of some limit tests showing how QTP+AdaptiveExecutionStrategy behave when they have to handle a load too big for the thread pool for a short duration: do they temporarily degrade performance or do they collapse? Can they recover once the load goes down again or did they go to the sorry place, never coming back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for more perf/stress testing.... if only we knew somebody to do that :)

I was interesting that one of the tests that needed to be adjusted was checking how timeouts worked if jobs were queued in the QTP. The test failed because with this change, we could handle one more request than without it, as instead of being reserved a thread took one more job off the queue to execute. That felt like a good change to me.

return;
super.startReservedThread();
}

@Override
protected boolean isTaskWaiting()
{
long counts = _counts.get();
int threads = Math.max(0, AtomicBiInteger.getHi(counts));
if (threads < getMaxThreads())
return false;
int queueSize = Math.max(0, -AtomicBiInteger.getLo(counts));
int pending = getPending();
return pending >= 0 && queueSize > pending;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ public int getAvailable()
return _threads.size();
}

@ManagedAttribute(value = "pending reserved threads (deprecated)", readonly = true)
@Deprecated
@ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending()
{
return 0;
return _pending.getOpaque();
}

@ManagedAttribute(value = "idle timeout in ms", readonly = true)
Expand Down Expand Up @@ -225,9 +224,9 @@ public boolean tryExecute(Runnable task)
return false;
}

private void startReservedThread()
protected void startReservedThread()
{
if (_maxPending > 0 && _pending.incrementAndGet() >= _maxPending)
if (_pending.incrementAndGet() > _maxPending && _maxPending > 0)
{
_pending.decrementAndGet();
return;
Expand Down Expand Up @@ -273,7 +272,8 @@ public void run()
_thread = Thread.currentThread();
try
{
_pending.decrementAndGet();
if (_pending.decrementAndGet() < 0)
System.err.println("XXXXXXXXXXXXXXXXX " + this);

while (true)
{
Expand Down Expand Up @@ -321,7 +321,10 @@ public void run()
// clear interrupted status between reserved thread iterations
if (Thread.interrupted() && LOG.isDebugEnabled())
LOG.debug("{} was interrupted", _thread);

}
if (isTaskWaiting())
return;
}
}
catch (Throwable t)
Expand Down Expand Up @@ -386,4 +389,14 @@ public String toString()
_thread);
}
}

/**
* Test if there is a better task for a thread to do other than to become a reserved thread?
*
* @return true if a thread could be used to do something better than be reserved.
*/
protected boolean isTaskWaiting()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,10 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s@%x{capacity=%d}",
return String.format("%s@%x{size=%d/%d}",
TypeUtil.toShortName(getClass()),
hashCode(),
size(),
capacity());
}
}
Loading