Skip to content

Commit

Permalink
Refactor HttpClient synchronized sections for virtual threads (#476)
Browse files Browse the repository at this point in the history
- Replaced `synchronized` blocks with `ReentrantLock` in `LeaseRequest` to better support virtual threads introduced in JDK 21.
- Ensured each `LeaseRequest` instance has its own unique lock for maintaining original synchronization semantics.
- Addressed potential performance and deadlock issues with virtual threads by using explicit lock primitives from `java.util.concurrent.locks`.
  • Loading branch information
arturobernalg authored Aug 27, 2023
1 parent b80426c commit 5eea636
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
Expand All @@ -53,9 +54,12 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {

private final CacheMap entries;

private final ReentrantLock lock;

public BasicHttpCacheStorage(final CacheConfig config) {
super();
this.entries = new CacheMap(config.getMaxCacheEntries());
this.lock = new ReentrantLock();
}

/**
Expand All @@ -67,9 +71,14 @@ public BasicHttpCacheStorage(final CacheConfig config) {
* HttpCacheEntry to place in the cache
*/
@Override
public synchronized void putEntry(
public void putEntry(
final String url, final HttpCacheEntry entry) throws ResourceIOException {
entries.put(url, entry);
lock.lock();
try {
entries.put(url, entry);
} finally {
lock.unlock();
}
}

/**
Expand All @@ -80,8 +89,13 @@ public synchronized void putEntry(
* @return HttpCacheEntry if one exists, or null for cache miss
*/
@Override
public synchronized HttpCacheEntry getEntry(final String url) throws ResourceIOException {
return entries.get(url);
public HttpCacheEntry getEntry(final String url) throws ResourceIOException {
lock.lock();
try {
return entries.get(url);
} finally {
lock.unlock();
}
}

/**
Expand All @@ -91,15 +105,26 @@ public synchronized HttpCacheEntry getEntry(final String url) throws ResourceIOE
* Url that is the cache key
*/
@Override
public synchronized void removeEntry(final String url) throws ResourceIOException {
entries.remove(url);
public void removeEntry(final String url) throws ResourceIOException {
lock.lock();
try {
entries.remove(url);
} finally {
lock.unlock();
}

}

@Override
public synchronized void updateEntry(
public void updateEntry(
final String url, final HttpCacheCASOperation casOperation) throws ResourceIOException {
final HttpCacheEntry existingEntry = entries.get(url);
entries.put(url, casOperation.execute(existingEntry));
lock.lock();
try {
final HttpCacheEntry existingEntry = entries.get(url);
entries.put(url, casOperation.execute(existingEntry));
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.security.SecureRandom;
import java.util.Formatter;
import java.util.Locale;
import java.util.concurrent.locks.ReentrantLock;

/**
* Should produce reasonably unique tokens.
Expand All @@ -43,6 +44,8 @@ class BasicIdGenerator {

private long count;

private final ReentrantLock lock;

public BasicIdGenerator() {
super();
String hostname;
Expand All @@ -58,18 +61,24 @@ public BasicIdGenerator() {
throw new Error(ex);
}
this.rnd.setSeed(System.currentTimeMillis());
this.lock = new ReentrantLock();
}

public synchronized void generate(final StringBuilder buffer) {
this.count++;
final int rndnum = this.rnd.nextInt();
buffer.append(System.currentTimeMillis());
buffer.append('.');
try (Formatter formatter = new Formatter(buffer, Locale.ROOT)) {
formatter.format("%1$016x-%2$08x", this.count, rndnum);
public void generate(final StringBuilder buffer) {
lock.lock();
try {
this.count++;
final int rndnum = this.rnd.nextInt();
buffer.append(System.currentTimeMillis());
buffer.append('.');
try (Formatter formatter = new Formatter(buffer, Locale.ROOT)) {
formatter.format("%1$016x-%2$08x", this.count, rndnum);
}
buffer.append('.');
buffer.append(this.hostname);
} finally {
lock.unlock();
}
buffer.append('.');
buffer.append(this.hostname);
}

public String generate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
Expand All @@ -49,6 +50,8 @@
*/
class CacheRevalidatorBase implements Closeable {

private final ReentrantLock lock;

interface ScheduledExecutor {

Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
Expand Down Expand Up @@ -103,6 +106,7 @@ public CacheRevalidatorBase(
this.schedulingStrategy = schedulingStrategy;
this.pendingRequest = new HashSet<>();
this.failureCache = new ConcurrentCountMap<>();
this.lock = new ReentrantLock();
}

/**
Expand All @@ -119,7 +123,8 @@ public CacheRevalidatorBase(
* Schedules an asynchronous re-validation
*/
void scheduleRevalidation(final String cacheKey, final Runnable command) {
synchronized (pendingRequest) {
lock.lock();
try {
if (!pendingRequest.contains(cacheKey)) {
final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
Expand All @@ -130,6 +135,8 @@ void scheduleRevalidation(final String cacheKey, final Runnable command) {
LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
}
}
} finally {
lock.unlock();
}
}

Expand All @@ -145,21 +152,30 @@ public void awaitTermination(final Timeout timeout) throws InterruptedException

void jobSuccessful(final String identifier) {
failureCache.resetCount(identifier);
synchronized (pendingRequest) {
lock.lock();
try {
pendingRequest.remove(identifier);
} finally {
lock.unlock();
}
}

void jobFailed(final String identifier) {
failureCache.increaseCount(identifier);
synchronized (pendingRequest) {
lock.lock();
try {
pendingRequest.remove(identifier);
} finally {
lock.unlock();
}
}

Set<String> getScheduledIdentifiers() {
synchronized (pendingRequest) {
lock.lock();
try {
return new HashSet<>(pendingRequest);
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
Expand Down Expand Up @@ -82,12 +83,15 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
private final Set<ResourceReference> resources;
private final AtomicBoolean active;

private final ReentrantLock lock;

public ManagedHttpCacheStorage(final CacheConfig config) {
super();
this.entries = new CacheMap(config.getMaxCacheEntries());
this.morque = new ReferenceQueue<>();
this.resources = new HashSet<>();
this.active = new AtomicBoolean(true);
this.lock = new ReentrantLock();
}

private void ensureValidState() {
Expand All @@ -110,29 +114,38 @@ public void putEntry(final String url, final HttpCacheEntry entry) throws Resour
Args.notNull(url, "URL");
Args.notNull(entry, "Cache entry");
ensureValidState();
synchronized (this) {
lock.lock();
try {
this.entries.put(url, entry);
keepResourceReference(entry);
} finally {
lock.unlock();
}
}

@Override
public HttpCacheEntry getEntry(final String url) throws ResourceIOException {
Args.notNull(url, "URL");
ensureValidState();
synchronized (this) {
lock.lock();
try {
return this.entries.get(url);
} finally {
lock.unlock();
}
}

@Override
public void removeEntry(final String url) throws ResourceIOException {
Args.notNull(url, "URL");
ensureValidState();
synchronized (this) {
lock.lock();
try {
// Cannot deallocate the associated resources immediately as the
// cache entry may still be in use
this.entries.remove(url);
} finally {
lock.unlock();
}
}

Expand All @@ -143,13 +156,16 @@ public void updateEntry(
Args.notNull(url, "URL");
Args.notNull(casOperation, "CAS operation");
ensureValidState();
synchronized (this) {
lock.lock();
try {
final HttpCacheEntry existing = this.entries.get(url);
final HttpCacheEntry updated = casOperation.execute(existing);
this.entries.put(url, updated);
if (existing != updated) {
keepResourceReference(updated);
}
} finally {
lock.unlock();
}
}

Expand All @@ -170,8 +186,11 @@ public void cleanResources() {
if (isActive()) {
ResourceReference ref;
while ((ref = (ResourceReference) this.morque.poll()) != null) {
synchronized (this) {
lock.lock();
try {
this.resources.remove(ref);
} finally {
lock.unlock();
}
ref.getResource().dispose();
}
Expand All @@ -180,27 +199,33 @@ public void cleanResources() {

public void shutdown() {
if (compareAndSet()) {
synchronized (this) {
lock.lock();
try {
this.entries.clear();
for (final ResourceReference ref: this.resources) {
ref.getResource().dispose();
}
this.resources.clear();
while (this.morque.poll() != null) {
}
} finally {
lock.unlock();
}
}
}

@Override
public void close() {
if (compareAndSet()) {
synchronized (this) {
lock.lock();
try {
ResourceReference ref;
while ((ref = (ResourceReference) this.morque.poll()) != null) {
this.resources.remove(ref);
ref.getResource().dispose();
}
} finally {
lock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

package org.apache.hc.client5.http.impl.cookie;

import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.client5.http.cookie.CookieSpec;
import org.apache.hc.client5.http.cookie.CookieSpecFactory;
import org.apache.hc.core5.annotation.Contract;
Expand All @@ -43,17 +45,23 @@ public class IgnoreCookieSpecFactory implements CookieSpecFactory {

private volatile CookieSpec cookieSpec;

private final ReentrantLock lock;

public IgnoreCookieSpecFactory() {
super();
this.lock = new ReentrantLock();
}

@Override
public CookieSpec create(final HttpContext context) {
if (cookieSpec == null) {
synchronized (this) {
lock.lock();
try {
if (cookieSpec == null) {
this.cookieSpec = IgnoreSpecSpec.INSTANCE;
}
} finally {
lock.unlock();
}
}
return this.cookieSpec;
Expand Down
Loading

0 comments on commit 5eea636

Please sign in to comment.