Skip to content

Commit

Permalink
add builder pattern to consumer and producer
Browse files Browse the repository at this point in the history
update readme
  • Loading branch information
brainlag committed Apr 5, 2015
1 parent 5f57ef6 commit 52c0cfa
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 82 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
The MIT License (MIT)

Copyright (c) 2013 Dustin Norlander
Copyright (c) 2015 Peter Nimmervoll

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
68 changes: 7 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
## JavaNSQClient

A fast netty-based java client for [NSQ][nsq]

## Notes:

X connection per Consumer and X connections per Producer where X is the nummer of how many nsqds the user
wants to connect.
A fast netty-based Java8 client for [NSQ][nsq]
heavily forked of TrendrrNSQClient

## TODO:
auth
ssl
snappy
....

Expand All @@ -17,77 +14,26 @@ snappy
Example usage:

```
NSQLookup lookup = new NSQLookupDynMapImpl();
NSQLookup lookup = new NSQLookup();
lookup.addAddr("localhost", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", new NSQMessageCallback() {
@Override
public void message(NSQMessage message) {
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", (message) -> {
System.out.println("received: " + message);
//now mark the message as finished.
message.finished();
//or you could requeue it, which indicates a failure and puts it back on the queue.
//message.requeue();
}
@Override
public void error(Exception x) {
//handle errors
log.warn("Caught", x);
}
});
consumer.start();
```


## Producer

Example usage:

```
NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1);
producer.start();
for (int i=0; i < 50000; i++) {
producer.produce("speedtest", ("this is a message" + i).getBytes());
}
NSQProducer producer = new NSQProducer().addAddress("localhost", 4150).start();
producer.produce("TestTopic", ("this is a message").getBytes());
```

The producer also has a Batch collector that will collect messages until some threshold is reached (currently maxbytes or maxmessages) then send as a MPUB request. This gives much greater throughput then producing messages one at a time.

```
producer.configureBatch("speedtest",
new BatchCallback() {
@Override
public void batchSuccess(String topic, int num) {
}
@Override
public void batchError(Exception ex, String topic, List<byte[]> messages) {
ex.printStackTrace();
}
},
batchsize,
null, //use default maxbytes
null //use default max seconds
);
producer.start();
for (int i=0; i < iterations; i++) {
producer.produceBatch("speedtest", ("this is a message" + i).getBytes());
}
```


## Dependancies

* [netty][netty]
* [slf4j][slf4j]
* [trendrr-oss][trendrr-oss]

Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser


[nsq]: https://github.com/bitly/nsq
[netty]: http://netty.io/
[slf4j]: http://www.slf4j.org/
[trendrr-oss]: https://github.com/trendrr/java-oss-lib
16 changes: 11 additions & 5 deletions src/main/java/io/nsq/NSQConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCal
this.errorCallback = errCallback;
}

public void start() {
public NSQConsumer start() {
if (!started) {
started = true;
//connect once otherwise we might have to wait one lookupPeriod
Expand All @@ -62,6 +62,7 @@ public void run() {
}
}, lookupPeriod, lookupPeriod);
}
return this;
}

private Connection createConnection(ServerAddress serverAddress) {
Expand Down Expand Up @@ -102,14 +103,18 @@ private void cleanClose() {
}
}

public void setMessagesPerBatch(int messagesPerBatch) {
this.messagesPerBatch = messagesPerBatch;
public NSQConsumer setMessagesPerBatch(int messagesPerBatch) {
if (!started) {
this.messagesPerBatch = messagesPerBatch;
}
return this;
}

public void setLookupPeriod(long periodMillis) {
public NSQConsumer setLookupPeriod(long periodMillis) {
if (!started) {
this.lookupPeriod = periodMillis;
}
return this;
}


Expand Down Expand Up @@ -140,10 +145,11 @@ private void connect() {
*
* @param executor
*/
public void setExecutor(ExecutorService executor) {
public NSQConsumer setExecutor(ExecutorService executor) {
if (!started) {
this.executor = executor;
}
return this;
}

private Set<ServerAddress> lookupAddresses() {
Expand Down
46 changes: 30 additions & 16 deletions src/main/java/io/nsq/NSQProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class NSQProducer {
*/
private int connectionRetries = 5;

public void start() {
public NSQProducer start() {
if (!started) {
createPool();
started = true;
}
return this;
}

private void createPool() {
Expand Down Expand Up @@ -70,6 +71,9 @@ protected Connection getConnection() throws NoConnectionsException {
* produce multiple messages.
*/
public void produceMulti(String topic, List<byte[]> messages) throws TimeoutException, NSQException {
if (!started) {
throw new IllegalStateException("Producer must be started before producing messages!");
}
if (messages == null || messages.isEmpty()) {
return;
}
Expand All @@ -81,24 +85,30 @@ public void produceMulti(String topic, List<byte[]> messages) throws TimeoutExce
}

Connection c = this.getConnection();

NSQCommand command = NSQCommand.instance("MPUB " + topic);
command.setData(messages);
try {
NSQCommand command = NSQCommand.instance("MPUB " + topic);
command.setData(messages);


NSQFrame frame = c.commandAndWait(command);
if (frame instanceof ErrorFrame) {
String err = ((ErrorFrame) frame).getErrorMessage();
if (err.startsWith("E_BAD_TOPIC")) {
throw new BadTopicException(err);
}
if (err.startsWith("E_BAD_MESSAGE")) {
throw new BadMessageException(err);
NSQFrame frame = c.commandAndWait(command);
if (frame instanceof ErrorFrame) {
String err = ((ErrorFrame) frame).getErrorMessage();
if (err.startsWith("E_BAD_TOPIC")) {
throw new BadTopicException(err);
}
if (err.startsWith("E_BAD_MESSAGE")) {
throw new BadMessageException(err);
}
}
} finally {
pool.returnObject(c.getServerAddress(), c);
}
}

public void produce(String topic, byte[] message) throws NSQException, TimeoutException {
if (!started) {
throw new IllegalStateException("Producer must be started before producing messages!");
}
Connection c = getConnection();
try {
NSQCommand command = NSQCommand.instance("PUB " + topic, message);
Expand All @@ -117,18 +127,21 @@ public void produce(String topic, byte[] message) throws NSQException, TimeoutEx
}
}

public void addAddress(String host, int port) {
public NSQProducer addAddress(String host, int port) {
addresses.add(new ServerAddress(host, port));
return this;
}

public void removeAddress(String host, int port) {
public NSQProducer removeAddress(String host, int port) {
addresses.remove(new ServerAddress(host, port));
return this;
}

public void setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
public NSQProducer setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
if (!started) {
this.poolConfig = poolConfig;
}
return this;
}

/**
Expand All @@ -138,10 +151,11 @@ public void setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
*
* @param executor
*/
public void setExecutor(ExecutorService executor) {
public NSQProducer setExecutor(ExecutorService executor) {
if (!started) {
this.executor = executor;
}
return this;
}

protected ExecutorService getExecutor() {
Expand Down

0 comments on commit 52c0cfa

Please sign in to comment.