diff --git a/src/io/socket/IOConnection.java b/src/io/socket/IOConnection.java index 0667114..124c841 100644 --- a/src/io/socket/IOConnection.java +++ b/src/io/socket/IOConnection.java @@ -263,9 +263,11 @@ private void connectTransport() { transport = WebsocketTransport.create(url, this); else if (protocols.contains(XhrTransport.TRANSPORT_NAME)) transport = XhrTransport.create(url, this); - else + else { error(new SocketIOException( "Server supports no available transports. You should reconfigure the server to support a available transport")); + return; + } transport.connect(); } @@ -828,15 +830,27 @@ public boolean isConnected() { return getState() == STATE_READY; } - public int getState() { - synchronized (this) { - return state; - } + /** + * Gets the current state of this IOConnection + * @return current state + */ + private synchronized int getState() { + return state; } - public void setState(int state) { - synchronized (this) { - this.state = state; - } + /** + * Sets the current state of this IOConnection + * @param new state + */ + private synchronized void setState(int state) { + this.state = state; + } + + /** + * gets the currently used transport + * @return currently used transport + */ + public IOTransport getTransport() { + return transport; } } diff --git a/src/io/socket/IOTransport.java b/src/io/socket/IOTransport.java index e9b50cf..19461a3 100644 --- a/src/io/socket/IOTransport.java +++ b/src/io/socket/IOTransport.java @@ -60,4 +60,6 @@ interface IOTransport { * {@link IOConnection} */ void invalidate(); + + String getName(); } diff --git a/src/io/socket/SocketIO.java b/src/io/socket/SocketIO.java index 2f7d3b2..e3f60d7 100644 --- a/src/io/socket/SocketIO.java +++ b/src/io/socket/SocketIO.java @@ -302,4 +302,14 @@ public void reconnect() { public boolean isConnected() { return this.connection.isConnected(); } + + /** + * Returns the name of the used transport + * + * @return the name of the currently used transport + */ + public String getTransport() { + IOTransport transport = this.connection.getTransport(); + return transport != null ? transport.getName() : null; + } } diff --git a/src/io/socket/WebsocketTransport.java b/src/io/socket/WebsocketTransport.java index bf55e06..4f3dc20 100644 --- a/src/io/socket/WebsocketTransport.java +++ b/src/io/socket/WebsocketTransport.java @@ -136,4 +136,9 @@ public void connect() { public void send(String text) throws Exception { websocket.send(text); } + + @Override + public String getName() { + return TRANSPORT_NAME; + } } diff --git a/src/io/socket/XhrTransport.java b/src/io/socket/XhrTransport.java index 7dfb087..84c4ef3 100644 --- a/src/io/socket/XhrTransport.java +++ b/src/io/socket/XhrTransport.java @@ -11,45 +11,49 @@ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLConnection; +import java.util.Arrays; +import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; - /** * The Class XhrTransport. */ class XhrTransport implements IOTransport { - + /** The String to identify this Transport. */ public static final String TRANSPORT_NAME = "xhr-polling"; - + /** The connection. */ private IOConnection connection; - + /** The url. */ private URL url; - + /** The queue holding elements to send. */ ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); /** background thread for managing the server connection. */ PollThread pollThread = null; - + /** Indicates whether the {@link IOConnection} wants us to be connected. */ private boolean connect; - + /** Indicates whether {@link PollThread} is blocked. */ private boolean blocked; - + + HttpURLConnection urlConnection; + /** * The Class ReceiverThread. */ private class PollThread extends Thread { - + /** * Instantiates a new receiver thread. */ @@ -57,7 +61,9 @@ public PollThread() { super(TRANSPORT_NAME); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see java.lang.Thread#run() */ @Override @@ -66,26 +72,40 @@ public void run() { while (isConnect()) { try { String line; - URLConnection urlConnection = url.openConnection(); + URL url = new URL(XhrTransport.this.url.toString() + "?t=" + System.currentTimeMillis()); + urlConnection = (HttpURLConnection) url.openConnection(); if (!queue.isEmpty()) { urlConnection.setDoOutput(true); + urlConnection.setRequestMethod("POST") ; BufferedWriter output = new BufferedWriter( new OutputStreamWriter( urlConnection.getOutputStream())); - while((line = queue.peek()) != null) { - output.write("�7�" + line); - queue.remove(); + if (queue.size() == 1) { + line = queue.poll(); + output.write(line); + System.out.println(line); + } else { + Iterator iter = queue.iterator(); + while (iter.hasNext()) { + line = iter.next(); + output.write("\ufffd" + line.length() + + "\ufffd" + line); + System.out.println("\ufffd" + line.length() + + "\ufffd" + line); + iter.remove(); + } } output.close(); - } - else { + } else { setBlocked(true); - BufferedReader input = new BufferedReader( - new InputStreamReader( - urlConnection.getInputStream())); - while ((line = input.readLine()) != null) { - if (connection != null) - connection.transportMessage(line); + InputStream plainInput = urlConnection.getInputStream(); + if (plainInput != null) { + BufferedReader input = new BufferedReader( + new InputStreamReader(plainInput)); + while ((line = input.readLine()) != null) { + if (connection != null) + connection.transportMessage(line); + } } setBlocked(false); } @@ -103,9 +123,11 @@ public void run() { /** * Creates a new Transport for the given url an {@link IOConnection}. - * - * @param url the url - * @param connection the connection + * + * @param url + * the url + * @param connection + * the connection * @return the iO transport */ public static IOTransport create(URL url, IOConnection connection) { @@ -123,16 +145,20 @@ public static IOTransport create(URL url, IOConnection connection) { /** * Instantiates a new xhr transport. - * - * @param url the url - * @param connection the connection + * + * @param url + * the url + * @param connection + * the connection */ public XhrTransport(URL url, IOConnection connection) { this.connection = connection; this.url = url; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#connect() */ @Override @@ -142,7 +168,9 @@ public void connect() { pollThread.start(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#disconnect() */ @Override @@ -151,7 +179,9 @@ public void disconnect() { pollThread.interrupt(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#send(java.lang.String) */ @Override @@ -159,7 +189,9 @@ public void send(String text) throws IOException { sendBulk(new String[] { text }); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#canSendBulk() */ @Override @@ -167,19 +199,23 @@ public boolean canSendBulk() { return true; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#sendBulk(java.lang.String[]) */ @Override public void sendBulk(String[] texts) throws IOException { - for (String text : texts) { - queue.add(text); - } - if(isBlocked()) + queue.addAll(Arrays.asList(texts)); + if (isBlocked()) { pollThread.interrupt(); + urlConnection.disconnect(); + } } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see io.socket.IOTransport#invalidate() */ @Override @@ -189,7 +225,7 @@ public void invalidate() { /** * Checks if is connect. - * + * * @return true, if is connect */ private synchronized boolean isConnect() { @@ -198,8 +234,9 @@ private synchronized boolean isConnect() { /** * Sets the connect. - * - * @param connect the new connect + * + * @param connect + * the new connect */ private synchronized void setConnect(boolean connect) { this.connect = connect; @@ -207,7 +244,7 @@ private synchronized void setConnect(boolean connect) { /** * Checks if is blocked. - * + * * @return true, if is blocked */ private synchronized boolean isBlocked() { @@ -216,10 +253,16 @@ private synchronized boolean isBlocked() { /** * Sets the blocked. - * - * @param blocked the new blocked + * + * @param blocked + * the new blocked */ private synchronized void setBlocked(boolean blocked) { this.blocked = blocked; } + + @Override + public String getName() { + return TRANSPORT_NAME; + } } diff --git a/tests/io/socket/TestSocketIO.java b/tests/io/socket/AbstractTestSocketIO.java similarity index 82% rename from tests/io/socket/TestSocketIO.java rename to tests/io/socket/AbstractTestSocketIO.java index 0f75c27..05d4a4f 100644 --- a/tests/io/socket/TestSocketIO.java +++ b/tests/io/socket/AbstractTestSocketIO.java @@ -13,15 +13,14 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @RunWith(io.socket.RandomBlockJUnit4ClassRunner.class) -public class TestSocketIO implements IOCallback { - private final static String NODE = "C:\\Program Files (x86)\\nodejs\\node.exe"; - private static final int PORT = 10214; - private static final int TIMEOUT = 2; +public abstract class AbstractTestSocketIO implements IOCallback { + private final static String NODE = "/opt/local/bin/node"; + private int port = -1; + private static final int TIMEOUT = 1000; LinkedBlockingQueue events; LinkedBlockingQueue outputs; LinkedBlockingQueue args; @@ -30,10 +29,7 @@ public class TestSocketIO implements IOCallback { Thread stderrThread; private Process node; private SocketIO socket; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } + static protected String transport = null; @AfterClass public static void tearDownAfterClass() throws Exception { @@ -41,11 +37,14 @@ public static void tearDownAfterClass() throws Exception { @Before public void setUp() throws Exception { + assertNotNull("Transport is set correctly", transport); events = new LinkedBlockingQueue(); outputs = new LinkedBlockingQueue(); args = new LinkedBlockingQueue(); + System.out.println("Connect with " + transport); node = Runtime.getRuntime().exec( - new String[] { NODE, "./tests/io/socket/socketio.js", "" + PORT }); + new String[] { NODE, "./tests/io/socket/socketio.js", + "" + getPort(), transport }); stdoutThread = new Thread("stdoutThread") { public void run() { @@ -89,6 +88,16 @@ public void run() { }; stderrThread.start(); stdoutThread.start(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + node.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); assertEquals("OK", takeLine()); } @@ -112,8 +121,9 @@ public void tearDown() throws Exception { void doConnect() throws Exception { // Setting up socket connection - socket = new SocketIO("http://127.0.0.1:" + PORT, this); + socket = new SocketIO("http://127.0.0.1:" + getPort(), this); assertEquals("onConnect", takeEvent()); + assertEquals(transport, socket.getTransport()); } void doClose() throws Exception { @@ -133,7 +143,7 @@ void doClose() throws Exception { // BEGIN TESTS - @Test + @Test(timeout=TIMEOUT) public void send() throws Exception { doConnect(); String str = "TESTSTRING"; @@ -142,7 +152,7 @@ public void send() throws Exception { doClose(); } - @Test + @Test(timeout=TIMEOUT) public void emitAndOn() throws Exception { doConnect(); @@ -159,7 +169,7 @@ public void emitAndOn() throws Exception { doClose(); } - @Test + @Test(timeout=TIMEOUT) public void emitAndMessage() throws Exception { doConnect(); String str = "TESTSTRING"; @@ -177,9 +187,9 @@ public void emitAndMessage() throws Exception { doClose(); } - @Test + @Test(timeout=TIMEOUT) public void namespaces() throws Exception { - SocketIO ns1 = new SocketIO("http://127.0.0.1:" + PORT + "/ns1", this); + SocketIO ns1 = new SocketIO("http://127.0.0.1:" + getPort() + "/ns1", this); assertEquals("onConnect", takeEvent()); // In some very rare cases, it is possible to receive data on an socket @@ -192,18 +202,18 @@ public void namespaces() throws Exception { ns1.disconnect(); assertEquals("onDisconnect", takeEvent()); - SocketIO ns2 = new SocketIO("http://127.0.0.1:" + PORT + "/ns2", this); + SocketIO ns2 = new SocketIO("http://127.0.0.1:" + getPort() + "/ns2", this); assertEquals("onConnect", takeEvent()); assertEquals("onMessage_string", takeEvent()); assertEquals("ns2", takeArg()); - - SocketIO ns2_2 = new SocketIO("http://127.0.0.1:" + PORT + "/ns2", this); + + SocketIO ns2_2 = new SocketIO("http://127.0.0.1:" + getPort() + "/ns2", this); assertEquals("onConnect", takeEvent()); assertEquals("onMessage_string", takeEvent()); assertEquals("ns2", takeArg()); - + ns2_2.disconnect(); ns2.disconnect(); assertEquals("onDisconnect", takeEvent()); @@ -211,23 +221,22 @@ public void namespaces() throws Exception { doClose(); } - @Test + @Test(timeout=TIMEOUT) public void error() throws Exception { doConnect(); - new SocketIO( - "http://127.0.0.1:" + (PORT + 1) + "/ns1", this); + new SocketIO("http://127.0.0.1:1024/", this); assertEquals("onError", takeEvent()); doClose(); } - - @Test + + @Test(timeout=TIMEOUT) public void acknowledge() throws Exception { doConnect(); socket.emit("echoAck", new IOAcknowledge() { @Override public void ack(Object... args) { events.add("ack"); - TestSocketIO.this.args.addAll(Arrays.asList(args)); + AbstractTestSocketIO.this.args.addAll(Arrays.asList(args)); } }, "TESTSTRING"); assertEquals("ack", takeEvent()); @@ -256,7 +265,7 @@ String takeLine() throws InterruptedException { } Object takeArg() throws InterruptedException { - Object arg = args.poll(TIMEOUT, TimeUnit.SECONDS); + Object arg = args.poll(TIMEOUT, TimeUnit.MILLISECONDS); if (arg == null) { fail("takeArg Timeout!"); } @@ -297,4 +306,10 @@ public void onError(SocketIOException socketIOException) { events.add("onError"); } + public int getPort() { + if(port == -1) + port = 2048 + (int)(Math.random() * 10000); + return port; + } + } diff --git a/tests/io/socket/AllTests.java b/tests/io/socket/AllTests.java new file mode 100644 index 0000000..e2504ca --- /dev/null +++ b/tests/io/socket/AllTests.java @@ -0,0 +1,14 @@ +package io.socket; + +import org.junit.experimental.ParallelComputer; +import org.junit.runner.JUnitCore; +import org.junit.runner.Result; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +@RunWith(Suite.class) +@SuiteClasses({ WebsocketTestSocketIO.class, XHRTestSocketIO.class }) +public class AllTests { + +} diff --git a/tests/io/socket/RandomBlockJUnit4ClassRunner.java b/tests/io/socket/RandomBlockJUnit4ClassRunner.java index 01adb4e..8a69a18 100644 --- a/tests/io/socket/RandomBlockJUnit4ClassRunner.java +++ b/tests/io/socket/RandomBlockJUnit4ClassRunner.java @@ -14,7 +14,8 @@ public RandomBlockJUnit4ClassRunner(Class klass) throws InitializationError { super(klass); } - + + protected java.util.List computeTestMethods() { java.util.List methods = super.computeTestMethods(); Collections.shuffle(methods); diff --git a/tests/io/socket/WebsocketTestSocketIO.java b/tests/io/socket/WebsocketTestSocketIO.java new file mode 100644 index 0000000..940bbac --- /dev/null +++ b/tests/io/socket/WebsocketTestSocketIO.java @@ -0,0 +1,11 @@ +package io.socket; + +import org.junit.BeforeClass; + +public class WebsocketTestSocketIO extends AbstractTestSocketIO { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + transport = "websocket"; + } +} diff --git a/tests/io/socket/XHRTestSocketIO.java b/tests/io/socket/XHRTestSocketIO.java new file mode 100644 index 0000000..28de29e --- /dev/null +++ b/tests/io/socket/XHRTestSocketIO.java @@ -0,0 +1,11 @@ +package io.socket; + +import org.junit.BeforeClass; + +public class XHRTestSocketIO extends AbstractTestSocketIO { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + transport = "xhr-polling"; + } +} diff --git a/tests/io/socket/socketio.js b/tests/io/socket/socketio.js index 853b00b..c7e182e 100644 --- a/tests/io/socket/socketio.js +++ b/tests/io/socket/socketio.js @@ -1,33 +1,32 @@ -var port = parseInt(process.argv[process.argv.length - 1]); +var port = parseInt(process.argv[2]); var io = require('socket.io').listen(port); var stdin = process.openStdin(); stdin.setEncoding('utf8'); -stdin.on('data', function (chunk) { - process.stderr.write(chunk); +stdin.on('data', function(chunk) { + process.stderr.write(chunk); }); - +io.set('transports', [ process.argv[3] ]); var main = io.sockets.on('connection', function(socket) { socket.on('echo', function(data) { socket.emit('echo', data); - }); - socket.on('echoSend', function(data) { - if(typeof data == 'object') { + }); + socket.on('echoSend', function(data) { + if (typeof data == 'object') { socket.send(JSON.parse(JSON.stringify(data))); - } - else { - socket.send(data); + } else { + socket.send(data); } }); socket.on('echoAck', function(data, ack) { ack(data); }); socket.on('message', function(m) { - process.stdout.write("__:MESSAGE:"+m+"\n"); + process.stdout.write("__:MESSAGE:" + m + "\n"); }); });