Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

[Sprint: 49] Implementation for an RxJava top tags module #17

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions rxjava-top-tags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
Spring XD Reactor Stream Example
================================

This is an example of a custom module that uses RxJava's Observable API.

## Requirements

In order to install the module run it in your Spring XD installation, you will need to have installed:

* Spring XD version 1.1.x ([Instructions](http://docs.spring.io/spring-xd/docs/current/reference/html/#getting-started)). You'll need to build Spring XD with Java 8+ to use this sample (which uses lambda expressions).

## Code Tour

The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java).
This uses the Observable API to calculate the most referenced tags in a given time window. The[Tuple](http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic container for keyed data.


## Building

$ mvn package

## Using the Custom Module

The uber-jar will be in `target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar`. To install and register the module to your Spring XD distribution, use the `module upload` Spring XD shell command. Start Spring XD and the shell:

```
_____ __ _______
/ ___| (-) \ \ / / _ \
\ `--. _ __ _ __ _ _ __ __ _ \ V /| | | |
`--. \ '_ \| '__| | '_ \ / _` | / ^ \| | | |
/\__/ / |_) | | | | | | | (_| | / / \ \ |/ /
\____/| .__/|_| |_|_| |_|\__, | \/ \/___/
| | __/ |
|_| |___/
eXtreme Data
1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>module upload --file [path-to]/spring-xd-samples/rxjava-top-tags/target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar --name rxjava-top-tags --type processor
Successfully uploaded module 'processor:reactor-top-tags'
xd:>
```

Now create an deploy a stream:

```
xd:>stream create reactor --definition "tweetstream | rxjava-top-tags | log" --deploy
```

The `rxjava-top-tags` processor also supports the `timeWindow` and `topN` parameters for customizing the processor's
behavior.

You should see the stream output in the Spring XD log, indicating the top N tags for the given interval:

```
2015-02-15 20:13:49,077 1.1.0.RELEASE INFO RxComputationThreadPool-3 sink.top-tags - {"id":"8df84f9b-40ee-23c3-7473-fa611c43a19d","timestamp":1424049229077,"topTags":{"SNL40":18,"NBAAllStarNYC":4,"SpringXD":4}}

```
61 changes: 61 additions & 0 deletions rxjava-top-tags/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<parent>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<version>1.1.0.RELEASE</version>
</parent>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.springframework.xd.</groupId>
<artifactId>spring-xd-rxjava</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.0</version>
</dependency>

</dependencies>

</project>
84 changes: 84 additions & 0 deletions rxjava-top-tags/src/main/java/com/acme/TopTags.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.acme;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.springframework.xd.tuple.TupleBuilder.tuple;

import java.util.LinkedHashMap;
import java.util.stream.Collectors;

import com.gs.collections.api.tuple.Pair;
import com.gs.collections.impl.tuple.Tuples;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import rx.Observable;

import org.springframework.xd.rxjava.Processor;
import org.springframework.xd.tuple.Tuple;

/**
* @author Marius Bogoevici
*/
public class TopTags implements Processor<String, Tuple> {

private int timeWindow;

private int timeShift;

private int topN;

public TopTags(int timeWindow, int timeShift, int topN) {
this.timeWindow = timeWindow;
this.timeShift = timeShift;
this.topN = topN;
}

private static Log logger = LogFactory.getLog(TopTags.class);

@Override
public Observable<Tuple> process(Observable<String> inputStream) {
return inputStream.flatMap(tweet -> {
JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text");
return Observable.from(array.toArray(new String[array.size()]));
})
// create (tag,1) tuple for each incoming tag
.map(tag -> Tuples.pair(tag, 1))
// batch all tags in the time window
.window(timeWindow, timeShift, SECONDS)
// with each time window stream
.flatMap(windowBuffer ->
windowBuffer
// reduce by tag, counting all entries with the same tag
.groupBy(Pair::getOne)
.flatMap(
groupedStream ->
groupedStream.reduce((acc, v) -> Tuples.pair(acc.getOne(), acc.getTwo() + v.getTwo()))
)
// sort the results
.toSortedList((a, b) -> -a.getTwo().compareTo(b.getTwo()))
// convert the output to a friendlier format
.map(l -> tuple().of("topTags",
l.subList(0, Math.min(topN, l.size()))
.stream().collect(Collectors.toMap(Pair::getOne, Pair::getTwo, (v1, v2) -> v1, LinkedHashMap::new)
)
)
)
);
}
}
60 changes: 60 additions & 0 deletions rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.acme;

import org.springframework.xd.module.options.spi.ModuleOption;

/**
* Holds options for the TopTags module
*
* @author Mark Pollack
* @author Marius Bogoevici
*/
public class TopTagsOptionsMetadata {

private int timeWindow = 1;

private int timeShift = 1;

private int topN = 10;

public int getTopN() {
return topN;
}

@ModuleOption("The number of entires to include in the top N listing")
public void setTopN(int topN) {
this.topN = topN;
}

public int getTimeWindow() {
return timeWindow;
}

@ModuleOption("The length in seconds of the time window over which the top N tags are calculated")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}

public int getTimeShift() {
return timeShift;
}

@ModuleOption("The frequency in seconds with which the top N tags are calculated")
public void setTimeShift(int timeShift) {
this.timeShift = timeShift;
}
}
30 changes: 30 additions & 0 deletions rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean id="processor" class="com.acme.TopTags">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${timeShift}"/>
<constructor-arg index="2" value="${topN}"/>
</bean>


<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

<int:channel id="input"/>

<bean name="messageHandler" class="org.springframework.xd.rxjava.SubjectMessageHandler">
<constructor-arg ref="processor"/>
</bean>


<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>

<int:channel id="output"/>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
options_class = com.acme.TopTagsOptionsMetadata