Skip to content

Commit

Permalink
fixed fields order
Browse files Browse the repository at this point in the history
EinsamHauer committed Oct 15, 2015
1 parent b9d72eb commit a83903f
Showing 1 changed file with 1 addition and 114 deletions.
115 changes: 1 addition & 114 deletions src/main/java/net/iponweb/disthene/dumper/Dumper.java
Original file line number Diff line number Diff line change
@@ -100,119 +100,6 @@ private void dumpTenant(File folder, final String tenant) throws IOException, Ex
);

ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(parameters.getThreads()));
/*
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(paths.size());;
final AtomicInteger counter = new AtomicInteger(0);
for (final String path : paths) {
Function<ResultSet, Void> function = new Function<ResultSet, Void>() {
@Override
public Void apply(ResultSet result) {
StringBuilder sb = new StringBuilder();
for(Row row : result) {
sb.append(path).append(" ").append(row.getLong("time")).append(" ");
sb.append(path.startsWith("sum") ? ListUtils.sum(row.getList("data", Double.class)) : ListUtils.average(row.getList("data", Double.class))).append(" ");
sb.append(tenant).append("\n");
}
pwMetrics.print(sb.toString());
int cc = counter.addAndGet(1);
if (cc % 100000 == 0) {
logger.info("Processed: " + cc * 100 / paths.size() + "%");
pwMetrics.flush();
}
return null;
}
};
futures.add(
Futures.transform(
session.executeAsync(longRollupStatement.bind(path)),
function,
executor
));
}
futures = Futures.inCompletionOrder(futures);
int totalCount = 0;
for (ListenableFuture<Void> future : futures) {
future.get();
totalCount ++;
}
logger.info("Processed " + totalCount + " paths");*/

/*
for (int i = 0; i < paths.size(); i += 100000) {
List<ListenableFuture<String>> futures = Lists.newArrayListWithExpectedSize(100000);
for (int j = i; j < i + 100000; j++) {
Function<ResultSet, String> serializeFunction =
new Function<ResultSet, String>() {
public String apply(ResultSet resultSet) {
StringBuffer sb = new StringBuffer();
for(Row row : resultSet) {
sb.append(path).append(" ").append(row.getLong("time")).append(" ");
sb.append(path.startsWith("sum") ? ListUtils.sum(row.getList("data", Double.class)) : ListUtils.average(row.getList("data", Double.class))).append(" ");
sb.append(tenant).append("\n");
}
return sb.toString();
}
};
futures.add(
Futures.transform(
session.executeAsync(longRollupStatement.bind(path)),
serializeFunction,
executor
)
}
}
*/

/*
List<ListenableFuture<String>> futures = Lists.newArrayListWithExpectedSize(paths.size());;
for (final String path : paths) {
Function<ResultSet, String> serializeFunction =
new Function<ResultSet, String>() {
public String apply(ResultSet resultSet) {
StringBuffer sb = new StringBuffer();
for(Row row : resultSet) {
sb.append(path).append(" ").append(row.getLong("time")).append(" ");
sb.append(path.startsWith("sum") ? ListUtils.sum(row.getList("data", Double.class)) : ListUtils.average(row.getList("data", Double.class))).append(" ");
sb.append(tenant).append("\n");
}
return sb.toString();
}
};
futures.add(
Futures.transform(
session.executeAsync(longRollupStatement.bind(path)),
serializeFunction,
executor
)
);
}
futures = Futures.inCompletionOrder(futures);
int counter = 0;
for (ListenableFuture<String> future : futures) {
pwMetrics.print(future.get());
counter++;
if (counter % 100000 == 0) {
logger.info("Processed: " + counter * 100 / paths.size() + "%");
pwMetrics.flush();
}
}
*/


final AtomicInteger counter = new AtomicInteger(0);

for (String path : paths) {
@@ -352,7 +239,7 @@ public Metric(String path, Long time, Double data, String tenant) {

@Override
public String toString() {
return path + " " + time + " " + data + " " + tenant;
return path + " " + data + " " + time + " " + tenant;
}
}

0 comments on commit a83903f

Please sign in to comment.