diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java
index 613e6da45..09e0176c3 100644
--- a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java
+++ b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java
@@ -50,8 +50,8 @@ public class IngestServiceTest {
@Before
public void setup() throws InterruptedException {
- server = new IngestServer(ConfigFactory.load()
- .withFallback(ConfigFactory.load("test-reference.conf"))
+ server = new IngestServer(ConfigFactory.load("proto-reference.conf")
+ .withFallback(ConfigFactory.load())
.resolve());
ingest = new IngestService(server.repo, server.direct, server.scheduler);
server.startConsumerThreads();
diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java
index 687e0349e..aaf88dbb5 100644
--- a/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java
+++ b/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java
@@ -41,7 +41,7 @@ public class RetrieveServiceTest {
@Before
public void setup() throws InterruptedException {
- server = new IngestServer(ConfigFactory.load("test-reference.conf")
+ server = new IngestServer(ConfigFactory.load("proto-reference.conf")
.withFallback(ConfigFactory.load())
.resolve());
retrieve = new RetrieveService(server.repo, server.direct);
diff --git a/scheme/proto/src/test/resources/application.conf b/scheme/proto/src/test/resources/proto-reference.conf
similarity index 100%
rename from scheme/proto/src/test/resources/application.conf
rename to scheme/proto/src/test/resources/proto-reference.conf
diff --git a/tools/pom.xml b/tools/pom.xml
index 574bbb749..5c9791e2c 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -178,6 +178,22 @@
test
+
+
+ cz.o2.proxima
+ proxima-scheme-proto
+ ${project.version}
+ test
+
+
+
+ cz.o2.proxima
+ proxima-scheme-proto
+ ${project.version}
+ test-jar
+ test
+
+
diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java
index d02e0755c..a50187743 100644
--- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java
+++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java
@@ -80,7 +80,7 @@ public void run() throws Exception {
throw new IllegalArgumentException(
"Unable to find config file " + f + ". Check your configuration.");
}
- return ConfigFactory.parseFile(new File(f));
+ return ConfigFactory.parseFile(c);
})
.reduce(
ConfigFactory.empty(),
diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java
index 9a5c6ea95..819ea5097 100644
--- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java
+++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java
@@ -18,10 +18,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import cz.o2.proxima.direct.commitlog.CommitLogReader;
+import cz.o2.proxima.direct.commitlog.Position;
+import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
+import cz.o2.proxima.direct.core.DirectDataOperator;
+import cz.o2.proxima.direct.core.OnlineAttributeWriter;
+import cz.o2.proxima.direct.euphoria.source.BatchSource;
+import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource;
+import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource;
import cz.o2.proxima.functional.TriFunction;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub;
@@ -29,19 +36,13 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
-import cz.o2.proxima.direct.core.OnlineAttributeWriter;
+import cz.o2.proxima.scheme.ValueSerializer;
import cz.o2.proxima.scheme.ValueSerializerFactory;
import cz.o2.proxima.storage.StreamElement;
-import cz.o2.proxima.direct.commitlog.CommitLogReader;
-import cz.o2.proxima.direct.commitlog.Position;
-import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
-import cz.o2.proxima.direct.core.DirectDataOperator;
-import cz.o2.proxima.direct.euphoria.source.BatchSource;
-import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource;
-import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource;
import cz.o2.proxima.tools.io.ConsoleRandomReader;
import cz.o2.proxima.tools.io.TypedStreamElement;
import cz.o2.proxima.util.Classpath;
+import cz.o2.proxima.util.Optionals;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing;
import cz.seznam.euphoria.core.client.flow.Flow;
@@ -65,6 +66,11 @@
import groovy.lang.GroovyObject;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.codehaus.groovy.tools.shell.Groovysh;
+import org.codehaus.groovy.tools.shell.IO;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -83,10 +89,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.codehaus.groovy.tools.shell.Groovysh;
-import org.codehaus.groovy.tools.shell.IO;
/**
* This is the groovysh based console.
@@ -283,7 +285,7 @@ public Stream getUnionStream(
if (direct.isPresent()) {
return Arrays.stream(descriptors)
.map(desc ->
- direct.get().getFamiliesForAttribute(desc.desc())
+ getDirectDataOperator().getFamiliesForAttribute(desc.desc())
.stream()
.filter(af -> af.getDesc().getAccess().canReadCommitLog())
// sort primary families on top
@@ -369,7 +371,7 @@ public WindowedStream, GlobalWindowing> getBatchSnapsh
ds = reduceUpdatesToSnapshot(attrDesc, fromStamp, toStamp);
} else {
Dataset raw = flow.get().createInput(BatchSource.of(
- family.getBatchObservable().get(),
+ Optionals.get(family.getBatchObservable()),
family.getDesc(),
fromStamp,
toStamp));
@@ -406,7 +408,7 @@ private Dataset reduceUpdatesToSnapshot(
.filter(af -> af.getDesc().getAccess().isStateCommitLog())
.sorted((l, r) -> Integer.compare(
l.getDesc().getType().ordinal(), r.getDesc().getType().ordinal()))
- .map(af -> af.getCommitLogReader().get())
+ .map(af -> Optionals.get(af.getCommitLogReader()))
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"Cannot create batch snapshot, missing random access family "
@@ -472,7 +474,7 @@ public WindowedStream getBatchUpdates(
.distinct()
.map(family ->
flow.get().createInput(BatchSource.of(
- family.getBatchObservable().get(),
+ Optionals.get(family.getBatchObservable()),
family.getAttributes(),
startStamp, endStamp)))
.reduce((left, right) -> Union.of(left, right).output())
@@ -525,68 +527,40 @@ public ConsoleRandomReader getRandomAccessReader(String entity) {
}
- public void put(
+ public void put(
EntityDescriptor entityDesc,
- AttributeDescriptor attrDesc,
- String key, String attribute, String textFormat)
- throws NoSuchMethodException, IllegalAccessException, InvocationTargetException,
- ClassNotFoundException, InvalidProtocolBufferException, InterruptedException,
- TextFormat.ParseException {
-
- put(entityDesc, attrDesc, key, attribute,
- System.currentTimeMillis(), textFormat);
- }
-
- public void put(
- EntityDescriptor entityDesc,
- AttributeDescriptor attrDesc,
- String key, String attribute, long stamp, String textFormat)
- throws NoSuchMethodException, IllegalAccessException, InvocationTargetException,
- ClassNotFoundException, InvalidProtocolBufferException, InterruptedException,
- TextFormat.ParseException {
+ AttributeDescriptor attrDesc,
+ String key, String attribute, long stamp, T value
+ ) throws InterruptedException {
+ ValueSerializerFactory serializerFactory = repo.getValueSerializerFactory(
+ attrDesc.getSchemeUri().getScheme())
+ .orElseThrow(() -> new IllegalStateException(
+ "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName()
+ + " with scheme " + attrDesc.getSchemeUri().toString() + ".")
+ );
+
+ ValueSerializer serializer = serializerFactory
+ .getValueSerializer(attrDesc.getSchemeUri());
+
+ OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc)
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Missing online writer for " + attrDesc));
- if (!direct.isPresent()) {
- throw new IllegalStateException(
- "Can write with direct operator only. Add runtime dependecncy");
- }
- if (attrDesc.getSchemeUri().getScheme().equals("proto")) {
- ValueSerializerFactory factory = repo.getValueSerializerFactory(
- attrDesc.getSchemeUri().getScheme())
- .orElseThrow(() -> new IllegalStateException(
- "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName()
- + " with scheme " + attrDesc.getSchemeUri().toString() + ".")
- );
-
- String protoClass = factory.getClassName(attrDesc.getSchemeUri());
- Class cls = Classpath.findClass(protoClass, AbstractMessage.class);
- byte[] payload = null;
- if (textFormat != null) {
- Method newBuilder = cls.getDeclaredMethod("newBuilder");
- Builder builder = (Builder) newBuilder.invoke(null);
- TextFormat.merge(textFormat, builder);
- payload = builder.build().toByteArray();
- }
- OnlineAttributeWriter writer = direct.get().getWriter(attrDesc)
- .orElseThrow(() -> new IllegalArgumentException(
- "Missing writer for " + attrDesc));
- CountDownLatch latch = new CountDownLatch(1);
- AtomicReference exc = new AtomicReference<>();
- writer.write(StreamElement.update(
- entityDesc, attrDesc, UUID.randomUUID().toString(),
- key, attribute, stamp, payload), (success, ex) -> {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference exc = new AtomicReference<>();
+ byte[] payload = serializer.serialize(value);
+ writer.write(StreamElement.update(
+ entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute,
+ stamp, payload), ((success, error) -> {
if (!success) {
- exc.set(ex);
+ exc.set(error);
}
latch.countDown();
- });
- latch.await();
- if (exc.get() != null) {
- throw new RuntimeException(exc.get());
- }
- } else {
- throw new IllegalArgumentException(
- "Don't know how to make builder for "
- + attrDesc.getSchemeUri());
+ })
+ );
+ latch.await();
+ if (exc.get() != null) {
+ throw new RuntimeException(exc.get());
}
}
@@ -602,11 +576,7 @@ public void delete(
EntityDescriptor entityDesc, AttributeDescriptor> attrDesc,
String key, String attribute, long stamp) throws InterruptedException {
- if (!direct.isPresent()) {
- throw new IllegalStateException(
- "Can write with direct operator only. Add runtime dependecncy");
- }
- OnlineAttributeWriter writer = direct.get().getWriter(attrDesc)
+ OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc)
.orElseThrow(() -> new IllegalArgumentException(
"Missing writer for " + attrDesc));
CountDownLatch latch = new CountDownLatch(1);
@@ -742,4 +712,10 @@ private TriFunction getExecutorFactory(
return (repository, cfg, eventTime) -> Console.createLocalExecutor(eventTime);
}
+ DirectDataOperator getDirectDataOperator() {
+ return direct.orElseThrow( () ->
+ new IllegalStateException(
+ "Unable to get Direct data operator. Add runtime dependency.")
+ );
+ }
}
diff --git a/tools/src/main/resources/class-entitydesc.ftlh b/tools/src/main/resources/class-entitydesc.ftlh
index d2dea3c63..fcbebe01c 100644
--- a/tools/src/main/resources/class-entitydesc.ftlh
+++ b/tools/src/main/resources/class-entitydesc.ftlh
@@ -123,11 +123,11 @@ class Environment implements RepositoryProvider {
def List> list(String key, String start) {
return ${entity.classname}Descriptor.this.reader.list(key, name, start)
}
- def void put(String key, String attribute, String textFormat) {
- console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, textFormat)
+ def void put(String key, String attribute, ${attribute.type} value) {
+ console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, System.currentTimeMillis(), value)
}
- def void put(String key, String attribute, long stamp, String textFormat) {
- console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, textFormat)
+ def void put(String key, String attribute, long stamp, ${attribute.type} value) {
+ console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, value)
}
def void delete(String key, String attribute) {
console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute)
@@ -145,11 +145,11 @@ class Environment implements RepositoryProvider {
def ${attribute.type} get(String key) {
return ${entity.classname}Descriptor.this.reader.get(key, name)?.getValue()
}
- def void put(String key, String textFormat) {
- console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), textFormat)
+ def void put(String key, ${attribute.type} value) {
+ console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), System.currentTimeMillis(), value)
}
- def void put(String key, long stamp, String textFormat) {
- console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, textFormat)
+ def void put(String key, long stamp, ${attribute.type} value) {
+ console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, value)
}
def void delete(String key) {
console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.getName())
@@ -182,6 +182,7 @@ class Environment implements RepositoryProvider {
#list>
+
def Repository getRepo() {
return console.getRepo()
}
diff --git a/tools/src/main/resources/log4j.properties b/tools/src/main/resources/log4j.properties
new file mode 100644
index 000000000..1da823016
--- /dev/null
+++ b/tools/src/main/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java
index a15961382..c1a3041e7 100644
--- a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java
+++ b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java
@@ -72,8 +72,7 @@ public class GroovyEnvTest {
@Before
public void setUp() {
Console console = Console.create(cfg, repo);
- direct = console.getDirect().orElseThrow(
- () -> new IllegalStateException("Missing direct operator"));
+ direct = console.getDirectDataOperator();
conf = new Configuration(Configuration.VERSION_2_3_23);
conf.setDefaultEncoding("utf-8");
conf.setClassForTemplateLoading(getClass(), "/");
diff --git a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java
new file mode 100644
index 000000000..325c3be1f
--- /dev/null
+++ b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java
@@ -0,0 +1,195 @@
+/**
+ * Copyright 2017-2019 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.tools.groovy;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import cz.o2.proxima.direct.commitlog.CommitLogReader;
+import cz.o2.proxima.direct.commitlog.LogObserver;
+import cz.o2.proxima.direct.core.DirectDataOperator;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.repository.ConfigRepository;
+import cz.o2.proxima.repository.EntityDescriptor;
+import cz.o2.proxima.repository.Repository;
+import cz.o2.proxima.storage.StreamElement;
+import cz.o2.proxima.util.Optionals;
+import freemarker.template.Configuration;
+import freemarker.template.TemplateExceptionHandler;
+import groovy.lang.GroovyClassLoader;
+import groovy.lang.Script;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@Slf4j
+public class GroovyEnvTestPutBytes {
+ final Config cfg = ConfigFactory.load("test-reference.conf").resolve();
+ final Repository repo = ConfigRepository.of(cfg);
+
+ final EntityDescriptor gateway = Optionals.get(repo.findEntity("gateway"));
+ final AttributeDescriptor users = Optionals.get(gateway.findAttribute("users"));
+ final AttributeDescriptor device = Optionals.get(
+ gateway.findAttribute("device.*"));
+
+ Configuration conf;
+
+ GroovyClassLoader loader;
+
+ DirectDataOperator direct;
+
+ @Before
+ public void setUp() {
+ Console console = Console.create(cfg, repo);
+ direct = console.getDirectDataOperator();
+ conf = new Configuration(Configuration.VERSION_2_3_23);
+ conf.setDefaultEncoding("utf-8");
+ conf.setClassForTemplateLoading(getClass(), "/");
+ conf.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
+ conf.setLogTemplateExceptions(false);
+
+ loader = new GroovyClassLoader(Thread.currentThread().getContextClassLoader());
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+
+ @Test
+ public void testPutBytes() throws Exception {
+ long now = System.currentTimeMillis();
+ executeTest(new AbstractTest() {
+ @Override
+ Script inputScript() throws Exception {
+ return compile("env.gateway.users.put(\"test-key\",\"testValue\".getBytes())");
+ }
+
+ @Override
+ void validate(StreamElement element) {
+ assertTrue(element.getStamp() > now);
+ assertNotNull(element.getValue());
+ assertEquals("testValue", new String(element.getValue()));
+ assertEquals("test-key", element.getKey());
+ assertEquals("users", element.getAttribute());
+ }
+ }, users);
+ }
+
+ @Test
+ public void testPutBytesWithTimestamp() throws Exception {
+ executeTest(new AbstractTest() {
+ @Override
+ Script inputScript() throws Exception {
+ return compile("env.gateway.users.put(\"test-key\",123456789L,"
+ + "\"testValue\".getBytes())");
+ }
+
+ @Override
+ void validate(StreamElement element) {
+ assertEquals(123456789L, element.getStamp());
+ assertNotNull(element.getValue());
+ assertEquals("testValue", new String(element.getValue()));
+ assertEquals("test-key", element.getKey());
+ assertEquals("users", element.getAttribute());
+ }
+ }, users);
+ }
+
+ @Test
+ public void testPutBytesWildcard() throws Exception {
+ final long start = System.currentTimeMillis();
+ executeTest(new AbstractTest() {
+ @Override
+ Script inputScript() throws Exception {
+ return compile("env.gateway.device.put(\"test-key\","
+ + "\"toilet\",\"value\".getBytes())");
+ }
+
+ @Override
+ void validate(StreamElement element) {
+ assertTrue(element.getStamp() > start);
+ assertNotNull(element.getValue());
+ assertEquals("value", new String(element.getValue()));
+ assertEquals("test-key", element.getKey());
+ assertEquals("device.toilet", element.getAttribute());
+ }
+ }, device);
+ }
+
+ @Test
+ public void testPutBytesWildcardWithTimestamp() throws Exception {
+ final long tms = System.currentTimeMillis();
+ executeTest(new AbstractTest() {
+ @Override
+ Script inputScript() throws Exception {
+ return compile("env.gateway.device.put(\"test-key\",\"toilet\","
+ + tms + ",\"value\".getBytes())");
+ }
+
+ @Override
+ void validate(StreamElement element) {
+ assertEquals(tms, element.getStamp());
+ assertNotNull(element.getValue());
+ assertEquals("value", new String(element.getValue()));
+ assertEquals("test-key", element.getKey());
+ assertEquals("device.toilet", element.getAttribute());
+ }
+ }, device);
+
+ }
+
+
+ void executeTest(AbstractTest test, AttributeDescriptor attr) throws Exception {
+ test.execute(
+ Optionals.get(direct.getCommitLogReader(attr)));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ Script compile(String script) throws Exception { // @TODO: refactor
+ String source = GroovyEnv.getSource(conf, repo)
+ + "\n"
+ + "env = cz.o2.proxima.tools.groovy.Console.get().getEnv()"
+ + "\n"
+ + script;
+ Class