Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 51: remove dead code #52

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,8 @@

public class PravegaProperties
{
private static final String SESSION_READER_TYPE = "reader_type";

private static final String SESSION_CURSOR_DELIM_CHAR = "cursor_delim_char";

private static final String SESSION_GROUPED_EVENT_SPLITS = "grouped_event_splits";

private static final String SESSION_EVENT_READ_TIMEOUT_MS = "event_read_timeout_ms";

private final ConnectorSession session;

public PravegaProperties(final ConnectorSession session)
Expand All @@ -50,47 +44,11 @@ public static List<PropertyMetadata<?>> buildSessionProperties()
",",
false));

propertyMetadataList.add(
PropertyMetadata.stringProperty(
SESSION_READER_TYPE,
"reader type [event|grouped_event|segment_range|segment_range_per_split]",
"segment_range_per_split",
false));

propertyMetadataList.add(
PropertyMetadata.integerProperty(
SESSION_GROUPED_EVENT_SPLITS,
"number of splits when using grouped readers",
63,
false));

propertyMetadataList.add(
PropertyMetadata.integerProperty(
SESSION_EVENT_READ_TIMEOUT_MS,
"timeout in ms to readNextEvent()",
10000,
false));

return propertyMetadataList;
}

public String getCursorDelimChar()
{
return session.getProperty(SESSION_CURSOR_DELIM_CHAR, String.class);
}

public String getReaderType()
{
return session.getProperty(SESSION_READER_TYPE, String.class);
}

public int getGroupedEventSplits()
{
return session.getProperty(SESSION_GROUPED_EVENT_SPLITS, Integer.class);
}

public int getEventReadTimeoutMs()
{
return session.getProperty(SESSION_EVENT_READ_TIMEOUT_MS, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,8 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
Iterator<DecodableEvent> eventIterator;

switch (split.getReaderType()) {
case EVENT_STREAM:
case SINGLE_GROUP_EVENT_STREAM:
eventIterator = new EventStreamIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class), properties);
break;

case SEGMENT_RANGE:
eventIterator = new SegmentRangeIterator(segmentManager, deserialize(split.getReaderArgs(), ReaderArgs.class));
break;

case SEGMENT_RANGE_PER_SPLIT:
eventIterator = new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class));
break;

default:
throw new IllegalArgumentException("readerType " + split.getReaderType());
}
Iterator<DecodableEvent> eventIterator =
new SegmentEventIterator(segmentManager, deserialize(split.getReaderArgs(), SegmentRange.class));

return new PravegaRecordCursor(eventIterator, columnHandles, eventDecoder, properties, split.getSchema().get(0).getFormat());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,40 +137,6 @@ KeyValueTableFactory getKeyValueTableFactory(String scope)
return factory;
}

EventStreamClientFactory getEventStreamClientFactory(String scope)
{
EventStreamClientFactory factory = eventStreamClientFactoryMap.get(scope);
if (factory == null) {
synchronized (this) {
factory = eventStreamClientFactoryMap.get(scope);
if (factory == null) {
factory = EventStreamClientFactory.withScope(scope, clientConfig);
if (eventStreamClientFactoryMap.putIfAbsent(scope, factory) != null) {
throw new RuntimeException("unexpected concurrent create of event stream factory");
}
}
}
}
return factory;
}

ReaderGroupManager readerGroupManager(String scope)
{
ReaderGroupManager readerGroupManager = scopedReaderGroupManagerMap.get(scope);
if (readerGroupManager == null) {
synchronized (this) {
readerGroupManager = scopedReaderGroupManagerMap.get(scope);
if (readerGroupManager == null) {
readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig);
if (scopedReaderGroupManagerMap.putIfAbsent(scope, readerGroupManager) != null) {
throw new RuntimeException("unexpected concurrent create of reader group manager");
}
}
}
}
return readerGroupManager;
}

SerializerConfig serializerConfig(String groupId)
{
return SerializerConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,19 @@
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.stream.ReaderGroupConfig;

import javax.inject.Inject;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static io.pravega.connectors.presto.PravegaErrorCode.PRAVEGA_SPLIT_ERROR;
import static io.pravega.connectors.presto.PravegaHandleResolver.convertLayout;
import static io.pravega.connectors.presto.util.PravegaNameUtils.multiSourceStream;
import static io.pravega.connectors.presto.util.PravegaNameUtils.scopedName;
import static io.pravega.connectors.presto.util.PravegaSerializationUtils.serialize;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -89,7 +86,7 @@ public ConnectorSplitSource getSplits(

return new FixedSplitSource(splits.build());
}
catch (Exception e) { // Catch all exceptions because Pravega library is written in scala and checked exceptions are not declared in method signature.
catch (Exception e) {
if (e instanceof PrestoException) {
throw e;
}
Expand All @@ -101,17 +98,7 @@ public ConnectorSplitSource getSplits(

private static ReaderType readerType(PravegaProperties properties)
{
String type = properties.getReaderType();
switch (type) {
case "event":
return ReaderType.EVENT_STREAM;
case "grouped_event":
return ReaderType.SINGLE_GROUP_EVENT_STREAM;
case "segment_range_per_split":
return ReaderType.SEGMENT_RANGE_PER_SPLIT;
default:
return ReaderType.SEGMENT_RANGE;
}
return ReaderType.SEGMENT_RANGE_PER_SPLIT;
}

private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder<ConnectorSplit> splits)
Expand Down Expand Up @@ -149,25 +136,7 @@ private void buildStreamSplits(final PravegaProperties properties,
sourceStreams.forEach(stream -> {
StreamCutSupplier streamCutSupplier = new StreamCutSupplier(streamReaderManager, pravegaTableHandle.getSchemaName(), stream);

Supplier<PravegaSplit> splitSupplier;
log.info("get split supplier for " + readerType);
switch (readerType) {
case EVENT_STREAM:
case SEGMENT_RANGE:
splitSupplier = splitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier);
break;

case SINGLE_GROUP_EVENT_STREAM:
splitSupplier = groupedReaderSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier, properties.getGroupedEventSplits());
break;

case SEGMENT_RANGE_PER_SPLIT:
splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier);
break;

default:
throw new IllegalArgumentException("" + readerType);
}
Supplier<PravegaSplit> splitSupplier = segmentPerSplitSupplier(readerType, pravegaTableHandle, stream, streamCutSupplier);

PravegaSplit split = splitSupplier.get();
do {
Expand All @@ -180,82 +149,6 @@ private void buildStreamSplits(final PravegaProperties properties,
log.info("created " + splitCounter.get() + " stream splits of type " + readerType);
}

Supplier<PravegaSplit> splitSupplier(final ReaderType readerType,
final PravegaTableHandle tableHandle,
final String stream,
final StreamCutSupplier streamCutSupplier)
{
return () -> {
StreamCutRange range = streamCutSupplier.get();
if (range == null) {
return null;
}

log.info(readerType + " split " + range);

return new PravegaSplit(
connectorId,
ObjectType.STREAM,
Collections.singletonList(tableHandle.getSchema().get(0)),
readerType,
serialize(new ReaderArgs(tableHandle.getSchemaName(), stream, range, null)),
tableHandle.getSchemaRegistryGroupId());
};
}

Supplier<PravegaSplit> groupedReaderSplitSupplier(final ReaderType readerType,
final PravegaTableHandle tableHandle,
final String stream,
final StreamCutSupplier streamCutSupplier,
final int numSplits)
{
StreamCutRange first = streamCutSupplier.get();
StreamCutRange last = null;
do {
StreamCutRange range = streamCutSupplier.get();
if (range == null) {
break;
}
last = range;
} while (true);

if (last == null) {
throw new IllegalStateException("no end split");
}
StreamCutRange range = new StreamCutRange(first.getStart(), last.getEnd());

log.info(readerType + " split " + range);
String readerGroup = UUID.randomUUID().toString();

final ReaderArgs readerArgs =
new ReaderArgs(tableHandle.getSchemaName(), stream, range, readerGroup);
ReaderGroupConfig config =
ReaderGroupConfig.builder()
.stream(scopedName(readerArgs.getScope(), readerArgs.getStream()),
readerArgs.getStreamCutRange().getStart(),
readerArgs.getStreamCutRange().getEnd())
.build();

log.info("create reader group " + readerGroup);
streamReaderManager.readerGroupManager(
tableHandle.getSchemaName()).createReaderGroup(readerGroup, config);

final AtomicInteger splitCounter = new AtomicInteger();

return () -> {
if (splitCounter.getAndIncrement() == numSplits) {
return null;
}
return new PravegaSplit(
connectorId,
ObjectType.STREAM,
Collections.singletonList(tableHandle.getSchema().get(0)),
readerType,
serialize(readerArgs),
tableHandle.getSchemaRegistryGroupId());
};
}

Supplier<PravegaSplit> segmentPerSplitSupplier(final ReaderType readerType,
final PravegaTableHandle tableHandle,
final String stream,
Expand Down
Loading