Skip to content

Commit

Permalink
Use more appropriate naming
Browse files Browse the repository at this point in the history
  • Loading branch information
eleventigers committed Jan 18, 2016
1 parent 29342ad commit 4b7eaac
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 81 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

import com.obviousengine.rxbus.Bus;
import com.obviousengine.rxbus.Queue;
Expand All @@ -24,19 +24,19 @@
import rx.Subscriber;
import rx.Subscription;

final class DefaultBusStation implements BusStation {
final class DefaultDispatcher implements Dispatcher {

private final Map<Sink<?>, Subscription> subscriptions = new ConcurrentHashMap<>(8);
private final Map<Station<?>, Subscription> subscriptions = new ConcurrentHashMap<>(8);
private final Map<Class<?>, Queue<?>> queues = new ConcurrentHashMap<>(4);

private final Bus bus;
private final Scheduler busScheduler;
private final Flusher flusher;
private final ErrorListener errorListener;

public static BusStation create(Bus bus, Scheduler scheduler, Flusher flusher,
public static Dispatcher create(Bus bus, Scheduler scheduler, Flusher flusher,
ErrorListener errorListener) {
return new DefaultBusStation(bus, scheduler, flusher, errorListener);
return new DefaultDispatcher(bus, scheduler, flusher, errorListener);
}

@Override
Expand All @@ -45,19 +45,19 @@ public <T> void publish(T event) {
}

@Override
public <T> void register(Class<T> eventClass, Sink<T> sink) {
public <T> void register(Class<T> eventClass, Station<T> station) {
synchronized (subscriptions) {
if (!subscriptions.containsKey(sink)) {
if (!subscriptions.containsKey(station)) {
Queue<T> queue = queue(eventClass);
subscriptions.put(sink, bus.subscribe(
queue, new SinkSubscriber<>(sink, flusher, errorListener), busScheduler));
subscriptions.put(station, bus.subscribe(
queue, new SinkSubscriber<>(station, flusher, errorListener), busScheduler));
}
}
}

@Override
public <T> void unregister(Sink<T> sink) {
Subscription subscription = subscriptions.remove(sink);
public <T> void unregister(Station<T> station) {
Subscription subscription = subscriptions.remove(station);
if (subscription != null) {
subscription.unsubscribe();
}
Expand All @@ -82,12 +82,12 @@ private <T> Queue<T> queue(Class<?> eventClass) {

private static class SinkSubscriber<T> extends Subscriber<T> {

private final Sink<T> sink;
private final Station<T> station;
private final Flusher flusher;
private final ErrorListener errorListener;

SinkSubscriber(Sink<T> sink, Flusher flusher, ErrorListener errorListener) {
this.sink = sink;
SinkSubscriber(Station<T> station, Flusher flusher, ErrorListener errorListener) {
this.station = station;
this.flusher = flusher;
this.errorListener = errorListener;
}
Expand All @@ -105,18 +105,18 @@ public void onError(Throwable e) {
@Override
public void onNext(T t) {
try {
sink.receive(t);
station.receive(t);
} catch (Throwable throwable) {
// We want to continue using this subscriber even if the receiver throws
// so we just pass the error to a dedicated handler
errorListener.onError(throwable);
} finally {
flusher.schedule(sink);
flusher.schedule(station);
}
}
}

private DefaultBusStation(Bus bus, Scheduler busScheduler, Flusher flusher,
private DefaultDispatcher(Bus bus, Scheduler busScheduler, Flusher flusher,
ErrorListener errorListener) {
this.bus = bus;
this.busScheduler = busScheduler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

public interface BusStation {
public interface Dispatcher {

<T> void publish(T event);

<T> void register(Class<T> eventClass, Sink<T> sink);
<T> void register(Class<T> eventClass, Station<T> station);

<T> void unregister(Sink<T> sink);
<T> void unregister(Station<T> station);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

public interface ErrorListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

public interface Flushable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

public interface Flusher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

import com.obviousengine.rxbus.Bus;
import com.obviousengine.rxbus.RxBus;
import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.schedulers.Schedulers;

public final class RxBusStation {
public final class RxBusDispatcher {

public static Builder builder() {
return new Builder();
Expand Down Expand Up @@ -83,7 +83,7 @@ public Builder errorListener(ErrorListener errorListener) {
return this;
}

public BusStation build() {
public Dispatcher build() {
if (bus == null) {
bus = RxBus.create();
}
Expand All @@ -105,14 +105,14 @@ public BusStation build() {
errorListener = ErrorListener.NOOP;
}

return DefaultBusStation.create(bus, busScheduler,
return DefaultDispatcher.create(bus, busScheduler,
DefaultFlusher.create(flushScheduler, flushDelay,
flushDelayTimeUnit),
errorListener);
}
}

private RxBusStation() {
private RxBusDispatcher() {
throw new AssertionError("No instances");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

public interface Sink<T> extends Flushable {
public interface Station<T> extends Flushable {

void receive(T event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -34,98 +34,98 @@
import rx.schedulers.TestScheduler;

@SuppressWarnings("unchecked")
public final class DefaultBusStationTest {
public final class DefaultDispatcherTest {

private final EventA eventA = new EventA();
private final Bus bus = RxBus.create();
private final Sink<EventA> sinkA1 = mock(Sink.class);
private final Sink<EventA> sinkA2 = mock(Sink.class);
private final Station<EventA> stationA1 = mock(Station.class);
private final Station<EventA> stationA2 = mock(Station.class);
private final Flusher flusher = mock(Flusher.class);
private final ErrorListener errorListener = mock(ErrorListener.class);
private final TestScheduler scheduler = Schedulers.test();
private final BusStation busStation = DefaultBusStation.create(
private final Dispatcher dispatcher = DefaultDispatcher.create(
bus, scheduler, flusher, errorListener);

@Before
public void setUp() {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Sink sink = (Sink) invocation.getArguments()[0];
sink.flush();
Station station = (Station) invocation.getArguments()[0];
station.flush();
return null;
}
}).when(flusher).schedule(any(Sink.class));
}).when(flusher).schedule(any(Station.class));
}

@Test
public void singleSinkSingleEvent() {
busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
public void singleStationSingleEvent() {
dispatcher.register(EventA.class, stationA1);
dispatcher.publish(eventA);
scheduler.triggerActions();

verify(sinkA1).receive(eventA);
verify(sinkA1).flush();
verify(stationA1).receive(eventA);
verify(stationA1).flush();

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(stationA1);
}

@Test
public void multipleSinkSingleEvent() {
busStation.register(EventA.class, sinkA1);
busStation.register(EventA.class, sinkA2);
busStation.publish(eventA);
public void multipleStationSingleEvent() {
dispatcher.register(EventA.class, stationA1);
dispatcher.register(EventA.class, stationA2);
dispatcher.publish(eventA);
scheduler.triggerActions();

verify(sinkA1).receive(eventA);
verify(sinkA1).flush();
verify(sinkA2).receive(eventA);
verify(sinkA2).flush();
verify(stationA1).receive(eventA);
verify(stationA1).flush();
verify(stationA2).receive(eventA);
verify(stationA2).flush();

verifyNoMoreInteractions(sinkA1, sinkA2);
verifyNoMoreInteractions(stationA1, stationA2);

}

@Test
public void singleSinkRegisterUnregister() {
busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
public void singleStationRegisterUnregister() {
dispatcher.register(EventA.class, stationA1);
dispatcher.publish(eventA);
scheduler.triggerActions();
busStation.unregister(sinkA1);
dispatcher.unregister(stationA1);

verify(sinkA1).receive(eventA);
verify(sinkA1).flush();
verify(stationA1).receive(eventA);
verify(stationA1).flush();

busStation.publish(eventA);
dispatcher.publish(eventA);
scheduler.triggerActions();

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(stationA1);
}

@Test
public void singleSinkFlushWhenReceiveThrows() {
public void singleStationFlushWhenReceiveThrows() {
final Throwable error = new RuntimeException();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
throw error;
}
}).when(sinkA1).receive(eventA);
}).when(stationA1).receive(eventA);

busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
dispatcher.register(EventA.class, stationA1);
dispatcher.publish(eventA);
scheduler.triggerActions();

verify(sinkA1).receive(eventA);
verify(sinkA1).flush();
verify(stationA1).receive(eventA);
verify(stationA1).flush();
verify(errorListener).onError(error);

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(stationA1);
verifyNoMoreInteractions(errorListener);
}

@Test
public void singleSinkReceiveAfterReceiveThrows() {
public void singleStationReceiveAfterReceiveThrows() {
final Throwable error = new RuntimeException();
final AtomicBoolean thrown = new AtomicBoolean();
doAnswer(new Answer() {
Expand All @@ -137,18 +137,18 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}
}).when(sinkA1).receive(eventA);
}).when(stationA1).receive(eventA);

busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
busStation.publish(eventA);
dispatcher.register(EventA.class, stationA1);
dispatcher.publish(eventA);
dispatcher.publish(eventA);
scheduler.triggerActions();

verify(sinkA1, times(2)).receive(eventA);
verify(sinkA1, times(2)).flush();
verify(stationA1, times(2)).receive(eventA);
verify(stationA1, times(2)).flush();
verify(errorListener).onError(error);

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(stationA1);
verifyNoMoreInteractions(errorListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
* limitations under the License.
*/

package com.obviousengine.rxbus.station;
package com.obviousengine.rxbus.dispatcher;

class EventA { }
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
rootProject.name = 'rxbus-root'

include ':rxbus', ':rxbus-android', 'rxbus-station'
include ':rxbus', ':rxbus-android', 'rxbus-dispatcher'

0 comments on commit 4b7eaac

Please sign in to comment.