Skip to content

Commit

Permalink
Port to Java 7
Browse files Browse the repository at this point in the history
  • Loading branch information
vincepri committed Apr 12, 2016
1 parent ab9841d commit 6e0cee5
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 167 deletions.
77 changes: 5 additions & 72 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.brainlag</groupId>
<groupId>com.segment.nsq</groupId>
<artifactId>nsq-client</artifactId>
<version>1.0.0.RC2</version>
<version>1.0.0</version>

<name>JavaNSQClient</name>
<description>Fast Java client for NSQ.</description>
Expand All @@ -17,12 +17,6 @@
<url>http://www.opensource.org/licenses/mit-license.php</url>
</license>
</licenses>
<developers>
<developer>
<name>Peter Nimmervoll</name>
<email>[email protected]</email>
</developer>
</developers>
<scm>
<connection>scm:git:[email protected]:brainlag/JavaNSQClient.git</connection>
<developerConnection>scm:git:[email protected]:brainlag/JavaNSQClient.git</developerConnection>
Expand All @@ -40,81 +34,20 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.2.3.Final</version>
<version>1.4.0.Final</version>
</extension>
</extensions>
</build>

<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.5</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/com/github/brainlag/nsq/NSQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,20 @@ public void setEventLoopGroup(final EventLoopGroup eventLoopGroup) {
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("{\"client_id\":\"" + clientId + "\", ");
buffer.append("\"hostname\":\"" + hostname + "\", ");
buffer.append("{\"client_id\":\"").append(clientId).append("\", ");
buffer.append("\"hostname\":\"").append(hostname).append("\", ");
buffer.append("\"feature_negotiation\": true, ");
if (getHeartbeatInterval() != null) {
buffer.append("\"heartbeat_interval\":" + getHeartbeatInterval().toString() + ", ");
buffer.append("\"heartbeat_interval\":").append(getHeartbeatInterval().toString()).append(", ");
}
if (getOutputBufferSize() != null) {
buffer.append("\"output_buffer_size\":" + getOutputBufferSize().toString() + ", ");
buffer.append("\"output_buffer_size\":").append(getOutputBufferSize().toString()).append(", ");
}
if (getOutputBufferTimeout() != null) {
buffer.append("\"output_buffer_timeout\":" + getOutputBufferTimeout().toString() + ", ");
buffer.append("\"output_buffer_timeout\":").append(getOutputBufferTimeout().toString()).append(", ");
}
if (isTlsV1()) {
buffer.append("\"tls_v1\":" + isTlsV1() + ", ");
buffer.append("\"tls_v1\":").append(isTlsV1()).append(", ");
}
if (getCompression() == Compression.SNAPPY) {
buffer.append("\"snappy\": true, ");
Expand All @@ -162,15 +162,15 @@ public String toString() {
buffer.append("\"deflate\": true, ");
}
if (getDeflateLevel() != null) {
buffer.append("\"deflate_level\":" + getDeflateLevel().toString() + ", ");
buffer.append("\"deflate_level\":").append(getDeflateLevel().toString()).append(", ");
}
if (getSampleRate() != null) {
buffer.append("\"sample_rate\":" + getSampleRate().toString() + ", ");
buffer.append("\"sample_rate\":").append(getSampleRate().toString()).append(", ");
}
if (getMsgTimeout() != null) {
buffer.append("\"msg_timeout\":" + getMsgTimeout().toString() + ", ");
buffer.append("\"msg_timeout\":").append(getMsgTimeout().toString()).append(", ");
}
buffer.append("\"user_agent\": \"" + userAgent + "\"}");
buffer.append("\"user_agent\": \"").append(userAgent).append("\"}");

return buffer.toString();
}
Expand Down
33 changes: 19 additions & 14 deletions src/main/java/com/github/brainlag/nsq/NSQConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.lookup.NSQLookup;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
Expand All @@ -15,15 +16,8 @@
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class NSQConsumer implements Closeable {
Expand All @@ -43,7 +37,7 @@ public class NSQConsumer implements Closeable {
private long lookupPeriod = 60 * 1000; // how often to recheck for new nodes (and clean up non responsive nodes)
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newCachedThreadPool();
private Optional<ScheduledFuture<?>> timeout = Optional.empty();
private Optional<? extends ScheduledFuture<?>> timeout = Optional.absent();

public NSQConsumer(final NSQLookup lookup, final String topic, final String channel, final NSQMessageCallback callback) {
this(lookup, topic, channel, callback, new NSQConfig());
Expand All @@ -70,8 +64,11 @@ public NSQConsumer start() {
started = true;
//connect once otherwise we might have to wait one lookupPeriod
connect();
scheduler.scheduleAtFixedRate(() -> {
connect();
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NSQConsumer.this.connect();
}
}, lookupPeriod, lookupPeriod, TimeUnit.MILLISECONDS);
}
return this;
Expand All @@ -98,7 +95,12 @@ protected void processMessage(final NSQMessage message) {
LogManager.getLogger(this).warn("NO Callback, dropping message: " + message);
} else {
try {
executor.execute(() -> callback.message(message));
executor.execute(new Runnable() {
@Override
public void run() {
callback.message(message);
}
});
if (nextTimeout > 0) {
updateTimeout(message, -500);
}
Expand All @@ -124,8 +126,11 @@ private void updateTimeout(final NSQMessage message, long change) {
}
Date newTimeout = calculateTimeoutDate(change);
if (newTimeout != null) {
timeout = Optional.of(scheduler.schedule(() -> {
rdy(message, 1); // test the waters
timeout = Optional.of(scheduler.schedule(new Runnable() {
@Override
public void run() {
NSQConsumer.this.rdy(message, 1); // test the waters
}
}, 0, TimeUnit.MILLISECONDS));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.github.brainlag.nsq.exceptions.NSQException;

@FunctionalInterface
public interface NSQErrorCallback {

void error(NSQException x);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.github.brainlag.nsq.NSQMessage;

@FunctionalInterface
public interface NSQMessageCallback {

public void message(NSQMessage message);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.brainlag.nsq.exceptions;

public class BadMessageException extends NSQException {

public BadMessageException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.brainlag.nsq.exceptions;

public class BadTopicException extends NSQException {

public BadTopicException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.brainlag.nsq.exceptions;

public class DisconnectedException extends NSQException {

public DisconnectedException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.brainlag.nsq.exceptions;

public class NoConnectionsException extends NSQException {

public NoConnectionsException(String message) {
super(message);
}
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/github/brainlag/nsq/netty/NSQHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, NSQFrame msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, final NSQFrame msg) throws Exception {
final Connection con = ctx.channel().attr(Connection.STATE).get();
if (con != null) {
ctx.channel().eventLoop().execute(() -> con.incoming(msg));
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
con.incoming(msg);
}
});
} else {
LogManager.getLogger(this).warn("No connection set for : " + ctx.channel());
}
Expand Down
Loading

0 comments on commit 6e0cee5

Please sign in to comment.