Skip to content

Commit

Permalink
refactoring + tests for put byte[] from console
Browse files Browse the repository at this point in the history
  • Loading branch information
LesTR committed Feb 16, 2019
1 parent d39df73 commit a5b11f0
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class IngestServiceTest {
@Before
public void setup() throws InterruptedException {
server = new IngestServer(ConfigFactory.load()
.withFallback(ConfigFactory.load("test-reference.conf"))
.withFallback(ConfigFactory.load("proto-reference.conf"))
.resolve());
ingest = new IngestService(server.repo, server.direct, server.scheduler);
server.startConsumerThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class RetrieveServiceTest {

@Before
public void setup() throws InterruptedException {
server = new IngestServer(ConfigFactory.load("test-reference.conf")
server = new IngestServer(ConfigFactory.load("proto-reference.conf")
.withFallback(ConfigFactory.load())
.resolve());
retrieve = new RetrieveService(server.repo, server.direct);
Expand Down
16 changes: 16 additions & 0 deletions tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@
<scope>test</scope>
</dependency>


<dependency>
<groupId>cz.o2.proxima</groupId>
<artifactId>proxima-scheme-proto</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>cz.o2.proxima</groupId>
<artifactId>proxima-scheme-proto</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void run() throws Exception {
throw new IllegalArgumentException(
"Unable to find config file " + f + ". Check your configuration.");
}
return ConfigFactory.parseFile(new File(f));
return ConfigFactory.parseFile(c);
})
.reduce(
ConfigFactory.empty(),
Expand Down
134 changes: 55 additions & 79 deletions tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.Position;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.euphoria.source.BatchSource;
import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource;
import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource;
import cz.o2.proxima.functional.TriFunction;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.scheme.ValueSerializer;
import cz.o2.proxima.scheme.ValueSerializerFactory;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.Position;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.euphoria.source.BatchSource;
import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource;
import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource;
import cz.o2.proxima.tools.io.ConsoleRandomReader;
import cz.o2.proxima.tools.io.TypedStreamElement;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.Optionals;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand All @@ -65,6 +66,11 @@
import groovy.lang.GroovyObject;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.groovy.tools.shell.Groovysh;
import org.codehaus.groovy.tools.shell.IO;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -83,10 +89,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.groovy.tools.shell.Groovysh;
import org.codehaus.groovy.tools.shell.IO;

/**
* This is the groovysh based console.
Expand Down Expand Up @@ -283,7 +285,7 @@ public Stream<StreamElement> getUnionStream(
if (direct.isPresent()) {
return Arrays.stream(descriptors)
.map(desc ->
direct.get().getFamiliesForAttribute(desc.desc())
getDirectDataOperator().getFamiliesForAttribute(desc.desc())
.stream()
.filter(af -> af.getDesc().getAccess().canReadCommitLog())
// sort primary families on top
Expand Down Expand Up @@ -369,7 +371,7 @@ public <T> WindowedStream<TypedStreamElement<T>, GlobalWindowing> getBatchSnapsh
ds = reduceUpdatesToSnapshot(attrDesc, fromStamp, toStamp);
} else {
Dataset<StreamElement> raw = flow.get().createInput(BatchSource.of(
family.getBatchObservable().get(),
Optionals.get(family.getBatchObservable()),
family.getDesc(),
fromStamp,
toStamp));
Expand Down Expand Up @@ -406,7 +408,7 @@ private Dataset<StreamElement> reduceUpdatesToSnapshot(
.filter(af -> af.getDesc().getAccess().isStateCommitLog())
.sorted((l, r) -> Integer.compare(
l.getDesc().getType().ordinal(), r.getDesc().getType().ordinal()))
.map(af -> af.getCommitLogReader().get())
.map(af -> Optionals.get(af.getCommitLogReader()))
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"Cannot create batch snapshot, missing random access family "
Expand Down Expand Up @@ -472,7 +474,7 @@ public WindowedStream<StreamElement, GlobalWindowing> getBatchUpdates(
.distinct()
.map(family ->
flow.get().createInput(BatchSource.of(
family.getBatchObservable().get(),
Optionals.get(family.getBatchObservable()),
family.getAttributes(),
startStamp, endStamp)))
.reduce((left, right) -> Union.of(left, right).output())
Expand Down Expand Up @@ -525,68 +527,40 @@ public ConsoleRandomReader getRandomAccessReader(String entity) {
}


public void put(
public <T> void put(
EntityDescriptor entityDesc,
AttributeDescriptor attrDesc,
String key, String attribute, String textFormat)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException,
ClassNotFoundException, InvalidProtocolBufferException, InterruptedException,
TextFormat.ParseException {

put(entityDesc, attrDesc, key, attribute,
System.currentTimeMillis(), textFormat);
}

public void put(
EntityDescriptor entityDesc,
AttributeDescriptor attrDesc,
String key, String attribute, long stamp, String textFormat)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException,
ClassNotFoundException, InvalidProtocolBufferException, InterruptedException,
TextFormat.ParseException {
AttributeDescriptor<T> attrDesc,
String key, String attribute, long stamp, T value
) throws InterruptedException {
ValueSerializerFactory serializerFactory = repo.getValueSerializerFactory(
attrDesc.getSchemeUri().getScheme())
.orElseThrow(() -> new IllegalStateException(
"Unable to get ValueSerializerFactory for attribute " + attrDesc.getName()
+ " with scheme " + attrDesc.getSchemeUri().toString() + ".")
);

ValueSerializer<T> serializer = serializerFactory
.getValueSerializer(attrDesc.getSchemeUri());

OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc)
.orElseThrow(() -> new IllegalArgumentException(
"Missing online writer for " + attrDesc));

if (!direct.isPresent()) {
throw new IllegalStateException(
"Can write with direct operator only. Add runtime dependecncy");
}
if (attrDesc.getSchemeUri().getScheme().equals("proto")) {
ValueSerializerFactory factory = repo.getValueSerializerFactory(
attrDesc.getSchemeUri().getScheme())
.orElseThrow(() -> new IllegalStateException(
"Unable to get ValueSerializerFactory for attribute " + attrDesc.getName()
+ " with scheme " + attrDesc.getSchemeUri().toString() + ".")
);

String protoClass = factory.getClassName(attrDesc.getSchemeUri());
Class<AbstractMessage> cls = Classpath.findClass(protoClass, AbstractMessage.class);
byte[] payload = null;
if (textFormat != null) {
Method newBuilder = cls.getDeclaredMethod("newBuilder");
Builder builder = (Builder) newBuilder.invoke(null);
TextFormat.merge(textFormat, builder);
payload = builder.build().toByteArray();
}
OnlineAttributeWriter writer = direct.get().getWriter(attrDesc)
.orElseThrow(() -> new IllegalArgumentException(
"Missing writer for " + attrDesc));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> exc = new AtomicReference<>();
writer.write(StreamElement.update(
entityDesc, attrDesc, UUID.randomUUID().toString(),
key, attribute, stamp, payload), (success, ex) -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> exc = new AtomicReference<>();
byte[] payload = serializer.serialize(value);
writer.write(StreamElement.update(
entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute,
stamp, payload), ((success, error) -> {
if (!success) {
exc.set(ex);
exc.set(error);
}
latch.countDown();
});
latch.await();
if (exc.get() != null) {
throw new RuntimeException(exc.get());
}
} else {
throw new IllegalArgumentException(
"Don't know how to make builder for "
+ attrDesc.getSchemeUri());
})
);
latch.await();
if (exc.get() != null) {
throw new RuntimeException(exc.get());
}

}
Expand All @@ -602,11 +576,7 @@ public void delete(
EntityDescriptor entityDesc, AttributeDescriptor<?> attrDesc,
String key, String attribute, long stamp) throws InterruptedException {

if (!direct.isPresent()) {
throw new IllegalStateException(
"Can write with direct operator only. Add runtime dependecncy");
}
OnlineAttributeWriter writer = direct.get().getWriter(attrDesc)
OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc)
.orElseThrow(() -> new IllegalArgumentException(
"Missing writer for " + attrDesc));
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -742,4 +712,10 @@ private TriFunction<Repository, Config, Boolean, Executor> getExecutorFactory(
return (repository, cfg, eventTime) -> Console.createLocalExecutor(eventTime);
}

DirectDataOperator getDirectDataOperator() {
return direct.orElseThrow( () ->
new IllegalStateException(
"Unable to get Direct data operator. Add runtime dependency.")
);
}
}
17 changes: 9 additions & 8 deletions tools/src/main/resources/class-entitydesc.ftlh
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ class Environment implements RepositoryProvider {
def List<KeyValue<${attribute.type}>> list(String key, String start) {
return ${entity.classname}Descriptor.this.reader.list(key, name, start)
}
def void put(String key, String attribute, String textFormat) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, textFormat)
def void put(String key, String attribute, ${attribute.type} value) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, System.currentTimeMillis(), value)
}
def void put(String key, String attribute, long stamp, String textFormat) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, textFormat)
def void put(String key, String attribute, long stamp, ${attribute.type} value) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, value)
}
def void delete(String key, String attribute) {
console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute)
Expand All @@ -145,11 +145,11 @@ class Environment implements RepositoryProvider {
def ${attribute.type} get(String key) {
return ${entity.classname}Descriptor.this.reader.get(key, name)?.getValue()
}
def void put(String key, String textFormat) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), textFormat)
def void put(String key, ${attribute.type} value) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), System.currentTimeMillis(), value)
}
def void put(String key, long stamp, String textFormat) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, textFormat)
def void put(String key, long stamp, ${attribute.type} value) {
console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, value)
}
def void delete(String key) {
console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.getName())
Expand Down Expand Up @@ -182,6 +182,7 @@ class Environment implements RepositoryProvider {

</#list>


def Repository getRepo() {
return console.getRepo()
}
Expand Down
8 changes: 8 additions & 0 deletions tools/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public class GroovyEnvTest {
@Before
public void setUp() {
Console console = Console.create(cfg, repo);
direct = console.getDirect().orElseThrow(
() -> new IllegalStateException("Missing direct operator"));
direct = console.getDirectDataOperator();
conf = new Configuration(Configuration.VERSION_2_3_23);
conf.setDefaultEncoding("utf-8");
conf.setClassForTemplateLoading(getClass(), "/");
Expand Down
Loading

0 comments on commit a5b11f0

Please sign in to comment.