diff --git a/beam/pom.xml b/beam/pom.xml index 72d518510..bb3712a97 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -199,6 +199,7 @@ core core-testing io-pubsub + runner tools diff --git a/beam/runner/license-header-spotless.txt b/beam/runner/license-header-spotless.txt new file mode 120000 index 000000000..077ffe5c5 --- /dev/null +++ b/beam/runner/license-header-spotless.txt @@ -0,0 +1 @@ +../license-header-spotless.txt \ No newline at end of file diff --git a/beam/runner/license-header.txt b/beam/runner/license-header.txt new file mode 120000 index 000000000..9e0b4f073 --- /dev/null +++ b/beam/runner/license-header.txt @@ -0,0 +1 @@ +../license-header.txt \ No newline at end of file diff --git a/beam/runner/pom.xml b/beam/runner/pom.xml new file mode 100644 index 000000000..cdaf2a533 --- /dev/null +++ b/beam/runner/pom.xml @@ -0,0 +1,207 @@ + + + + 4.0.0 + + + cz.o2.proxima + proxima-beam + 0.11-SNAPSHOT + + + proxima-beam-runner + jar + + ${project.groupId}:${project.artifactId} + + + + + kr.motd.maven + os-maven-plugin + ${maven.ext.os.version} + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${maven.protobuf.version} + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + test-compile + compile-custom + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.version} + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + true + + + *:* + + + + + com.google.common. + ${coreShade}.com.google.common. + + + + + + package + + shade + + + + + + + + + + + + cz.o2.proxima + proxima-core + + + + cz.o2.proxima + proxima-direct-core + + + + cz.o2.proxima + proxima-scheme-proto + ${project.version} + + + + ${apache.beam.groupId} + beam-sdks-java-core + + + + org.apache.beam + beam-runners-java-job-service + ${apache.beam.version} + + + + org.projectlombok + lombok + + + + com.google.auto.service + auto-service + + + + com.google.guava + guava + ${guava.version} + provided + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + cz.o2.proxima + proxima-core + ${project.version} + tests + test + + + + cz.o2.proxima + proxima-direct-core + ${project.version} + tests + test + + + + cz.o2.proxima + proxima-direct-io-kafka + ${project.version} + test + + + + cz.o2.proxima + proxima-direct-io-kafka + ${project.version} + tests + test + + + + junit + junit + + + + org.hamcrest + hamcrest-all + + + + org.mockito + mockito-core + + + + + + diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java new file mode 100644 index 000000000..afce6cb4b --- /dev/null +++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.joda.time.Duration; + +public class ProximaBeamExecutor { + + private final Pipeline pipeline; + private final ProximaTranslationContext context; + + ProximaBeamExecutor(Pipeline pipeline, ProximaTranslationContext context) { + this.pipeline = pipeline; + this.context = context; + } + + public PortablePipelineResult execute() { + return asPipelineResult(context); + } + + private PortablePipelineResult asPipelineResult(ProximaTranslationContext context) { + return new PortablePipelineResult() { + @Override + public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + @Override + public @NonNull State getState() { + return context.getState(); + } + + @Override + public @NonNull State cancel() throws IOException { + return context.cancel(); + } + + @Override + public @NonNull State waitUntilFinish(@NonNull Duration duration) { + return context.waitUntilFinish(duration); + } + + @Override + public @NonNull State waitUntilFinish() { + return waitUntilFinish(Duration.ZERO); + } + + @Override + public @NonNull MetricResults metrics() { + return context.getMetricsResult(); + } + }; + } +} diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java new file mode 100644 index 000000000..b7ba626f4 --- /dev/null +++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.options.StreamingOptions; +import org.checkerframework.checker.nullness.qual.NonNull; + +public interface ProximaPipelineOptions extends PipelineOptions, StreamingOptions { + + @AutoService(PipelineOptionsRegistrar.class) + class OptionsFactory implements PipelineOptionsRegistrar { + + @Override + public @NonNull Iterable<@NonNull Class<@NonNull ? extends @NonNull PipelineOptions>> + getPipelineOptions() { + return Collections.singletonList(ProximaPipelineOptions.class); + } + } + + @Default.String("commit-log-family") + String getCommitFamily(); + + void setCommitFamily(String family); + + @Default.String("state-random-access-family") + String getStateFamily(); + + void setStateFamily(String family); + + @Default.String("shuffle-family") + String getShuffleFamily(); + + void setShuffleFamily(String family); +} diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java new file mode 100644 index 000000000..90599fb07 --- /dev/null +++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java @@ -0,0 +1,117 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.options.PipelineOptions; + +@Slf4j +public class ProximaPipelineTranslator { + + @FunctionalInterface + interface PTransformTranslator { + void translate(String id, Pipeline pipeline, ProximaTranslationContext context); + } + + private final Map urnToTransformTranslator; + + ProximaPipelineTranslator() { + this.urnToTransformTranslator = + ImmutableMap.builder() + .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten) + .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey) + .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse) + .put(ExecutableStage.URN, this::translateExecutableStage) + .put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle) + // For testing only + .put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, this::translateTestStream) + .build(); + } + + private void translateTestStream( + String id, Pipeline pipeline, ProximaTranslationContext context) { + + log.debug("Translating TestStream {}", id); + } + + private void translateReshuffle(String id, Pipeline pipeline, ProximaTranslationContext context) { + + log.debug("Translating Reshuffle {}", id); + } + + private void translateExecutableStage( + String id, Pipeline pipeline, ProximaTranslationContext context) { + + log.debug("Translating ExecutableStage {}", id); + } + + private void translateImpulse(String id, Pipeline pipeline, ProximaTranslationContext context) { + log.debug("Translating Impulse {}", id); + } + + private void translateGroupByKey( + String id, Pipeline pipeline, ProximaTranslationContext context) { + + log.debug("Translating GroupByKey {}", id); + } + + private void translateFlatten(String id, Pipeline pipeline, ProximaTranslationContext context) { + log.debug("Translating Flatten {}", id); + } + + public Set knownUrns() { + return urnToTransformTranslator.keySet(); + } + + public ProximaTranslationContext createTranslationContext( + JobInfo jobInfo, PipelineOptions options) { + + return new ProximaTranslationContext(jobInfo, options); + } + + public ProximaBeamExecutor translate(ProximaTranslationContext context, Pipeline pipeline) { + + QueryablePipeline p = + QueryablePipeline.forTransforms( + pipeline.getRootTransformIdsList(), pipeline.getComponents()); + for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) { + urnToTransformTranslator + .getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound) + .translate(transform.getId(), pipeline, context); + } + + return new ProximaBeamExecutor(pipeline, context); + } + + private void urnNotFound( + String id, RunnerApi.Pipeline pipeline, ProximaTranslationContext context) { + + throw new IllegalArgumentException( + String.format( + "Unknown type of URN %s for PTransform with id %s.", + pipeline.getComponents().getTransformsOrThrow(id).getSpec().getUrn(), id)); + } +} diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java new file mode 100644 index 000000000..73da42547 --- /dev/null +++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java @@ -0,0 +1,76 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.core.construction.graph.ProtoOverrides; +import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander; +import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class ProximaRunner implements PortablePipelineRunner { + + private final PipelineOptions options; + + public ProximaRunner(PipelineOptions options) { + this.options = options; + } + + @Override + public @NonNull PortablePipelineResult run(RunnerApi.Pipeline pipeline, @NonNull JobInfo jobInfo) + throws Exception { + + SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class)); + ProximaPipelineTranslator translator = new ProximaPipelineTranslator(); + + // Expand any splittable ParDos within the graph to enable sizing and splitting of bundles. + Pipeline pipelineWithSdfExpanded = + ProtoOverrides.updateTransform( + PTransformTranslation.PAR_DO_TRANSFORM_URN, + pipeline, + SplittableParDoExpander.createSizedReplacement()); + + // Don't let the fuser fuse any subcomponents of native transforms. + Pipeline trimmedPipeline = + TrivialNativeTransformExpander.forKnownUrns( + pipelineWithSdfExpanded, translator.knownUrns()); + + // Fused pipeline proto. + RunnerApi.Pipeline fusedPipeline = + trimmedPipeline + .getComponents() + .getTransformsMap() + .values() + .stream() + .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn())) + ? trimmedPipeline + : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline(); + + ProximaBeamExecutor executor = + translator.translate(translator.createTranslationContext(jobInfo, options), fusedPipeline); + + return executor.execute(); + } +} diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java new file mode 100644 index 000000000..561ec2b74 --- /dev/null +++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java @@ -0,0 +1,142 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import com.google.common.base.Preconditions; +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.beam.runner.proto.Messages; +import cz.o2.proxima.beam.runner.proto.Messages.Bundle; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.AttributeFamilyDescriptor; +import cz.o2.proxima.repository.EntityAwareAttributeDescriptor.Regular; +import cz.o2.proxima.repository.EntityAwareAttributeDescriptor.Wildcard; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.joda.time.Duration; + +public class ProximaTranslationContext { + + private final JobInfo jobInfo; + private final ProximaPipelineOptions options; + private final Repository repo; + private final DirectDataOperator direct; + private final AttributeFamilyDescriptor shuffleFamily; + private final AttributeFamilyDescriptor commitFamily; + private final AttributeFamilyDescriptor stateFamily; + private final Regular bundleDesc; + private final Regular commitDesc; + private final Wildcard stateDesc; + + private State state = State.UNKNOWN; + + public ProximaTranslationContext(JobInfo jobInfo, PipelineOptions options) { + this.jobInfo = jobInfo; + this.options = options.as(ProximaPipelineOptions.class); + this.repo = Repository.of(ConfigFactory.load().resolve()); + this.direct = repo.getOrCreateOperator(DirectDataOperator.class); + + this.shuffleFamily = repo.getFamilyByName(this.options.getShuffleFamily()); + this.commitFamily = repo.getFamilyByName(this.options.getCommitFamily()); + this.stateFamily = repo.getFamilyByName(this.options.getStateFamily()); + + this.bundleDesc = regular(repo, "bundle", "shuffle"); + this.commitDesc = regular(repo, "bundle", "commit"); + this.stateDesc = wildcard(repo, "state", "data.*"); + + validateFamilyHasAttribute(shuffleFamily, bundleDesc); + validateFamilyHasAttribute(commitFamily, commitDesc); + validateFamilyHasAttribute(stateFamily, stateDesc); + } + + private void validateFamilyHasAttribute( + AttributeFamilyDescriptor familyDesc, AttributeDescriptor attrDesc) { + Preconditions.checkArgument( + familyDesc.getAttributes().equals(Collections.singletonList(attrDesc)), + "Family %s must contain only single attribute %s", + familyDesc, + attrDesc); + } + + private Wildcard wildcard(Repository repo, String entity, String attribute) { + EntityDescriptor entityDesc = repo.getEntity(entity); + return Wildcard.of(entityDesc, entityDesc.getAttribute(attribute)); + } + + private Regular regular(Repository repo, String entity, String attribute) { + EntityDescriptor entityDesc = repo.getEntity(entity); + return Regular.of(entityDesc, entityDesc.getAttribute(attribute)); + } + + public ExecutorService getExecutor() { + return direct.getContext().getExecutorService(); + } + + public State getState() { + return state; + } + + public State cancel() { + this.state = State.CANCELLED; + return getState(); + } + + public State waitUntilFinish(Duration duration) { + // FIXME + this.state = State.DONE; + return getState(); + } + + public MetricResults getMetricsResult() { + return new MetricResults() { + @Override + public @NonNull MetricQueryResults queryMetrics(@NonNull MetricsFilter filter) { + return new MetricQueryResults() { + @Override + public @NonNull Iterable<@NonNull MetricResult<@NonNull Long>> getCounters() { + return Collections.emptyList(); + } + + @Override + public @NonNull Iterable<@NonNull MetricResult<@NonNull DistributionResult>> getDistributions() { + return Collections.emptyList(); + } + + @Override + public @NonNull Iterable<@NonNull MetricResult<@NonNull GaugeResult>> getGauges() { + return Collections.emptyList(); + } + }; + } + }; + } + + public Stream +} diff --git a/beam/runner/src/main/proto/messages.proto b/beam/runner/src/main/proto/messages.proto new file mode 100644 index 000000000..cc709d10c --- /dev/null +++ b/beam/runner/src/main/proto/messages.proto @@ -0,0 +1,57 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +package cz.o2.proxima.beam.runner.proto; + +message Bundle { + enum PaneInfo { + UNKNOWN = 0; + ON_TIME = 1; + LATE = 2; + } + + message TimestampedValue { + bytes value = 1; + uint64 stamp = 2; + bytes window = 3; + PaneInfo pane_info = 4; + } + // unique ID of this bundle + bytes id = 1; + // unique ID of bundle that generated this bundle + bytes generated_from = 2; + repeated TimestampedValue values = 3; + repeated bytes timers = 4; + // input watermark *after* the bundle is processed + uint64 watermark = 5; +} + +message State { + bytes value = 1; +} + +message Commit { + message StateUpdate { + string id = 1; + bytes value = 2; + } + + repeated StateUpdate state_update = 1; + string bundle_id = 2; + // output watermark after commit + uint64 watermark = 3; +} diff --git a/beam/runner/src/main/resources/reference.conf b/beam/runner/src/main/resources/reference.conf new file mode 100644 index 000000000..538ae983b --- /dev/null +++ b/beam/runner/src/main/resources/reference.conf @@ -0,0 +1,41 @@ +entities { + bundle { + attributes { + shuffle { + scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.Bundle" + } + commit { + scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.Commit" + } + } + } + state { + attributes { + "data.*" { + scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.State" + } + } + } +} + +attributeFamilies { + + commit-log-family { + entity: bundle + attributes: [ commit ] + type: primary + } + + shuffle-family { + entity: bundle + attributes: [ shuffle ] + type: primary + } + + state-random-access-family { + entity: state + attributes: [ "*" ] + type: primary + } + +} diff --git a/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java b/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java new file mode 100644 index 000000000..eb0ff866c --- /dev/null +++ b/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.runner; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; + +public class ProximaPipelineRunnerTest { + + @Test + public void testPipelineSubmit() throws Exception { + Pipeline pipeline = Pipeline.create(); + pipeline.apply(Impulse.create()); + execute(pipeline); + } + + private void execute(Pipeline pipeline) throws Exception { + RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline); + ProximaRunner runner = new ProximaRunner(pipeline.getOptions()); + PortablePipelineResult res = runner.run(protoPipeline, asJobInfo(pipeline)); + res.waitUntilFinish(); + } + + private JobInfo asJobInfo(Pipeline pipeline) { + return new JobInfo() { + @Override + public @NonNull String jobId() { + return "fakeId"; + } + + @Override + public @NonNull String jobName() { + return "fakeName"; + } + + @Override + public @NonNull String retrievalToken() { + return "fakeToken"; + } + + @Override + public @NonNull Struct pipelineOptions() { + return Struct.newBuilder().build(); + } + }; + } +} diff --git a/beam/runner/src/test/resources/application.conf b/beam/runner/src/test/resources/application.conf new file mode 100644 index 000000000..088914c0e --- /dev/null +++ b/beam/runner/src/test/resources/application.conf @@ -0,0 +1,18 @@ +attributeFamilies { + + commit-log-family { + storage: "kafka-test://local/commit-log" + access: commit-log + } + + shuffle-family { + storage: "kafka-test://local/shuffle" + access: commit-log + } + + state-random-access-family { + storage: "kafka-test://local/state" + access: [ state-commit-log, cached-view ] + } + +} diff --git a/beam/runner/src/test/resources/log4j2.properties b/beam/runner/src/test/resources/log4j2.properties new file mode 100644 index 000000000..3924d894a --- /dev/null +++ b/beam/runner/src/test/resources/log4j2.properties @@ -0,0 +1,9 @@ +# console appender +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %level %c{1}:%L - %msg%n + +# Root logger option +rootLogger.level = ${sys:LOG_LEVEL:-INFO} +rootLogger.appenderRef.stdout.ref = STDOUT