Skip to content

Commit

Permalink
User error listener and setup a builder for BusStation
Browse files Browse the repository at this point in the history
  • Loading branch information
eleventigers committed Jan 18, 2016
1 parent 3124efb commit 29342ad
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@

final class DefaultBusStation implements BusStation {

private final ConcurrentHashMap<Sink<?>, Subscription> subscriptions =
new ConcurrentHashMap<>(8);
private final Map<Sink<?>, Subscription> subscriptions = new ConcurrentHashMap<>(8);
private final Map<Class<?>, Queue<?>> queues = new ConcurrentHashMap<>(4);

private final Bus bus;
private final Scheduler busScheduler;
private final Flusher flusher;
private final ErrorListener errorListener;

public static BusStation create(Bus mainBus, Scheduler scheduler, Flusher flusher) {
return new DefaultBusStation(mainBus, scheduler, flusher);
public static BusStation create(Bus bus, Scheduler scheduler, Flusher flusher,
ErrorListener errorListener) {
return new DefaultBusStation(bus, scheduler, flusher, errorListener);
}

@Override
Expand All @@ -49,7 +50,7 @@ public <T> void register(Class<T> eventClass, Sink<T> sink) {
if (!subscriptions.containsKey(sink)) {
Queue<T> queue = queue(eventClass);
subscriptions.put(sink, bus.subscribe(
queue, new SinkSubscriber<>(sink, flusher), busScheduler));
queue, new SinkSubscriber<>(sink, flusher, errorListener), busScheduler));
}
}
}
Expand Down Expand Up @@ -83,10 +84,12 @@ private static class SinkSubscriber<T> extends Subscriber<T> {

private final Sink<T> sink;
private final Flusher flusher;
private final ErrorListener errorListener;

SinkSubscriber(Sink<T> sink, Flusher flusher) {
SinkSubscriber(Sink<T> sink, Flusher flusher, ErrorListener errorListener) {
this.sink = sink;
this.flusher = flusher;
this.errorListener = errorListener;
}

@Override
Expand All @@ -103,18 +106,21 @@ public void onError(Throwable e) {
public void onNext(T t) {
try {
sink.receive(t);
} catch (Throwable e) {
//TODO(eleventigers): handle error
throw e;
} catch (Throwable throwable) {
// We want to continue using this subscriber even if the receiver throws
// so we just pass the error to a dedicated handler
errorListener.onError(throwable);
} finally {
flusher.schedule(sink);
}

flusher.schedule(sink);
}
}

private DefaultBusStation(Bus bus, Scheduler busScheduler, Flusher flusher) {
private DefaultBusStation(Bus bus, Scheduler busScheduler, Flusher flusher,
ErrorListener errorListener) {
this.bus = bus;
this.busScheduler = busScheduler;
this.flusher = flusher;
this.errorListener = errorListener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2016 Obvious Engineering.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.obviousengine.rxbus.station;

public interface ErrorListener {

void onError(Throwable throwable);

ErrorListener NOOP = new ErrorListener() {
@Override
public void onError(Throwable throwable) {
// no-op
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,102 @@

package com.obviousengine.rxbus.station;

import com.obviousengine.rxbus.Bus;
import com.obviousengine.rxbus.RxBus;
import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.schedulers.Schedulers;

public final class RxBusStation {

public static Builder builder() {
return new Builder();
}

public static final class Builder {

private static final long DEFAULT_FLUSH_DELAY_MIN = 2;

private Bus bus;
private Scheduler busScheduler;
private Scheduler flushScheduler;
private long flushDelay;
private TimeUnit flushDelayTimeUnit;
private ErrorListener errorListener;

Builder() {
}

public Builder bus(Bus bus) {
if (bus == null) {
throw new NullPointerException("bus == null");
}
this.bus = bus;
return this;
}

public Builder busScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
this.busScheduler = scheduler;
return this;
}

public Builder flushScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
this.flushScheduler = scheduler;
return this;
}

public Builder flushDelay(long flushDelay, TimeUnit timeUnit) {
if (timeUnit == null) {
throw new NullPointerException("timeUnit == null");
}
this.flushDelay = flushDelay;
this.flushDelayTimeUnit = timeUnit;
return this;
}

public Builder errorListener(ErrorListener errorListener) {
if (errorListener == null) {
throw new NullPointerException("errorListener == null");
}
this.errorListener = errorListener;
return this;
}

public BusStation build() {
if (bus == null) {
bus = RxBus.create();
}

if (busScheduler == null) {
busScheduler = Schedulers.immediate();
}

if (flushScheduler == null) {
flushScheduler = Schedulers.io();
}

if (flushDelayTimeUnit == null) {
flushDelay = DEFAULT_FLUSH_DELAY_MIN;
flushDelayTimeUnit = TimeUnit.MINUTES;
}

if (errorListener == null) {
errorListener = ErrorListener.NOOP;
}

return DefaultBusStation.create(bus, busScheduler,
DefaultFlusher.create(flushScheduler, flushDelay,
flushDelayTimeUnit),
errorListener);
}
}

private RxBusStation() {
throw new AssertionError("No instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.obviousengine.rxbus.Bus;
import com.obviousengine.rxbus.RxBus;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -39,8 +41,10 @@ public final class DefaultBusStationTest {
private final Sink<EventA> sinkA1 = mock(Sink.class);
private final Sink<EventA> sinkA2 = mock(Sink.class);
private final Flusher flusher = mock(Flusher.class);
private final ErrorListener errorListener = mock(ErrorListener.class);
private final TestScheduler scheduler = Schedulers.test();
private final BusStation busStation = DefaultBusStation.create(bus, scheduler, flusher);
private final BusStation busStation = DefaultBusStation.create(
bus, scheduler, flusher, errorListener);

@Before
public void setUp() {
Expand Down Expand Up @@ -98,4 +102,54 @@ public void singleSinkRegisterUnregister() {
verifyNoMoreInteractions(sinkA1);
}

@Test
public void singleSinkFlushWhenReceiveThrows() {
final Throwable error = new RuntimeException();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
throw error;
}
}).when(sinkA1).receive(eventA);

busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
scheduler.triggerActions();

verify(sinkA1).receive(eventA);
verify(sinkA1).flush();
verify(errorListener).onError(error);

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(errorListener);
}

@Test
public void singleSinkReceiveAfterReceiveThrows() {
final Throwable error = new RuntimeException();
final AtomicBoolean thrown = new AtomicBoolean();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (thrown.compareAndSet(false, true)) {
throw error;
} else {
return null;
}
}
}).when(sinkA1).receive(eventA);

busStation.register(EventA.class, sinkA1);
busStation.publish(eventA);
busStation.publish(eventA);
scheduler.triggerActions();

verify(sinkA1, times(2)).receive(eventA);
verify(sinkA1, times(2)).flush();
verify(errorListener).onError(error);

verifyNoMoreInteractions(sinkA1);
verifyNoMoreInteractions(errorListener);
}

}

0 comments on commit 29342ad

Please sign in to comment.