Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [flink-core] Initial draft of sql support. #557

Open
wants to merge 2 commits into
base: field_accessor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
/**
* Interface for value accessors allowed create and get value of attribute
*
* @param <InputT> input type
* @param <OutputT> output type
* @param <InputT> Type of "raw" input element, that we want to convert into normalized form.
* @param <OutputT> Normalized "output" type, eg. `bytes -> string` for schema type `string`.
*/
@Experimental
public interface AttributeValueAccessor<InputT, OutputT> extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,28 @@ static StructureValue of(Map<String, Object> value) {

public interface StructureValueAccessor<T> extends AttributeValueAccessor<T, StructureValue> {

/**
* Get accessor for a given field. Please not that accessors only work on "raw values". See
* {@link #getRawFieldValue(String, Object)} for more details.
*
* @param name Name of the field to get accessor for.
* @return Field accessor.
*/
AttributeValueAccessor<?, ?> getFieldAccessor(String name);

/**
* Get raw value of a given field. In this context, raw value means a value before applying a
* field accessor on it (for example it can be a byte representation, that would be converted to
* a string after "accessing"). This is intended for partial message parsing and to be used in
* combination with {@link #getFieldAccessor(String)}.
*
* @param name Name of the field.
* @param structure Structure to get a raw value from.
* @param <OutputT> Type of raw value. This is only to simplify casting of returned value.
* @return Raw value.
*/
<OutputT> OutputT getRawFieldValue(String name, T structure);

@Override
default Type getType() {
return Type.STRUCTURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public void testArrayWithStructureValue() {
private static class TestStructureAccessor
implements StructureValueAccessor<Map<String, Object>> {

@Override
public AttributeValueAccessor<?, ?> getFieldAccessor(String name) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public <OutputT> OutputT getRawFieldValue(String name, Map<String, Object> structure) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public StructureValue valueOf(Map<String, Object> object) {
return StructureValue.of(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public boolean onError(Throwable error) {
}

@Test
public void testOrderedObserverLifycycle() {
public void testOrderedObserverLifecycle() {
StreamElement update =
StreamElement.upsert(
entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <li>Unified metrics for monitoring and alerting.
* </ul>
*
* @param <OptionsT> Source options.
* @param <ReaderT> Reader to use, for reading data.
* @param <ObserverT> Observer implementation.
* @param <OffsetT> Offset used by the current reader.
Expand All @@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>,
OffsetT extends Serializable,
Expand Down Expand Up @@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)

@Getter private final RepositoryFactory repositoryFactory;
private final List<AttributeDescriptor<?>> attributeDescriptors;
@Getter private final OptionsT options;
private final ResultExtractor<OutputT> resultExtractor;

@Nullable private transient List<OffsetT> restoredOffsets;
Expand All @@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
OptionsT options,
ResultExtractor<OutputT> resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
this.options = options;
this.resultExtractor = resultExtractor;
}

Expand Down Expand Up @@ -290,14 +295,16 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
if (!options.shutdownFinishedSources()) {
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@Slf4j
public class BatchLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.BatchLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand All @@ -82,7 +84,7 @@ BatchLogReader createLogReader(List<AttributeDescriptor<?>> attributeDescriptors

@Override
List<Partition> getPartitions(BatchLogReader reader) {
return reader.getPartitions();
return reader.getPartitions(getOptions().startTimestamp(), getOptions().endTimestamp());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -32,6 +31,7 @@
@Slf4j
public class CommitLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
public CommitLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.CommitLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand Down Expand Up @@ -109,8 +110,10 @@ UnifiedObserveHandle<Offset> observePartitions(
List<Partition> partitions,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
reader.observeBulkPartitions(
partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand All @@ -132,7 +135,7 @@ UnifiedObserveHandle<Offset> observeRestoredOffsets(
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
reader.observeBulkOffsets(offsets, false, observer);
reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand Down
Loading