diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/EventStreamIterator.java b/prestodb/src/main/java/io/pravega/connectors/presto/EventStreamIterator.java deleted file mode 100644 index 2b40c7c..0000000 --- a/prestodb/src/main/java/io/pravega/connectors/presto/EventStreamIterator.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.pravega.connectors.presto; - -import com.facebook.airlift.log.Logger; -import io.pravega.connectors.presto.decoder.BytesEvent; -import io.pravega.connectors.presto.decoder.DecodableEvent; -import io.pravega.client.stream.EventRead; -import io.pravega.client.stream.EventStreamReader; -import io.pravega.client.stream.ReaderConfig; -import io.pravega.client.stream.ReaderGroupConfig; -import io.pravega.client.stream.impl.ByteBufferSerializer; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.UUID; - -import static io.pravega.connectors.presto.util.PravegaNameUtils.scopedName; - -public class EventStreamIterator - implements Iterator -{ - private static final Logger log = Logger.get(EventStreamIterator.class); - - private final PravegaSegmentManager segmentManager; - private final ReaderArgs readerArgs; - private EventStreamReader reader; - private final long readTimeoutMs; - - private ByteBuffer event; - - public EventStreamIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs, PravegaProperties properties) - { - this.segmentManager = segmentManager; - this.readerArgs = readerArgs; - this.readTimeoutMs = properties.getEventReadTimeoutMs(); - } - - private void init() - { - log.info("open iterator for stream " + readerArgs); - String readerGroupName = readerArgs.getReaderGroup(); - if (readerArgs.getReaderGroup() == null) { - readerGroupName = "reader-group-" + UUID.randomUUID().toString(); - ReaderGroupConfig config = - ReaderGroupConfig.builder() - .stream(scopedName(readerArgs.getScope(), readerArgs.getStream()), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()) - .build(); - log.info("create reader group " + readerGroupName); - segmentManager.readerGroupManager( - readerArgs.getScope()).createReaderGroup(readerGroupName, config); - } - - String readerId = UUID.randomUUID().toString(); - log.info("create reader " + readerId); - reader = segmentManager.getEventStreamClientFactory(readerArgs.getScope()) - .createReader(readerId, - readerGroupName, - new ByteBufferSerializer(), - ReaderConfig.builder().build()); - } - - private boolean _next() - { - if (reader == null) { - init(); - } - - EventRead read; - do { - read = reader.readNextEvent(readTimeoutMs); - } while (read.isCheckpoint()); - event = read.getEvent(); - return event != null; - } - - @Override - public boolean hasNext() - { - return event != null || _next(); - } - - @Override - public DecodableEvent next() - { - BytesEvent bytesEvent = new BytesEvent(event); - event = null; - return bytesEvent; - } -} diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaProperties.java b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaProperties.java index e5c40fd..cfdcce9 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaProperties.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaProperties.java @@ -24,14 +24,8 @@ public class PravegaProperties { - private static final String SESSION_READER_TYPE = "reader_type"; - private static final String SESSION_CURSOR_DELIM_CHAR = "cursor_delim_char"; - private static final String SESSION_GROUPED_EVENT_SPLITS = "grouped_event_splits"; - - private static final String SESSION_EVENT_READ_TIMEOUT_MS = "event_read_timeout_ms"; - private final ConnectorSession session; public PravegaProperties(final ConnectorSession session) @@ -50,27 +44,6 @@ public static List> buildSessionProperties() ",", false)); - propertyMetadataList.add( - PropertyMetadata.stringProperty( - SESSION_READER_TYPE, - "reader type [event|grouped_event|segment_range|segment_range_per_split]", - "segment_range_per_split", - false)); - - propertyMetadataList.add( - PropertyMetadata.integerProperty( - SESSION_GROUPED_EVENT_SPLITS, - "number of splits when using grouped readers", - 63, - false)); - - propertyMetadataList.add( - PropertyMetadata.integerProperty( - SESSION_EVENT_READ_TIMEOUT_MS, - "timeout in ms to readNextEvent()", - 10000, - false)); - return propertyMetadataList; } @@ -78,19 +51,4 @@ public String getCursorDelimChar() { return session.getProperty(SESSION_CURSOR_DELIM_CHAR, String.class); } - - public String getReaderType() - { - return session.getProperty(SESSION_READER_TYPE, String.class); - } - - public int getGroupedEventSplits() - { - return session.getProperty(SESSION_GROUPED_EVENT_SPLITS, Integer.class); - } - - public int getEventReadTimeoutMs() - { - return session.getProperty(SESSION_EVENT_READ_TIMEOUT_MS, Integer.class); - } } diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaRecordSet.java b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaRecordSet.java index 8130f67..4f85bed 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaRecordSet.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaRecordSet.java @@ -81,25 +81,8 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - Iterator eventIterator; - - switch (split.getReaderType()) { - case EVENT_STREAM: - case SINGLE_GROUP_EVENT_STREAM: - eventIterator = new EventStreamIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class), properties); - break; - - case SEGMENT_RANGE: - eventIterator = new SegmentRangeIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class)); - break; - - case SEGMENT_RANGE_PER_SPLIT: - eventIterator = new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); - break; - - default: - throw new IllegalArgumentException("readerType " + split.getReaderType()); - } + Iterator eventIterator = + new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); return new PravegaRecordCursor(eventIterator, columnHandles, eventDecoder, properties, split.getSchema().get(0).getFormat()); } diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSegmentManager.java b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSegmentManager.java index 69c0d90..4d0a004 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSegmentManager.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSegmentManager.java @@ -137,40 +137,6 @@ KeyValueTableFactory getKeyValueTableFactory(String scope) return factory; } - EventStreamClientFactory getEventStreamClientFactory(String scope) - { - EventStreamClientFactory factory = eventStreamClientFactoryMap.get(scope); - if (factory == null) { - synchronized (this) { - factory = eventStreamClientFactoryMap.get(scope); - if (factory == null) { - factory = EventStreamClientFactory.withScope(scope, clientConfig); - if (eventStreamClientFactoryMap.putIfAbsent(scope, factory) != null) { - throw new RuntimeException("unexpected concurrent create of event stream factory"); - } - } - } - } - return factory; - } - - ReaderGroupManager readerGroupManager(String scope) - { - ReaderGroupManager readerGroupManager = scopedReaderGroupManagerMap.get(scope); - if (readerGroupManager == null) { - synchronized (this) { - readerGroupManager = scopedReaderGroupManagerMap.get(scope); - if (readerGroupManager == null) { - readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig); - if (scopedReaderGroupManagerMap.putIfAbsent(scope, readerGroupManager) != null) { - throw new RuntimeException("unexpected concurrent create of reader group manager"); - } - } - } - } - return readerGroupManager; - } - SerializerConfig serializerConfig(String groupId) { return SerializerConfig.builder() diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java index 4f4a7d0..aa9a3da 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java @@ -27,14 +27,12 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.ReaderGroupConfig; import javax.inject.Inject; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -42,7 +40,6 @@ import static io.pravega.connectors.presto.PravegaErrorCode.PRAVEGA_SPLIT_ERROR; import static io.pravega.connectors.presto.PravegaHandleResolver.convertLayout; import static io.pravega.connectors.presto.util.PravegaNameUtils.multiSourceStream; -import static io.pravega.connectors.presto.util.PravegaNameUtils.scopedName; import static io.pravega.connectors.presto.util.PravegaSerializationUtils.serialize; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -89,7 +86,7 @@ public ConnectorSplitSource getSplits( return new FixedSplitSource(splits.build()); } - catch (Exception e) { // Catch all exceptions because Pravega library is written in scala and checked exceptions are not declared in method signature. + catch (Exception e) { if (e instanceof PrestoException) { throw e; } @@ -101,17 +98,7 @@ public ConnectorSplitSource getSplits( private static ReaderType readerType(PravegaProperties properties) { - String type = properties.getReaderType(); - switch (type) { - case "event": - return ReaderType.EVENT_STREAM; - case "grouped_event": - return ReaderType.SINGLE_GROUP_EVENT_STREAM; - case "segment_range_per_split": - return ReaderType.SEGMENT_RANGE_PER_SPLIT; - default: - return ReaderType.SEGMENT_RANGE; - } + return ReaderType.SEGMENT_RANGE_PER_SPLIT; } private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder splits) @@ -149,25 +136,7 @@ private void buildStreamSplits(final PravegaProperties properties, sourceStreams.forEach(stream -> { StreamCutSupplier streamCutSupplier = new StreamCutSupplier(streamReaderManager, pravegaTableHandle.getSchemaName(), stream); - Supplier splitSupplier; - log.info("get split supplier for " + readerType); - switch (readerType) { - case EVENT_STREAM: - case SEGMENT_RANGE: - splitSupplier = splitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - case SINGLE_GROUP_EVENT_STREAM: - splitSupplier = groupedReaderSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier, properties.getGroupedEventSplits()); - break; - - case SEGMENT_RANGE_PER_SPLIT: - splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - default: - throw new IllegalArgumentException("" + readerType); - } + Supplier splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); PravegaSplit split = splitSupplier.get(); do { @@ -180,82 +149,6 @@ private void buildStreamSplits(final PravegaProperties properties, log.info("created " + splitCounter.get() + " stream splits of type " + readerType); } - Supplier splitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier) - { - return () -> { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - return null; - } - - log.info(readerType + " split " + range); - - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(new ReaderArgs(tableHandle.getSchemaName(), stream, range, null)), - tableHandle.getSchemaRegistryGroupId()); - }; - } - - Supplier groupedReaderSplitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier, - final int numSplits) - { - StreamCutRange first = streamCutSupplier.get(); - StreamCutRange last = null; - do { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - break; - } - last = range; - } while (true); - - if (last == null) { - throw new IllegalStateException("no end split"); - } - StreamCutRange range = new StreamCutRange(first.getStart(), last.getEnd()); - - log.info(readerType + " split " + range); - String readerGroup = UUID.randomUUID().toString(); - - final ReaderArgs readerArgs = - new ReaderArgs(tableHandle.getSchemaName(), stream, range, readerGroup); - ReaderGroupConfig config = - ReaderGroupConfig.builder() - .stream(scopedName(readerArgs.getScope(), readerArgs.getStream()), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()) - .build(); - - log.info("create reader group " + readerGroup); - streamReaderManager.readerGroupManager( - tableHandle.getSchemaName()).createReaderGroup(readerGroup, config); - - final AtomicInteger splitCounter = new AtomicInteger(); - - return () -> { - if (splitCounter.getAndIncrement() == numSplits) { - return null; - } - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(readerArgs), - tableHandle.getSchemaRegistryGroupId()); - }; - } - Supplier segmentPerSplitSupplier(final ReaderType readerType, final PravegaTableHandle tableHandle, final String stream, diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/ReaderArgs.java b/prestodb/src/main/java/io/pravega/connectors/presto/ReaderArgs.java deleted file mode 100644 index 083eae2..0000000 --- a/prestodb/src/main/java/io/pravega/connectors/presto/ReaderArgs.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.pravega.connectors.presto; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.Serializable; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - -public class ReaderArgs - implements Serializable -{ - private final String scope; - private final String stream; - private final StreamCutRange streamCutRange; - private final String readerGroup; - - public ReaderArgs(@JsonProperty("scope") String scope, - @JsonProperty("stream") String stream, - @JsonProperty("streamCutRange") StreamCutRange streamCutRange, - @JsonProperty("readerGroup") String readerGroup) - { - this.scope = requireNonNull(scope, "scope is null"); - this.stream = requireNonNull(stream, "stream is null"); - this.streamCutRange = requireNonNull(streamCutRange, "streamCutRange is null"); - this.readerGroup = readerGroup; // may be null - } - - @JsonProperty - public String getScope() - { - return scope; - } - - @JsonProperty - public String getStream() - { - return stream; - } - - @JsonProperty - public StreamCutRange getStreamCutRange() - { - return streamCutRange; - } - - @JsonProperty - public String getReaderGroup() - { - return readerGroup; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("scope", scope) - .add("stream", stream) - .add("streamCutRange", streamCutRange) - .add("readerGroup", readerGroup) - .toString(); - } -} diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/ReaderType.java b/prestodb/src/main/java/io/pravega/connectors/presto/ReaderType.java index dbf4a9e..655824b 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/ReaderType.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/ReaderType.java @@ -18,9 +18,6 @@ public enum ReaderType { - SEGMENT_RANGE /* 1 split handles all segments within a stream cut */, SEGMENT_RANGE_PER_SPLIT /* segments for stream cut are given out to different splits */, - EVENT_STREAM /* stream oriented reading (vs. segments) */, - SINGLE_GROUP_EVENT_STREAM /* stream oriented reading (vs. segments) all readers in same group */, KVT /* key value table */, } diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/SegmentRangeIterator.java b/prestodb/src/main/java/io/pravega/connectors/presto/SegmentRangeIterator.java deleted file mode 100644 index 8120e4c..0000000 --- a/prestodb/src/main/java/io/pravega/connectors/presto/SegmentRangeIterator.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.pravega.connectors.presto; - -import com.facebook.airlift.log.Logger; -import io.pravega.connectors.presto.decoder.BytesEvent; -import io.pravega.connectors.presto.decoder.DecodableEvent; -import io.pravega.client.batch.SegmentIterator; -import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.impl.ByteBufferSerializer; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class SegmentRangeIterator - implements Iterator -{ - private static final Logger log = Logger.get(SegmentRangeIterator.class); - - private final PravegaSegmentManager segmentManager; - - private final Iterator segmentIterator; - - private SegmentIterator segmentEventIterator; - - private ByteBuffer event; - - private final StreamCutRange streamCutRange; - - private int fullSegments; - - private int emptySegments; - - private int events; - - public SegmentRangeIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs) - { - this.segmentManager = segmentManager; - - this.streamCutRange = readerArgs.getStreamCutRange(); - - log.info("open iterator for " + streamCutRange); - this.segmentIterator = - segmentManager.getSegments(readerArgs.getScope(), - readerArgs.getStream(), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()).getIterator(); - } - - private ByteBuffer _next() - { - if (segmentEventIterator != null && segmentEventIterator.hasNext()) { - events++; - return segmentEventIterator.next(); - } - - do { - if (!segmentIterator.hasNext()) { - log.info("done with " + streamCutRange + "; full: " + fullSegments + ", empty: " + emptySegments + ", events: " + events); - return null; - } - - segmentEventIterator = segmentManager.getSegmentIterator(segmentIterator.next(), new ByteBufferSerializer()); - log.info("next segment " + streamCutRange + " has event? " + segmentEventIterator.hasNext()); - if (segmentEventIterator.hasNext()) { - fullSegments++; - events++; - return segmentEventIterator.next(); - } - emptySegments++; - // maybe segment was empty, continue - } while (true); - } - - @Override - public boolean hasNext() - { - if (event == null) { - event = _next(); - } - return event != null; - } - - @Override - public DecodableEvent next() - { - ByteBuffer toReturn = event; - event = null; - return new BytesEvent(toReturn); - } -} diff --git a/trino/src/main/java/io/trino/plugin/pravega/EventStreamIterator.java b/trino/src/main/java/io/trino/plugin/pravega/EventStreamIterator.java deleted file mode 100644 index cd24afd..0000000 --- a/trino/src/main/java/io/trino/plugin/pravega/EventStreamIterator.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.trino.plugin.pravega; - -import io.airlift.log.Logger; -import io.pravega.client.stream.EventRead; -import io.pravega.client.stream.EventStreamReader; -import io.trino.plugin.pravega.decoder.BytesEvent; -import io.trino.plugin.pravega.decoder.DecodableEvent; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class EventStreamIterator - implements Iterator -{ - private static final Logger log = Logger.get(EventStreamIterator.class); - - private final PravegaSegmentManager segmentManager; - private final ReaderArgs readerArgs; - private EventStreamReader reader; - private final long readTimeoutMs; - - private ByteBuffer event; - - public EventStreamIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs, PravegaProperties properties) - { - this.segmentManager = segmentManager; - this.readerArgs = readerArgs; - this.readTimeoutMs = properties.getEventReadTimeoutMs(); - } - - private boolean _next() - { - EventRead read; - do { - read = reader.readNextEvent(readTimeoutMs); - } - while (read.isCheckpoint()); - event = read.getEvent(); - return event != null; - } - - @Override - public boolean hasNext() - { - return event != null || _next(); - } - - @Override - public DecodableEvent next() - { - BytesEvent bytesEvent = new BytesEvent(event); - event = null; - return bytesEvent; - } -} diff --git a/trino/src/main/java/io/trino/plugin/pravega/PravegaProperties.java b/trino/src/main/java/io/trino/plugin/pravega/PravegaProperties.java index 97a0e55..53552da 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/PravegaProperties.java +++ b/trino/src/main/java/io/trino/plugin/pravega/PravegaProperties.java @@ -24,14 +24,8 @@ public class PravegaProperties { - private static final String SESSION_READER_TYPE = "reader_type"; - private static final String SESSION_CURSOR_DELIM_CHAR = "cursor_delim_char"; - private static final String SESSION_GROUPED_EVENT_SPLITS = "grouped_event_splits"; - - private static final String SESSION_EVENT_READ_TIMEOUT_MS = "event_read_timeout_ms"; - private final ConnectorSession session; public PravegaProperties(final ConnectorSession session) @@ -50,27 +44,6 @@ public static List> buildSessionProperties() ",", false)); - propertyMetadataList.add( - PropertyMetadata.stringProperty( - SESSION_READER_TYPE, - "reader type [event|grouped_event|segment_range|segment_range_per_split]", - "segment_range_per_split", - false)); - - propertyMetadataList.add( - PropertyMetadata.integerProperty( - SESSION_GROUPED_EVENT_SPLITS, - "number of splits when using grouped readers", - 63, - false)); - - propertyMetadataList.add( - PropertyMetadata.integerProperty( - SESSION_EVENT_READ_TIMEOUT_MS, - "timeout in ms to readNextEvent()", - 10000, - false)); - return propertyMetadataList; } @@ -78,19 +51,4 @@ public String getCursorDelimChar() { return session.getProperty(SESSION_CURSOR_DELIM_CHAR, String.class); } - - public String getReaderType() - { - return session.getProperty(SESSION_READER_TYPE, String.class); - } - - public int getGroupedEventSplits() - { - return session.getProperty(SESSION_GROUPED_EVENT_SPLITS, Integer.class); - } - - public int getEventReadTimeoutMs() - { - return session.getProperty(SESSION_EVENT_READ_TIMEOUT_MS, Integer.class); - } } diff --git a/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSet.java b/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSet.java index 811f5b8..fd702bd 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSet.java +++ b/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSet.java @@ -81,25 +81,8 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - Iterator eventIterator; - - switch (split.getReaderType()) { - case EVENT_STREAM: - case SINGLE_GROUP_EVENT_STREAM: - eventIterator = new EventStreamIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class), properties); - break; - - case SEGMENT_RANGE: - eventIterator = new SegmentRangeIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class)); - break; - - case SEGMENT_RANGE_PER_SPLIT: - eventIterator = new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); - break; - - default: - throw new IllegalArgumentException("readerType " + split.getReaderType()); - } + Iterator eventIterator = + new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); return new PravegaRecordCursor(eventIterator, columnHandles, eventDecoder, properties, split.getSchema().get(0).getFormat()); } diff --git a/trino/src/main/java/io/trino/plugin/pravega/PravegaSegmentManager.java b/trino/src/main/java/io/trino/plugin/pravega/PravegaSegmentManager.java index d2704a5..c82db8b 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/PravegaSegmentManager.java +++ b/trino/src/main/java/io/trino/plugin/pravega/PravegaSegmentManager.java @@ -137,40 +137,6 @@ KeyValueTableFactory getKeyValueTableFactory(String scope) return factory; } - EventStreamClientFactory getEventStreamClientFactory(String scope) - { - EventStreamClientFactory factory = eventStreamClientFactoryMap.get(scope); - if (factory == null) { - synchronized (this) { - factory = eventStreamClientFactoryMap.get(scope); - if (factory == null) { - factory = EventStreamClientFactory.withScope(scope, clientConfig); - if (eventStreamClientFactoryMap.putIfAbsent(scope, factory) != null) { - throw new RuntimeException("unexpected concurrent create of event stream factory"); - } - } - } - } - return factory; - } - - ReaderGroupManager readerGroupManager(String scope) - { - ReaderGroupManager readerGroupManager = scopedReaderGroupManagerMap.get(scope); - if (readerGroupManager == null) { - synchronized (this) { - readerGroupManager = scopedReaderGroupManagerMap.get(scope); - if (readerGroupManager == null) { - readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig); - if (scopedReaderGroupManagerMap.putIfAbsent(scope, readerGroupManager) != null) { - throw new RuntimeException("unexpected concurrent create of reader group manager"); - } - } - } - } - return readerGroupManager; - } - SerializerConfig serializerConfig(String groupId) { return SerializerConfig.builder() diff --git a/trino/src/main/java/io/trino/plugin/pravega/PravegaSplitManager.java b/trino/src/main/java/io/trino/plugin/pravega/PravegaSplitManager.java index f3f267e..06c6b3a 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/PravegaSplitManager.java +++ b/trino/src/main/java/io/trino/plugin/pravega/PravegaSplitManager.java @@ -92,7 +92,7 @@ public ConnectorSplitSource getSplits( return new FixedSplitSource(splits.build()); } - catch (Exception e) { // Catch all exceptions because Pravega library is written in scala and checked exceptions are not declared in method signature. + catch (Exception e) { if (e instanceof TrinoException) { throw e; } @@ -104,17 +104,7 @@ public ConnectorSplitSource getSplits( private static ReaderType readerType(PravegaProperties properties) { - String type = properties.getReaderType(); - switch (type) { - case "event": - return ReaderType.EVENT_STREAM; - case "grouped_event": - return ReaderType.SINGLE_GROUP_EVENT_STREAM; - case "segment_range_per_split": - return ReaderType.SEGMENT_RANGE_PER_SPLIT; - default: - return ReaderType.SEGMENT_RANGE; - } + return ReaderType.SEGMENT_RANGE_PER_SPLIT; } private List getNodeAddresses() @@ -161,21 +151,7 @@ private void buildStreamSplits(final PravegaProperties properties, sourceStreams.forEach(stream -> { StreamCutSupplier streamCutSupplier = new StreamCutSupplier(streamReaderManager, pravegaTableHandle.getSchemaName(), stream); - Supplier splitSupplier; - log.info("get split supplier for " + readerType); - switch (readerType) { - case EVENT_STREAM: - case SEGMENT_RANGE: - splitSupplier = splitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - case SEGMENT_RANGE_PER_SPLIT: - splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - default: - throw new IllegalArgumentException("" + readerType); - } + Supplier splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); PravegaSplit split = splitSupplier.get(); do { @@ -189,30 +165,6 @@ private void buildStreamSplits(final PravegaProperties properties, log.info("created " + splitCounter.get() + " stream splits of type " + readerType); } - Supplier splitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier) - { - return () -> { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - return null; - } - - log.info(readerType + " split " + range); - - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(new ReaderArgs(tableHandle.getSchemaName(), stream, range, null)), - tableHandle.getSchemaRegistryGroupId(), - getNodeAddresses()); - }; - } - Supplier segmentPerSplitSupplier(final ReaderType readerType, final PravegaTableHandle tableHandle, final String stream, diff --git a/trino/src/main/java/io/trino/plugin/pravega/ReaderArgs.java b/trino/src/main/java/io/trino/plugin/pravega/ReaderArgs.java deleted file mode 100644 index b8721b0..0000000 --- a/trino/src/main/java/io/trino/plugin/pravega/ReaderArgs.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.trino.plugin.pravega; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.Serializable; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - -public class ReaderArgs - implements Serializable -{ - private final String scope; - private final String stream; - private final StreamCutRange streamCutRange; - private final String readerGroup; - - public ReaderArgs(@JsonProperty("scope") String scope, - @JsonProperty("stream") String stream, - @JsonProperty("streamCutRange") StreamCutRange streamCutRange, - @JsonProperty("readerGroup") String readerGroup) - { - this.scope = requireNonNull(scope, "scope is null"); - this.stream = requireNonNull(stream, "stream is null"); - this.streamCutRange = requireNonNull(streamCutRange, "streamCutRange is null"); - this.readerGroup = readerGroup; // may be null - } - - @JsonProperty - public String getScope() - { - return scope; - } - - @JsonProperty - public String getStream() - { - return stream; - } - - @JsonProperty - public StreamCutRange getStreamCutRange() - { - return streamCutRange; - } - - @JsonProperty - public String getReaderGroup() - { - return readerGroup; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("scope", scope) - .add("stream", stream) - .add("streamCutRange", streamCutRange) - .add("readerGroup", readerGroup) - .toString(); - } -} diff --git a/trino/src/main/java/io/trino/plugin/pravega/ReaderType.java b/trino/src/main/java/io/trino/plugin/pravega/ReaderType.java index 17651c6..844ea13 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/ReaderType.java +++ b/trino/src/main/java/io/trino/plugin/pravega/ReaderType.java @@ -18,9 +18,6 @@ public enum ReaderType { - SEGMENT_RANGE /* 1 split handles all segments within a stream cut */, SEGMENT_RANGE_PER_SPLIT /* segments for stream cut are given out to different splits */, - EVENT_STREAM /* stream oriented reading (vs. segments) */, - SINGLE_GROUP_EVENT_STREAM /* stream oriented reading (vs. segments) all readers in same group */, KVT /* key value table */, } diff --git a/trino/src/main/java/io/trino/plugin/pravega/SegmentRangeIterator.java b/trino/src/main/java/io/trino/plugin/pravega/SegmentRangeIterator.java deleted file mode 100644 index 8bab3a0..0000000 --- a/trino/src/main/java/io/trino/plugin/pravega/SegmentRangeIterator.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.trino.plugin.pravega; - -import io.airlift.log.Logger; -import io.pravega.client.batch.SegmentIterator; -import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.impl.ByteBufferSerializer; -import io.trino.plugin.pravega.decoder.BytesEvent; -import io.trino.plugin.pravega.decoder.DecodableEvent; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class SegmentRangeIterator - implements Iterator -{ - private static final Logger log = Logger.get(SegmentRangeIterator.class); - - private final PravegaSegmentManager segmentManager; - - private final Iterator segmentIterator; - - private SegmentIterator segmentEventIterator; - - private ByteBuffer event; - - private final StreamCutRange streamCutRange; - - private int fullSegments; - - private int emptySegments; - - private int events; - - public SegmentRangeIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs) - { - this.segmentManager = segmentManager; - - this.streamCutRange = readerArgs.getStreamCutRange(); - - log.info("open iterator for " + streamCutRange); - this.segmentIterator = - segmentManager.getSegments(readerArgs.getScope(), - readerArgs.getStream(), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()).getIterator(); - } - - private ByteBuffer _next() - { - if (segmentEventIterator != null && segmentEventIterator.hasNext()) { - events++; - return segmentEventIterator.next(); - } - - do { - if (!segmentIterator.hasNext()) { - log.info("done with " + streamCutRange + "; full: " + fullSegments + ", empty: " + emptySegments + ", events: " + events); - return null; - } - - segmentEventIterator = segmentManager.getSegmentIterator(segmentIterator.next(), new ByteBufferSerializer()); - log.info("next segment " + streamCutRange + " has event? " + segmentEventIterator.hasNext()); - if (segmentEventIterator.hasNext()) { - fullSegments++; - events++; - return segmentEventIterator.next(); - } - emptySegments++; - // maybe segment was empty, continue - } while (true); - } - - @Override - public boolean hasNext() - { - if (event == null) { - event = _next(); - } - return event != null; - } - - @Override - public DecodableEvent next() - { - ByteBuffer toReturn = event; - event = null; - return new BytesEvent(toReturn); - } -}