diff --git a/pom.xml b/pom.xml index 8e0bf9e..e66ac87 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ 3.2.1 14.0.25.Final 2.1.4.Final - 1.0.5.Final + 1.0.6.Final 10.1.19 diff --git a/session/core/src/main/java/org/wildfly/clustering/spring/session/DistributableSession.java b/session/core/src/main/java/org/wildfly/clustering/spring/session/DistributableSession.java index a27a957..650f170 100644 --- a/session/core/src/main/java/org/wildfly/clustering/spring/session/DistributableSession.java +++ b/session/core/src/main/java/org/wildfly/clustering/spring/session/DistributableSession.java @@ -14,7 +14,6 @@ import org.wildfly.clustering.cache.batch.Batch; import org.wildfly.clustering.cache.batch.BatchContext; -import org.wildfly.clustering.session.OOBSession; import org.wildfly.clustering.session.Session; import org.wildfly.clustering.session.SessionManager; import org.wildfly.clustering.session.user.User; @@ -57,9 +56,6 @@ public String changeSessionId() { oldSession.invalidate(); this.session = newSession; } catch (IllegalStateException e) { - if (!oldSession.isValid()) { - oldSession.close(); - } newSession.invalidate(); throw e; } @@ -89,11 +85,6 @@ public T getAttribute(String name) { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return (T) session.getAttributes().getAttribute(name); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -102,11 +93,6 @@ public Set getAttributeNames() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getAttributes().getAttributeNames(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -115,11 +101,6 @@ public Instant getCreationTime() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getMetaData().getCreationTime(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -133,11 +114,6 @@ public Instant getLastAccessedTime() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getMetaData().getLastAccessStartTime(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -146,11 +122,6 @@ public Duration getMaxInactiveInterval() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getMetaData().getTimeout(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -159,11 +130,6 @@ public boolean isExpired() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getMetaData().isExpired(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -179,11 +145,6 @@ public void setAttribute(String name, Object value) { Session session = this.session; try (BatchContext context = this.resumeBatch()) { session.getAttributes().setAttribute(name, value); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } // N.B. org.springframework.session.web.http.HttpSessionAdapter already triggers HttpSessionBindingListener events @@ -228,11 +189,6 @@ public void setMaxInactiveInterval(Duration duration) { Session session = this.session; try (BatchContext context = this.resumeBatch()) { session.getMetaData().setTimeout(duration); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -241,11 +197,6 @@ public boolean isNew() { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return session.getMetaData().isNew(); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -270,9 +221,6 @@ public void close() { } } } - } finally { - // Switch to OOB session, in case this session is referenced outside the scope of this request - this.session = new OOBSession<>(this.manager, requestSession.getId(), null); } } } diff --git a/session/core/src/test/java/org/wildfly/clustering/spring/session/servlet/SessionServlet.java b/session/core/src/test/java/org/wildfly/clustering/spring/session/servlet/SessionServlet.java index ad9877b..8cec444 100644 --- a/session/core/src/test/java/org/wildfly/clustering/spring/session/servlet/SessionServlet.java +++ b/session/core/src/test/java/org/wildfly/clustering/spring/session/servlet/SessionServlet.java @@ -35,6 +35,10 @@ public void doHead(HttpServletRequest request, HttpServletResponse response) thr HttpSession session = request.getSession(false); if (session != null) { response.setHeader(SmokeITParameters.SESSION_ID, session.getId()); + AtomicInteger value = (AtomicInteger) session.getAttribute(SmokeITParameters.VALUE); + if (value != null) { + response.setIntHeader(SmokeITParameters.VALUE, value.get()); + } } } @@ -44,22 +48,18 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro response.setHeader(SmokeITParameters.SESSION_ID, session.getId()); AtomicInteger value = (AtomicInteger) session.getAttribute(SmokeITParameters.VALUE); - int result = 0; if (value == null) { - value = new AtomicInteger(result); + value = new AtomicInteger(0); session.setAttribute(SmokeITParameters.VALUE, value); - } else { - result = value.incrementAndGet(); } - response.setIntHeader(SmokeITParameters.VALUE, result); + response.setIntHeader(SmokeITParameters.VALUE, value.incrementAndGet()); } @Override public void doDelete(HttpServletRequest request, HttpServletResponse response) throws IOException { HttpSession session = request.getSession(false); if (session != null) { - response.setHeader(SmokeITParameters.SESSION_ID, session.getId()); session.invalidate(); } } diff --git a/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSession.java b/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSession.java index 8d049cb..1efec02 100644 --- a/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSession.java +++ b/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSession.java @@ -16,7 +16,6 @@ import org.wildfly.clustering.cache.batch.Batch; import org.wildfly.clustering.cache.batch.BatchContext; -import org.wildfly.clustering.session.OOBSession; import org.wildfly.clustering.session.Session; import org.wildfly.clustering.session.SessionAttributes; import org.wildfly.clustering.session.SessionManager; @@ -91,9 +90,6 @@ public Mono changeSessionId() { oldSession.invalidate(); this.session = newSession; } catch (IllegalStateException e) { - if (!oldSession.isValid()) { - oldSession.close(); - } newSession.invalidate(); throw e; } @@ -103,7 +99,7 @@ public Mono changeSessionId() { @Override public Mono invalidate() { - return Mono.fromRunnable(this::invalidateSync).publishOn(Schedulers.boundedElastic()); + return Mono.fromRunnable(this::invalidateSync).subscribeOn(Schedulers.boundedElastic()); } private void invalidateSync() { @@ -113,11 +109,6 @@ private void invalidateSync() { if (this.batch != null) { this.batch.close(); } - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } @@ -141,13 +132,11 @@ public void close() { } } } - } finally { - // Switch to OOB session, in case this session is referenced outside the scope of this request - this.session = new OOBSession<>(this.manager, requestSession.getId(), null); } } else if (requestSession.isValid()) { // Invalidate if session was never "started". this.invalidateSync(); + requestSession.close(); } } @@ -262,11 +251,6 @@ private R apply(Function, R> function) { Session session = this.session; try (BatchContext context = this.resumeBatch()) { return function.apply(session); - } catch (IllegalStateException e) { - if (!session.isValid()) { - session.close(); - } - throw e; } } diff --git a/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSessionManager.java b/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSessionManager.java index 46dfc9d..069af0d 100644 --- a/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSessionManager.java +++ b/web/core/src/main/java/org/wildfly/clustering/spring/web/DistributableWebSessionManager.java @@ -6,7 +6,6 @@ package org.wildfly.clustering.spring.web; import java.util.Iterator; -import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; import java.util.function.Function; @@ -54,17 +53,15 @@ private Mono getSession(BiFunction, St B batch = batcher.createBatch(); try { Mono> result = Mono.fromCompletionStage(function.apply(this.manager, id)); -// CompletionStage> future = function.apply(this.manager, id); B suspendedBatch = batcher.suspendBatch(); Supplier>> creator = () -> { try (BatchContext context = this.manager.getBatcher().resumeBatch(suspendedBatch)) { - return Mono.fromCompletionStage(this.manager.createSessionAsync(this.manager.getIdentifierFactory().get())).publishOn(Schedulers.boundedElastic()); + return Mono.fromCompletionStage(this.manager.createSessionAsync(this.manager.getIdentifierFactory().get())).subscribeOn(Schedulers.boundedElastic()); } }; - return result.flatMap(session -> Optional.ofNullable(session).map(Mono::just).orElseGet(creator)) - .flatMap(session -> Mono.just(new DistributableWebSession<>(this.manager, session, suspendedBatch))); -// return Mono.fromCompletionStage(future.thenCompose(session -> Optional.ofNullable(session).map(CompletableFuture::completedStage).orElseGet(creator)) -// .thenApply(session -> new DistributableWebSession<>(this.manager, session, suspendedBatch))); + return result.switchIfEmpty(Mono.defer(creator)).map(session -> new DistributableWebSession<>(this.manager, session, suspendedBatch)); +// return result.flatMap(session -> Optional.ofNullable(session).map(Mono::just).orElseGet(creator)) +// .map(session -> new DistributableWebSession<>(this.manager, session, suspendedBatch)); } catch (RuntimeException | Error e) { try (BatchContext context = batcher.resumeBatch(batch)) { batch.discard(); @@ -82,7 +79,7 @@ private Mono close(ServerWebExchange exchange, SpringWebSession session) { this.identifierResolver.setSessionId(exchange, session.getId()); } - return Mono.fromRunnable(session::close).publishOn(Schedulers.boundedElastic()); + return Mono.fromRunnable(session::close).subscribeOn(Schedulers.boundedElastic()); } private String requestedSessionId(ServerWebExchange exchange) { diff --git a/web/core/src/test/java/org/wildfly/clustering/spring/web/AbstractSmokeITCase.java b/web/core/src/test/java/org/wildfly/clustering/spring/web/AbstractSmokeITCase.java index 3c2c8d1..46d3dd8 100644 --- a/web/core/src/test/java/org/wildfly/clustering/spring/web/AbstractSmokeITCase.java +++ b/web/core/src/test/java/org/wildfly/clustering/spring/web/AbstractSmokeITCase.java @@ -4,11 +4,9 @@ */ package org.wildfly.clustering.spring.web; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.*; import java.net.CookieManager; -import java.net.CookiePolicy; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -18,6 +16,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -26,6 +26,7 @@ import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.wildfly.common.function.ExceptionBiConsumer; @@ -38,6 +39,9 @@ public class AbstractSmokeITCase implements ExceptionBiConsumer testClass) { return ShrinkWrap.create(WebArchive.class, testClass.getSimpleName() + ".war") .addClasses(PropertiesAsset.class, SmokeITParameters.class) @@ -45,6 +49,7 @@ protected static WebArchive deployment(Class test } private final boolean transactional; + private final ExecutorService executor; private final HttpClient client; protected AbstractSmokeITCase(HttpClient.Builder builder) { @@ -53,71 +58,88 @@ protected AbstractSmokeITCase(HttpClient.Builder builder) { protected AbstractSmokeITCase(boolean transactional, HttpClient.Builder builder) { this.transactional = transactional; - CookieManager manager = new CookieManager(); - manager.setCookiePolicy(CookiePolicy.ACCEPT_ALL); - this.client = builder.cookieHandler(manager).build(); + this.executor = Executors.newFixedThreadPool(CONCURRENCY); + this.client = builder.cookieHandler(new CookieManager()).executor(this.executor).build(); + } + + @AfterEach + public void destroy() { + this.executor.shutdown(); } @Override public void accept(URI baseURI1, URI baseURI2) throws Exception { URI uri1 = baseURI1.resolve(SmokeITParameters.ENDPOINT_NAME); URI uri2 = baseURI2.resolve(SmokeITParameters.ENDPOINT_NAME); - int iterations = 4; - int concurrentRequests = 20; + for (URI uri : Arrays.asList(uri1, uri2)) { + // Verify a request that never starts its session + this.client.sendAsync(HttpRequest.newBuilder(uri).method("HEAD", BodyPublishers.noBody()).build(), BodyHandlers.discarding()).thenAccept(response -> { + assertEquals(HttpServletResponse.SC_OK, response.statusCode()); + assertNull(response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null)); + assertFalse(response.headers().firstValueAsLong(SmokeITParameters.VALUE).isPresent()); + }).join(); + } AtomicReference sessionId = new AtomicReference<>(); AtomicLong expected = new AtomicLong(0); - for (int i = 0; i < iterations; i++) { + for (int i = 0; i < ITERATIONS; i++) { for (URI uri : Arrays.asList(uri1, uri2)) { int count = i; long value = this.client.sendAsync(HttpRequest.newBuilder(uri).build(), BodyHandlers.discarding()).thenApply(response -> { assertEquals(HttpServletResponse.SC_OK, response.statusCode(), Integer.toString(count)); - String requestSessionId = response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none"); + String requestSessionId = response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null); + assertNotNull(requestSessionId); // Validate propagation of session ID if (!sessionId.compareAndSet(null, requestSessionId)) { assertEquals(sessionId.get(), requestSessionId); } return response.headers().firstValueAsLong(SmokeITParameters.VALUE).orElse(0); }).join(); - Assertions.assertEquals(expected.getAndIncrement(), value); - // Validate session propagation + Assertions.assertEquals(expected.incrementAndGet(), value); + // Validate session is still "started" this.client.sendAsync(HttpRequest.newBuilder(uri).method("HEAD", BodyPublishers.noBody()).build(), BodyHandlers.discarding()).thenAccept(response -> { assertEquals(HttpServletResponse.SC_OK, response.statusCode()); - assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none")); + assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null)); }).join(); - // Perform a bunch of concurrent requests incrementing the session attribute - List> futures = new ArrayList<>(concurrentRequests); - for (int j = 0; j < concurrentRequests; j++) { - futures.add(this.client.sendAsync(HttpRequest.newBuilder(uri).build(), BodyHandlers.discarding()).thenApply(response -> { + // Perform a number of concurrent requests incrementing the session attribute + List> futures = new ArrayList<>(CONCURRENCY); + for (int j = 0; j < CONCURRENCY; j++) { + CompletableFuture future = this.client.sendAsync(HttpRequest.newBuilder(uri).build(), BodyHandlers.discarding()).thenApply(response -> { assertEquals(HttpServletResponse.SC_OK, response.statusCode()); - assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none")); + assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null)); return response.headers().firstValueAsLong(SmokeITParameters.VALUE).orElse(0); - })); - expected.incrementAndGet(); - } - for (CompletableFuture future : futures) { - future.join(); + }); + futures.add(future); } + expected.addAndGet(CONCURRENCY); + // Verify the correct number of unique results + assertEquals(CONCURRENCY, futures.stream().map(CompletableFuture::join).distinct().count()); // Verify expected session attribute value following concurrent updates value = this.client.sendAsync(HttpRequest.newBuilder(uri).build(), BodyHandlers.discarding()).thenApply(response -> { assertEquals(HttpServletResponse.SC_OK, response.statusCode()); assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none")); return response.headers().firstValueAsLong(SmokeITParameters.VALUE).orElse(0); }).join(); - Assertions.assertEquals(expected.getAndIncrement(), value); + Assertions.assertEquals(expected.incrementAndGet(), value); if (!this.transactional) { // Grace time between fail-over requests TimeUnit.SECONDS.sleep(1); } } } + // Invalidate session this.client.sendAsync(HttpRequest.newBuilder(uri1).DELETE().build(), BodyHandlers.discarding()).thenAccept(response -> { assertEquals(HttpServletResponse.SC_OK, response.statusCode()); - assertEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none")); - }).join(); - Thread.sleep(500); - this.client.sendAsync(HttpRequest.newBuilder(uri2).method("HEAD", BodyPublishers.noBody()).build(), BodyHandlers.discarding()).thenAccept(response -> { - assertEquals(HttpServletResponse.SC_OK, response.statusCode()); - assertNotEquals(sessionId.get(), response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse("none")); + assertNull(response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null)); }).join(); + List> futures = new ArrayList<>(2); + for (URI uri : Arrays.asList(uri1, uri2)) { + // Verify session was truly invalidated + futures.add(this.client.sendAsync(HttpRequest.newBuilder(uri).method("HEAD", BodyPublishers.noBody()).build(), BodyHandlers.discarding()).thenAccept(response -> { + assertEquals(HttpServletResponse.SC_OK, response.statusCode()); + assertNull(response.headers().firstValue(SmokeITParameters.SESSION_ID).orElse(null)); + assertFalse(response.headers().firstValueAsLong(SmokeITParameters.VALUE).isPresent()); + })); + } + futures.stream().forEach(CompletableFuture::join); } } diff --git a/web/core/src/test/java/org/wildfly/clustering/spring/web/context/SessionHandler.java b/web/core/src/test/java/org/wildfly/clustering/spring/web/context/SessionHandler.java index e532cb3..e0272a6 100644 --- a/web/core/src/test/java/org/wildfly/clustering/spring/web/context/SessionHandler.java +++ b/web/core/src/test/java/org/wildfly/clustering/spring/web/context/SessionHandler.java @@ -5,7 +5,9 @@ package org.wildfly.clustering.spring.web.context; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import jakarta.servlet.http.HttpServletResponse; @@ -23,31 +25,41 @@ /** * @author Paul Ferraro */ -public class SessionHandler implements WebHandler { +public class SessionHandler implements WebHandler, Function> { + private static final Set SUPPORTED_METHODS = Set.of(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.DELETE); @Override public Mono handle(ServerWebExchange exchange) { + ServerHttpResponse response = exchange.getResponse(); + ServerHttpRequest request = exchange.getRequest(); + HttpMethod method = request.getMethod(); + if (!SUPPORTED_METHODS.contains(method)) { + response.setStatusCode(HttpStatusCode.valueOf(HttpServletResponse.SC_METHOD_NOT_ALLOWED)); + return Mono.empty(); + } + Mono sessionPublisher = this.apply(exchange); + if (method.equals(HttpMethod.DELETE)) { + return sessionPublisher.flatMap(WebSession::invalidate); + } + return sessionPublisher.doOnNext(session -> { + if (session.isStarted()) { + response.getHeaders().set(SmokeITParameters.SESSION_ID, session.getId()); + } + }).then(); + } + + @Override + public Mono apply(ServerWebExchange exchange) { ServerHttpRequest request = exchange.getRequest(); HttpMethod method = request.getMethod(); ServerHttpResponse response = exchange.getResponse(); - Mono publisher = exchange.getSession().doOnNext(session -> response.getHeaders().set(SmokeITParameters.SESSION_ID, session.getId())); + Mono sessionPublisher = exchange.getSession(); if (method.equals(HttpMethod.GET)) { - publisher = publisher.doOnNext(session -> { - AtomicInteger value = session.getAttribute(SmokeITParameters.VALUE); - int result = 0; - if (value == null) { - value = new AtomicInteger(result); - session.getAttributes().put(SmokeITParameters.VALUE, value); - } else { - result = value.incrementAndGet(); - } - response.getHeaders().set(SmokeITParameters.VALUE, Integer.toString(result)); + return sessionPublisher.doOnNext(session -> { + AtomicInteger value = (AtomicInteger) session.getAttributes().computeIfAbsent(SmokeITParameters.VALUE, key -> new AtomicInteger(0)); + response.getHeaders().set(SmokeITParameters.VALUE, Integer.toString(value.incrementAndGet())); }); - } else if (method.equals(HttpMethod.DELETE)) { - return publisher.flatMap(WebSession::invalidate); - } else if (!method.equals(HttpMethod.HEAD)) { - response.setStatusCode(HttpStatusCode.valueOf(HttpServletResponse.SC_METHOD_NOT_ALLOWED)); } - return Mono.when(publisher); + return sessionPublisher; } }