Skip to content

Commit

Permalink
[flink-core] Initial draft of sql support.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk committed Jun 4, 2021
1 parent 56fe1f6 commit 20c6bcf
Show file tree
Hide file tree
Showing 17 changed files with 818 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
/**
* Interface for value accessors allowed create and get value of attribute
*
* @param <InputT> input type
* @param <OutputT> output type
* @param <InputT> Type of "raw" input element, that we want to convert into normalized form.
* @param <OutputT> Normalized "output" type, eg. `bytes -> string` for schema type `string`.
*/
@Experimental
public interface AttributeValueAccessor<InputT, OutputT> extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ static StructureValue of(Map<String, Object> value) {

public interface StructureValueAccessor<T> extends AttributeValueAccessor<T, StructureValue> {

AttributeValueAccessor<?, ?> getFieldAccessor(String name);

Object getRawValue(String name, T object);

@Override
default Type getType() {
return Type.STRUCTURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public void testArrayWithStructureValue() {
private static class TestStructureAccessor
implements StructureValueAccessor<Map<String, Object>> {

@Override
public AttributeValueAccessor<?, ?> getFieldAccessor(String name) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public Object getRawValue(String name, Map<String, Object> object) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public StructureValue valueOf(Map<String, Object> object) {
return StructureValue.of(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public final Optional<RandomAccessReader> getRandomAccess(AttributeDescriptor<?>
return getRandomAccess(Arrays.asList(attrs));
}

private Optional<DirectAttributeFamilyDescriptor> getFamilyForAttributes(
public Optional<DirectAttributeFamilyDescriptor> getFamilyForAttributes(
Collection<AttributeDescriptor<?>> attrs,
UnaryFunction<DirectAttributeFamilyDescriptor, Boolean> mask) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public boolean onError(Throwable error) {
}

@Test
public void testOrderedObserverLifycycle() {
public void testOrderedObserverLifecycle() {
StreamElement update =
StreamElement.upsert(
entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,16 +291,16 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
}
}
}
// while (cancelled.getCount() > 0) {
// try {
// cancelled.await();
// } catch (InterruptedException e) {
// if (cancelled.getCount() == 0) {
// // Re-interrupt if cancelled.
// Thread.currentThread().interrupt();
// }
// }
// }
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright 2017-2021 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.flink.core;

import cz.o2.proxima.flink.core.table.LogCatalogTable;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.DataOperator;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.util.Optionals;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.Value;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;

public class FlinkDataOperator implements DataOperator {

private final Repository repository;
@Getter private final StreamExecutionEnvironment executionEnvironment;
@Getter private final StreamTableEnvironment tableEnvironment;

public FlinkDataOperator(Repository repository) {
this.repository = repository;
this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
this.tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
}

@Override
public void close() {
// Nothing to close.
}

@Override
public void reload() {
// Nothing to reload.
}

@Override
public Repository getRepository() {
return repository;
}

@Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
@Value
public static class CommitLogOptions {

@Builder.Default Position initialPosition = Position.OLDEST;

@Builder.Default boolean stopAtCurrent = false;
}

public DataStream<StreamElement> createBatchLogStream(
List<AttributeDescriptor<?>> attributeDescriptors) {
return createBatchLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build());
}

public DataStream<StreamElement> createBatchLogStream(
List<AttributeDescriptor<?>> attributeDescriptors, CommitLogOptions options) {
return executionEnvironment.addSource(
new BatchLogSourceFunction<>(
repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
sourceName("BatchLog", attributeDescriptors));
}

private static String sourceName(String type, List<AttributeDescriptor<?>> attributeDescriptors) {
return String.format(
"%s: %s",
type,
attributeDescriptors
.stream()
.sorted()
.map(AttributeDescriptor::getName)
.collect(Collectors.joining(",")));
}

public DataStream<StreamElement> createCommitLogStream(
List<AttributeDescriptor<?>> attributeDescriptors) {
return createCommitLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build());
}

public DataStream<StreamElement> createCommitLogStream(
List<AttributeDescriptor<?>> attributeDescriptors, CommitLogOptions options) {
return executionEnvironment.addSource(
new CommitLogSourceFunction<>(
repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
sourceName("CommitLog", attributeDescriptors));
}

public void registerCommitLogTable(
String table, List<AttributeDescriptor<?>> attributeDescriptors, CommitLogOptions options) {
final Catalog catalog =
Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog()));
try {
catalog.createTable(
ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), table)),
new LogCatalogTable(repository, attributeDescriptors),
true);
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to register a new table [%s].", table), e);
}
}

/** @see StreamExecutionEnvironment#execute(String) for more details. */
public JobExecutionResult execute(String jobName) throws Exception {
return executionEnvironment.execute(jobName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright 2017-2021 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.flink.core;

import cz.o2.proxima.repository.DataOperator;
import cz.o2.proxima.repository.DataOperatorFactory;
import cz.o2.proxima.repository.Repository;

public class FlinkDataOperatorFactory implements DataOperatorFactory<FlinkDataOperator> {

@Override
public String getOperatorName() {
return "flink";
}

@Override
public boolean isOfType(Class<? extends DataOperator> cls) {
return cls.isAssignableFrom(FlinkDataOperator.class);
}

@Override
public FlinkDataOperator create(Repository repository) {
return new FlinkDataOperator(repository);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright 2017-2021 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.flink.core.table;

import cz.o2.proxima.flink.core.CommitLogSourceFunction;
import cz.o2.proxima.repository.Repository;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.types.RowKind;

public class CommitLogDynamicSource implements ScanTableSource {

private final Repository repository;
private final LogCatalogTable catalogTable;

public CommitLogDynamicSource(Repository repository, LogCatalogTable catalogTable) {
this.repository = repository;
this.catalogTable = catalogTable;
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.build();
}

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return SourceFunctionProvider.of(
new CommitLogSourceFunction<>(
repository.asFactory(),
catalogTable.getAttributeDescriptors(),
new RowDataResultExtractor(catalogTable.getAttributeDescriptors())),
false);
}

@Override
public DynamicTableSource copy() {
throw new UnsupportedOperationException();
}

@Override
public String asSummaryString() {
return "CommitLog table source";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright 2017-2021 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.flink.core.table;

import java.util.Collections;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;

public class CommitLogSourceFactory implements DynamicTableSourceFactory {

static final String FACTORY_IDENTIFIER = "proxima";

public CommitLogSourceFactory() {}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final LogCatalogTable catalogTable = (LogCatalogTable) context.getCatalogTable().getOrigin();
return new CommitLogDynamicSource(catalogTable.getRepository(), catalogTable);
}

@Override
public String factoryIdentifier() {
return FACTORY_IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Loading

0 comments on commit 20c6bcf

Please sign in to comment.