Skip to content

Commit

Permalink
[RxBus] Replace Subject with Jake Wharton's RxRelay
Browse files Browse the repository at this point in the history
  • Loading branch information
eleventigers committed Dec 4, 2016
1 parent 469d677 commit 9ba6e7b
Show file tree
Hide file tree
Showing 22 changed files with 326 additions and 100 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ ext {
androidPlugin = 'com.android.tools.build:gradle:1.3.1'
supportAnnotations = 'com.android.support:support-annotations:23.1.0'

rxJava = 'io.reactivex:rxjava:1.1.0'
rxAndroid = 'io.reactivex:rxandroid:1.1.0'
rxJava = 'io.reactivex:rxjava:1.1.6'
rxAndroid = 'io.reactivex:rxandroid:1.2.1'
rxRelay = 'com.jakewharton.rxrelay:rxrelay:1.2.0'
junit = 'junit:junit:4.12'
truth = 'com.google.truth:truth:0.25'
mockito = 'org.mockito:mockito-core:1.10.8'
Expand Down
2 changes: 2 additions & 0 deletions rxbus-android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ dependencies {
compile project(':rxbus')
compile rootProject.ext.rxAndroid
compile rootProject.ext.supportAnnotations
testCompile rootProject.ext.junit
testCompile rootProject.ext.mockito
}

android {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
import android.util.Log;
import android.util.SparseArray;

import com.jakewharton.rxrelay.Relay;

import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.subjects.Subject;

/**
* Simple {@link Bus} implementation with {@link Queue} caching and logging specific
* to the Android platform. By default subscriptions are observed on the main thread.
*/
@SuppressWarnings("WeakerAccess")
public final class AndroidRxBus implements Bus {

public static AndroidRxBus create() {
Expand Down Expand Up @@ -66,7 +68,7 @@ public <T> void publish(Queue<T> queue, T event) {
}

@Override
public <T> Subject<T, T> queue(Queue<T> queue) {
public <T> Relay<T, T> queue(Queue<T> queue) {
return bus.queue(queue);
}

Expand All @@ -82,16 +84,16 @@ public <T> Subscription subscribe(Queue<T> queue, Observer<T> observer, Schedule

private static final class AndroidQueueCache implements RxBus.QueueCache {

private final SparseArray<Subject<?, ?>> sparseArray = new SparseArray<>();
private final SparseArray<Relay<?, ?>> sparseArray = new SparseArray<>();

@Override
@SuppressWarnings("unchecked")
public <T> Subject<T, T> get(Queue<T> queue) {
return (Subject<T, T>) sparseArray.get(queue.getId());
public <T> Relay<T, T> get(Queue<T> queue) {
return (Relay<T, T>) sparseArray.get(queue.getId());
}

@Override
public <T> void put(Queue<T> queue, Subject<T, T> subject) {
public <T> void put(Queue<T> queue, Relay<T, T> subject) {
sparseArray.put(queue.getId(), subject);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2016 Jokubas Dargis.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.jokubasdargis.rxbus;

import com.jakewharton.rxrelay.Relay;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.CompositeSubscription;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

public final class AndroidRxBusTest {

private final static RxBus.Logger SYSTEM_LOGGER = new RxBus.Logger() {
@Override
public void log(String message) {
System.out.println(message);
}
};

private final Queue<Event> events = Queue.of(Event.class).name("TestQueue").build();
private final RxBus.QueueCache queueCache = new RxBus.DefaultQueueCache();
private final TestScheduler testScheduler = Schedulers.test();

@Test
public void create() {
Bus bus = AndroidRxBus.create();
assertNotNull(bus);
}

@Test
public void createWithLogger() {
Bus bus = AndroidRxBus.create(SYSTEM_LOGGER);
assertNotNull(bus);
}

@Test
public void queueAsRelay() {
Bus bus = AndroidRxBus.create(queueCache, SYSTEM_LOGGER);
Relay<Event, Event> subject = bus.queue(events);
assertNotNull(subject);
}

@Test
public void publishEvent() {
Bus bus = AndroidRxBus.create(queueCache, SYSTEM_LOGGER);
Observer<Event> observer = spy(new PrintingObserver<Event>());
Subscription subscription = bus.subscribe(events, observer, testScheduler);
Event event = Event.create();
bus.publish(events, event);
testScheduler.triggerActions();
subscription.unsubscribe();
verify(observer).onNext(event);
verify(observer, never()).onCompleted();
}

@Test
public void oneQueueMultipleObservers() {
Bus bus = AndroidRxBus.create(queueCache, SYSTEM_LOGGER);
CompositeSubscription subscriptions = new CompositeSubscription();
List<Observer<Event>> observers = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Observer<Event> observer = spy(new NoopObserver<Event>());
observers.add(observer);
subscriptions.add(bus.subscribe(events, observer, testScheduler));
}
Event event = Event.create();
bus.publish(events, event);
testScheduler.triggerActions();
subscriptions.unsubscribe();
for (Observer<Event> observer : observers) {
verify(observer).onNext(event);
}
}

@Test
public void allEventsGoThrough() {
final int numEvents = 100;
Bus bus = AndroidRxBus.create(queueCache, SYSTEM_LOGGER);
CountingObserver observer = new CountingObserver();
Subscription subscription = bus.queue(events)
.onBackpressureBuffer()
.observeOn(testScheduler)
.subscribe(observer);
for (int i = 0; i < numEvents; i++) {
bus.publish(events, Event.create());
}
testScheduler.triggerActions();
subscription.unsubscribe();
assertEquals(numEvents, observer.getCount());
}

private static class PrintingObserver<V> implements Observer<V> {

@Override
public void onCompleted() {
System.out.println("onComplete() called");
}

@Override
public void onError(Throwable e) {
System.out.println("onError() called with " + e);
}

@Override
public void onNext(V v) {
System.out.println("onNext() called with " + v);
}
}

private static class CountingObserver implements Observer<Event> {

private final AtomicInteger count = new AtomicInteger();

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Event event) {
count.incrementAndGet();
}

int getCount() {
return count.get();
}
}

private static class NoopObserver<V> implements Observer<V> {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(V v) {
}
}
}
47 changes: 47 additions & 0 deletions rxbus-android/src/test/java/net/jokubasdargis/rxbus/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2016 Jokubas Dargis.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.jokubasdargis.rxbus;

import java.util.concurrent.TimeUnit;

final class Event {

static Event create() {
return new Event(System.nanoTime());
}

static Event create(long timestamp) {
return new Event(timestamp);
}

private final long timestampNanos;

private Event(long timestamp) {
this.timestampNanos = timestamp;
}

public long getTimestamp(TimeUnit unit) {
return unit.convert(timestampNanos, TimeUnit.NANOSECONDS);
}

@Override
public String toString() {
return "Event{"
+ "timestamp=" + timestampNanos
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class DefaultDispatcher implements Dispatcher {
private final Flusher flusher;
private final ErrorListener errorListener;

public static Dispatcher create(Bus bus, Scheduler scheduler, Flusher flusher,
static Dispatcher create(Bus bus, Scheduler scheduler, Flusher flusher,
ErrorListener errorListener) {
return new DefaultDispatcher(bus, scheduler, flusher, errorListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class DefaultFlusher implements Flusher {
private final FlushAction flushAction = new FlushAction();
private final SerialSubscription subscription = new SerialSubscription();

public static Flusher create(Scheduler scheduler, long flushDelay, TimeUnit flushDelayUnit) {
static Flusher create(Scheduler scheduler, long flushDelay, TimeUnit flushDelayUnit) {
return new DefaultFlusher(scheduler, flushDelay, flushDelayUnit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.jokubasdargis.rxbus;

@SuppressWarnings("WeakerAccess")
public interface Dispatcher {

<T> void publish(Queue<T> queue, T event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.jokubasdargis.rxbus;

@SuppressWarnings("WeakerAccess")
public interface ErrorListener {

void onError(Throwable throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.jokubasdargis.rxbus;

@SuppressWarnings("WeakerAccess")
public interface Flushable {

void flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.jokubasdargis.rxbus;

@SuppressWarnings("WeakerAccess")
public interface Flusher {

void schedule(Flushable flushable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Scheduler;
import rx.schedulers.Schedulers;

@SuppressWarnings("WeakerAccess")
public final class RxBusDispatcher {

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.jokubasdargis.rxbus;

@SuppressWarnings("WeakerAccess")
public interface Station<T> extends Flushable {

void receive(T event);
Expand Down
1 change: 1 addition & 0 deletions rxbus/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apply from: rootProject.file('gradle/checkstyle.gradle')

dependencies {
compile rootProject.ext.rxJava
compile rootProject.ext.rxRelay
testCompile rootProject.ext.junit
testCompile rootProject.ext.mockito
}
Expand Down
8 changes: 5 additions & 3 deletions rxbus/src/main/java/net/jokubasdargis/rxbus/Bus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package net.jokubasdargis.rxbus;

import com.jakewharton.rxrelay.Relay;

import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subjects.Subject;

/**
* Event notification system which enforces use of dedicated queues to perform type safe pub/sub.
*/
@SuppressWarnings("WeakerAccess")
public interface Bus {

/**
Expand All @@ -42,8 +44,8 @@ public interface Bus {
<T> void publish(Queue<T> queue, T event);

/**
* Converts the given {@link Queue} to a {@link Subject} to be used
* Converts the given {@link Queue} to a {@link Relay} to be used
* outside the bus context (when combining Rx streams).
*/
<T> Subject<T, T> queue(Queue<T> queue);
<T> Relay<T, T> queue(Queue<T> queue);
}
Loading

0 comments on commit 9ba6e7b

Please sign in to comment.