diff --git a/build.gradle b/build.gradle index 60e7a36..2fa1711 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,7 @@ dependencies { compile "com.facebook.airlift:bootstrap:${airliftBootstrapVersion}" compile "com.facebook.airlift:json:${airliftJsonVersion}" compile "com.facebook.airlift:log:${airliftLogVersion}" + compile "com.facebook.airlift:concurrent:${airliftConcurrentVersion}" compile "com.facebook.airlift:configuration:${airliftConfigurationVersion}" compile "com.google.guava:guava:${guavaVersion}" compile "com.google.inject:guice:${guiceVersion}" diff --git a/gradle.properties b/gradle.properties index b93efe0..55a4329 100644 --- a/gradle.properties +++ b/gradle.properties @@ -9,6 +9,7 @@ # airliftBootstrapVersion=0.191 +airliftConcurrentVersion=0.191 airliftConfigurationVersion=0.191 airliftJsonVersion=0.191 airliftLogVersion=0.191 @@ -24,7 +25,7 @@ guavaVersion=26.0-jre guiceVersion=4.2.2 jacksonVersion=2.10.0 javaxValidationVersion=1.1.0.Final -pravegaVersion=0.9.0 +pravegaVersion=0.10.0-2833.156e2c5-SNAPSHOT pravegaSchemaRegistryVersion=0.2.0 prestoVersion=0.247 protobufVersion=3.11.4 diff --git a/src/main/java/com/facebook/presto/pravega/EventStreamIterator.java b/src/main/java/com/facebook/presto/pravega/EventStreamIterator.java deleted file mode 100644 index 1645219..0000000 --- a/src/main/java/com/facebook/presto/pravega/EventStreamIterator.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.facebook.presto.pravega; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.pravega.decoder.BytesEvent; -import com.facebook.presto.pravega.decoder.DecodableEvent; -import io.pravega.client.stream.EventRead; -import io.pravega.client.stream.EventStreamReader; -import io.pravega.client.stream.ReaderConfig; -import io.pravega.client.stream.ReaderGroupConfig; -import io.pravega.client.stream.impl.ByteBufferSerializer; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.UUID; - -import static com.facebook.presto.pravega.util.PravegaNameUtils.scopedName; - -public class EventStreamIterator - implements Iterator -{ - private static final Logger log = Logger.get(EventStreamIterator.class); - - private final PravegaSegmentManager segmentManager; - private final ReaderArgs readerArgs; - private EventStreamReader reader; - private final long readTimeoutMs; - - private ByteBuffer event; - - public EventStreamIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs, PravegaProperties properties) - { - this.segmentManager = segmentManager; - this.readerArgs = readerArgs; - this.readTimeoutMs = properties.getEventReadTimeoutMs(); - } - - private void init() - { - log.info("open iterator for stream " + readerArgs); - String readerGroupName = readerArgs.getReaderGroup(); - if (readerArgs.getReaderGroup() == null) { - readerGroupName = "reader-group-" + UUID.randomUUID().toString(); - ReaderGroupConfig config = - ReaderGroupConfig.builder() - .stream(scopedName(readerArgs.getScope(), readerArgs.getStream()), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()) - .build(); - log.info("create reader group " + readerGroupName); - segmentManager.readerGroupManager( - readerArgs.getScope()).createReaderGroup(readerGroupName, config); - } - - String readerId = UUID.randomUUID().toString(); - log.info("create reader " + readerId); - reader = segmentManager.getEventStreamClientFactory(readerArgs.getScope()) - .createReader(readerId, - readerGroupName, - new ByteBufferSerializer(), - ReaderConfig.builder().build()); - } - - private boolean _next() - { - if (reader == null) { - init(); - } - - EventRead read; - do { - read = reader.readNextEvent(readTimeoutMs); - } while (read.isCheckpoint()); - event = read.getEvent(); - return event != null; - } - - @Override - public boolean hasNext() - { - return event != null || _next(); - } - - @Override - public DecodableEvent next() - { - BytesEvent bytesEvent = new BytesEvent(event); - event = null; - return bytesEvent; - } -} diff --git a/src/main/java/com/facebook/presto/pravega/PravegaProperties.java b/src/main/java/com/facebook/presto/pravega/PravegaProperties.java index 3ef3cd5..ca26162 100644 --- a/src/main/java/com/facebook/presto/pravega/PravegaProperties.java +++ b/src/main/java/com/facebook/presto/pravega/PravegaProperties.java @@ -24,13 +24,9 @@ public class PravegaProperties { - private static final String SESSION_READER_TYPE = "reader_type"; - private static final String SESSION_CURSOR_DELIM_CHAR = "cursor_delim_char"; - private static final String SESSION_GROUPED_EVENT_SPLITS = "grouped_event_splits"; - - private static final String SESSION_EVENT_READ_TIMEOUT_MS = "event_read_timeout_ms"; + private static final String SEGMENT_RANGE_SPLIT_SIZE_BYTES = "segment_range_split_size_bytes"; private final ConnectorSession session; @@ -47,28 +43,14 @@ public static List> buildSessionProperties() PropertyMetadata.stringProperty( SESSION_CURSOR_DELIM_CHAR, "character used as field separator for delimited formats", - ",", - false)); - - propertyMetadataList.add( - PropertyMetadata.stringProperty( - SESSION_READER_TYPE, - "reader type [event|grouped_event|segment_range|segment_range_per_split]", - "segment_range_per_split", - false)); - - propertyMetadataList.add( - PropertyMetadata.integerProperty( - SESSION_GROUPED_EVENT_SPLITS, - "number of splits when using grouped readers", - 63, + "|", false)); propertyMetadataList.add( PropertyMetadata.integerProperty( - SESSION_EVENT_READ_TIMEOUT_MS, - "timeout in ms to readNextEvent()", - 10000, + SEGMENT_RANGE_SPLIT_SIZE_BYTES, + "desired split size for segment range. cannot guarantee the size, it is approximate", + 32 * 1048576, false)); return propertyMetadataList; @@ -79,18 +61,8 @@ public String getCursorDelimChar() return session.getProperty(SESSION_CURSOR_DELIM_CHAR, String.class); } - public String getReaderType() - { - return session.getProperty(SESSION_READER_TYPE, String.class); - } - - public int getGroupedEventSplits() - { - return session.getProperty(SESSION_GROUPED_EVENT_SPLITS, Integer.class); - } - - public int getEventReadTimeoutMs() + public int getSegmentRangeSplitSizeBytes() { - return session.getProperty(SESSION_EVENT_READ_TIMEOUT_MS, Integer.class); + return session.getProperty(SEGMENT_RANGE_SPLIT_SIZE_BYTES, Integer.class); } } diff --git a/src/main/java/com/facebook/presto/pravega/PravegaRecordSet.java b/src/main/java/com/facebook/presto/pravega/PravegaRecordSet.java index f25b739..f797529 100644 --- a/src/main/java/com/facebook/presto/pravega/PravegaRecordSet.java +++ b/src/main/java/com/facebook/presto/pravega/PravegaRecordSet.java @@ -23,6 +23,7 @@ import com.facebook.presto.pravega.decoder.EventDecoder; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordSet; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import io.pravega.client.batch.SegmentRange; @@ -81,26 +82,9 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - Iterator eventIterator; - - switch (split.getReaderType()) { - case EVENT_STREAM: - case SINGLE_GROUP_EVENT_STREAM: - eventIterator = new EventStreamIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class), properties); - break; - - case SEGMENT_RANGE: - eventIterator = new SegmentRangeIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class)); - break; - - case SEGMENT_RANGE_PER_SPLIT: - eventIterator = new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); - break; - - default: - throw new IllegalArgumentException("readerType " + split.getReaderType()); - } - + Preconditions.checkArgument(split.getReaderType() == ReaderType.SEGMENT_RANGE_PER_SPLIT); + Iterator eventIterator = + new SegmentRangeIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class)); return new PravegaRecordCursor(eventIterator, columnHandles, eventDecoder, properties, split.getSchema().get(0).getFormat()); } } diff --git a/src/main/java/com/facebook/presto/pravega/PravegaSegmentManager.java b/src/main/java/com/facebook/presto/pravega/PravegaSegmentManager.java index 2faa718..e42cc13 100644 --- a/src/main/java/com/facebook/presto/pravega/PravegaSegmentManager.java +++ b/src/main/java/com/facebook/presto/pravega/PravegaSegmentManager.java @@ -76,7 +76,7 @@ public PravegaSegmentManager(PravegaConnectorConfig connectorConfig) registryClient = SchemaRegistryClientFactory.withDefaultNamespace(registryConfig); } - private BatchClientFactory batchClientFactory(String scope) + BatchClientFactory batchClientFactory(String scope) { BatchClientFactory batchClientFactory = clientFactoryMap.get(scope); if (batchClientFactory == null) { @@ -93,6 +93,11 @@ private BatchClientFactory batchClientFactory(String scope) return batchClientFactory; } + public StreamManager getStreamManager() + { + return StreamManager.create(clientConfig.getControllerURI()); + } + /** * Returns a list of {@link SegmentRange} instances. * diff --git a/src/main/java/com/facebook/presto/pravega/PravegaSplitManager.java b/src/main/java/com/facebook/presto/pravega/PravegaSplitManager.java index e47dc41..979dbb1 100644 --- a/src/main/java/com/facebook/presto/pravega/PravegaSplitManager.java +++ b/src/main/java/com/facebook/presto/pravega/PravegaSplitManager.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.facebook.presto.pravega; import com.facebook.airlift.log.Logger; @@ -26,23 +25,15 @@ import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; -import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.ReaderGroupConfig; import javax.inject.Inject; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import static com.facebook.presto.pravega.PravegaErrorCode.PRAVEGA_SPLIT_ERROR; import static com.facebook.presto.pravega.PravegaHandleResolver.convertLayout; import static com.facebook.presto.pravega.util.PravegaNameUtils.multiSourceStream; -import static com.facebook.presto.pravega.util.PravegaNameUtils.scopedName; import static com.facebook.presto.pravega.util.PravegaSerializationUtils.serialize; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -53,7 +44,7 @@ public class PravegaSplitManager implements ConnectorSplitManager { - private static final Logger log = Logger.get(PravegaSegmentManager.class); + private static final Logger log = Logger.get(PravegaSplitManager.class); private final String connectorId; private final PravegaConnectorConfig pravegaConnectorConfig; private final PravegaSegmentManager streamReaderManager; @@ -77,19 +68,13 @@ public ConnectorSplitSource getSplits( SplitSchedulingContext splitSchedulingContext) { PravegaTableHandle pravegaTableHandle = convertLayout(layout).getTable(); - ImmutableList.Builder splits = ImmutableList.builder(); try { - if (pravegaTableHandle.getObjectType() == ObjectType.KV_TABLE) { - buildKVSplits(pravegaTableHandle, splits); - } - else { - buildStreamSplits(new PravegaProperties(session), pravegaTableHandle, splits); - } - - return new FixedSplitSource(splits.build()); + return pravegaTableHandle.getObjectType() == ObjectType.KV_TABLE + ? buildKVSplits(pravegaTableHandle) + : buildStreamSplits(new PravegaProperties(session), pravegaTableHandle); } - catch (Exception e) { // Catch all exceptions because Pravega library is written in scala and checked exceptions are not declared in method signature. + catch (Exception e) { if (e instanceof PrestoException) { throw e; } @@ -99,26 +84,13 @@ public ConnectorSplitSource getSplits( } } - private static ReaderType readerType(PravegaProperties properties) - { - String type = properties.getReaderType(); - switch (type) { - case "event": - return ReaderType.EVENT_STREAM; - case "grouped_event": - return ReaderType.SINGLE_GROUP_EVENT_STREAM; - case "segment_range_per_split": - return ReaderType.SEGMENT_RANGE_PER_SPLIT; - default: - return ReaderType.SEGMENT_RANGE; - } - } - - private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder splits) + private ConnectorSplitSource buildKVSplits(PravegaTableHandle pravegaTableHandle) { pravegaTableHandle.getOjectArgs().orElseThrow(() -> new IllegalArgumentException("no KF defined for " + pravegaTableHandle)); + ImmutableList.Builder splits = ImmutableList.builder(); + for (String kf : pravegaTableHandle.getOjectArgs().get()) { PravegaSplit split = new PravegaSplit(connectorId, @@ -131,162 +103,21 @@ private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList. } log.info("created " + pravegaTableHandle.getOjectArgs().get().size() + " kv splits"); + + return new FixedSplitSource(splits.build()); } - private void buildStreamSplits(final PravegaProperties properties, - PravegaTableHandle pravegaTableHandle, - ImmutableList.Builder splits) + private ConnectorSplitSource buildStreamSplits(final PravegaProperties properties, + PravegaTableHandle pravegaTableHandle) { - // TODO: Enable begin and end cuts to be configurable: https://github.com/pravega/pravega-sql/issues/24 List sourceStreams = multiSourceStream(pravegaTableHandle) - ? pravegaTableHandle.getOjectArgs().orElseThrow( - () -> new IllegalArgumentException("no args for multi source table found")) + ? pravegaTableHandle.getOjectArgs().orElseThrow(() -> new IllegalArgumentException("no args for multi source table found")) : Collections.singletonList(pravegaTableHandle.getObjectName()); - AtomicInteger splitCounter = new AtomicInteger(0); - ReaderType readerType = readerType(properties); - - sourceStreams.forEach(stream -> { - StreamCutSupplier streamCutSupplier = new StreamCutSupplier(streamReaderManager, pravegaTableHandle.getSchemaName(), stream); - - Supplier splitSupplier; - log.info("get split supplier for " + readerType); - switch (readerType) { - case EVENT_STREAM: - case SEGMENT_RANGE: - splitSupplier = splitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - case SINGLE_GROUP_EVENT_STREAM: - splitSupplier = groupedReaderSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier, properties.getGroupedEventSplits()); - break; - - case SEGMENT_RANGE_PER_SPLIT: - splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier); - break; - - default: - throw new IllegalArgumentException("" + readerType); - } - - PravegaSplit split = splitSupplier.get(); - do { - splits.add(split); - splitCounter.incrementAndGet(); - split = splitSupplier.get(); - } while (split != null); - }); - - log.info("created " + splitCounter.get() + " stream splits of type " + readerType); - } - - Supplier splitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier) - { - return () -> { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - return null; - } - - log.info(readerType + " split " + range); - - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(new ReaderArgs(tableHandle.getSchemaName(), stream, range, null)), - tableHandle.getSchemaRegistryGroupId()); - }; - } - - Supplier groupedReaderSplitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier, - final int numSplits) - { - StreamCutRange first = streamCutSupplier.get(); - StreamCutRange last = null; - do { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - break; - } - last = range; - } while (true); - - if (last == null) { - throw new IllegalStateException("no end split"); - } - StreamCutRange range = new StreamCutRange(first.getStart(), last.getEnd()); - - log.info(readerType + " split " + range); - String readerGroup = UUID.randomUUID().toString(); - - final ReaderArgs readerArgs = - new ReaderArgs(tableHandle.getSchemaName(), stream, range, readerGroup); - ReaderGroupConfig config = - ReaderGroupConfig.builder() - .stream(scopedName(readerArgs.getScope(), readerArgs.getStream()), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()) - .build(); - - log.info("create reader group " + readerGroup); - streamReaderManager.readerGroupManager( - tableHandle.getSchemaName()).createReaderGroup(readerGroup, config); - - final AtomicInteger splitCounter = new AtomicInteger(); - - return () -> { - if (splitCounter.getAndIncrement() == numSplits) { - return null; - } - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(readerArgs), - tableHandle.getSchemaRegistryGroupId()); - }; - } - - Supplier segmentPerSplitSupplier(final ReaderType readerType, - final PravegaTableHandle tableHandle, - final String stream, - final StreamCutSupplier streamCutSupplier) - { - final AtomicReference> iterator = new AtomicReference<>(); - - return () -> { - if (iterator.get() == null || !iterator.get().hasNext()) { - StreamCutRange range = streamCutSupplier.get(); - if (range == null) { - return null; - } - log.info(readerType + " split " + range); - iterator.set(streamReaderManager.getSegments(tableHandle.getSchemaName(), stream, range.getStart(), range.getEnd()).getIterator()); - if (iterator.get() == null || !iterator.get().hasNext()) { - log.info("no more splits"); - return null; - } - } - - SegmentRange segmentRange = iterator.get().next(); - log.info(readerType + " split " + segmentRange); - - return new PravegaSplit( - connectorId, - ObjectType.STREAM, - Collections.singletonList(tableHandle.getSchema().get(0)), - readerType, - serialize(segmentRange), - tableHandle.getSchemaRegistryGroupId()); - }; + return new PravegaSplitSource(connectorId, + pravegaTableHandle, + sourceStreams, + streamReaderManager, + properties); } } diff --git a/src/main/java/com/facebook/presto/pravega/PravegaSplitSource.java b/src/main/java/com/facebook/presto/pravega/PravegaSplitSource.java new file mode 100644 index 0000000..50c0bdb --- /dev/null +++ b/src/main/java/com/facebook/presto/pravega/PravegaSplitSource.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.pravega; + +import com.facebook.airlift.concurrent.MoreFutures; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.batch.SegmentRange; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.pravega.util.PravegaSerializationUtils.serialize; + +public class PravegaSplitSource + implements ConnectorSplitSource +{ + private static final Logger log = Logger.get(PravegaSplitSource.class); + + private ListeningScheduledExecutorService tp = + MoreExecutors.listeningDecorator(new ScheduledThreadPoolExecutor(2)); + + private volatile boolean allDone; + + private BlockingQueue queue; + + private final String connectorId; + + private final PravegaTableHandle tableHandle; + + private final List streams; + + private final PravegaSegmentManager segmentManager; + + private final PravegaProperties properties; + + private final ListenableFuture loader; + + private int batchCalls; + private int total; + private int maxReturned; + private int minReturned = Integer.MAX_VALUE; + + private class AsyncLoader + implements Callable + { + @Override + public Boolean call() + { + log.info("start asyncLoader, " + streams.size() + " stream(s)"); + + try (StreamManager streamManager = segmentManager.getStreamManager()) { + SegmentRangeTable segmentRangeTable = + new SegmentRangeTable(streamManager, + segmentManager.batchClientFactory(tableHandle.getSchemaName()), + tableHandle.getSchemaName()); + for (String stream : streams) { + log.info("start loading ranges for stream " + stream); + segmentRangeTable.load(stream, queue, properties); + } + } + log.info("done loading ranges for all streams"); + allDone = true; + return true; + } + } + + public PravegaSplitSource(String connectorId, + PravegaTableHandle tableHandle, + List streams, + PravegaSegmentManager segmentManager, + PravegaProperties properties) + { + this.connectorId = connectorId; + this.tableHandle = tableHandle; + this.streams = streams; + this.segmentManager = segmentManager; + this.properties = properties; + + this.queue = new LinkedBlockingQueue<>(); + this.loader = tp.submit(new AsyncLoader()); + } + + @Override + public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) + { + batchCalls++; + + ListenableFuture future = tp.submit(() -> { + log.debug("load next " + maxSize + " splits"); + + ArrayList ranges = new ArrayList<>(); + while (!allDone) { + SegmentRange first = queue.poll(500, TimeUnit.MILLISECONDS); + if (first != null) { + ranges.add(first); + break; + } + } + queue.drainTo(ranges, maxSize - 1); + + log.debug("got " + ranges.size() + " segmentRanges from loader"); + + ArrayList results = new ArrayList<>(); + for (SegmentRange segmentRange : ranges) { + PravegaSplit split = new PravegaSplit( + connectorId, + tableHandle.getObjectType(), + Collections.singletonList(tableHandle.getSchema().get(0)), + ReaderType.SEGMENT_RANGE_PER_SPLIT, + serialize(segmentRange), + tableHandle.getSchemaRegistryGroupId()); + results.add(split); + } + + if (ranges.size() > maxReturned) { + maxReturned = ranges.size(); + } + else if (ranges.size() < minReturned) { + minReturned = ranges.size(); + } + total += ranges.size(); + return new ConnectorSplitBatch(results, isFinished()); + }); + return MoreFutures.toCompletableFuture(future); + } + + @Override + public void close() + { + log.info(Arrays.toString(streams.toArray()) + + " [calls: " + batchCalls + " min: " + minReturned + " max: " + maxReturned + " total: " + total + "]"); + loader.cancel(true); + tp.shutdownNow(); + } + + @Override + public boolean isFinished() + { + return allDone && queue.isEmpty(); + } +} diff --git a/src/main/java/com/facebook/presto/pravega/ReaderType.java b/src/main/java/com/facebook/presto/pravega/ReaderType.java index 174035f..c579e1e 100644 --- a/src/main/java/com/facebook/presto/pravega/ReaderType.java +++ b/src/main/java/com/facebook/presto/pravega/ReaderType.java @@ -18,9 +18,7 @@ public enum ReaderType { - SEGMENT_RANGE /* 1 split handles all segments within a stream cut */, - SEGMENT_RANGE_PER_SPLIT /* segments for stream cut are given out to different splits */, - EVENT_STREAM /* stream oriented reading (vs. segments) */, - SINGLE_GROUP_EVENT_STREAM /* stream oriented reading (vs. segments) all readers in same group */, + STREAM_CUT_RANGE /* split will handle all segment ranges within a particular stream cut */, + SEGMENT_RANGE_PER_SPLIT /* split will handle a single segment range */, KVT /* key value table */, } diff --git a/src/main/java/com/facebook/presto/pravega/SegmentEventIterator.java b/src/main/java/com/facebook/presto/pravega/SegmentEventIterator.java deleted file mode 100644 index 67876c6..0000000 --- a/src/main/java/com/facebook/presto/pravega/SegmentEventIterator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.facebook.presto.pravega; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.pravega.decoder.BytesEvent; -import com.facebook.presto.pravega.decoder.DecodableEvent; -import io.pravega.client.batch.SegmentIterator; -import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.impl.ByteBufferSerializer; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class SegmentEventIterator - implements Iterator -{ - private static final Logger log = Logger.get(SegmentEventIterator.class); - - private final SegmentIterator segmentEventIterator; - - public SegmentEventIterator(PravegaSegmentManager segmentManager, SegmentRange segmentRange) - { - log.info("open iterator for " + segmentRange); - this.segmentEventIterator = segmentManager.getSegmentIterator(segmentRange, new ByteBufferSerializer()); - } - - @Override - public boolean hasNext() - { - return segmentEventIterator.hasNext(); - } - - @Override - public DecodableEvent next() - { - return new BytesEvent(segmentEventIterator.next()); - } -} diff --git a/src/main/java/com/facebook/presto/pravega/SegmentRangeIterator.java b/src/main/java/com/facebook/presto/pravega/SegmentRangeIterator.java index 408eba0..d9165a3 100644 --- a/src/main/java/com/facebook/presto/pravega/SegmentRangeIterator.java +++ b/src/main/java/com/facebook/presto/pravega/SegmentRangeIterator.java @@ -31,75 +31,26 @@ public class SegmentRangeIterator { private static final Logger log = Logger.get(SegmentRangeIterator.class); - private final PravegaSegmentManager segmentManager; + private final SegmentIterator segmentEventIterator; - private final Iterator segmentIterator; + private SegmentRange segmentRange; - private SegmentIterator segmentEventIterator; - - private ByteBuffer event; - - private final StreamCutRange streamCutRange; - - private int fullSegments; - - private int emptySegments; - - private int events; - - public SegmentRangeIterator(PravegaSegmentManager segmentManager, ReaderArgs readerArgs) + public SegmentRangeIterator(PravegaSegmentManager segmentManager, SegmentRange segmentRange) { - this.segmentManager = segmentManager; - - this.streamCutRange = readerArgs.getStreamCutRange(); - - log.info("open iterator for " + streamCutRange); - this.segmentIterator = - segmentManager.getSegments(readerArgs.getScope(), - readerArgs.getStream(), - readerArgs.getStreamCutRange().getStart(), - readerArgs.getStreamCutRange().getEnd()).getIterator(); - } - - private ByteBuffer _next() - { - if (segmentEventIterator != null && segmentEventIterator.hasNext()) { - events++; - return segmentEventIterator.next(); - } - - do { - if (!segmentIterator.hasNext()) { - log.info("done with " + streamCutRange + "; full: " + fullSegments + ", empty: " + emptySegments + ", events: " + events); - return null; - } - - segmentEventIterator = segmentManager.getSegmentIterator(segmentIterator.next(), new ByteBufferSerializer()); - log.info("next segment " + streamCutRange + " has event? " + segmentEventIterator.hasNext()); - if (segmentEventIterator.hasNext()) { - fullSegments++; - events++; - return segmentEventIterator.next(); - } - emptySegments++; - // maybe segment was empty, continue - } while (true); + log.info("open iterator for " + segmentRange); + this.segmentEventIterator = segmentManager.getSegmentIterator(segmentRange, new ByteBufferSerializer()); + this.segmentRange = segmentRange; } @Override public boolean hasNext() { - if (event == null) { - event = _next(); - } - return event != null; + return segmentEventIterator.hasNext(); } @Override public DecodableEvent next() { - ByteBuffer toReturn = event; - event = null; - return new BytesEvent(toReturn); + return new BytesEvent(segmentEventIterator.next()); } } diff --git a/src/main/java/com/facebook/presto/pravega/SegmentRangeTable.java b/src/main/java/com/facebook/presto/pravega/SegmentRangeTable.java new file mode 100644 index 0000000..69d6510 --- /dev/null +++ b/src/main/java/com/facebook/presto/pravega/SegmentRangeTable.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.pravega; + +import com.facebook.airlift.log.Logger; +import io.pravega.client.BatchClientFactory; +import io.pravega.client.admin.StreamInfo; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.batch.SegmentIterator; +import io.pravega.client.batch.SegmentRange; +import io.pravega.client.batch.impl.SegmentRangeImpl; +import io.pravega.client.segment.impl.Segment; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamCut; +import io.pravega.client.stream.impl.StreamCutImpl; +import io.pravega.client.watermark.WatermarkSerializer; +import io.pravega.shared.NameUtils; +import io.pravega.shared.watermarks.Watermark; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Queue; + +public class SegmentRangeTable +{ + private static final Logger log = Logger.get(SegmentRangeTable.class); + + private final StreamManager streamManager; + + private final BatchClientFactory batchClient; + + private final String scope; + + public SegmentRangeTable(final StreamManager streamManager, final BatchClientFactory batchClient, final String scope) + { + this.streamManager = streamManager; + this.batchClient = batchClient; + this.scope = scope; + } + + public void load(final String streamName, Queue queue, final PravegaProperties properties) + { + Stream stream = Stream.of(scope, streamName); + log.info("load SegmentRanges for " + stream); + + if (!streamManager.checkStreamExists(stream.getScope(), NameUtils.getMarkStreamForStream(stream.getStreamName()))) { + log.info("watermark stream for " + stream + " does not exist, return unbounded segments"); + loadUnboundedSegmentRanges(stream, queue); + return; // early exit!! + } + + Iterator iterator = + batchClient.getSegments(Stream.of(stream.getScope(), + NameUtils.getMarkStreamForStream(stream.getStreamName())), null, null).getIterator(); + + if (!iterator.hasNext()) { + log.info("no segments in watermark stream for " + stream + ", return unbounded segments"); + loadUnboundedSegmentRanges(stream, queue); + return; // early exit!! + } + + SegmentIterator segmentIterator = + batchClient.readSegment(iterator.next(), new WatermarkSerializer()); + + if (iterator.hasNext()) { + // does watermark stream have > 1 segment? + throw new IllegalStateException("> 1 segment not handled"); + } + + if (!segmentIterator.hasNext()) { + log.info("empty watermark stream for " + stream + ", return unbounded segments"); + loadUnboundedSegmentRanges(stream, queue); + return; // early exit!! + } + + // our target split size + int desiredSegmentRangeSizeBytes = properties.getSegmentRangeSplitSizeBytes(); + // so we don't have to create Segment over+again + final Map lookup = new HashMap<>(); + // 'start' offset in a split/segment + final Map baseline = new HashMap<>(); + + StreamInfo streamInfo = streamManager.getStreamInfo(stream.getScope(), stream.getStreamName()); + + // for each head cut, put to baseline map + streamInfo.getHeadStreamCut().asImpl().getPositions().forEach((segment, offset) -> { + lookup.put(segment.getSegmentId(), segment); + baseline.put(segment.getSegmentId(), new StreamCutImpl(stream, segment, offset)); + }); + + // for each watermark + while (segmentIterator.hasNext()) { + // for each segment,offset within the watermark + // compare offset against our baseline for the segment + // if exceeds split size, that is our SegmentRange. update baseline. + segmentIterator.next().getStreamCut().forEach((swr, offset) -> { + Segment segment = lookup.get(swr.getSegmentId()); + if (segment == null) { + // scale up + // insert first position by looking up starting offset for the segment + segment = lookup.computeIfAbsent(swr.getSegmentId(), + k -> new Segment(stream.getScope(), stream.getStreamName(), swr.getSegmentId())); + baseline.put(segment.getSegmentId(), firstStreamCut(segment)); + } + else if (distance(segment, baseline.get(segment.getSegmentId()), offset) >= desiredSegmentRangeSizeBytes) { + // size exceeded, this is a split + StreamCut sc = new StreamCutImpl(stream, segment, offset); + queue.add(constructRange(segment, baseline.get(segment.getSegmentId()), sc)); + baseline.put(segment.getSegmentId(), sc); + } + }); + } + + // end of watermarks - finish up by creating last splits regardless of size + + // refresh tail stream cuts. it could be the case that stream was being written to + // in which case watermarks would have taken us past our previous tail stream cut view taken at start + streamInfo = streamManager.getStreamInfo(stream.getScope(), stream.getStreamName()); + streamInfo.getTailStreamCut().asImpl().getPositions().forEach((segment, offset) -> + queue.add(constructRange(segment, baseline.remove(segment.getSegmentId()), new StreamCutImpl(stream, segment, offset)))); + + // we removed above while iterating tail stream cuts - so these don't exist in tail + baseline.forEach((sid, sc) -> { + // scale down + queue.add(constructRange(lookup.get(sid), sc, null)); + }); + } + + private StreamCut firstStreamCut(Segment segment) + { + return new StreamCutImpl(segment.getStream(), segment, batchClient.currentSegmentRange(segment).getStartOffset()); + } + + private SegmentRange constructRange(Segment segment, StreamCut start, StreamCut end) + { + if (start == null || end == null) { + SegmentRange range = batchClient.currentSegmentRange(segment); + if (start == null) { + start = new StreamCutImpl(segment.getStream(), segment, range.getStartOffset()); + } + if (end == null) { + end = new StreamCutImpl(segment.getStream(), segment, range.getEndOffset()); + } + } + + return SegmentRangeImpl.fromStreamCuts(segment, start, end); + } + + long distance(Segment segment, StreamCut start, long endOffset) + { + Long startOffset = start.asImpl().getPositions().getOrDefault(segment, -1L); + if (startOffset < 0 || endOffset <= startOffset) { + throw new IllegalStateException(start + " " + endOffset + " " + segment); + } + return endOffset - startOffset; + } + + private void loadUnboundedSegmentRanges(Stream stream, Queue queue) + { + Iterator iterator = + batchClient.getSegments(Stream.of(stream.getScope(), stream.getStreamName()), null, null).getIterator(); + while (iterator.hasNext()) { + queue.add(iterator.next()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/facebook/presto/pravega/StreamCutRangeIterator.java b/src/main/java/com/facebook/presto/pravega/StreamCutRangeIterator.java new file mode 100644 index 0000000..48457fd --- /dev/null +++ b/src/main/java/com/facebook/presto/pravega/StreamCutRangeIterator.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.pravega; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.pravega.decoder.BytesEvent; +import com.facebook.presto.pravega.decoder.DecodableEvent; +import io.pravega.client.batch.SegmentIterator; +import io.pravega.client.batch.SegmentRange; +import io.pravega.client.stream.impl.ByteBufferSerializer; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +public class StreamCutRangeIterator + implements Iterator +{ + private static final Logger log = Logger.get(SegmentRangeIterator.class); + + private final PravegaSegmentManager segmentManager; + + private final Iterator segmentIterator; + + private SegmentIterator segmentEventIterator; + + private ByteBuffer event; + + private final StreamCutRange streamCutRange; + + private int fullSegments; + + private int emptySegments; + + private int events; + + public StreamCutRangeIterator(PravegaSegmentManager segmentManager, StreamCutRangeReaderArgs streamCutRangeReaderArgs) + { + this.segmentManager = segmentManager; + + this.streamCutRange = streamCutRangeReaderArgs.getStreamCutRange(); + + log.info("open iterator for " + streamCutRange); + this.segmentIterator = + segmentManager.getSegments(streamCutRangeReaderArgs.getScope(), + streamCutRangeReaderArgs.getStream(), + streamCutRangeReaderArgs.getStreamCutRange().getStart(), + streamCutRangeReaderArgs.getStreamCutRange().getEnd()).getIterator(); + } + + private ByteBuffer _next() + { + if (segmentEventIterator != null && segmentEventIterator.hasNext()) { + events++; + return segmentEventIterator.next(); + } + + do { + if (!segmentIterator.hasNext()) { + log.info("done with " + streamCutRange + "; full: " + fullSegments + ", empty: " + emptySegments + ", events: " + events); + return null; + } + + segmentEventIterator = segmentManager.getSegmentIterator(segmentIterator.next(), new ByteBufferSerializer()); + log.info("next segment " + streamCutRange + " has event? " + segmentEventIterator.hasNext()); + if (segmentEventIterator.hasNext()) { + fullSegments++; + events++; + return segmentEventIterator.next(); + } + emptySegments++; + // maybe segment was empty, continue + } while (true); + } + + @Override + public boolean hasNext() + { + if (event == null) { + event = _next(); + } + return event != null; + } + + @Override + public DecodableEvent next() + { + ByteBuffer toReturn = event; + event = null; + return new BytesEvent(toReturn); + } +} \ No newline at end of file diff --git a/src/main/java/com/facebook/presto/pravega/ReaderArgs.java b/src/main/java/com/facebook/presto/pravega/StreamCutRangeReaderArgs.java similarity index 76% rename from src/main/java/com/facebook/presto/pravega/ReaderArgs.java rename to src/main/java/com/facebook/presto/pravega/StreamCutRangeReaderArgs.java index fbfbc0b..1bd9a73 100644 --- a/src/main/java/com/facebook/presto/pravega/ReaderArgs.java +++ b/src/main/java/com/facebook/presto/pravega/StreamCutRangeReaderArgs.java @@ -23,23 +23,20 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class ReaderArgs +public class StreamCutRangeReaderArgs implements Serializable { private final String scope; private final String stream; private final StreamCutRange streamCutRange; - private final String readerGroup; - public ReaderArgs(@JsonProperty("scope") String scope, - @JsonProperty("stream") String stream, - @JsonProperty("streamCutRange") StreamCutRange streamCutRange, - @JsonProperty("readerGroup") String readerGroup) + public StreamCutRangeReaderArgs(@JsonProperty("scope") String scope, + @JsonProperty("stream") String stream, + @JsonProperty("streamCutRange") StreamCutRange streamCutRange) { this.scope = requireNonNull(scope, "scope is null"); this.stream = requireNonNull(stream, "stream is null"); this.streamCutRange = requireNonNull(streamCutRange, "streamCutRange is null"); - this.readerGroup = readerGroup; // may be null } @JsonProperty @@ -60,12 +57,6 @@ public StreamCutRange getStreamCutRange() return streamCutRange; } - @JsonProperty - public String getReaderGroup() - { - return readerGroup; - } - @Override public String toString() { @@ -73,7 +64,6 @@ public String toString() .add("scope", scope) .add("stream", stream) .add("streamCutRange", streamCutRange) - .add("readerGroup", readerGroup) .toString(); } } diff --git a/src/main/java/com/facebook/presto/pravega/StreamCutSupplier.java b/src/main/java/com/facebook/presto/pravega/StreamCutSupplier.java deleted file mode 100644 index 940b306..0000000 --- a/src/main/java/com/facebook/presto/pravega/StreamCutSupplier.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) Pravega Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.facebook.presto.pravega; - -import io.pravega.client.batch.SegmentIterator; -import io.pravega.client.batch.SegmentRange; -import io.pravega.client.stream.StreamCut; -import io.pravega.client.stream.impl.ByteBufferSerializer; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -import static com.facebook.presto.pravega.util.PravegaNameUtils.streamCutName; - -public class StreamCutSupplier - implements AutoCloseable -{ - private PravegaSegmentManager segmentManager; - - private Iterator rangeIterator; - - private SegmentIterator segmentIterator; - - private StreamCut previous; - - private boolean empty; - - public StreamCutSupplier(PravegaSegmentManager segmentManager, String scope, String stream) - { - if (segmentManager.streamExists(scope, streamCutName(stream))) { - // for now, read stream cuts from internal stream - // https://github.com/pravega/pravega-sql/issues/24 - this.segmentManager = segmentManager; - - this.rangeIterator = segmentManager.getSegments(scope, streamCutName(stream), null, null).getIterator(); - // init fist stream cut - this.previous = nextStreamCut(); - } - - if (this.previous == null) { - // either stream doesn't exist or no stream cuts logged - this.empty = true; - } - } - - private StreamCut nextStreamCut() - { - do { - if (segmentIterator != null && segmentIterator.hasNext()) { - return StreamCut.fromBytes(segmentIterator.next()); - } - - if (!rangeIterator.hasNext()) { - return null; - } - - segmentIterator = segmentManager.getSegmentIterator(rangeIterator.next(), - new ByteBufferSerializer()); - } while (true); - } - - private StreamCutRange next() - { - if (previous == null) { - return null; - } - - StreamCut start = previous; - StreamCut end = nextStreamCut(); - previous = end; - - // looking for explicitly defined start+end stream cuts - // so we return null when we have no end (vs. start->UNBOUNDED) - return previous == null ? null : new StreamCutRange(start, end); - } - - public StreamCutRange get() - { - if (empty) { - StreamCutRange range = StreamCutRange.NULL_PAIR; - empty = false; - return range; - } - return next(); - } - - @Override - public void close() - { - } -}