Skip to content

Commit

Permalink
Async support for HazelcastSessionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
emre committed May 22, 2017
1 parent d0bcff3 commit b5bebde
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.hazelcast.session;

import com.hazelcast.core.Hazelcast;
import org.apache.http.client.CookieStore;
import org.apache.http.impl.client.BasicCookieStore;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.junit.Assert.assertEquals;

public abstract class AbstractAsyncServletTest extends AbstractHazelcastSessionsTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Before
public void setUp() throws Exception {
Hazelcast.newHazelcastInstance();

instance1 = getWebContainerConfigurator();
instance1.port(SERVER_PORT_1).sticky(true).clientOnly(true).sessionTimeout(10).start();
instance2 = getWebContainerConfigurator();
instance2.port(SERVER_PORT_2).sticky(true).clientOnly(true).sessionTimeout(10).start();
}

@Override
protected WebContainerConfigurator<?> getWebContainerConfigurator() {
return getAsyncWebContainerConfigurator();
}

protected abstract WebContainerConfigurator<?> getAsyncWebContainerConfigurator();

@Test
public void testReadWriteRead() throws Exception {
CookieStore cookieStore = new BasicCookieStore();
String value = executeRequest("", SERVER_PORT_1, cookieStore);
assertEquals(value, "OK");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ public void mapCleared(MapEvent event) {
private void configureValves() {
if (isSticky()) {
HazelcastSessionChangeValve hazelcastSessionChangeValve = new HazelcastSessionChangeValve(this);
hazelcastSessionChangeValve.setAsyncSupported(true);
getContainer().getPipeline().addValve(hazelcastSessionChangeValve);
}

if (isDeferredEnabled()) {
HazelcastSessionCommitValve hazelcastSessionCommitValve = new HazelcastSessionCommitValve(this);
hazelcastSessionCommitValve.setAsyncSupported(true);
getContainer().getPipeline().addValve(hazelcastSessionCommitValve);
}
}
Expand Down
47 changes: 47 additions & 0 deletions tomcat7/src/test/java/com/hazelcast/session/AsyncServlet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.hazelcast.session;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@WebServlet(asyncSupported = true, value = "/AsyncServlet")
public class AsyncServlet extends HttpServlet {
private static final BlockingQueue<AsyncContext> queue = new ArrayBlockingQueue<AsyncContext>(20000);

public AsyncServlet() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
AsyncServlet.newEvent();
}
}, 0, 5, TimeUnit.SECONDS);
}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
queue.add(request.startAsync());
}

private static void newEvent() {
ArrayList<AsyncContext> clients = new ArrayList<AsyncContext>(queue.size());
queue.drainTo(clients);
for (AsyncContext ac : clients) {
try {
ac.getResponse().getWriter().write("OK");
} catch (IOException e) {
e.printStackTrace();
}
ac.complete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.hazelcast.session;

public class AsyncServletTest extends AbstractAsyncServletTest {
@Override
protected WebContainerConfigurator<?> getAsyncWebContainerConfigurator() {
return new Tomcat7AsyncConfigurator(temporaryFolder.getRoot().getAbsolutePath());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.hazelcast.session;

import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.startup.Tomcat;

public class Tomcat7AsyncConfigurator extends WebContainerConfigurator<Tomcat> {

private final String baseDir;

private Tomcat tomcat;
private SessionManager manager;

public Tomcat7AsyncConfigurator(String baseDir) {
this.baseDir = baseDir;
}

@Override
public Tomcat configure() throws Exception {
Tomcat tomcat = new Tomcat();
if (!clientOnly) {
P2PLifecycleListener listener = new P2PLifecycleListener();
listener.setConfigLocation(configLocation);
tomcat.getServer().addLifecycleListener(listener);
} else {
tomcat.getServer().addLifecycleListener(new ClientServerLifecycleListener());
}
tomcat.getEngine().setJvmRoute("tomcat-" + port);
tomcat.getEngine().setName("engine-" + port);

final Connector connector = tomcat.getConnector();
connector.setPort(port);
connector.setProperty("bindOnInit", "false");

tomcat.addUser("someuser", "somepass");
tomcat.addRole("someuser", "role1");

Context context;
try {
context = tomcat.addWebapp("", baseDir);

Wrapper asyncServlet = context.createWrapper();
asyncServlet.setName("asyncServlet");
asyncServlet.setServletClass(AsyncServlet.class.getName());
asyncServlet.setAsyncSupported(true);

context.addChild(asyncServlet);
context.addServletMapping("/*", "asyncServlet");

} catch (final Exception e) {
throw new IllegalStateException(e);
}

this.manager = new HazelcastSessionManager();
context.setManager((HazelcastSessionManager) manager);
updateManager((HazelcastSessionManager) manager);
context.setCookies(true);
context.setBackgroundProcessorDelay(1);
context.setReloadable(true);

return tomcat;
}

@Override
public void start() throws Exception {
tomcat = configure();
tomcat.start();
}

@Override
public void stop() throws Exception {
if (tomcat.getServer().getState().isAvailable()) {
tomcat.stop();
}
}

@Override
public void reload() {
Context context = (Context) tomcat.getHost().findChild("/");
if (context == null) {
context = (Context) tomcat.getHost().findChild("");
}
context.reload();
}

@Override
public SessionManager getManager() {
return manager;
}

private void updateManager(HazelcastSessionManager manager) {
manager.setSticky(sticky);
manager.setClientOnly(clientOnly);
manager.setMapName(mapName);
manager.setDeferredWrite(deferredWrite);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@ public void mapCleared(MapEvent event) {
private void configureValves() {
if (isSticky()) {
HazelcastSessionChangeValve hazelcastSessionChangeValve = new HazelcastSessionChangeValve(this);
hazelcastSessionChangeValve.setAsyncSupported(true);
getContext().getPipeline().addValve(hazelcastSessionChangeValve);
}

if (isDeferredEnabled()) {
HazelcastSessionCommitValve hazelcastSessionCommitValve = new HazelcastSessionCommitValve(this);
hazelcastSessionCommitValve.setAsyncSupported(true);
getContext().getPipeline().addValve(hazelcastSessionCommitValve);
}
}
Expand Down
47 changes: 47 additions & 0 deletions tomcat8/src/test/java/com/hazelcast/session/AsyncServlet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.hazelcast.session;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@WebServlet(asyncSupported = true, value = "/AsyncServlet")
public class AsyncServlet extends HttpServlet {
private static final BlockingQueue<AsyncContext> queue = new ArrayBlockingQueue<AsyncContext>(20000);

public AsyncServlet() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
AsyncServlet.newEvent();
}
}, 0, 5, TimeUnit.SECONDS);
}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
queue.add(request.startAsync());
}

private static void newEvent() {
ArrayList<AsyncContext> clients = new ArrayList<AsyncContext>(queue.size());
queue.drainTo(clients);
for (AsyncContext ac : clients) {
try {
ac.getResponse().getWriter().write("OK");
} catch (IOException e) {
e.printStackTrace();
}
ac.complete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.hazelcast.session;

public class AsyncServletTest extends AbstractAsyncServletTest {
@Override
protected WebContainerConfigurator<?> getAsyncWebContainerConfigurator() {
return new Tomcat8AsyncConfigurator(temporaryFolder.getRoot().getAbsolutePath());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.hazelcast.session;

import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.startup.Tomcat;

public class Tomcat8AsyncConfigurator extends WebContainerConfigurator<Tomcat> {

private final String baseDir;

private Tomcat tomcat;
private SessionManager manager;

public Tomcat8AsyncConfigurator(String baseDir) {
this.baseDir = baseDir;
}

@Override
public Tomcat configure() throws Exception {
Tomcat tomcat = new Tomcat();
if (!clientOnly) {
P2PLifecycleListener listener = new P2PLifecycleListener();
listener.setConfigLocation(configLocation);
tomcat.getServer().addLifecycleListener(listener);
} else {
tomcat.getServer().addLifecycleListener(new ClientServerLifecycleListener());
}
tomcat.getEngine().setJvmRoute("tomcat-" + port);
tomcat.getEngine().setName("engine-" + port);

final Connector connector = tomcat.getConnector();
connector.setPort(port);
connector.setProperty("bindOnInit", "false");

tomcat.addUser("someuser", "somepass");
tomcat.addRole("someuser", "role1");

Context context;
try {
context = tomcat.addWebapp("", baseDir);

Wrapper asyncServlet = context.createWrapper();
asyncServlet.setName("asyncServlet");
asyncServlet.setServletClass(AsyncServlet.class.getName());
asyncServlet.setAsyncSupported(true);

context.addChild(asyncServlet);
context.addServletMapping("/*", "asyncServlet");

} catch (final Exception e) {
throw new IllegalStateException(e);
}

this.manager = new HazelcastSessionManager();
context.setManager((HazelcastSessionManager) manager);
updateManager((HazelcastSessionManager) manager);
context.setCookies(true);
context.setBackgroundProcessorDelay(1);
context.setReloadable(true);

return tomcat;
}

@Override
public void start() throws Exception {
tomcat = configure();
tomcat.start();
}

@Override
public void stop() throws Exception {
if (tomcat.getServer().getState().isAvailable()) {
tomcat.stop();
}
}

@Override
public void reload() {
Context context = (Context) tomcat.getHost().findChild("/");
if (context == null) {
context = (Context) tomcat.getHost().findChild("");
}
context.reload();
}

@Override
public SessionManager getManager() {
return manager;
}

private void updateManager(HazelcastSessionManager manager) {
manager.setSticky(sticky);
manager.setClientOnly(clientOnly);
manager.setMapName(mapName);
manager.setDeferredWrite(deferredWrite);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ public void mapCleared(MapEvent event) {
private void configureValves() {
if (isSticky()) {
HazelcastSessionChangeValve hazelcastSessionChangeValve = new HazelcastSessionChangeValve(this);
hazelcastSessionChangeValve.setAsyncSupported(true);
getContext().getPipeline().addValve(hazelcastSessionChangeValve);
}

if (isDeferredEnabled()) {
HazelcastSessionCommitValve hazelcastSessionCommitValve = new HazelcastSessionCommitValve(this);
hazelcastSessionCommitValve.setAsyncSupported(true);
getContext().getPipeline().addValve(hazelcastSessionCommitValve);
}
}
Expand Down
Loading

0 comments on commit b5bebde

Please sign in to comment.