Skip to content

Commit

Permalink
Merge pull request #411: [beam-core]: Introduction of ProximaIO.write
Browse files Browse the repository at this point in the history
je-ik authored Apr 3, 2024
2 parents 9b52a4a + f4e350c commit 95e6f80
Showing 3 changed files with 256 additions and 0 deletions.
112 changes: 112 additions & 0 deletions beam/core/src/main/java/cz/o2/proxima/beam/io/ProximaIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.io;

import cz.o2.proxima.core.annotations.Experimental;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;

/** IO connector for Proxima platform. */
@Experimental
@Slf4j
public class ProximaIO {

private ProximaIO() {
// No-op.
}

/**
* Write {@link StreamElement stream elements} into proxima using {@link DirectDataOperator}.
*
* @param repositoryFactory Serializable factory for Proxima repository.
* @return Write transform.
*/
public static Write write(RepositoryFactory repositoryFactory) {
return new Write(repositoryFactory);
}

/**
* Transformation that writes {@link StreamElement stream elements} into proxima using {@link
* DirectDataOperator}.
*/
public static class Write extends PTransform<PCollection<StreamElement>, PDone> {

private final RepositoryFactory repositoryFactory;

private Write(RepositoryFactory repositoryFactory) {
this.repositoryFactory = repositoryFactory;
}

@Override
public PDone expand(PCollection<StreamElement> input) {
input.apply("Write", ParDo.of(new WriteFn(repositoryFactory)));
return PDone.in(input.getPipeline());
}
}

static class WriteFn extends DoFn<StreamElement, Void> {

private final RepositoryFactory repositoryFactory;

private transient DirectDataOperator direct;

WriteFn(RepositoryFactory repositoryFactory) {
this.repositoryFactory = repositoryFactory;
}

@VisibleForTesting
DirectDataOperator getDirect() {
return direct;
}

@Setup
public void setUp() {
direct = repositoryFactory.apply().getOrCreateOperator(DirectDataOperator.class);
}

@ProcessElement
public void processElement(@Element StreamElement element) {
direct
.getWriter(element.getAttributeDescriptor())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format("Missing writer for [%s].", element.getAttributeDescriptor())))
.write(
element,
(succ, error) -> {
if (error != null) {
log.error(String.format("Unable to write element [%s].", element), error);
}
});
}

@Teardown
public void tearDown() {
if (direct != null) {
direct.close();
}
}
}
}
74 changes: 74 additions & 0 deletions beam/core/src/test/java/cz/o2/proxima/beam/io/ProximaIOTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.io;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.randomaccess.KeyValue;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PDone;
import org.junit.Rule;
import org.junit.Test;

public class ProximaIOTest {

private final Repository repository =
Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
private final EntityDescriptor gateway = repository.getEntity("gateway");
private final AttributeDescriptor<byte[]> status = gateway.getAttribute("status");

@Rule public TestPipeline pipeline = TestPipeline.create();

@Test
public void writeTest() {
PDone done =
pipeline
.apply(Create.of(Arrays.asList(write("key1"), write("key2"), write("key3"))))
.apply(ProximaIO.write(repository.asFactory()));
assertNotNull(pipeline.run());
try (DirectDataOperator direct = repository.getOrCreateOperator(DirectDataOperator.class)) {
RandomAccessReader reader = Optionals.get(direct.getRandomAccess(status));
Optional<KeyValue<byte[]>> status1 = reader.get("key1", status);
Optional<KeyValue<byte[]>> status2 = reader.get("key2", status);
assertTrue(status2.isPresent());
assertTrue(status1.isPresent());
}
}

private StreamElement write(String key) {
return StreamElement.upsert(
gateway,
status,
UUID.randomUUID().toString(),
key,
status.getName(),
System.currentTimeMillis(),
new byte[] {1});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.io;

import static org.junit.Assert.assertTrue;

import cz.o2.proxima.beam.io.ProximaIO.WriteFn;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.direct.core.randomaccess.KeyValue;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.util.Optional;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ProximaIOWriteFnTest {

private final Repository repository =
Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
private final EntityDescriptor gateway = repository.getEntity("gateway");
private final AttributeDescriptor<byte[]> status = gateway.getAttribute("status");
private final WriteFn writeFn = new WriteFn(repository.asFactory());
private RandomAccessReader reader;

@Before
public void setup() {
writeFn.setUp();
reader = Optionals.get(writeFn.getDirect().getRandomAccess(status));
}

@After
public void tearDown() {
writeFn.tearDown();
}

@Test
public void writeSuccessfullyTest() {
long now = System.currentTimeMillis();
writeFn.processElement(
StreamElement.upsert(
gateway,
status,
UUID.randomUUID().toString(),
"key1",
status.getName(),
now,
new byte[] {1}));
Optional<KeyValue<byte[]>> keyValue = reader.get("key1", status, now);
assertTrue(keyValue.isPresent());
}
}

0 comments on commit 95e6f80

Please sign in to comment.