diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java index 390bba9e0..532fd7c1d 100644 --- a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java @@ -54,7 +54,6 @@ private Builder(Repository repo) { @Setter private boolean replica = false; - @SuppressWarnings("unchecked") public AttributeDescriptorImpl build() { Objects.requireNonNull(name, "Please specify name"); Objects.requireNonNull(entity, "Please specify entity"); @@ -66,8 +65,7 @@ public AttributeDescriptorImpl build() { return new AttributeDescriptorImpl<>( name, entity, schemeUri, - factory.map(f -> (ValueSerializer)f.getValueSerializer(schemeUri)) - .orElse(null), + factory.map(f -> f.getValueSerializer(schemeUri)).orElse(null), replica); } } diff --git a/core/src/main/java/cz/o2/proxima/util/Optionals.java b/core/src/main/java/cz/o2/proxima/util/Optionals.java index 1c538cfbe..7b0df96e5 100644 --- a/core/src/main/java/cz/o2/proxima/util/Optionals.java +++ b/core/src/main/java/cz/o2/proxima/util/Optionals.java @@ -17,8 +17,18 @@ import java.util.Optional; +/** + * Utility class for manipulation with {@link Optional} + */ public class Optionals { - + /** + * Get value from Optional or throw IllegalArgumentException + * + * @param optional Optional object + * @param Generic type Optional + * @return T value from Optional + * @throws IllegalArgumentException in case of empty value + */ public static T get(Optional optional) { return optional.orElseThrow(() -> new IllegalArgumentException("Provided optional is empty.")); diff --git a/core/src/test/java/cz/o2/proxima/util/OptionalsTest.java b/core/src/test/java/cz/o2/proxima/util/OptionalsTest.java new file mode 100644 index 000000000..ac4089565 --- /dev/null +++ b/core/src/test/java/cz/o2/proxima/util/OptionalsTest.java @@ -0,0 +1,39 @@ +/** + * Copyright 2017-2019 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.util; + +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for utility class {@link Optionals} + */ +public class OptionalsTest { + + @Test + public void testGet() { + String foo = "bar"; + assertEquals(foo, Optionals.get(Optional.of(foo))); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetFromEmpty() { + Optionals.get(Optional.empty()); + } +} \ No newline at end of file diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java index 613e6da45..9a4df50e5 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java @@ -49,7 +49,7 @@ public class IngestServiceTest { CountDownLatch latch; @Before - public void setup() throws InterruptedException { + public void setup() { server = new IngestServer(ConfigFactory.load() .withFallback(ConfigFactory.load("test-reference.conf")) .resolve()); @@ -63,9 +63,7 @@ public void setup() throws InterruptedException { @Override public void onNext(Rpc.StatusBulk status) { - for (Rpc.Status s : status.getStatusList()) { - responses.add(s); - } + responses.addAll(status.getStatusList()); } @Override diff --git a/example/tools/pom.xml b/example/tools/pom.xml index 862beea60..ed6eb402e 100644 --- a/example/tools/pom.xml +++ b/example/tools/pom.xml @@ -114,7 +114,7 @@ cz.o2.proxima.tools.groovy.Compiler - -o ${project.basedir}/target/generated-sources/groovy/cz/o2/proxima/example/Environment.groovy ${project.basedir}/model/src/main/resources/reference.conf + -o ${project.build.directory}/generated-sources/groovy/cz/o2/proxima/example/Environment.groovy ${project.parent.basedir}/model/src/main/resources/reference.conf diff --git a/scheme/proto/src/test/resources/application.conf b/scheme/proto/src/test/resources/reference.conf similarity index 100% rename from scheme/proto/src/test/resources/application.conf rename to scheme/proto/src/test/resources/reference.conf diff --git a/tools/pom.xml b/tools/pom.xml index 574bbb749..5c9791e2c 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -178,6 +178,22 @@ test + + + cz.o2.proxima + proxima-scheme-proto + ${project.version} + test + + + + cz.o2.proxima + proxima-scheme-proto + ${project.version} + test-jar + test + + diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java index c89b15444..a50187743 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Compiler.java @@ -26,6 +26,8 @@ import java.io.StringReader; import java.nio.charset.StandardCharsets; import java.util.List; + +import lombok.extern.slf4j.Slf4j; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; @@ -36,6 +38,7 @@ /** * A compiler of conf files to groovy object. */ +@Slf4j public class Compiler { private final Configuration conf = new Configuration(Configuration.VERSION_2_3_23); @@ -56,7 +59,11 @@ private Compiler(String[] args) throws ParseException { if (!parsed.hasOption("o")) { throw new IllegalStateException("Missing config option 'o' for output"); } - output = parsed.getOptionValue("o"); + output = parsed.getOptionValue("o").trim(); + if (output.length() == 0) { + throw new IllegalArgumentException("Empty option 'o' value for output."); + } + log.debug("Configured output file: {}", output); configs = parsed.getArgList(); conf.setDefaultEncoding(StandardCharsets.UTF_8.name()); @@ -67,7 +74,14 @@ private Compiler(String[] args) throws ParseException { public void run() throws Exception { Config config = configs.stream() - .map(f -> ConfigFactory.parseFile(new File(f))) + .map(f -> { + File c = new File(f); + if (!c.exists()) { + throw new IllegalArgumentException( + "Unable to find config file " + f + ". Check your configuration."); + } + return ConfigFactory.parseFile(c); + }) .reduce( ConfigFactory.empty(), (l, r) -> l.withFallback(r)) diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java index 9a5c6ea95..33eb48105 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java @@ -16,12 +16,21 @@ package cz.o2.proxima.tools.groovy; import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.google.protobuf.AbstractMessage; import com.google.protobuf.AbstractMessage.Builder; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.direct.commitlog.CommitLogReader; +import cz.o2.proxima.direct.commitlog.Position; +import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.direct.euphoria.source.BatchSource; +import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource; +import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource; import cz.o2.proxima.functional.TriFunction; import cz.o2.proxima.proto.service.RetrieveServiceGrpc; import cz.o2.proxima.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub; @@ -29,19 +38,13 @@ import cz.o2.proxima.repository.AttributeDescriptor; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; -import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.scheme.ValueSerializer; import cz.o2.proxima.scheme.ValueSerializerFactory; import cz.o2.proxima.storage.StreamElement; -import cz.o2.proxima.direct.commitlog.CommitLogReader; -import cz.o2.proxima.direct.commitlog.Position; -import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor; -import cz.o2.proxima.direct.core.DirectDataOperator; -import cz.o2.proxima.direct.euphoria.source.BatchSource; -import cz.o2.proxima.direct.euphoria.source.BoundedStreamSource; -import cz.o2.proxima.direct.euphoria.source.UnboundedStreamSource; import cz.o2.proxima.tools.io.ConsoleRandomReader; import cz.o2.proxima.tools.io.TypedStreamElement; import cz.o2.proxima.util.Classpath; +import cz.o2.proxima.util.Optionals; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing; import cz.seznam.euphoria.core.client.flow.Flow; @@ -65,6 +68,11 @@ import groovy.lang.GroovyObject; import io.grpc.Channel; import io.grpc.ManagedChannelBuilder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.groovy.tools.shell.Groovysh; +import org.codehaus.groovy.tools.shell.IO; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -83,10 +91,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.codehaus.groovy.tools.shell.Groovysh; -import org.codehaus.groovy.tools.shell.IO; /** * This is the groovysh based console. @@ -283,7 +287,7 @@ public Stream getUnionStream( if (direct.isPresent()) { return Arrays.stream(descriptors) .map(desc -> - direct.get().getFamiliesForAttribute(desc.desc()) + getDirectDataOperator().getFamiliesForAttribute(desc.desc()) .stream() .filter(af -> af.getDesc().getAccess().canReadCommitLog()) // sort primary families on top @@ -369,7 +373,7 @@ public WindowedStream, GlobalWindowing> getBatchSnapsh ds = reduceUpdatesToSnapshot(attrDesc, fromStamp, toStamp); } else { Dataset raw = flow.get().createInput(BatchSource.of( - family.getBatchObservable().get(), + Optionals.get(family.getBatchObservable()), family.getDesc(), fromStamp, toStamp)); @@ -406,7 +410,7 @@ private Dataset reduceUpdatesToSnapshot( .filter(af -> af.getDesc().getAccess().isStateCommitLog()) .sorted((l, r) -> Integer.compare( l.getDesc().getType().ordinal(), r.getDesc().getType().ordinal())) - .map(af -> af.getCommitLogReader().get()) + .map(af -> Optionals.get(af.getCommitLogReader())) .findFirst() .orElseThrow(() -> new IllegalStateException( "Cannot create batch snapshot, missing random access family " @@ -472,7 +476,7 @@ public WindowedStream getBatchUpdates( .distinct() .map(family -> flow.get().createInput(BatchSource.of( - family.getBatchObservable().get(), + Optionals.get(family.getBatchObservable()), family.getAttributes(), startStamp, endStamp))) .reduce((left, right) -> Union.of(left, right).output()) @@ -528,65 +532,52 @@ public ConsoleRandomReader getRandomAccessReader(String entity) { public void put( EntityDescriptor entityDesc, AttributeDescriptor attrDesc, - String key, String attribute, String textFormat) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, - ClassNotFoundException, InvalidProtocolBufferException, InterruptedException, - TextFormat.ParseException { + String key, String attribute, long stamp, String value + ) { + Gson gson = new Gson(); + String classname = Optionals.get(repo.getValueSerializerFactory( + attrDesc.getSchemeUri().getScheme())) + .getClassName(attrDesc.getSchemeUri()); + Class clazz = Classpath.findClass(classname,Object.class); + Class x = gson.fromJson(value,clazz.getClass()); - put(entityDesc, attrDesc, key, attribute, - System.currentTimeMillis(), textFormat); - } - public void put( + throw new RuntimeException("tadaa" + x.toString()); + } + public void put( EntityDescriptor entityDesc, - AttributeDescriptor attrDesc, - String key, String attribute, long stamp, String textFormat) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, - ClassNotFoundException, InvalidProtocolBufferException, InterruptedException, - TextFormat.ParseException { + AttributeDescriptor attrDesc, + String key, String attribute, long stamp, T value + ) throws InterruptedException { + ValueSerializerFactory serializerFactory = repo.getValueSerializerFactory( + attrDesc.getSchemeUri().getScheme()) + .orElseThrow(() -> new IllegalStateException( + "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName() + + " with scheme " + attrDesc.getSchemeUri().toString() + ".") + ); + + ValueSerializer serializer = serializerFactory + .getValueSerializer(attrDesc.getSchemeUri()); + + OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc) + .orElseThrow(() -> new IllegalArgumentException( + "Missing online writer for " + attrDesc)); - if (!direct.isPresent()) { - throw new IllegalStateException( - "Can write with direct operator only. Add runtime dependecncy"); - } - if (attrDesc.getSchemeUri().getScheme().equals("proto")) { - ValueSerializerFactory factory = repo.getValueSerializerFactory( - attrDesc.getSchemeUri().getScheme()) - .orElseThrow(() -> new IllegalStateException( - "Unable to get ValueSerializerFactory for attribute " + attrDesc.getName() - + " with scheme " + attrDesc.getSchemeUri().toString() + ".") - ); - - String protoClass = factory.getClassName(attrDesc.getSchemeUri()); - Class cls = Classpath.findClass(protoClass, AbstractMessage.class); - byte[] payload = null; - if (textFormat != null) { - Method newBuilder = cls.getDeclaredMethod("newBuilder"); - Builder builder = (Builder) newBuilder.invoke(null); - TextFormat.merge(textFormat, builder); - payload = builder.build().toByteArray(); - } - OnlineAttributeWriter writer = direct.get().getWriter(attrDesc) - .orElseThrow(() -> new IllegalArgumentException( - "Missing writer for " + attrDesc)); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exc = new AtomicReference<>(); - writer.write(StreamElement.update( - entityDesc, attrDesc, UUID.randomUUID().toString(), - key, attribute, stamp, payload), (success, ex) -> { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exc = new AtomicReference<>(); + byte[] payload = serializer.serialize(value); + writer.write(StreamElement.update( + entityDesc, attrDesc, UUID.randomUUID().toString(), key, attribute, + stamp, payload), ((success, error) -> { if (!success) { - exc.set(ex); + exc.set(error); } latch.countDown(); - }); - latch.await(); - if (exc.get() != null) { - throw new RuntimeException(exc.get()); - } - } else { - throw new IllegalArgumentException( - "Don't know how to make builder for " - + attrDesc.getSchemeUri()); + }) + ); + latch.await(); + if (exc.get() != null) { + throw new RuntimeException(exc.get()); } } @@ -602,11 +593,7 @@ public void delete( EntityDescriptor entityDesc, AttributeDescriptor attrDesc, String key, String attribute, long stamp) throws InterruptedException { - if (!direct.isPresent()) { - throw new IllegalStateException( - "Can write with direct operator only. Add runtime dependecncy"); - } - OnlineAttributeWriter writer = direct.get().getWriter(attrDesc) + OnlineAttributeWriter writer = getDirectDataOperator().getWriter(attrDesc) .orElseThrow(() -> new IllegalArgumentException( "Missing writer for " + attrDesc)); CountDownLatch latch = new CountDownLatch(1); @@ -742,4 +729,10 @@ private TriFunction getExecutorFactory( return (repository, cfg, eventTime) -> Console.createLocalExecutor(eventTime); } + DirectDataOperator getDirectDataOperator() { + return direct.orElseThrow( () -> + new IllegalStateException( + "Unable to get Direct data operator. Add runtime dependency.") + ); + } } diff --git a/tools/src/main/resources/class-entitydesc.ftlh b/tools/src/main/resources/class-entitydesc.ftlh index d2dea3c63..915cca74e 100644 --- a/tools/src/main/resources/class-entitydesc.ftlh +++ b/tools/src/main/resources/class-entitydesc.ftlh @@ -123,11 +123,17 @@ class Environment implements RepositoryProvider { def List> list(String key, String start) { return ${entity.classname}Descriptor.this.reader.list(key, name, start) } - def void put(String key, String attribute, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, textFormat) + def void put(String key, String attribute, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, System.currentTimeMillis(), value) } - def void put(String key, String attribute, long stamp, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, textFormat) + def void put(String key, String attribute, String value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, System.currentTimeMillis(), value) + } + def void put(String key, String attribute, long stamp, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, value) + } + def void put(String key, String attribute, long stamp, String value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute, stamp, value) } def void delete(String key, String attribute) { console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.toAttributePrefix() + attribute) @@ -145,11 +151,17 @@ class Environment implements RepositoryProvider { def ${attribute.type} get(String key) { return ${entity.classname}Descriptor.this.reader.get(key, name)?.getValue() } - def void put(String key, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), textFormat) + def void put(String key, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), System.currentTimeMillis(), value) + } + def void put(String key, String value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), System.currentTimeMillis(), value) } - def void put(String key, long stamp, String textFormat) { - console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, textFormat) + def void put(String key, long stamp, ${attribute.type} value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, value) + } + def void put(String key, long stamp, String value) { + console.put(${entity.classname}Descriptor.this.desc, desc, key, desc.getName(), stamp, value) } def void delete(String key) { console.delete(${entity.classname}Descriptor.this.desc, desc, key, desc.getName()) @@ -182,6 +194,7 @@ class Environment implements RepositoryProvider { + def Repository getRepo() { return console.getRepo() } diff --git a/tools/src/main/resources/log4j.properties b/tools/src/main/resources/log4j.properties new file mode 100644 index 000000000..1da823016 --- /dev/null +++ b/tools/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvAbstractPutTest.java b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvAbstractPutTest.java new file mode 100644 index 000000000..232e74e15 --- /dev/null +++ b/tools/src/test/java/cz/o2/proxima/tools/groovy/GroovyEnvAbstractPutTest.java @@ -0,0 +1,87 @@ +package cz.o2.proxima.tools.groovy; + +import com.typesafe.config.Config; +import cz.o2.proxima.direct.commitlog.CommitLogReader; +import cz.o2.proxima.direct.commitlog.LogObserver; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.util.Optionals; +import freemarker.template.Configuration; +import freemarker.template.TemplateExceptionHandler; +import groovy.lang.GroovyClassLoader; +import groovy.lang.Script; +import org.junit.Before; + +import static org.junit.Assert.fail; + +public abstract class GroovyEnvAbstractPutTest { + + static Repository repo; + static Console console; + static Config cfg; + + DirectDataOperator direct; + Configuration conf; + GroovyClassLoader loader; + + @Before + public void setUp() { + console = Console.create(cfg, repo); + direct = console.getDirectDataOperator(); + conf = new Configuration(Configuration.VERSION_2_3_23); + conf.setDefaultEncoding("utf-8"); + conf.setClassForTemplateLoading(getClass(), "/"); + conf.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER); + conf.setLogTemplateExceptions(false); + + loader = new GroovyClassLoader(Thread.currentThread().getContextClassLoader()); + Thread.currentThread().setContextClassLoader(loader); + } + + + void executeTest(AbstractTest test, AttributeDescriptor attr) throws Exception { + test.execute( + Optionals.get(direct.getCommitLogReader(attr))); + } + + + @SuppressWarnings("unchecked") + Script compile(String script) throws Exception { // @TODO: refactor + String source = GroovyEnv.getSource(conf, repo) + + "\n" + + "env = cz.o2.proxima.tools.groovy.Console.get().getEnv()" + + "\n" + + script; + Class