From a5b11f0fa2d636a09a6012776d01082e045d08a6 Mon Sep 17 00:00:00 2001 From: Lukas Drbal Date: Sat, 16 Feb 2019 21:06:49 +0100 Subject: [PATCH] refactoring + tests for put byte[] from console --- .../o2/proxima/server/IngestServiceTest.java | 2 +- .../proxima/server/RetrieveServiceTest.java | 2 +- ...{application.conf => proto-reference.conf} | 0 tools/pom.xml | 16 ++ .../cz/o2/proxima/tools/groovy/Compiler.java | 2 +- .../cz/o2/proxima/tools/groovy/Console.java | 134 +++++------- .../src/main/resources/class-entitydesc.ftlh | 17 +- tools/src/main/resources/log4j.properties | 8 + .../proxima/tools/groovy/GroovyEnvTest.java | 3 +- .../tools/groovy/GroovyEnvTestPutBytes.java | 195 ++++++++++++++++++ 10 files changed, 287 insertions(+), 92 deletions(-) rename scheme/proto/src/test/resources/{application.conf => proto-reference.conf} (100%) create mode 100644 tools/src/main/resources/log4j.properties create mode 100644 tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java index 613e6da45..f85992eed 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java @@ -51,7 +51,7 @@ public class IngestServiceTest { @Before public void setup() throws InterruptedException { server = new IngestServer(ConfigFactory.load() - .withFallback(ConfigFactory.load("test-reference.conf")) + .withFallback(ConfigFactory.load("proto-reference.conf")) .resolve()); ingest = new IngestService(server.repo, server.direct, server.scheduler); server.startConsumerThreads(); diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java index 687e0349e..aaf88dbb5 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java @@ -41,7 +41,7 @@ public class RetrieveServiceTest { @Before public void setup() throws InterruptedException { - server = new IngestServer(ConfigFactory.load("test-reference.conf") + server = new IngestServer(ConfigFactory.load("proto-reference.conf") .withFallback(ConfigFactory.load()) .resolve()); retrieve = new RetrieveService(server.repo, server.direct); diff --git a/scheme/proto/src/test/resources/application.conf b/scheme/proto/src/test/resources/proto-reference.conf similarity index 100% rename from scheme/proto/src/test/resources/application.conf rename to scheme/proto/src/test/resources/proto-reference.conf diff --git a/tools/pom.xml b/tools/pom.xml index 574bbb749..5c9791e2c 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -178,6 +178,22 @@ test + + + cz.o2.proxima + proxima-scheme-proto + ${project.version} + test + + + + cz.o2.proxima + proxima-scheme-proto + ${project.version} + test-jar + test + + diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java index d02e0755c..a50187743 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java @@ -80,7 +80,7 @@ public void run() throws Exception { throw new IllegalArgumentException( "Unable to find config file " + f + ". Check your configuration."); } - return ConfigFactory.parseFile(new File(f)); + return ConfigFactory.parseFile(c); }) .reduce( ConfigFactory.empty(), diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java index 9a5c6ea95..819ea5097 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java @@ -18,10 +18,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.AbstractMessage; import com.google.protobuf.AbstractMessage.Builder; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.direct.commitlog.CommitLogReader; +import cz.o2.proxima.direct.commitlog.Position; +import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.direct.euphoria.source.BatchSource; +import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource; +import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource; import cz.o2.proxima.functional.TriFunction; import cz.o2.proxima.proto.service.RetrieveServiceGrpc; import cz.o2.proxima.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub; @@ -29,19 +36,13 @@ import cz.o2.proxima.repository.AttributeDescriptor; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; -import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.scheme.ValueSerializer; import cz.o2.proxima.scheme.ValueSerializerFactory; import cz.o2.proxima.storage.StreamElement; -import cz.o2.proxima.direct.commitlog.CommitLogReader; -import cz.o2.proxima.direct.commitlog.Position; -import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor; -import cz.o2.proxima.direct.core.DirectDataOperator; -import cz.o2.proxima.direct.euphoria.source.BatchSource; -import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource; -import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource; import cz.o2.proxima.tools.io.ConsoleRandomReader; import cz.o2.proxima.tools.io.TypedStreamElement; import cz.o2.proxima.util.Classpath; +import cz.o2.proxima.util.Optionals; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing; import cz.seznam.euphoria.core.client.flow.Flow; @@ -65,6 +66,11 @@ import groovy.lang.GroovyObject; import io.grpc.Channel; import io.grpc.ManagedChannelBuilder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.groovy.tools.shell.Groovysh; +import org.codehaus.groovy.tools.shell.IO; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -83,10 +89,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.codehaus.groovy.tools.shell.Groovysh; -import org.codehaus.groovy.tools.shell.IO; /** * This is the groovysh based console. @@ -283,7 +285,7 @@ public Stream getUnionStream( if (direct.isPresent()) { return Arrays.stream(descriptors) .map(desc -> - direct.get().getFamiliesForAttribute(desc.desc()) + getDirectDataOperator().getFamiliesForAttribute(desc.desc()) .stream() .filter(af -> af.getDesc().getAccess().canReadCommitLog()) // sort primary families on top @@ -369,7 +371,7 @@ public WindowedStream, GlobalWindowing> getBatchSnapsh ds = reduceUpdatesToSnapshot(attrDesc, fromStamp, toStamp); } else { Dataset raw = flow.get().createInput(BatchSource.of( - family.getBatchObservable().get(), + Optionals.get(family.getBatchObservable()), family.getDesc(), fromStamp, toStamp)); @@ -406,7 +408,7 @@ private Dataset reduceUpdatesToSnapshot( .filter(af -> af.getDesc().getAccess().isStateCommitLog()) .sorted((l, r) -> Integer.compare( l.getDesc().getType().ordinal(), r.getDesc().getType().ordinal())) - .map(af -> af.getCommitLogReader().get()) + .map(af -> Optionals.get(af.getCommitLogReader())) .findFirst() .orElseThrow(() -> new IllegalStateException( "Cannot create batch snapshot, missing random access family " @@ -472,7 +474,7 @@ public WindowedStream getBatchUpdates( .distinct() .map(family -> flow.get().createInput(BatchSource.of( - family.getBatchObservable().get(), + Optionals.get(family.getBatchObservable()), family.getAttributes(), startStamp, endStamp))) .reduce((left, right) -> Union.of(left, right).output()) @@ -525,68 +527,40 @@ public ConsoleRandomReader getRandomAccessReader(String entity) { } - public void put( + public void put( EntityDescriptor entityDesc, - AttributeDescriptor attrDesc, - String key, String attribute, String textFormat) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, - ClassNotFoundException, InvalidProtocolBufferException, InterruptedException, - TextFormat.ParseException { - - put(entityDesc, attrDesc, key, attribute, - System.currentTimeMillis(), textFormat); - } - - public void put( - EntityDescriptor entityDesc, - AttributeDescriptor attrDesc, - String key, String attribute, long stamp, String textFormat) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, - ClassNotFoundException, InvalidProtocolBufferException, InterruptedException, - TextFormat.ParseException { + AttributeDescriptor attrDesc, + String key, String attribute, long stamp, T value + ) throws InterruptedException { + ValueSerializerFactory serializerFactory = repo.getValueSerializerFactory( + attrDesc.getSchemeUri().getScheme()) + .orElseThrow(() -> new IllegalStateException( + "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName() + + " with scheme " + attrDesc.getSchemeUri().toString() + ".") + ); + + ValueSerializer serializer = serializerFactory + .getValueSerializer(attrDesc.getSchemeUri()); + + OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc) + .orElseThrow(() -> new IllegalArgumentException( + "Missing online writer for " + attrDesc)); - if (!direct.isPresent()) { - throw new IllegalStateException( - "Can write with direct operator only. Add runtime dependecncy"); - } - if (attrDesc.getSchemeUri().getScheme().equals("proto")) { - ValueSerializerFactory factory = repo.getValueSerializerFactory( - attrDesc.getSchemeUri().getScheme()) - .orElseThrow(() -> new IllegalStateException( - "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName() - + " with scheme " + attrDesc.getSchemeUri().toString() + ".") - ); - - String protoClass = factory.getClassName(attrDesc.getSchemeUri()); - Class cls = Classpath.findClass(protoClass, AbstractMessage.class); - byte[] payload = null; - if (textFormat != null) { - Method newBuilder = cls.getDeclaredMethod("newBuilder"); - Builder builder = (Builder) newBuilder.invoke(null); - TextFormat.merge(textFormat, builder); - payload = builder.build().toByteArray(); - } - OnlineAttributeWriter writer = direct.get().getWriter(attrDesc) - .orElseThrow(() -> new IllegalArgumentException( - "Missing writer for " + attrDesc)); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exc = new AtomicReference<>(); - writer.write(StreamElement.update( - entityDesc, attrDesc, UUID.randomUUID().toString(), - key, attribute, stamp, payload), (success, ex) -> { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exc = new AtomicReference<>(); + byte[] payload = serializer.serialize(value); + writer.write(StreamElement.update( + entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute, + stamp, payload), ((success, error) -> { if (!success) { - exc.set(ex); + exc.set(error); } latch.countDown(); - }); - latch.await(); - if (exc.get() != null) { - throw new RuntimeException(exc.get()); - } - } else { - throw new IllegalArgumentException( - "Don't know how to make builder for " - + attrDesc.getSchemeUri()); + }) + ); + latch.await(); + if (exc.get() != null) { + throw new RuntimeException(exc.get()); } } @@ -602,11 +576,7 @@ public void delete( EntityDescriptor entityDesc, AttributeDescriptor attrDesc, String key, String attribute, long stamp) throws InterruptedException { - if (!direct.isPresent()) { - throw new IllegalStateException( - "Can write with direct operator only. Add runtime dependecncy"); - } - OnlineAttributeWriter writer = direct.get().getWriter(attrDesc) + OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc) .orElseThrow(() -> new IllegalArgumentException( "Missing writer for " + attrDesc)); CountDownLatch latch = new CountDownLatch(1); @@ -742,4 +712,10 @@ private TriFunction getExecutorFactory( return (repository, cfg, eventTime) -> Console.createLocalExecutor(eventTime); } + DirectDataOperator getDirectDataOperator() { + return direct.orElseThrow( () -> + new IllegalStateException( + "Unable to get Direct data operator. Add runtime dependency.") + ); + } } diff --git a/tools/src/main/resources/class-entitydesc.ftlh b/tools/src/main/resources/class-entitydesc.ftlh index d2dea3c63..fcbebe01c 100644 --- a/tools/src/main/resources/class-entitydesc.ftlh +++ b/tools/src/main/resources/class-entitydesc.ftlh @@ -123,11 +123,11 @@ class Environment implements RepositoryProvider { def List> list(String key, String start) { return ${entity.classname}Descriptor.this.reader.list(key, name, start) } - def void put(String key, String attribute, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, textFormat) + def void put(String key, String attribute, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, System.currentTimeMillis(), value) } - def void put(String key, String attribute, long stamp, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, textFormat) + def void put(String key, String attribute, long stamp, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, value) } def void delete(String key, String attribute) { console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute) @@ -145,11 +145,11 @@ class Environment implements RepositoryProvider { def ${attribute.type} get(String key) { return ${entity.classname}Descriptor.this.reader.get(key, name)?.getValue() } - def void put(String key, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), textFormat) + def void put(String key, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), System.currentTimeMillis(), value) } - def void put(String key, long stamp, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, textFormat) + def void put(String key, long stamp, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, value) } def void delete(String key) { console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.getName()) @@ -182,6 +182,7 @@ class Environment implements RepositoryProvider { + def Repository getRepo() { return console.getRepo() } diff --git a/tools/src/main/resources/log4j.properties b/tools/src/main/resources/log4j.properties new file mode 100644 index 000000000..1da823016 --- /dev/null +++ b/tools/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java index a15961382..c1a3041e7 100644 --- a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java +++ b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTest.java @@ -72,8 +72,7 @@ public class GroovyEnvTest { @Before public void setUp() { Console console = Console.create(cfg, repo); - direct = console.getDirect().orElseThrow( - () -> new IllegalStateException("Missing direct operator")); + direct = console.getDirectDataOperator(); conf = new Configuration(Configuration.VERSION_2_3_23); conf.setDefaultEncoding("utf-8"); conf.setClassForTemplateLoading(getClass(), "/"); diff --git a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java new file mode 100644 index 000000000..325c3be1f --- /dev/null +++ b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvTestPutBytes.java @@ -0,0 +1,195 @@ +/** + * Copyright 2017-2019 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.tools.groovy; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.direct.commitlog.CommitLogReader; +import cz.o2.proxima.direct.commitlog.LogObserver; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.ConfigRepository; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.util.Optionals; +import freemarker.template.Configuration; +import freemarker.template.TemplateExceptionHandler; +import groovy.lang.GroovyClassLoader; +import groovy.lang.Script; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +@Slf4j +public class GroovyEnvTestPutBytes { + final Config cfg = ConfigFactory.load("test-reference.conf").resolve(); + final Repository repo = ConfigRepository.of(cfg); + + final EntityDescriptor gateway = Optionals.get(repo.findEntity("gateway")); + final AttributeDescriptor users = Optionals.get(gateway.findAttribute("users")); + final AttributeDescriptor device = Optionals.get( + gateway.findAttribute("device.*")); + + Configuration conf; + + GroovyClassLoader loader; + + DirectDataOperator direct; + + @Before + public void setUp() { + Console console = Console.create(cfg, repo); + direct = console.getDirectDataOperator(); + conf = new Configuration(Configuration.VERSION_2_3_23); + conf.setDefaultEncoding("utf-8"); + conf.setClassForTemplateLoading(getClass(), "/"); + conf.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER); + conf.setLogTemplateExceptions(false); + + loader = new GroovyClassLoader(Thread.currentThread().getContextClassLoader()); + Thread.currentThread().setContextClassLoader(loader); + } + + @Test + public void testPutBytes() throws Exception { + long now = System.currentTimeMillis(); + executeTest(new AbstractTest() { + @Override + Script inputScript() throws Exception { + return compile("env.gateway.users.put(\"test-key\",\"testValue\".getBytes())"); + } + + @Override + void validate(StreamElement element) { + assertTrue(element.getStamp() > now); + assertNotNull(element.getValue()); + assertEquals("testValue", new String(element.getValue())); + assertEquals("test-key", element.getKey()); + assertEquals("users", element.getAttribute()); + } + }, users); + } + + @Test + public void testPutBytesWithTimestamp() throws Exception { + executeTest(new AbstractTest() { + @Override + Script inputScript() throws Exception { + return compile("env.gateway.users.put(\"test-key\",123456789L," + + "\"testValue\".getBytes())"); + } + + @Override + void validate(StreamElement element) { + assertEquals(123456789L, element.getStamp()); + assertNotNull(element.getValue()); + assertEquals("testValue", new String(element.getValue())); + assertEquals("test-key", element.getKey()); + assertEquals("users", element.getAttribute()); + } + }, users); + } + + @Test + public void testPutBytesWildcard() throws Exception { + final long start = System.currentTimeMillis(); + executeTest(new AbstractTest() { + @Override + Script inputScript() throws Exception { + return compile("env.gateway.device.put(\"test-key\"," + + "\"toilet\",\"value\".getBytes())"); + } + + @Override + void validate(StreamElement element) { + assertTrue(element.getStamp() > start); + assertNotNull(element.getValue()); + assertEquals("value", new String(element.getValue())); + assertEquals("test-key", element.getKey()); + assertEquals("device.toilet", element.getAttribute()); + } + }, device); + } + + @Test + public void testPutBytesWildcardWithTimestamp() throws Exception { + final long tms = System.currentTimeMillis(); + executeTest(new AbstractTest() { + @Override + Script inputScript() throws Exception { + return compile("env.gateway.device.put(\"test-key\",\"toilet\"," + + tms + ",\"value\".getBytes())"); + } + + @Override + void validate(StreamElement element) { + assertEquals(tms, element.getStamp()); + assertNotNull(element.getValue()); + assertEquals("value", new String(element.getValue())); + assertEquals("test-key", element.getKey()); + assertEquals("device.toilet", element.getAttribute()); + } + }, device); + + } + + + void executeTest(AbstractTest test, AttributeDescriptor attr) throws Exception { + test.execute( + Optionals.get(direct.getCommitLogReader(attr))); + } + + + @SuppressWarnings("unchecked") + Script compile(String script) throws Exception { // @TODO: refactor + String source = GroovyEnv.getSource(conf, repo) + + "\n" + + "env = cz.o2.proxima.tools.groovy.Console.get().getEnv()" + + "\n" + + script; + Class