diff --git a/beam/pom.xml b/beam/pom.xml
index 72d518510..bb3712a97 100644
--- a/beam/pom.xml
+++ b/beam/pom.xml
@@ -199,6 +199,7 @@
core
core-testing
io-pubsub
+ runner
tools
diff --git a/beam/runner/license-header-spotless.txt b/beam/runner/license-header-spotless.txt
new file mode 120000
index 000000000..077ffe5c5
--- /dev/null
+++ b/beam/runner/license-header-spotless.txt
@@ -0,0 +1 @@
+../license-header-spotless.txt
\ No newline at end of file
diff --git a/beam/runner/license-header.txt b/beam/runner/license-header.txt
new file mode 120000
index 000000000..9e0b4f073
--- /dev/null
+++ b/beam/runner/license-header.txt
@@ -0,0 +1 @@
+../license-header.txt
\ No newline at end of file
diff --git a/beam/runner/pom.xml b/beam/runner/pom.xml
new file mode 100644
index 000000000..cdaf2a533
--- /dev/null
+++ b/beam/runner/pom.xml
@@ -0,0 +1,207 @@
+
+
+
+ 4.0.0
+
+
+ cz.o2.proxima
+ proxima-beam
+ 0.11-SNAPSHOT
+
+
+ proxima-beam-runner
+ jar
+
+ ${project.groupId}:${project.artifactId}
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ ${maven.ext.os.version}
+
+
+
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ ${maven.protobuf.version}
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ test-compile
+ compile-custom
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven.shade.version}
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+ true
+
+
+ *:*
+
+
+
+
+ com.google.common.
+ ${coreShade}.com.google.common.
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+
+
+ cz.o2.proxima
+ proxima-core
+
+
+
+ cz.o2.proxima
+ proxima-direct-core
+
+
+
+ cz.o2.proxima
+ proxima-scheme-proto
+ ${project.version}
+
+
+
+ ${apache.beam.groupId}
+ beam-sdks-java-core
+
+
+
+ org.apache.beam
+ beam-runners-java-job-service
+ ${apache.beam.version}
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ com.google.auto.service
+ auto-service
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+ provided
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+
+ cz.o2.proxima
+ proxima-core
+ ${project.version}
+ tests
+ test
+
+
+
+ cz.o2.proxima
+ proxima-direct-core
+ ${project.version}
+ tests
+ test
+
+
+
+ cz.o2.proxima
+ proxima-direct-io-kafka
+ ${project.version}
+ test
+
+
+
+ cz.o2.proxima
+ proxima-direct-io-kafka
+ ${project.version}
+ tests
+ test
+
+
+
+ junit
+ junit
+
+
+
+ org.hamcrest
+ hamcrest-all
+
+
+
+ org.mockito
+ mockito-core
+
+
+
+
+
+
diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java
new file mode 100644
index 000000000..afce6cb4b
--- /dev/null
+++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaBeamExecutor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import java.io.IOException;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Duration;
+
+public class ProximaBeamExecutor {
+
+ private final Pipeline pipeline;
+ private final ProximaTranslationContext context;
+
+ ProximaBeamExecutor(Pipeline pipeline, ProximaTranslationContext context) {
+ this.pipeline = pipeline;
+ this.context = context;
+ }
+
+ public PortablePipelineResult execute() {
+ return asPipelineResult(context);
+ }
+
+ private PortablePipelineResult asPipelineResult(ProximaTranslationContext context) {
+ return new PortablePipelineResult() {
+ @Override
+ public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NonNull State getState() {
+ return context.getState();
+ }
+
+ @Override
+ public @NonNull State cancel() throws IOException {
+ return context.cancel();
+ }
+
+ @Override
+ public @NonNull State waitUntilFinish(@NonNull Duration duration) {
+ return context.waitUntilFinish(duration);
+ }
+
+ @Override
+ public @NonNull State waitUntilFinish() {
+ return waitUntilFinish(Duration.ZERO);
+ }
+
+ @Override
+ public @NonNull MetricResults metrics() {
+ return context.getMetricsResult();
+ }
+ };
+ }
+}
diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java
new file mode 100644
index 000000000..b7ba626f4
--- /dev/null
+++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineOptions.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public interface ProximaPipelineOptions extends PipelineOptions, StreamingOptions {
+
+ @AutoService(PipelineOptionsRegistrar.class)
+ class OptionsFactory implements PipelineOptionsRegistrar {
+
+ @Override
+ public @NonNull Iterable<@NonNull Class<@NonNull ? extends @NonNull PipelineOptions>>
+ getPipelineOptions() {
+ return Collections.singletonList(ProximaPipelineOptions.class);
+ }
+ }
+
+ @Default.String("commit-log-family")
+ String getCommitFamily();
+
+ void setCommitFamily(String family);
+
+ @Default.String("state-random-access-family")
+ String getStateFamily();
+
+ void setStateFamily(String family);
+
+ @Default.String("shuffle-family")
+ String getShuffleFamily();
+
+ void setShuffleFamily(String family);
+}
diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java
new file mode 100644
index 000000000..90599fb07
--- /dev/null
+++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaPipelineTranslator.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+@Slf4j
+public class ProximaPipelineTranslator {
+
+ @FunctionalInterface
+ interface PTransformTranslator {
+ void translate(String id, Pipeline pipeline, ProximaTranslationContext context);
+ }
+
+ private final Map urnToTransformTranslator;
+
+ ProximaPipelineTranslator() {
+ this.urnToTransformTranslator =
+ ImmutableMap.builder()
+ .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten)
+ .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey)
+ .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse)
+ .put(ExecutableStage.URN, this::translateExecutableStage)
+ .put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle)
+ // For testing only
+ .put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, this::translateTestStream)
+ .build();
+ }
+
+ private void translateTestStream(
+ String id, Pipeline pipeline, ProximaTranslationContext context) {
+
+ log.debug("Translating TestStream {}", id);
+ }
+
+ private void translateReshuffle(String id, Pipeline pipeline, ProximaTranslationContext context) {
+
+ log.debug("Translating Reshuffle {}", id);
+ }
+
+ private void translateExecutableStage(
+ String id, Pipeline pipeline, ProximaTranslationContext context) {
+
+ log.debug("Translating ExecutableStage {}", id);
+ }
+
+ private void translateImpulse(String id, Pipeline pipeline, ProximaTranslationContext context) {
+ log.debug("Translating Impulse {}", id);
+ }
+
+ private void translateGroupByKey(
+ String id, Pipeline pipeline, ProximaTranslationContext context) {
+
+ log.debug("Translating GroupByKey {}", id);
+ }
+
+ private void translateFlatten(String id, Pipeline pipeline, ProximaTranslationContext context) {
+ log.debug("Translating Flatten {}", id);
+ }
+
+ public Set knownUrns() {
+ return urnToTransformTranslator.keySet();
+ }
+
+ public ProximaTranslationContext createTranslationContext(
+ JobInfo jobInfo, PipelineOptions options) {
+
+ return new ProximaTranslationContext(jobInfo, options);
+ }
+
+ public ProximaBeamExecutor translate(ProximaTranslationContext context, Pipeline pipeline) {
+
+ QueryablePipeline p =
+ QueryablePipeline.forTransforms(
+ pipeline.getRootTransformIdsList(), pipeline.getComponents());
+ for (PipelineNode.PTransformNode transform : p.getTopologicallyOrderedTransforms()) {
+ urnToTransformTranslator
+ .getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound)
+ .translate(transform.getId(), pipeline, context);
+ }
+
+ return new ProximaBeamExecutor(pipeline, context);
+ }
+
+ private void urnNotFound(
+ String id, RunnerApi.Pipeline pipeline, ProximaTranslationContext context) {
+
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown type of URN %s for PTransform with id %s.",
+ pipeline.getComponents().getTransformsOrThrow(id).getSpec().getUrn(), id));
+ }
+}
diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java
new file mode 100644
index 000000000..73da42547
--- /dev/null
+++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
+import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
+import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class ProximaRunner implements PortablePipelineRunner {
+
+ private final PipelineOptions options;
+
+ public ProximaRunner(PipelineOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public @NonNull PortablePipelineResult run(RunnerApi.Pipeline pipeline, @NonNull JobInfo jobInfo)
+ throws Exception {
+
+ SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+ ProximaPipelineTranslator translator = new ProximaPipelineTranslator();
+
+ // Expand any splittable ParDos within the graph to enable sizing and splitting of bundles.
+ Pipeline pipelineWithSdfExpanded =
+ ProtoOverrides.updateTransform(
+ PTransformTranslation.PAR_DO_TRANSFORM_URN,
+ pipeline,
+ SplittableParDoExpander.createSizedReplacement());
+
+ // Don't let the fuser fuse any subcomponents of native transforms.
+ Pipeline trimmedPipeline =
+ TrivialNativeTransformExpander.forKnownUrns(
+ pipelineWithSdfExpanded, translator.knownUrns());
+
+ // Fused pipeline proto.
+ RunnerApi.Pipeline fusedPipeline =
+ trimmedPipeline
+ .getComponents()
+ .getTransformsMap()
+ .values()
+ .stream()
+ .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
+ ? trimmedPipeline
+ : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+
+ ProximaBeamExecutor executor =
+ translator.translate(translator.createTranslationContext(jobInfo, options), fusedPipeline);
+
+ return executor.execute();
+ }
+}
diff --git a/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java
new file mode 100644
index 000000000..561ec2b74
--- /dev/null
+++ b/beam/runner/src/main/java/cz/o2/proxima/beam/runner/ProximaTranslationContext.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigFactory;
+import cz.o2.proxima.beam.runner.proto.Messages;
+import cz.o2.proxima.beam.runner.proto.Messages.Bundle;
+import cz.o2.proxima.direct.core.DirectDataOperator;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.repository.AttributeFamilyDescriptor;
+import cz.o2.proxima.repository.EntityAwareAttributeDescriptor.Regular;
+import cz.o2.proxima.repository.EntityAwareAttributeDescriptor.Wildcard;
+import cz.o2.proxima.repository.EntityDescriptor;
+import cz.o2.proxima.repository.Repository;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Stream;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Duration;
+
+public class ProximaTranslationContext {
+
+ private final JobInfo jobInfo;
+ private final ProximaPipelineOptions options;
+ private final Repository repo;
+ private final DirectDataOperator direct;
+ private final AttributeFamilyDescriptor shuffleFamily;
+ private final AttributeFamilyDescriptor commitFamily;
+ private final AttributeFamilyDescriptor stateFamily;
+ private final Regular bundleDesc;
+ private final Regular commitDesc;
+ private final Wildcard stateDesc;
+
+ private State state = State.UNKNOWN;
+
+ public ProximaTranslationContext(JobInfo jobInfo, PipelineOptions options) {
+ this.jobInfo = jobInfo;
+ this.options = options.as(ProximaPipelineOptions.class);
+ this.repo = Repository.of(ConfigFactory.load().resolve());
+ this.direct = repo.getOrCreateOperator(DirectDataOperator.class);
+
+ this.shuffleFamily = repo.getFamilyByName(this.options.getShuffleFamily());
+ this.commitFamily = repo.getFamilyByName(this.options.getCommitFamily());
+ this.stateFamily = repo.getFamilyByName(this.options.getStateFamily());
+
+ this.bundleDesc = regular(repo, "bundle", "shuffle");
+ this.commitDesc = regular(repo, "bundle", "commit");
+ this.stateDesc = wildcard(repo, "state", "data.*");
+
+ validateFamilyHasAttribute(shuffleFamily, bundleDesc);
+ validateFamilyHasAttribute(commitFamily, commitDesc);
+ validateFamilyHasAttribute(stateFamily, stateDesc);
+ }
+
+ private void validateFamilyHasAttribute(
+ AttributeFamilyDescriptor familyDesc, AttributeDescriptor> attrDesc) {
+ Preconditions.checkArgument(
+ familyDesc.getAttributes().equals(Collections.singletonList(attrDesc)),
+ "Family %s must contain only single attribute %s",
+ familyDesc,
+ attrDesc);
+ }
+
+ private Wildcard wildcard(Repository repo, String entity, String attribute) {
+ EntityDescriptor entityDesc = repo.getEntity(entity);
+ return Wildcard.of(entityDesc, entityDesc.getAttribute(attribute));
+ }
+
+ private Regular regular(Repository repo, String entity, String attribute) {
+ EntityDescriptor entityDesc = repo.getEntity(entity);
+ return Regular.of(entityDesc, entityDesc.getAttribute(attribute));
+ }
+
+ public ExecutorService getExecutor() {
+ return direct.getContext().getExecutorService();
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public State cancel() {
+ this.state = State.CANCELLED;
+ return getState();
+ }
+
+ public State waitUntilFinish(Duration duration) {
+ // FIXME
+ this.state = State.DONE;
+ return getState();
+ }
+
+ public MetricResults getMetricsResult() {
+ return new MetricResults() {
+ @Override
+ public @NonNull MetricQueryResults queryMetrics(@NonNull MetricsFilter filter) {
+ return new MetricQueryResults() {
+ @Override
+ public @NonNull Iterable<@NonNull MetricResult<@NonNull Long>> getCounters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @NonNull Iterable<@NonNull MetricResult<@NonNull DistributionResult>> getDistributions() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @NonNull Iterable<@NonNull MetricResult<@NonNull GaugeResult>> getGauges() {
+ return Collections.emptyList();
+ }
+ };
+ }
+ };
+ }
+
+ public Stream
+}
diff --git a/beam/runner/src/main/proto/messages.proto b/beam/runner/src/main/proto/messages.proto
new file mode 100644
index 000000000..cc709d10c
--- /dev/null
+++ b/beam/runner/src/main/proto/messages.proto
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package cz.o2.proxima.beam.runner.proto;
+
+message Bundle {
+ enum PaneInfo {
+ UNKNOWN = 0;
+ ON_TIME = 1;
+ LATE = 2;
+ }
+
+ message TimestampedValue {
+ bytes value = 1;
+ uint64 stamp = 2;
+ bytes window = 3;
+ PaneInfo pane_info = 4;
+ }
+ // unique ID of this bundle
+ bytes id = 1;
+ // unique ID of bundle that generated this bundle
+ bytes generated_from = 2;
+ repeated TimestampedValue values = 3;
+ repeated bytes timers = 4;
+ // input watermark *after* the bundle is processed
+ uint64 watermark = 5;
+}
+
+message State {
+ bytes value = 1;
+}
+
+message Commit {
+ message StateUpdate {
+ string id = 1;
+ bytes value = 2;
+ }
+
+ repeated StateUpdate state_update = 1;
+ string bundle_id = 2;
+ // output watermark after commit
+ uint64 watermark = 3;
+}
diff --git a/beam/runner/src/main/resources/reference.conf b/beam/runner/src/main/resources/reference.conf
new file mode 100644
index 000000000..538ae983b
--- /dev/null
+++ b/beam/runner/src/main/resources/reference.conf
@@ -0,0 +1,41 @@
+entities {
+ bundle {
+ attributes {
+ shuffle {
+ scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.Bundle"
+ }
+ commit {
+ scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.Commit"
+ }
+ }
+ }
+ state {
+ attributes {
+ "data.*" {
+ scheme: "proto:cz.o2.proxima.beam.runner.proto.Messages.State"
+ }
+ }
+ }
+}
+
+attributeFamilies {
+
+ commit-log-family {
+ entity: bundle
+ attributes: [ commit ]
+ type: primary
+ }
+
+ shuffle-family {
+ entity: bundle
+ attributes: [ shuffle ]
+ type: primary
+ }
+
+ state-random-access-family {
+ entity: state
+ attributes: [ "*" ]
+ type: primary
+ }
+
+}
diff --git a/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java b/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java
new file mode 100644
index 000000000..eb0ff866c
--- /dev/null
+++ b/beam/runner/src/test/java/cz/o2/proxima/beam/runner/ProximaPipelineRunnerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017-2023 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.beam.runner;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.junit.Test;
+
+public class ProximaPipelineRunnerTest {
+
+ @Test
+ public void testPipelineSubmit() throws Exception {
+ Pipeline pipeline = Pipeline.create();
+ pipeline.apply(Impulse.create());
+ execute(pipeline);
+ }
+
+ private void execute(Pipeline pipeline) throws Exception {
+ RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline);
+ ProximaRunner runner = new ProximaRunner(pipeline.getOptions());
+ PortablePipelineResult res = runner.run(protoPipeline, asJobInfo(pipeline));
+ res.waitUntilFinish();
+ }
+
+ private JobInfo asJobInfo(Pipeline pipeline) {
+ return new JobInfo() {
+ @Override
+ public @NonNull String jobId() {
+ return "fakeId";
+ }
+
+ @Override
+ public @NonNull String jobName() {
+ return "fakeName";
+ }
+
+ @Override
+ public @NonNull String retrievalToken() {
+ return "fakeToken";
+ }
+
+ @Override
+ public @NonNull Struct pipelineOptions() {
+ return Struct.newBuilder().build();
+ }
+ };
+ }
+}
diff --git a/beam/runner/src/test/resources/application.conf b/beam/runner/src/test/resources/application.conf
new file mode 100644
index 000000000..088914c0e
--- /dev/null
+++ b/beam/runner/src/test/resources/application.conf
@@ -0,0 +1,18 @@
+attributeFamilies {
+
+ commit-log-family {
+ storage: "kafka-test://local/commit-log"
+ access: commit-log
+ }
+
+ shuffle-family {
+ storage: "kafka-test://local/shuffle"
+ access: commit-log
+ }
+
+ state-random-access-family {
+ storage: "kafka-test://local/state"
+ access: [ state-commit-log, cached-view ]
+ }
+
+}
diff --git a/beam/runner/src/test/resources/log4j2.properties b/beam/runner/src/test/resources/log4j2.properties
new file mode 100644
index 000000000..3924d894a
--- /dev/null
+++ b/beam/runner/src/test/resources/log4j2.properties
@@ -0,0 +1,9 @@
+# console appender
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %level %c{1}:%L - %msg%n
+
+# Root logger option
+rootLogger.level = ${sys:LOG_LEVEL:-INFO}
+rootLogger.appenderRef.stdout.ref = STDOUT