-
Notifications
You must be signed in to change notification settings - Fork 38
Executing commands
Every micro service is listening on two RabbitMQ queues, one for data ingestion and second for command execution.
You could ask why do we need commands at all? As we may be running more than just one instance of the individual micro services, or we might want to control the flow of the data (like batch processing with "flush" commands), we need to make sure only one micro service executes the following command.
In order to execute your desired command, simply send a valid JSON command over micro service's RabbitMQ commands queue, specified in your configuration file.
As an example you could take Billing micro service's invoice generation commands.
If you decide to implement your own command, first select which micro service should be executing it. Let's say you are designing an event processing command for your UDR micro service, then proceed by creating a new class that inherits from Commands
parent class, has name that corresponds with _class
JSON field, and Java class file is located in:
/core/udr/src/main/java/ch/icclab/cyclops/consume/command/model/
Now you can start developing your command and if you need to pass some arguments, simply register them in the Java class the same way as you would do it in a POJO class, where those object variables will be filled based on their corresponding JSON fields, or left as null/defaults.
We will be providing couple of tools for you, one of which is standardised interface for time series data operations.
To access time series data you will need to create a query builder with measurement's name
QueryBuilder query = new QueryBuilder(name);
Because a builder pattern is utilised, you can continue and add another statements:
select(String ... fields);
where(String key, Object value);
where(String key, String delimiter, Object value);
and(String key, Object value);
and(String key, String delimiter, Object value);
timeFrom(Long time, TimeUnit unit);
afterTime(Long time, TimeUnit unit);
timeTo(Long time, TimeUnit unit);
beforeTime(Long time, TimeUnit unit);
groupBy(String ... keys);
orderDesc();
limit(Integer lmt);
offset(Integer off);
count(String field);
sum(String field);
It is not necessary to specify select(*)
as all fields are returned by default, only use select
method to slice data, where the same applies to default ascending order.
This is just one of the possible examples:
QueryBuilder query = new QueryBuilder("CloudStackIPUsageData");
query.timeFrom(1451650521, TimeUnit.SECONDS);
query.timeTo(1467375357, TimeUnit.SECONDS);
query.where("account", "martin");
query.and("usage", ">", 15);
The next logical step is to execute prepared query and parse its result. Here you have two options how to proceed, either getting back List<Map>
or List<YourClass>
InfluxDBClient client = InfluxDBClient.getInstance();
List<Map> maps = client.executeQuery(builder);
List<Own> own = client.executeQueryAndMapItToClass(builder, Own.class);
If you decide to persist data sets programatically, start with database creation
createDatabases(String ... names);
There are two ways how to persist data points into underlying database, either point by point
persistSinglePoint(Point.Builder builder);
or by using batch processing
persistContainer(BatchPointsContainer container);
where adding points looks like this
addPoint(Point.Builder builder);
Make sure tags and fields are never null
.
In order to send messages to micro service's exchanges programatically use Messenger
class, where you have these two methods available
broadcast(Object content);
publish(Object content, String routing);
Where the first one is connected to fanout
and the second one to direct
exchange. When passing object as argument, do it in a raw format and don't transform it to JSON as that will be done automatically for you.
However, if you are executing commands over RESTful API and not by using asynchronous RabbitMQ, you will also have an option to return JSON representation of specified object
restful(Object content);
A command that looks into database based on provided parameters and forwards the response to micro service's broadcast exchange and reacts to JSON message on [commands queue] (https://github.com/icclab/cyclops/wiki/Configuration#rabbitmq)
{
"_class": "BroadcastUsersData",
"measurement": "CloudStackIPUsageData",
"account": "martin"
}
would be placed in consume/command/model/
and would look like
import ch.icclab.cyclops.consume.command.Command;
import ch.icclab.cyclops.timeseries.InfluxDBClient;
import ch.icclab.cyclops.timeseries.QueryBuilder;
import ch.icclab.cyclops.publish.Messenger;
import java.util.List;
import java.util.Map;
public class BroadcastUsersData extends Command{
private String measurement;
private String account;
@Override
protected void execute() {
QueryBuilder query = new QueryBuilder(measurement).where("account", account);
List<Map> list = InfluxDBClient.getInstance().executeQuery(query);
Messenger.getInstance().broadcast(list);
}
}