diff --git a/rxbus-station/src/main/java/com/obviousengine/rxbus/station/DefaultBusStation.java b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/DefaultBusStation.java index db0d9c3..36a100d 100644 --- a/rxbus-station/src/main/java/com/obviousengine/rxbus/station/DefaultBusStation.java +++ b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/DefaultBusStation.java @@ -26,16 +26,17 @@ final class DefaultBusStation implements BusStation { - private final ConcurrentHashMap, Subscription> subscriptions = - new ConcurrentHashMap<>(8); + private final Map, Subscription> subscriptions = new ConcurrentHashMap<>(8); private final Map, Queue> queues = new ConcurrentHashMap<>(4); private final Bus bus; private final Scheduler busScheduler; private final Flusher flusher; + private final ErrorListener errorListener; - public static BusStation create(Bus mainBus, Scheduler scheduler, Flusher flusher) { - return new DefaultBusStation(mainBus, scheduler, flusher); + public static BusStation create(Bus bus, Scheduler scheduler, Flusher flusher, + ErrorListener errorListener) { + return new DefaultBusStation(bus, scheduler, flusher, errorListener); } @Override @@ -49,7 +50,7 @@ public void register(Class eventClass, Sink sink) { if (!subscriptions.containsKey(sink)) { Queue queue = queue(eventClass); subscriptions.put(sink, bus.subscribe( - queue, new SinkSubscriber<>(sink, flusher), busScheduler)); + queue, new SinkSubscriber<>(sink, flusher, errorListener), busScheduler)); } } } @@ -83,10 +84,12 @@ private static class SinkSubscriber extends Subscriber { private final Sink sink; private final Flusher flusher; + private final ErrorListener errorListener; - SinkSubscriber(Sink sink, Flusher flusher) { + SinkSubscriber(Sink sink, Flusher flusher, ErrorListener errorListener) { this.sink = sink; this.flusher = flusher; + this.errorListener = errorListener; } @Override @@ -103,18 +106,21 @@ public void onError(Throwable e) { public void onNext(T t) { try { sink.receive(t); - } catch (Throwable e) { - //TODO(eleventigers): handle error - throw e; + } catch (Throwable throwable) { + // We want to continue using this subscriber even if the receiver throws + // so we just pass the error to a dedicated handler + errorListener.onError(throwable); + } finally { + flusher.schedule(sink); } - - flusher.schedule(sink); } } - private DefaultBusStation(Bus bus, Scheduler busScheduler, Flusher flusher) { + private DefaultBusStation(Bus bus, Scheduler busScheduler, Flusher flusher, + ErrorListener errorListener) { this.bus = bus; this.busScheduler = busScheduler; this.flusher = flusher; + this.errorListener = errorListener; } } diff --git a/rxbus-station/src/main/java/com/obviousengine/rxbus/station/ErrorListener.java b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/ErrorListener.java new file mode 100644 index 0000000..c11b499 --- /dev/null +++ b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/ErrorListener.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2016 Obvious Engineering. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.obviousengine.rxbus.station; + +public interface ErrorListener { + + void onError(Throwable throwable); + + ErrorListener NOOP = new ErrorListener() { + @Override + public void onError(Throwable throwable) { + // no-op + } + }; +} diff --git a/rxbus-station/src/main/java/com/obviousengine/rxbus/station/RxBusStation.java b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/RxBusStation.java index 380cc57..1c9e8f8 100644 --- a/rxbus-station/src/main/java/com/obviousengine/rxbus/station/RxBusStation.java +++ b/rxbus-station/src/main/java/com/obviousengine/rxbus/station/RxBusStation.java @@ -16,8 +16,102 @@ package com.obviousengine.rxbus.station; +import com.obviousengine.rxbus.Bus; +import com.obviousengine.rxbus.RxBus; +import java.util.concurrent.TimeUnit; +import rx.Scheduler; +import rx.schedulers.Schedulers; + public final class RxBusStation { + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private static final long DEFAULT_FLUSH_DELAY_MIN = 2; + + private Bus bus; + private Scheduler busScheduler; + private Scheduler flushScheduler; + private long flushDelay; + private TimeUnit flushDelayTimeUnit; + private ErrorListener errorListener; + + Builder() { + } + + public Builder bus(Bus bus) { + if (bus == null) { + throw new NullPointerException("bus == null"); + } + this.bus = bus; + return this; + } + + public Builder busScheduler(Scheduler scheduler) { + if (scheduler == null) { + throw new NullPointerException("scheduler == null"); + } + this.busScheduler = scheduler; + return this; + } + + public Builder flushScheduler(Scheduler scheduler) { + if (scheduler == null) { + throw new NullPointerException("scheduler == null"); + } + this.flushScheduler = scheduler; + return this; + } + + public Builder flushDelay(long flushDelay, TimeUnit timeUnit) { + if (timeUnit == null) { + throw new NullPointerException("timeUnit == null"); + } + this.flushDelay = flushDelay; + this.flushDelayTimeUnit = timeUnit; + return this; + } + + public Builder errorListener(ErrorListener errorListener) { + if (errorListener == null) { + throw new NullPointerException("errorListener == null"); + } + this.errorListener = errorListener; + return this; + } + + public BusStation build() { + if (bus == null) { + bus = RxBus.create(); + } + + if (busScheduler == null) { + busScheduler = Schedulers.immediate(); + } + + if (flushScheduler == null) { + flushScheduler = Schedulers.io(); + } + + if (flushDelayTimeUnit == null) { + flushDelay = DEFAULT_FLUSH_DELAY_MIN; + flushDelayTimeUnit = TimeUnit.MINUTES; + } + + if (errorListener == null) { + errorListener = ErrorListener.NOOP; + } + + return DefaultBusStation.create(bus, busScheduler, + DefaultFlusher.create(flushScheduler, flushDelay, + flushDelayTimeUnit), + errorListener); + } + } + private RxBusStation() { throw new AssertionError("No instances"); } diff --git a/rxbus-station/src/test/java/com/obviousengine/rxbus/station/DefaultBusStationTest.java b/rxbus-station/src/test/java/com/obviousengine/rxbus/station/DefaultBusStationTest.java index f35cf49..b0ac166 100644 --- a/rxbus-station/src/test/java/com/obviousengine/rxbus/station/DefaultBusStationTest.java +++ b/rxbus-station/src/test/java/com/obviousengine/rxbus/station/DefaultBusStationTest.java @@ -19,11 +19,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import com.obviousengine.rxbus.Bus; import com.obviousengine.rxbus.RxBus; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -39,8 +41,10 @@ public final class DefaultBusStationTest { private final Sink sinkA1 = mock(Sink.class); private final Sink sinkA2 = mock(Sink.class); private final Flusher flusher = mock(Flusher.class); + private final ErrorListener errorListener = mock(ErrorListener.class); private final TestScheduler scheduler = Schedulers.test(); - private final BusStation busStation = DefaultBusStation.create(bus, scheduler, flusher); + private final BusStation busStation = DefaultBusStation.create( + bus, scheduler, flusher, errorListener); @Before public void setUp() { @@ -98,4 +102,54 @@ public void singleSinkRegisterUnregister() { verifyNoMoreInteractions(sinkA1); } + @Test + public void singleSinkFlushWhenReceiveThrows() { + final Throwable error = new RuntimeException(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + throw error; + } + }).when(sinkA1).receive(eventA); + + busStation.register(EventA.class, sinkA1); + busStation.publish(eventA); + scheduler.triggerActions(); + + verify(sinkA1).receive(eventA); + verify(sinkA1).flush(); + verify(errorListener).onError(error); + + verifyNoMoreInteractions(sinkA1); + verifyNoMoreInteractions(errorListener); + } + + @Test + public void singleSinkReceiveAfterReceiveThrows() { + final Throwable error = new RuntimeException(); + final AtomicBoolean thrown = new AtomicBoolean(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (thrown.compareAndSet(false, true)) { + throw error; + } else { + return null; + } + } + }).when(sinkA1).receive(eventA); + + busStation.register(EventA.class, sinkA1); + busStation.publish(eventA); + busStation.publish(eventA); + scheduler.triggerActions(); + + verify(sinkA1, times(2)).receive(eventA); + verify(sinkA1, times(2)).flush(); + verify(errorListener).onError(error); + + verifyNoMoreInteractions(sinkA1); + verifyNoMoreInteractions(errorListener); + } + }