From c125c3413c19bc6bb69e5f16e4c6f769062ff1cf Mon Sep 17 00:00:00 2001 From: Arjun Mehta Date: Thu, 17 Nov 2016 13:55:24 -0500 Subject: [PATCH] Added nosql and dynamo plugins --- pom.xml | 2 + presto-base-plugin-nosql/pom.xml | 122 +++++++++++ .../presto/baseplugin/BaseColumnHandle.java | 94 ++++++++ .../presto/baseplugin/BaseConfig.java | 104 +++++++++ .../presto/baseplugin/BaseConnector.java | 77 +++++++ .../baseplugin/BaseConnectorFactory.java | 92 ++++++++ .../presto/baseplugin/BaseConnectorInfo.java | 31 +++ .../presto/baseplugin/BaseHandleResolver.java | 43 ++++ .../presto/baseplugin/BaseMetadata.java | 117 ++++++++++ .../presto/baseplugin/BaseModule.java | 91 ++++++++ .../presto/baseplugin/BasePlugin.java | 80 +++++++ .../presto/baseplugin/BaseProvider.java | 118 +++++++++++ .../facebook/presto/baseplugin/BaseQuery.java | 76 +++++++ .../presto/baseplugin/BaseRecordCursor.java | 119 +++++++++++ .../presto/baseplugin/BaseRecordSet.java | 41 ++++ .../baseplugin/BaseRecordSetProvider.java | 34 +++ .../facebook/presto/baseplugin/BaseSplit.java | 79 +++++++ .../presto/baseplugin/BaseSplitManager.java | 140 ++++++++++++ .../presto/baseplugin/BaseTableHandle.java | 59 ++++++ .../baseplugin/BaseTableLayoutHandle.java | 44 ++++ .../baseplugin/BaseTransactionHandle.java | 10 + .../facebook/presto/baseplugin/BaseUtils.java | 75 +++++++ .../presto/baseplugin/cache/BaseRecord.java | 23 ++ .../baseplugin/cache/BaseRecordBatch.java | 35 +++ .../logic/BasePluginLogicDefinition.java | 10 + .../baseplugin/metastore/BaseMetastore.java | 137 ++++++++++++ .../metastore/BaseMetastoreDefinition.java | 24 +++ .../predicate/BaseComparisonOperator.java | 16 ++ .../baseplugin/predicate/BasePredicate.java | 160 ++++++++++++++ presto-dynamo/pom.xml | 101 +++++++++ .../facebook/presto/dynamo/DynamoConfig.java | 97 +++++++++ .../facebook/presto/dynamo/DynamoIndex.java | 91 ++++++++ .../facebook/presto/dynamo/DynamoPlugin.java | 17 ++ .../presto/dynamo/DynamoProvider.java | 200 ++++++++++++++++++ .../facebook/presto/dynamo/DynamoQuery.java | 44 ++++ .../facebook/presto/dynamo/DynamoRecord.java | 51 +++++ .../presto/dynamo/DynamoRecordBatch.java | 24 +++ .../dynamo/DynamoRecordSetProvider.java | 31 +++ .../presto/dynamo/DynamoSplitManager.java | 122 +++++++++++ .../facebook/presto/dynamo/DynamoUtils.java | 71 +++++++ 40 files changed, 2902 insertions(+) create mode 100644 presto-base-plugin-nosql/pom.xml create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecord.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecordBatch.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/logic/BasePluginLogicDefinition.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastore.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastoreDefinition.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BaseComparisonOperator.java create mode 100644 presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BasePredicate.java create mode 100644 presto-dynamo/pom.xml create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoConfig.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoIndex.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoPlugin.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoProvider.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoQuery.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecord.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordBatch.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordSetProvider.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoSplitManager.java create mode 100644 presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoUtils.java diff --git a/pom.xml b/pom.xml index 4608fe8a918f..713f72393ccc 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,8 @@ presto-testing-server-launcher presto-plugin-toolkit presto-resource-group-managers + presto-base-plugin-nosql + presto-dynamo diff --git a/presto-base-plugin-nosql/pom.xml b/presto-base-plugin-nosql/pom.xml new file mode 100644 index 000000000000..df5a39ce3d62 --- /dev/null +++ b/presto-base-plugin-nosql/pom.xml @@ -0,0 +1,122 @@ + + + + presto-root + com.facebook.presto + 0.156 + + 4.0.0 + + presto-base-plugin + Presto - Base plugin + + + true + true + + false + true + + + + + com.h2database + h2 + + + + io.airlift + bootstrap + + + + io.airlift + log + + + + io.airlift + configuration + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + + com.facebook.presto + presto-spi + provided + + + + io.airlift + slice + provided + + + + javax.inject + javax.inject + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + org.weakref + jmxutils + + + + org.apache.commons + commons-lang3 + 3.4 + + + + + com.facebook.presto + presto-main + test + + + + org.testng + testng + test + + + + io.airlift + testing + test + + + + \ No newline at end of file diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java new file mode 100644 index 000000000000..dabf0b88c287 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java @@ -0,0 +1,94 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseColumnHandle implements ColumnHandle { + private final String columnName; + private final Type columnType; + private final int ordinalPosition; + + @JsonCreator + public BaseColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("ordinalPosition") int ordinalPosition) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.columnType = requireNonNull(columnType, "columnType is null"); + this.ordinalPosition = ordinalPosition; + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + @JsonProperty + public int getOrdinalPosition() + { + return ordinalPosition; + } + + public ColumnMetadata toColumnMetadata() + { + return new ColumnMetadata(columnName, columnType); + } + + @Override + public boolean equals(Object o) + { + if(o == this){ + return true; + } + if(o instanceof BaseColumnHandle){ + BaseColumnHandle baseColumnHandle = (BaseColumnHandle) o; + return new EqualsBuilder() + .append(columnName, baseColumnHandle.columnName) + .append(columnType, baseColumnHandle.columnType) + .append(ordinalPosition, baseColumnHandle.ordinalPosition) + .isEquals(); + } + return false; + } + + @Override + public int hashCode() + { + return new HashCodeBuilder() + .append(columnName) + .append(columnType) + .append(ordinalPosition) + .toHashCode(); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("columnType", columnType) + .add("ordinalPosition", ordinalPosition) + .toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java new file mode 100644 index 000000000000..a9bcfbc98c8f --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java @@ -0,0 +1,104 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ConnectorSession; +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import java.util.Optional; +import java.util.Properties; + +/** + * Created by amehta on 6/13/16. + */ +public abstract class BaseConfig { + private final Properties properties; + + public BaseConfig(){ + this.properties = new Properties(); + + setProperty("basePlugin.cacheEnabled", false); + setProperty("basePlugin.metastoreJdbcUrl", "jdbc:h2:~/h2/presto"); + setProperty("basePlugin.metastoreUsername", "sa"); + setProperty("basePlugin.metastorePassword", "sa"); + setProperty("basePlugin.defaultSchemaName", "default_schema"); + } + + protected void setProperty(String propertyName, Object value){ + this.properties.setProperty(propertyName, value.toString()); + } + + protected T getProperty(String propertyName, Class clazz, Optional session){ + //return session.isPresent() ? BaseUtils.getPropertyFromSessionConfig(propertyName, clazz, session.get(), this) : BaseUtils.getPropertyFromMap(propertyName, clazz, properties); + return BaseUtils.getPropertyFromMap(propertyName, clazz, properties); + } + + @Config("basePlugin.cacheEnabled") + @ConfigDescription("Whether the cache is enabled for this plugin") + public void setCacheEnabled(Boolean cacheEnabled){ + setProperty("basePlugin.cacheEnabled", cacheEnabled); + } + + public Boolean getCacheEnabled(Optional session){ + return getProperty("basePlugin.cacheEnabled", Boolean.class, session); + } + + public Boolean getCacheEnabled(){ + return getCacheEnabled(Optional.empty()); + } + + @Config("basePlugin.metastoreJdbcUrl") + @ConfigDescription("the location of the metadata store") + public void setMetastoreJdbcUrl(String metastoreJdbcUrl){ + setProperty("basePlugin.metastoreJdbcUrl", metastoreJdbcUrl); + } + + public String getMetastoreJdbcUrl(Optional session){ + return getProperty("basePlugin.metastoreJdbcUrl", String.class, session); + } + + public String getMetastoreJdbcUrl(){ + return getMetastoreJdbcUrl(Optional.empty()); + } + + @Config("basePlugin.metastoreUsername") + @ConfigDescription("the username for connecting to the metadata store") + public void setMetastoreUsername(String metastoreUsername){ + setProperty("basePlugin.metastoreUsername", metastoreUsername); + } + + public String getMetastoreUsername(Optional session){ + return getProperty("basePlugin.metastoreUsername", String.class, session); + } + + public String getMetastoreUsername(){ + return getMetastoreUsername(Optional.empty()); + } + + @Config("basePlugin.metastorePassword") + @ConfigDescription("the password for connecting to the metadata store") + public void setMetastorePassword(String metastorePassword){ + setProperty("basePlugin.metastorePassword", metastorePassword); + } + + public String getMetastorePassword(Optional session){ + return getProperty("basePlugin.metastorePassword", String.class, session); + } + + public String getMetastorePassword(){ + return getMetastorePassword(Optional.empty()); + } + + @Config("basePlugin.defaultSchemaName") + @ConfigDescription("the name of the default schema for datasources that don't have schemas") + public void setDefaultSchemaName(String defaultSchemaName){ + setProperty("basePlugin.defaultSchemaName", defaultSchemaName); + } + + public String getDefaultSchemaName(Optional session){ + return getProperty("basePlugin.defaultSchemaName", String.class, session); + } + + public String getDefaultSchemaName(){ + return getDefaultSchemaName(Optional.empty()); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java new file mode 100644 index 000000000000..7a10df163dbb --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java @@ -0,0 +1,77 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.transaction.IsolationLevel; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.log.Logger; + +import javax.inject.Inject; + +import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED; +import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseConnector implements Connector{ + private final Logger log = Logger.get(this.getClass()); + + private final LifeCycleManager lifeCycleManager; + private final BaseMetadata metadata; + private final BaseSplitManager splitManager; + private final BaseRecordSetProvider recordSetProvider; + + @Inject + public BaseConnector( + LifeCycleManager lifeCycleManager, + BaseMetadata metadata, + BaseSplitManager splitManager, + BaseRecordSetProvider recordSetProvider) + { + this.recordSetProvider = recordSetProvider; + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + checkConnectorSupports(READ_COMMITTED, isolationLevel); + return BaseTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return recordSetProvider; + } + + @Override + public final void shutdown() + { + try { + lifeCycleManager.stop(); + } + catch (Exception e) { + log.error(e, "Error shutting down connector"); + } + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java new file mode 100644 index 000000000000..133089b34e48 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java @@ -0,0 +1,92 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.base.Throwables; +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseConnectorFactory implements ConnectorFactory{ + private final String name; + + private final Class baseConfigClass; + private final Class baseConnectorClass; + private final Class baseMetadataClass; + private final Class baseSplitManagerClass; + private final Class baseRecordSetProviderClass; + private final Class baseHandleResolverClass; + private final Class baseProviderClass; + + public BaseConnectorFactory( + String name, + Class baseConfigClass, + Class baseConnectorClass, + Class baseMetadataClass, + Class baseSplitManagerClass, + Class baseRecordSetProviderClass, + Class baseHandleResolverClass, + Class baseProviderClass + ) { + this.name = requireNonNull(name, "connectorFactory name is null"); + + this.baseConfigClass = requireNonNull(baseConfigClass, "baseConfig is null"); + this.baseConnectorClass = requireNonNull(baseConnectorClass, "baseConnector is null"); + this.baseMetadataClass = requireNonNull(baseMetadataClass, "baseMetadata is null"); + this.baseSplitManagerClass = requireNonNull(baseSplitManagerClass, "baseSplitManager is null"); + this.baseRecordSetProviderClass = requireNonNull(baseRecordSetProviderClass, "baseRecordSetProvider is null"); + this.baseHandleResolverClass = requireNonNull(baseHandleResolverClass, "baseHandleResolver is null"); + this.baseProviderClass = requireNonNull(baseProviderClass, "baseProvider is null"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new BaseHandleResolver(); + } + + @Override + public Connector create(String connectorId, Map config, ConnectorContext connectorContext) + { + requireNonNull(config, "config is null"); + + try { + Bootstrap app = new Bootstrap( + new BaseModule( + connectorContext.getNodeManager(), + new BaseConnectorInfo(connectorId), + baseConfigClass, + baseConnectorClass, + baseMetadataClass, + baseSplitManagerClass, + baseRecordSetProviderClass, + baseHandleResolverClass, + baseProviderClass)); + + Injector injector = app + .strictConfig() + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + return injector.getInstance(BaseConnector.class); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java new file mode 100644 index 000000000000..9b1e13a9badf --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java @@ -0,0 +1,31 @@ +package com.facebook.presto.baseplugin; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/14/16. + */ +public class BaseConnectorInfo { + private final String connectorId; + + @JsonCreator + public BaseConnectorInfo( + @JsonProperty("connectorId") String connectorId + ){ + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + } + + @JsonProperty + public String getConnectorId() { + return connectorId; + } + + @Override + public String toString() { + return toStringHelper(this).add("connectorId", connectorId).toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java new file mode 100644 index 000000000000..df95e99a96c4 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java @@ -0,0 +1,43 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseHandleResolver implements ConnectorHandleResolver { + @Override + public Class getTableLayoutHandleClass() + { + return BaseTableLayoutHandle.class; + } + + @Override + public Class getTableHandleClass() + { + return BaseTableHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return BaseColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return BaseSplit.class; + } + + @Override + public Class getTransactionHandleClass() + { + return BaseTransactionHandle.class; + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java new file mode 100644 index 000000000000..5e64d27a899a --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java @@ -0,0 +1,117 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.baseplugin.metastore.BaseMetastore; +import com.facebook.presto.baseplugin.metastore.BaseMetastoreDefinition; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.ConnectorViewDefinition; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseMetadata implements ConnectorMetadata { + private final BaseProvider baseProvider; + private final BaseMetastoreDefinition baseMetastoreDefinition; + + @Inject + public BaseMetadata(BaseProvider baseProvider, BaseConnectorInfo baseConnectorInfo){ + this.baseProvider = requireNonNull(baseProvider, "baseProvider is null"); + this.baseMetastoreDefinition = new BaseMetastore(baseConnectorInfo, baseProvider.getConfig()); + } + + ////////////////////////////////////////USE BASE PROVIDER//////////////////////////////// + + @Override + public List listSchemaNames(ConnectorSession session) { + return baseProvider.listSchemaNames(session); + } + + @Override + public List listTables(ConnectorSession session, String schemaNameOrNull) { + return baseProvider.listTableNames(session, Optional.ofNullable(schemaNameOrNull)); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { + return baseProvider.getMappedColumnHandlesForSchemaTable(session, ((BaseTableHandle) tableHandle).getSchemaTableName()); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { + SchemaTableName schemaTableName = ((BaseTableHandle) table).getSchemaTableName(); + return new ConnectorTableMetadata(schemaTableName, baseProvider.listColumnMetadata(session, schemaTableName)); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { + SchemaTableName schemaTableName = new SchemaTableName(Optional.ofNullable(prefix.getSchemaName()).orElse("default_schema"), Optional.ofNullable(prefix.getTableName()).orElse("columns")); + return ImmutableMap.of(schemaTableName, baseProvider.listColumnMetadata(session, schemaTableName)); + } + + ////////////////////////////////GENERICALLY HANDLED//////////////////////////////// + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { + return new BaseTableHandle(tableName); + } + + @Override + public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) { + return ImmutableList.of(new ConnectorTableLayoutResult(new ConnectorTableLayout(new BaseTableLayoutHandle((BaseTableHandle) table, constraint.getSummary())), constraint.getSummary())); + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { + return new ConnectorTableLayout(handle); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { + BaseColumnHandle baseColumnHandle = (BaseColumnHandle)columnHandle; + return new ColumnMetadata(baseColumnHandle.getColumnName(), baseColumnHandle.getColumnType()); + } + + //////////////////////////////////////////Views//////////////////////////////////// + + @Override + public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace) { + //baseMetastoreDefinition.createView(session, viewName, viewData, replace); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName viewName) { + //baseMetastoreDefinition.dropView(session, viewName); + } + + @Override + public List listViews(ConnectorSession session, String schemaNameOrNull) { + //return baseMetastoreDefinition.listViews(session, Optional.ofNullable(schemaNameOrNull)); + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, SchemaTablePrefix prefix) { + //return baseMetastoreDefinition.getViews(session, prefix); + return ImmutableMap.of(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java new file mode 100644 index 000000000000..90704968ad3c --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java @@ -0,0 +1,91 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.NodeManager; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseModule implements Module { + private final NodeManager nodeManager; + private final BaseConnectorInfo baseConnectorInfo; + private final Class baseConfigClass; + private final Class baseConnectorClass; + private final Class baseMetadataClass; + private final Class baseSplitManagerClass; + private final Class baseRecordSetProviderClass; + private final Class baseHandleResolverClass; + private final Class baseProviderClass; + + public BaseModule( + NodeManager nodeManager, + BaseConnectorInfo baseConnectorInfo, + Class baseConfigClass, + Class baseConnectorClass, + Class baseMetadataClass, + Class baseSplitManagerClass, + Class baseRecordSetProviderClass, + Class baseHandleResolverClass, + Class baseProviderClass + ) { + this.nodeManager = requireNonNull(nodeManager); + this.baseConnectorInfo = requireNonNull(baseConnectorInfo, "baseConnectorInfo is null"); + this.baseConfigClass = requireNonNull(baseConfigClass, "baseConfig is null"); + this.baseConnectorClass = requireNonNull(baseConnectorClass, "baseConnector is null"); + this.baseMetadataClass = requireNonNull(baseMetadataClass, "baseMetadata is null"); + this.baseSplitManagerClass = requireNonNull(baseSplitManagerClass, "baseSplitManager is null"); + this.baseRecordSetProviderClass = requireNonNull(baseRecordSetProviderClass, "baseRecordSetProvider is null"); + this.baseHandleResolverClass = requireNonNull(baseHandleResolverClass, "baseHandleResolver is null"); + this.baseProviderClass = requireNonNull(baseProviderClass, "baseProvider is null"); + } + + @Override + public void configure(Binder binder) + { + configBinder(binder).bindConfig(baseConfigClass); + + binder.bind(BaseConnectorInfo.class).toInstance(baseConnectorInfo); + binder.bind(NodeManager.class).toInstance(nodeManager); + + if(baseConnectorClass != BaseConnector.class) { + binder.bind(BaseConnector.class).to(baseConnectorClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseConnector.class).in(Scopes.SINGLETON); + } + + if(baseMetadataClass != BaseMetadata.class){ + binder.bind(BaseMetadata.class).to(baseMetadataClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseMetadata.class).in(Scopes.SINGLETON); + } + + if(baseSplitManagerClass != BaseSplitManager.class){ + binder.bind(BaseSplitManager.class).to(baseSplitManagerClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseSplitManager.class).in(Scopes.SINGLETON); + } + + if(baseRecordSetProviderClass != BaseRecordSetProvider.class){ + binder.bind(BaseRecordSetProvider.class).to(baseRecordSetProviderClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseRecordSetProvider.class).in(Scopes.SINGLETON); + } + + if(baseHandleResolverClass != BaseHandleResolver.class){ + binder.bind(BaseHandleResolver.class).to(baseHandleResolverClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseHandleResolver.class).in(Scopes.SINGLETON); + } + + if(baseProviderClass != BaseProvider.class){ + binder.bind(BaseProvider.class).to(baseProviderClass).in(Scopes.SINGLETON); + }else{ + binder.bind(BaseProvider.class).in(Scopes.SINGLETON); + } + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java new file mode 100644 index 000000000000..0ae2474741b1 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java @@ -0,0 +1,80 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.collect.ImmutableList; + +/** + * Created by amehta on 6/13/16. + */ +public abstract class BasePlugin implements Plugin { + private String name; + + private Class baseConfigClass; + private Class baseConnectorClass; + private Class baseMetadataClass; + private Class baseSplitManagerClass; + private Class baseRecordSetProviderClass; + private Class baseHandleResolverClass; + private Class baseProviderClass; + + public BasePlugin() { + setBaseConfigClass(BaseConfig.class); + setBaseConnectorClass(BaseConnector.class); + setBaseMetadataClass(BaseMetadata.class); + setBaseSplitManagerClass(BaseSplitManager.class); + setBaseRecordSetProviderClass(BaseRecordSetProvider.class); + setBaseHandleResolverClass(BaseHandleResolver.class); + setBaseProviderClass(BaseProvider.class); + + //allow custom init + init(); + } + + public abstract void init(); + + public void setName(String name) { + this.name = name; + } + + public void setBaseConfigClass(Class baseConfigClass) { + this.baseConfigClass = baseConfigClass; + } + + public void setBaseConnectorClass(Class baseConnectorClass) { + this.baseConnectorClass = baseConnectorClass; + } + + public void setBaseMetadataClass(Class baseMetadataClass) { + this.baseMetadataClass = baseMetadataClass; + } + + public void setBaseSplitManagerClass(Class baseSplitManagerClass) { + this.baseSplitManagerClass = baseSplitManagerClass; + } + + public void setBaseRecordSetProviderClass(Class baseRecordSetProviderClass) { + this.baseRecordSetProviderClass = baseRecordSetProviderClass; + } + + public void setBaseHandleResolverClass(Class baseHandleResolverClass) { + this.baseHandleResolverClass = baseHandleResolverClass; + } + + public void setBaseProviderClass(Class baseProviderClass) { + this.baseProviderClass = baseProviderClass; + } + + @Override + public Iterable getConnectorFactories() { + return ImmutableList.of(new BaseConnectorFactory( + name, + baseConfigClass, + baseConnectorClass, + baseMetadataClass, + baseSplitManagerClass, + baseRecordSetProviderClass, + baseHandleResolverClass, + baseProviderClass)); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java new file mode 100644 index 000000000000..d6d7e21f7866 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java @@ -0,0 +1,118 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.baseplugin.cache.BaseRecordBatch; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public abstract class BaseProvider { + private final BaseConfig config; + private final Map> tableMap; + + private final LoadingCache recordCache; + + public BaseProvider(BaseConfig config) { + this.config = requireNonNull(config, "config is null"); + this.tableMap = new HashMap<>(); + + this.recordCache = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(2, TimeUnit.MINUTES).build(new CacheLoader() { + @Override + public BaseRecordBatch load(BaseQuery key) throws Exception { + return getRecordBatchForQueryFromSource(key); + } + }); + } + + public BaseConfig getConfig() { + return config; + } + + public LoadingCache getRecordCache() { + return recordCache; + } + + /** + * Gets the schema names for the source + * @param session contains query specific meta-information + * @return a list of schema names for the source + */ + public List listSchemaNames(ConnectorSession session){ + return ImmutableList.of(config.getDefaultSchemaName(Optional.of(session))); + } + + /** + * Gets a list of columns for a table/schema + * @param session contains query specific meta-information + * @param name the schema/table for which to get columns + * @return columns corresponding to a table/schema + */ + public List getTableColumns(ConnectorSession session, SchemaTableName name) { + return tableMap.computeIfAbsent(name, n -> generateTableColumns(session, n)); + } + + /** + * Gets a list of columnMetadata for a schema/table + * @param session contains query specific meta-information + * @param schemaTableName the schema/table for which to get columns + * @return the columnMetadata's corresponding to a schema/table + */ + public List listColumnMetadata(ConnectorSession session, SchemaTableName schemaTableName){ + return getTableColumns(session, schemaTableName).stream().map(x -> new ColumnMetadata(x.getColumnName(), x.getColumnType())).collect(Collectors.toList()); + } + + /** + * Gets a map of columnName -> columnHandles for a schema/table + * @param session contains query specific meta-information + * @param tableName the schema/table for which to get columns + * @return the columnName -> columnHandle corresponding to a schema/table + */ + public Map getMappedColumnHandlesForSchemaTable(ConnectorSession session, SchemaTableName tableName){ + return getTableColumns(session, tableName).stream().collect(Collectors.toMap(BaseColumnHandle::getColumnName, Function.identity())); + } + + /** + * attempts to get a recordBatch from custom logic. If that returns an EMPTY_BATCH, hits source + * @param query the request for data + * @return the recordBatch + */ + public BaseRecordBatch getRecordBatchForQuery(BaseQuery query){ + if(config.getCacheEnabled()){ + try { + return recordCache.get(query); + }catch (ExecutionException e){ + e.printStackTrace(); + } + } + return getRecordBatchForQueryFromSource(query); + } + + /** + * Gets a recordBatch corresponding to the state of a given query + * @param key the query that represents a request for data + * @return the recordBatch corresponding to the state of the given query + */ + public abstract BaseRecordBatch getRecordBatchForQueryFromSource(BaseQuery key); + + public abstract List generateTableColumns(ConnectorSession session, SchemaTableName tableName); + + public abstract List listTableNames(ConnectorSession session, Optional schemaName); +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java new file mode 100644 index 000000000000..80ada80b35d3 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java @@ -0,0 +1,76 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.google.common.base.MoreObjects; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.List; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 7/20/16. + */ +public class BaseQuery { + private final BaseSplit baseSplit; + private final ConnectorSession connectorSession; + private final List desiredColumns; + private final String id; + + public BaseQuery(BaseSplit baseSplit, ConnectorSession connectorSession, List desiredColumns) { + this.baseSplit = requireNonNull(baseSplit, "baseSplit is null"); + this.connectorSession = requireNonNull(connectorSession, "connectorSession is null"); + this.desiredColumns = requireNonNull(desiredColumns, "desiredColumns is null"); + this.id = UUID.randomUUID().toString(); + } + + public BaseSplit getBaseSplit() { + return baseSplit; + } + + public ConnectorSession getConnectorSession() { + return connectorSession; + } + + public List getDesiredColumns() { + return desiredColumns; + } + + public String getId(){ + return id; + } + + @Override + public boolean equals(Object o) { + if(o == this){ + return true; + } + if(o instanceof BaseQuery){ + BaseQuery baseQuery = (BaseQuery) o; + return new EqualsBuilder() + .append(baseSplit.getTableName(), baseQuery.baseSplit.getTableName()) + .append(baseSplit.getPredicates(), baseQuery.baseSplit.getPredicates()) + .isEquals(); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(baseSplit.getTableName()) + .append(baseSplit.getPredicates()) + .toHashCode(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", baseSplit.getTableName()) + .add("predicates", baseSplit.getPredicates()) + .toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java new file mode 100644 index 000000000000..ad0d460142c8 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java @@ -0,0 +1,119 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.baseplugin.cache.BaseRecord; +import com.facebook.presto.baseplugin.cache.BaseRecordBatch; +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.type.Type; +import io.airlift.slice.Slice; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public final class BaseRecordCursor implements RecordCursor { + private final BaseQuery baseQuery; + private final BaseProvider baseProvider; + + private BaseRecordBatch currentBatch; + private BaseRecord currentRecord; + //use index instead of iterator to support concurrency better + private int index; + + private boolean finishedFetching; + + public BaseRecordCursor(BaseQuery baseQuery, BaseProvider baseProvider) { + this.baseQuery = requireNonNull(baseQuery, "baseQuery is null"); + this.baseProvider = requireNonNull(baseProvider, "baseProvider is null"); + this.currentBatch = BaseRecordBatch.EMPTY_BATCH; + this.index = 0; + + this.finishedFetching = currentBatch.isLastBatch(); + } + + @Override + public long getTotalBytes() { + return 0; + } + + @Override + public long getCompletedBytes() { + return 0; + } + + @Override + public long getReadTimeNanos() { + return 0; + } + + @Override + public Type getType(int field) { + return getColumnAtPosition(field).getColumnType(); + } + + @Override + public boolean advanceNextPosition(){ + while(true) { + if (index < currentBatch.getBaseRecords().size()) { + currentRecord = currentBatch.getBaseRecords().get(index); + index++; + return true; + } else { + if (finishedFetching) { + return false; + } else {//this is the fetching piece of the data + try { + currentBatch = baseProvider.getConfig().getCacheEnabled(Optional.of(baseQuery.getConnectorSession())) ? baseProvider.getRecordCache().get(baseQuery) : baseProvider.getRecordBatchForQuery(baseQuery); + }catch (ExecutionException e){ + e.printStackTrace(); + currentBatch = baseProvider.getRecordBatchForQuery(baseQuery); + } + index = 0; + finishedFetching = currentBatch.isLastBatch(); + } + } + } + } + + @Override + public boolean getBoolean(int field){ + return currentRecord.getBoolean(getColumnAtPosition(field), field); + } + + @Override + public long getLong(int field){ + return currentRecord.getLong(getColumnAtPosition(field), field); + } + + @Override + public double getDouble(int field){ + return currentRecord.getDouble(getColumnAtPosition(field), field); + } + + @Override + public Slice getSlice(int field){ + return currentRecord.getSlice(getColumnAtPosition(field), field); + } + + @Override + public Object getObject(int field){ + return currentRecord.getObject(getColumnAtPosition(field), field); + } + + @Override + public boolean isNull(int field){ + return currentRecord.isNull(getColumnAtPosition(field), field); + } + + @Override + public void close() { + + } + + private BaseColumnHandle getColumnAtPosition(int field){ + return ((BaseColumnHandle) baseQuery.getDesiredColumns().get(field)); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java new file mode 100644 index 000000000000..f9259a8c88b2 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java @@ -0,0 +1,41 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.type.Type; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseRecordSet implements RecordSet { + private final BaseQuery baseQuery; + private final BaseProvider baseProvider; + + public BaseRecordSet(BaseQuery baseQuery, BaseProvider baseProvider) { + this.baseQuery = requireNonNull(baseQuery, "baseQuery is null"); + this.baseProvider = requireNonNull(baseProvider, "baseProvider is null"); + } + + public BaseQuery getBaseQuery(){ + return baseQuery; + } + + public BaseProvider getBaseProvider() { + return baseProvider; + } + + @Override + public List getColumnTypes() { + return baseQuery.getDesiredColumns().stream().map(x -> ((BaseColumnHandle)x).getColumnType()).collect(Collectors.toList()); + } + + @Override + public RecordCursor cursor() { + return new BaseRecordCursor(baseQuery, baseProvider); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java new file mode 100644 index 000000000000..f28668a518f2 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java @@ -0,0 +1,34 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.inject.Inject; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseRecordSetProvider implements ConnectorRecordSetProvider { + private final BaseProvider baseProvider; + + @Inject + public BaseRecordSetProvider(BaseProvider baseProvider){ + this.baseProvider = requireNonNull(baseProvider, "baseProvider is null"); + } + + public BaseProvider getBaseProvider() { + return baseProvider; + } + + @Override + public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns){ + return new BaseRecordSet(new BaseQuery((BaseSplit) split, session, columns), baseProvider); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java new file mode 100644 index 000000000000..486e24ce23a2 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java @@ -0,0 +1,79 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.baseplugin.predicate.BasePredicate; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseSplit implements ConnectorSplit { + private final List addresses; + private final SchemaTableName tableName; + private List predicates; + + @JsonCreator + public BaseSplit( + @JsonProperty("addresses") List addresses, + @JsonProperty("tableName") SchemaTableName tableName, + @JsonProperty("predicates") List predicates) + { + this.addresses = requireNonNull(addresses, "address is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.predicates = requireNonNull(predicates, "predicates is null"); + } + + @JsonProperty + public SchemaTableName getTableName() + { + return tableName; + } + + @JsonProperty + public List getPredicates() + { + return predicates; + } + + public BaseSplit setPredicates(List predicates) { + this.predicates = predicates; + return this; + } + + @JsonProperty + @Override + public List getAddresses() + { + return addresses; + } + + @Override + public boolean isRemotelyAccessible() + { + return true; + } + + @Override + public Object getInfo() + { + return this; + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("addresses", addresses) + .add("tableName", tableName) + .add("predicates", predicates) + .toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java new file mode 100644 index 000000000000..d2b1739c612b --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java @@ -0,0 +1,140 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.baseplugin.predicate.BaseComparisonOperator; +import com.facebook.presto.baseplugin.predicate.BasePredicate; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import io.airlift.log.Logger; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseSplitManager implements ConnectorSplitManager { + private final NodeManager nodeManager; + private final BaseConnectorInfo baseConnectorInfo; + private final BaseProvider baseProvider; + + private final Logger logger = Logger.get(BaseSplitManager.class); + + @Inject + public BaseSplitManager(NodeManager nodeManager, BaseConnectorInfo baseConnectorInfo, BaseProvider baseProvider) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.baseConnectorInfo = requireNonNull(baseConnectorInfo, "baseConnectorInfo is null"); + this.baseProvider = requireNonNull(baseProvider, "baseProvider is null"); + } + + /** + * breaks up the predicate across the splits depending on the predicate characteristics + * @param predicate the predicate to break up + * @param splits the fixed list of splits to break up the predicate across + */ + public void allocatePredicateToSplits(BasePredicate predicate, List splits) + { + if(!splits.isEmpty()) { + switch (predicate.getBaseComparisonOperator()) { + case IN: + int partitionSize = (int) Math.ceil(((double) predicate.getValues().size()) / splits.size()); + logger.info("Partition size of %s using %s number of splits for predicate with %s number of values", partitionSize, splits.size(), predicate.getValues().size()); + List predicateList = Lists.partition( + predicate.getValues(), + partitionSize + ).stream().map(x -> new BasePredicate(predicate.getBaseColumnHandle(), BaseComparisonOperator.IN, x)).collect(Collectors.toList()); + logger.info("Broke original predicate into %s number of predicates", predicateList.size()); + //splits.forEach(x -> x.getPredicates().add(predicateIterator.next())); + int splitIndex = 0; + Iterator predicateIterator = predicateList.iterator(); + while (predicateIterator.hasNext()) { + splits.get(splitIndex).getPredicates().add(predicateIterator.next()); + if(splits.size() - 1 > splitIndex) { + splitIndex++; + } + } + break; + case BETWEEN: + Double start = ((Number) predicate.getValues().get(0)).doubleValue(); + Double end = ((Number) predicate.getValues().get(1)).doubleValue(); + Double incrementSize = (end - start) / splits.size(); + for (BaseSplit split : splits) { + split.getPredicates().add(new BasePredicate(predicate.getBaseColumnHandle(), BaseComparisonOperator.BETWEEN, ImmutableList.of(start, start + incrementSize))); + start += incrementSize; + } + break; + default: + if (!splits.isEmpty()) { + splits.forEach(s -> s.getPredicates().add(predicate)); + } + } + } + } + + + @Override + /** + * returns a list of splits for the coordinator to handle + * @return a representation of a source and the splits associated with it + * @param transactionHandle contains information about the SQL transaction + * @param session contains query specific meta-information + * @param layout contains information about the table, predicates, and additional constraint + */ + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) + { + BaseTableLayoutHandle baseTableLayout = (BaseTableLayoutHandle) layout; + + //creates as many splits as there are active nodes in the cluster. EX: 3 nodes -> 3 splits + final List baseSplits = nodeManager.getWorkerNodes().stream() + .map(x -> new BaseSplit(ImmutableList.of(x.getHostAndPort()), baseTableLayout.getTable().getSchemaTableName(), new ArrayList<>())) + .collect(Collectors.toList()); + + //turns the columnDomains from the baseTableLayout into basePredicates + List basePredicates = baseTableLayout.getSummary().getColumnDomains().get().stream() + .map(BasePredicate::fromColumnDomain) + .collect(Collectors.toList()); + + //modify list of predicates going into be split + basePredicates = updateBasePredicates(basePredicates, baseTableLayout.getTable(), baseProvider, session); + + //allocates predicates to splits based on whether predicates have finite bounds or not + basePredicates.forEach(p -> allocatePredicateToSplits(p, baseSplits)); + + //allows updates to predicate-filled splits before they're sent back to coordinator + return new FixedSplitSource(updateBaseSplits(baseSplits, baseProvider, session)); + } + + /** + * Should override this method + * @param basePredicates + * @param tableHandle + * @param baseProvider + * @param session + * @return + */ + public List updateBasePredicates(List basePredicates, BaseTableHandle tableHandle, BaseProvider baseProvider, ConnectorSession session){ + return basePredicates; + } + + /** + * Should override this method + * @param baseSplits + * @param baseProvider + * @return + */ + public List updateBaseSplits(List baseSplits, BaseProvider baseProvider, ConnectorSession session){ + return baseSplits; + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java new file mode 100644 index 000000000000..be23565cd4b8 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java @@ -0,0 +1,59 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseTableHandle implements ConnectorTableHandle { + private final SchemaTableName schemaTableName; + + @JsonCreator + public BaseTableHandle( + @JsonProperty("schemaTableName") SchemaTableName schemaTableName + ) + { + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BaseTableHandle that = (BaseTableHandle) o; + return Objects.equals(schemaTableName, that.schemaTableName); + } + + @Override + public int hashCode() + { + return Objects.hash(schemaTableName); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("schemaTableName", schemaTableName) + .toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java new file mode 100644 index 000000000000..8c771a12204e --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java @@ -0,0 +1,44 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseTableLayoutHandle implements ConnectorTableLayoutHandle { + private final BaseTableHandle table; + private final TupleDomain summary; + + @JsonCreator + public BaseTableLayoutHandle( + @JsonProperty("table") BaseTableHandle table, + @JsonProperty("summary") TupleDomain summary + ){ + this.table = requireNonNull(table, "table is null"); + this.summary = requireNonNull(summary, "summary is null"); + } + + @JsonProperty + public BaseTableHandle getTable() + { + return table; + } + + @JsonProperty + public TupleDomain getSummary() + { + return summary; + } + + @Override + public String toString() + { + return table.toString(); + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java new file mode 100644 index 000000000000..9957307009ee --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java @@ -0,0 +1,10 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +/** + * Created by amehta on 6/13/16. + */ +public enum BaseTransactionHandle implements ConnectorTransactionHandle { + INSTANCE +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java new file mode 100644 index 000000000000..b7527d5c7189 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java @@ -0,0 +1,75 @@ +package com.facebook.presto.baseplugin; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.type.BigintType; +import com.facebook.presto.spi.type.BooleanType; +import com.facebook.presto.spi.type.DoubleType; +import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.VarcharType; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Optional; + +/** + * Created by amehta on 6/13/16. + */ +public class BaseUtils { + private BaseUtils() + {} + + public static T getValueAsType(Object value, Class clazz){ + String v = value.toString(); + if (clazz == String.class){ + value = v; + } else if (clazz == Double.class) { + value = Double.parseDouble(v); + } + else if (clazz == Integer.class) { + value = Integer.parseInt(v); + } + else if (clazz == Long.class) { + try { + value = Long.parseLong(v); + }catch (NumberFormatException e){// in the event of a date string + value = LocalDateTime.parse(v, DateTimeFormatter.ISO_OFFSET_DATE_TIME).toEpochSecond(ZoneOffset.UTC); + } + } + else if (clazz == Boolean.class) { + value = Boolean.parseBoolean(v); + } + return clazz.cast(value); + } + + public static T getPropertyFromMap(String propertyName, Class clazz, Map properties){ + return properties.containsKey(propertyName) ? getValueAsType(properties.get(propertyName), clazz) : null; + } + + public static T getPropertyFromSessionConfig(String propertyName, Class clazz, ConnectorSession session, BaseConfig config){ + Optional value = Optional.ofNullable(session.getProperty(propertyName.replace(".", "_"), Object.class)); + return value.isPresent() ? getValueAsType(value.get(), clazz) : config.getProperty(propertyName, clazz, Optional.empty()); + } + + public static Type getTypeForObject(Object o){ + if(o.getClass() == String.class){ + return VarcharType.VARCHAR; + } + else if(o.getClass() == Double.class){ + return DoubleType.DOUBLE; + } + else if(o.getClass() == Integer.class){ + return IntegerType.INTEGER; + } + else if(o.getClass() == Long.class){ + return BigintType.BIGINT; + } + else if(o.getClass() == Boolean.class){ + return BooleanType.BOOLEAN; + } + return null; + } + +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecord.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecord.java new file mode 100644 index 000000000000..84238cd9247d --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecord.java @@ -0,0 +1,23 @@ +package com.facebook.presto.baseplugin.cache; + +import com.facebook.presto.baseplugin.BaseColumnHandle; +import io.airlift.slice.Slice; + +import java.io.Serializable; + +/** + * Created by amehta on 7/19/16. + */ +public interface BaseRecord extends Serializable{ + boolean getBoolean(BaseColumnHandle baseColumnHandle, int field); + + long getLong(BaseColumnHandle baseColumnHandle, int field); + + double getDouble(BaseColumnHandle baseColumnHandle, int field); + + Slice getSlice(BaseColumnHandle baseColumnHandle, int field); + + Object getObject(BaseColumnHandle baseColumnHandle, int field); + + boolean isNull(BaseColumnHandle baseColumnHandle, int field); +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecordBatch.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecordBatch.java new file mode 100644 index 000000000000..ecadedead824 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/cache/BaseRecordBatch.java @@ -0,0 +1,35 @@ +package com.facebook.presto.baseplugin.cache; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + + +/** + * Created by amehta on 7/20/16. + */ +public class BaseRecordBatch implements Serializable{ + public static final BaseRecordBatch EMPTY_BATCH = new BaseRecordBatch(Collections.emptyList(), false); + public static final BaseRecordBatch LAST_BATCH = new BaseRecordBatch(Collections.emptyList(), true); + + private final List baseRecords; + + private boolean isLastBatch; + + public BaseRecordBatch(List baseRecords, boolean isLastBatch) { + this.baseRecords = baseRecords; + this.isLastBatch = isLastBatch; + } + + public List getBaseRecords() { + return baseRecords; + } + + public boolean isLastBatch() { + return isLastBatch; + } + + public void setLastBatch(boolean isLastBatch){ + this.isLastBatch = isLastBatch; + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/logic/BasePluginLogicDefinition.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/logic/BasePluginLogicDefinition.java new file mode 100644 index 000000000000..8af77c50fc50 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/logic/BasePluginLogicDefinition.java @@ -0,0 +1,10 @@ +package com.facebook.presto.baseplugin.logic; + +import com.facebook.presto.baseplugin.BasePlugin; + +/** + * Created by amehta on 8/15/16. + */ +public interface BasePluginLogicDefinition { + void basePluginCreation(BasePlugin basePlugin); +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastore.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastore.java new file mode 100644 index 000000000000..3cec5704ed37 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastore.java @@ -0,0 +1,137 @@ +package com.facebook.presto.baseplugin.metastore; + +import com.facebook.presto.baseplugin.BaseConfig; +import com.facebook.presto.baseplugin.BaseConnectorInfo; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorViewDefinition; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import org.h2.jdbcx.JdbcConnectionPool; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 8/1/16. + */ +public class BaseMetastore implements BaseMetastoreDefinition { + private final BaseConnectorInfo baseConnectorInfo; + private final BaseConfig baseConfig; + + private JdbcConnectionPool metastoreConnection; + + public BaseMetastore(BaseConnectorInfo baseConnectorInfo, BaseConfig baseConfig){ + this.baseConnectorInfo = requireNonNull(baseConnectorInfo, "baseConnectorInfo is null"); + this.baseConfig = requireNonNull(baseConfig, "baseConfig is null"); + + this.metastoreConnection = JdbcConnectionPool.create(baseConfig.getMetastoreJdbcUrl(Optional.empty())+"/"+baseConnectorInfo.getConnectorId(), baseConfig.getMetastoreUsername(Optional.empty()), baseConfig.getMetastorePassword(Optional.empty())); + createViewsTable(); + } + + private void createViewsTable(){ + try { + Connection connection = metastoreConnection.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("create table if not exists views (viewName varchar(255) primary key, viewData varchar(400000))"); + + preparedStatement.execute(); + preparedStatement.close(); + connection.close(); + }catch (SQLException e){ + e.printStackTrace(); + } + } + + private String getViewName(SchemaTableName schemaTableName) + { + return baseConnectorInfo.getConnectorId()+"."+schemaTableName.toString(); + } + + @Override + public List listViews(ConnectorSession session, Optional schemaName) { + ImmutableList.Builder viewNames = ImmutableList.builder(); + try{ + Splitter splitter = Splitter.on('.').trimResults().limit(3); + Connection connection = metastoreConnection.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("select viewName from views where viewName like ?"); + preparedStatement.setString(1, baseConnectorInfo.getConnectorId()+"%"); + + ResultSet rs = preparedStatement.executeQuery(); + while(rs.next()){ + List splits = splitter.splitToList(rs.getString("viewName")); + viewNames.add(new SchemaTableName(splits.get(1), splits.get(2))); + } + + preparedStatement.close(); + connection.close(); + }catch (SQLException e){ + e.printStackTrace(); + } + return viewNames.build(); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace) { + try { + Connection connection = metastoreConnection.getConnection(); + PreparedStatement preparedStatement = replace ? connection.prepareStatement("update views set viewData = ? where viewName = ?") : connection.prepareStatement("insert into views (viewData,viewName) values(?,?)"); + + preparedStatement.setString(1, viewData); + preparedStatement.setString(2, getViewName(viewName)); + + preparedStatement.execute(); + preparedStatement.close(); + connection.close(); + }catch (SQLException e){ + e.printStackTrace(); + } + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName viewName) { + try{ + Connection connection = metastoreConnection.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("delete from views where viewName = ?"); + preparedStatement.setString(1, getViewName(viewName)); + + preparedStatement.execute(); + preparedStatement.close(); + connection.close(); + }catch (SQLException e){ + e.printStackTrace(); + } + } + + @Override + public Map getViews(ConnectorSession session, SchemaTablePrefix schemaTablePrefix) { + SchemaTableName schemaTableName = new SchemaTableName(schemaTablePrefix.getSchemaName(), schemaTablePrefix.getTableName()); + + Map viewMap = new HashMap<>(); + try { + Connection connection = metastoreConnection.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("select viewData from views where viewName = ?"); + preparedStatement.setString(1, getViewName(schemaTableName)); + + ResultSet rs = preparedStatement.executeQuery(); + while(rs.next()){ + viewMap.put(schemaTableName, new ConnectorViewDefinition(schemaTableName, Optional.of(session.getIdentity().getUser()), rs.getString("viewData"))); + } + + preparedStatement.close(); + connection.close(); + }catch (SQLException e){ + e.printStackTrace(); + } + + return viewMap; + } +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastoreDefinition.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastoreDefinition.java new file mode 100644 index 000000000000..a77eece0b67d --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/metastore/BaseMetastoreDefinition.java @@ -0,0 +1,24 @@ +package com.facebook.presto.baseplugin.metastore; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorViewDefinition; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Created by amehta on 8/1/16. + */ +public interface BaseMetastoreDefinition { + + List listViews(ConnectorSession session, Optional schemaName); + + void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace); + + void dropView(ConnectorSession session, SchemaTableName viewName); + + Map getViews(ConnectorSession session, SchemaTablePrefix schemaTablePrefix); +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BaseComparisonOperator.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BaseComparisonOperator.java new file mode 100644 index 000000000000..869bb7081547 --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BaseComparisonOperator.java @@ -0,0 +1,16 @@ +package com.facebook.presto.baseplugin.predicate; + +/** + * Created by amehta on 6/20/16. + */ +public enum BaseComparisonOperator { + EQUAL, + GTE, + GT, + LTE, + LT, + NE, + BETWEEN, + IN, + LIKE +} diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BasePredicate.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BasePredicate.java new file mode 100644 index 000000000000..6af9b19042cf --- /dev/null +++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/predicate/BasePredicate.java @@ -0,0 +1,160 @@ +package com.facebook.presto.baseplugin.predicate; + +import com.facebook.presto.baseplugin.BaseColumnHandle; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.predicate.Domain; +import com.facebook.presto.spi.predicate.Marker; +import com.facebook.presto.spi.predicate.Range; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.weakref.jmx.internal.guava.collect.ImmutableList; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Created by amehta on 6/20/16. + */ +public class BasePredicate { + private final BaseColumnHandle baseColumnHandle; + private final BaseComparisonOperator baseComparisonOperator; + private final List values; + + @JsonCreator + public BasePredicate( + @JsonProperty("baseColumnHandle") BaseColumnHandle baseColumnHandle, + @JsonProperty("baseComparisonOperator") BaseComparisonOperator baseComparisonOperator, + @JsonProperty("values") List values + ) { + this.baseColumnHandle = baseColumnHandle; + this.baseComparisonOperator = baseComparisonOperator; + this.values = values; + } + + public static BasePredicate fromColumnDomain(TupleDomain.ColumnDomain columnDomain) + { + Builder builder = new Builder() + .setBaseColumnHandle((BaseColumnHandle) columnDomain.getColumn()) + .setBaseComparisonOperator(BaseComparisonOperator.NE); + List ranges = columnDomain.getDomain().getValues().getRanges().getOrderedRanges(); + if (ranges.size() == 1) { //EQ,GT,LT,LTE,GTE,BETWEEN + Range range = ranges.get(0); + if (range.getLow().getBound() == Marker.Bound.ABOVE) { + if (range.getHigh().getBound() == Marker.Bound.BELOW) { + if (range.getLow().getValueBlock().isPresent()) { + if (!range.getHigh().getValueBlock().isPresent()) {//GT + builder.setBaseComparisonOperator(BaseComparisonOperator.GT); + builder.addValues(range.getLow().getValue()); + } + } else if (range.getHigh().getValueBlock().isPresent()){//LT + builder.setBaseComparisonOperator(BaseComparisonOperator.LT); + builder.addValues(range.getHigh().getValue()); + } + } else if (range.getHigh().getValueBlock().isPresent()){//high.bound = EXACTLY, LTE + builder.setBaseComparisonOperator(BaseComparisonOperator.LTE); + builder.addValues(range.getHigh().getValue()); + } + } else { //low.bound = EXACTLY + if (range.getHigh().getBound() == Marker.Bound.EXACTLY){ //EQ + if(range.isSingleValue()) { + builder.setBaseComparisonOperator(BaseComparisonOperator.EQUAL); + builder.addValues(range.getSingleValue()); + } else if(range.getHigh().getValueBlock().isPresent() && range.getLow().getValueBlock().isPresent()){ + builder.setBaseComparisonOperator(BaseComparisonOperator.BETWEEN); + builder.addValues(range.getLow().getValue(), range.getHigh().getValue()); + } + } else if(range.getLow().getValueBlock().isPresent()){//GTE + builder.setBaseComparisonOperator(BaseComparisonOperator.GTE); + builder.addValues(range.getLow().getValue()); + } + } + } else { //IN + builder.setBaseComparisonOperator(BaseComparisonOperator.IN); + ranges.forEach(x -> builder.addValues(x.getSingleValue())); + } + return builder.build(); + } + + public static String predicatesToString(List predicates){ + return predicates.stream().map(p -> p.toString()+"|").sorted().collect(Collectors.joining()); + } + + @JsonProperty + public BaseColumnHandle getBaseColumnHandle() { + return baseColumnHandle; + } + + @JsonProperty + public BaseComparisonOperator getBaseComparisonOperator() { + return baseComparisonOperator; + } + + @JsonProperty + public List getValues() { + return values; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof BasePredicate){ + BasePredicate basePredicate = (BasePredicate) o; + return new EqualsBuilder() + .append(baseColumnHandle, basePredicate.baseColumnHandle) + .append(baseComparisonOperator, basePredicate.baseComparisonOperator) + .append(values, basePredicate.values) + .isEquals(); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(baseColumnHandle) + .append(baseComparisonOperator) + .append(values) + .toHashCode(); + } + + @Override + public String toString() { + return "BasePredicate{"+getBaseColumnHandle().getColumnName()+"."+getBaseComparisonOperator()+"."+getValues()+"}"; + } + + public static class Builder { + private BaseColumnHandle baseColumnHandle; + private BaseComparisonOperator baseComparisonOperator; + private ImmutableList.Builder values; + + public Builder() { + values = ImmutableList.builder(); + } + + public Builder setBaseColumnHandle(BaseColumnHandle baseColumnHandle) { + this.baseColumnHandle = requireNonNull(baseColumnHandle, "baseColumnHandle is null"); + return this; + } + + public Builder setBaseComparisonOperator(BaseComparisonOperator baseComparisonOperator) { + this.baseComparisonOperator = requireNonNull(baseComparisonOperator, "baseComparisonOperator is null"); + return this; + } + + public Builder addValues(Object... values){ + this.values.add(values); + return this; + } + + public BasePredicate build() { + return new BasePredicate(baseColumnHandle, baseComparisonOperator, values.build()); + } + } +} diff --git a/presto-dynamo/pom.xml b/presto-dynamo/pom.xml new file mode 100644 index 000000000000..4818b4828a1b --- /dev/null +++ b/presto-dynamo/pom.xml @@ -0,0 +1,101 @@ + + + + presto-root + com.facebook.presto + 0.156 + + + 4.0.0 + presto-dynamo + Presto - Dynamo plugin + presto-plugin + + + ${project.basedir} + 1.9.40 + + true + true + + false + true + + + + + com.facebook.presto + presto-base-plugin + ${project.version} + + + + com.facebook.presto + presto-spi + provided + + + + com.amazonaws + aws-java-sdk-dynamodb + ${aws.version} + + + + com.amazonaws + aws-java-sdk-core + ${aws.version} + + + + io.airlift + configuration + + + + com.google.inject + guice + + + + com.google.guava + guava + + + + io.airlift + slice + provided + + + + io.airlift + units + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + org.openjdk.jol + jol-core + provided + + + + + + + joda-time + joda-time + 2.9.6 + + + + \ No newline at end of file diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoConfig.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoConfig.java new file mode 100644 index 000000000000..f24689c62ab5 --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoConfig.java @@ -0,0 +1,97 @@ +package com.facebook.presto.dynamo; + +import com.facebook.presto.baseplugin.BaseConfig; +import com.facebook.presto.spi.ConnectorSession; +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import java.util.Optional; + +/** + * Created by amehta on 6/14/16. + */ +public class DynamoConfig extends BaseConfig { + @Config("dynamo.awsKey") + @ConfigDescription("access key for AWS") + public void setAwsKey(String awsKey){ + setProperty("dynamo.awsKey", awsKey); + } + + public String getAwsKey(Optional session){ + return getProperty("dynamo.awsKey", String.class, session); + } + + public String getAwsKey(){ + return getAwsKey(Optional.empty()); + } + + @Config("dynamo.secretKey") + @ConfigDescription("secret key for AWS") + public void setSecretKey(String secretKey){ + setProperty("dynamo.secretKey", secretKey); + } + + public String getSecretKey(Optional session){ + return getProperty("dynamo.secretKey", String.class, session); + } + + public String getSecretKey(){ + return getSecretKey(Optional.empty()); + } + + @Config("dynamo.lookupSuffix") + @ConfigDescription("suffix for lookup tables") + public void setLookupSuffix(String lookupSuffix){ + setProperty("dynamo.lookupSuffix", lookupSuffix); + } + + public String getLookupSuffix(Optional session){ + return getProperty("dynamo.lookupSuffix", String.class, session); + } + + public String getLookupSuffix(){ + return getLookupSuffix(Optional.empty()); + } + + @Config("dynamo.lookupColumnName") + @ConfigDescription("name of attribute that represents column name in lookup table") + public void setLookupColumnName(String lookupColumnName){ + setProperty("dynamo.lookupColumnName", lookupColumnName); + } + + public String getLookupColumnName(Optional session){ + return getProperty("dynamo.lookupColumnName", String.class, session); + } + + public String getLookupColumnName(){ + return getLookupColumnName(Optional.empty()); + } + + @Config("dynamo.lookupColumnType") + @ConfigDescription("name of attribute that represents column type in lookup table") + public void setLookupColumnType(String lookupColumnType){ + setProperty("dynamo.lookupColumnType", lookupColumnType); + } + + public String getLookupColumnType(Optional session){ + return getProperty("dynamo.lookupColumnType", String.class, session); + } + + public String getLookupColumnType(){ + return getLookupColumnType(Optional.empty()); + } + + @Config("dynamo.maxRetries") + @ConfigDescription("max retries for exponential backoff") + public void setMaxRetries(Integer maxRetries){ + setProperty("dynamo.maxRetries", maxRetries); + } + + public Integer getMaxRetries(Optional session){ + return getProperty("dynamo.maxRetries", Integer.class, session); + } + + public Integer getMaxRetries(){ + return getMaxRetries(Optional.empty()); + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoIndex.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoIndex.java new file mode 100644 index 000000000000..2a965ce2ae64 --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoIndex.java @@ -0,0 +1,91 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.facebook.presto.baseplugin.predicate.BasePredicate; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Created by amehta on 7/25/16. + */ +public class DynamoIndex { + private final String indexName; + private final KeySchemaElement hashKey; + private final Optional rangeKey; + + private final List indices; + + public DynamoIndex(Optional rangeKey, KeySchemaElement hashKey, String indexName) { + this.rangeKey = rangeKey; + this.hashKey = hashKey; + this.indexName = indexName; + this.indices = new ArrayList<>(); + } + + public String getIndexName() { + return indexName; + } + + public KeySchemaElement getHashKey() { + return hashKey; + } + + public Optional getRangeKey() { + return rangeKey; + } + + public List getIndices() { + return indices; + } + + public boolean isValidHashKey(BasePredicate basePredicate){ + return hashKey.getAttributeName().equals(basePredicate.getBaseColumnHandle().getColumnName()) + && DynamoUtils.HASH_OPERATORS.contains(basePredicate.getBaseComparisonOperator()); + } + + public boolean isValidRangeKey(BasePredicate basePredicate) { + return rangeKey.isPresent() + && rangeKey.get().getAttributeName().equals(basePredicate.getBaseColumnHandle().getColumnName()) + && DynamoUtils.RANGE_OPERATORS.contains(basePredicate.getBaseComparisonOperator()); + } + + public boolean containsKey(BasePredicate basePredicate, KeyType keyType){ + switch (keyType){ + case HASH: + return isValidHashKey(basePredicate); + case RANGE: + return isValidRangeKey(basePredicate); + default: + return false; + } + } + + public boolean containsKey(BasePredicate basePredicate){ + boolean isContains = isValidHashKey(basePredicate) || isValidRangeKey(basePredicate); + if(!isContains){ + isContains = indices.stream().filter(i -> i.containsKey(basePredicate)).findAny().isPresent(); + } + return isContains; + } + + public Optional getIndexNameForHashPredicate(BasePredicate basePredicate){ + if(containsKey(basePredicate, KeyType.HASH)){ + return Optional.of(indexName); + }else{ + Optional index = indices.stream().filter(i -> i.containsKey(basePredicate, KeyType.HASH)).findFirst(); + return index.isPresent() ? Optional.of(index.get().getIndexName()) : Optional.empty(); + } + } + + public Optional getIndexNameForRangePredicate(BasePredicate basePredicate){ + if(containsKey(basePredicate, KeyType.RANGE)){ + return Optional.of(indexName); + }else{ + Optional index = indices.stream().filter(i -> i.containsKey(basePredicate, KeyType.RANGE)).findFirst(); + return index.isPresent() ? Optional.of(index.get().getIndexName()) : Optional.empty(); + } + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoPlugin.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoPlugin.java new file mode 100644 index 000000000000..74d20e006379 --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoPlugin.java @@ -0,0 +1,17 @@ +package com.facebook.presto.dynamo; + +import com.facebook.presto.baseplugin.BasePlugin; + +/** + * Created by amehta on 6/14/16. + */ +public class DynamoPlugin extends BasePlugin { + @Override + public void init() { + setName("dynamo"); + setBaseConfigClass(DynamoConfig.class); + setBaseProviderClass(DynamoProvider.class); + setBaseSplitManagerClass(DynamoSplitManager.class); + setBaseRecordSetProviderClass(DynamoRecordSetProvider.class); + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoProvider.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoProvider.java new file mode 100644 index 000000000000..2a6039333b7a --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoProvider.java @@ -0,0 +1,200 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.retry.RetryPolicy; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.Condition; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.QueryRequest; +import com.amazonaws.services.dynamodbv2.model.QueryResult; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.facebook.presto.baseplugin.BaseColumnHandle; +import com.facebook.presto.baseplugin.BaseProvider; +import com.facebook.presto.baseplugin.BaseQuery; +import com.facebook.presto.baseplugin.BaseSplit; +import com.facebook.presto.baseplugin.cache.BaseRecordBatch; +import com.facebook.presto.baseplugin.predicate.BasePredicate; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Created by amehta on 6/14/16. + */ +public class DynamoProvider extends BaseProvider { + private final DynamoConfig dynamoConfig; + private final Map clientMap; //key is regionName + private final Map indexMap; //maps tableName to table schema + private final Map schemaMap; + private final RetryPolicy defaultRetryPolicy; + + @Inject + public DynamoProvider(DynamoConfig config) { + super(config); + this.dynamoConfig = config; + this.clientMap = new ConcurrentHashMap<>(); + this.indexMap = new ConcurrentHashMap<>(); + this.defaultRetryPolicy = PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(dynamoConfig.getMaxRetries(Optional.empty())); + + //adds the regions + ImmutableMap.Builder schemaMap = ImmutableMap.builder(); + for (Regions r : Regions.values()) { + Optional regionOpt = Optional.ofNullable(Region.getRegion(r)); + regionOpt.ifPresent(region -> schemaMap.put(region.getName(), region)); + } + //adds current region on ec2 + Optional.ofNullable(Regions.getCurrentRegion()).ifPresent(r -> schemaMap.put(r.getName(), r)); + this.schemaMap = schemaMap.build(); + } + + public DynamoConfig getDynamoConfig() { + return dynamoConfig; + } + + public AmazonDynamoDB getClient(String key, Optional session){// create object as separate variable to allow room for further client configuration later + if (!clientMap.containsKey(key)) { + ClientConfiguration configuration = new ClientConfiguration().withRetryPolicy(defaultRetryPolicy); + AmazonDynamoDB dynamoDB = new AmazonDynamoDBClient(new BasicAWSCredentials(dynamoConfig.getAwsKey(session), dynamoConfig.getSecretKey(session)), configuration); + dynamoDB.setRegion(schemaMap.get(key)); + clientMap.put(key, dynamoDB); + } + return clientMap.get(key); + } + + private DynamoIndex getDynamoIndex(List keys, String indexName){ + Optional rangeKey = keys.stream().filter(k -> k.getKeyType().equals(KeyType.RANGE.name())).findFirst(); + KeySchemaElement hashKey = keys.stream().filter(k -> k.getKeyType().equals(KeyType.HASH.name())).findFirst().get(); + return new DynamoIndex(rangeKey, hashKey, indexName); + } + + public DynamoIndex getDynamoIndex(SchemaTableName schemaTableName, Optional session){ + if(!indexMap.containsKey(schemaTableName.getTableName())){ + TableDescription tableDescription = getClient(schemaTableName.getSchemaName(), session).describeTable(schemaTableName.getTableName()).getTable(); + DynamoIndex tableIndex = getDynamoIndex(tableDescription.getKeySchema(), schemaTableName.getTableName()); + //build gsis + if(tableDescription.getGlobalSecondaryIndexes() != null) { + tableIndex.getIndices().addAll(tableDescription.getGlobalSecondaryIndexes().stream().map(g -> getDynamoIndex(g.getKeySchema(), g.getIndexName())).collect(Collectors.toList())); + } + //build lsis + if(tableDescription.getLocalSecondaryIndexes() != null) { + tableIndex.getIndices().addAll(tableDescription.getLocalSecondaryIndexes().stream().map(g -> getDynamoIndex(g.getKeySchema(), g.getIndexName())).collect(Collectors.toList())); + } + indexMap.put(schemaTableName.getTableName(), tableIndex); + } + return indexMap.get(schemaTableName.getTableName()); + } + + @Override + public List listSchemaNames(ConnectorSession session) { + return ImmutableList.copyOf(schemaMap.keySet()); + } + + @Override + public List generateTableColumns(ConnectorSession session, SchemaTableName tableName) { + Optional sessionOpt = Optional.of(session); + + List> records = getClient(tableName.getSchemaName(), sessionOpt).scan(new ScanRequest(tableName.getTableName()+dynamoConfig.getLookupSuffix(sessionOpt))).getItems(); + ImmutableList.Builder columns = ImmutableList.builder(); + for(int i=0 ; i < records.size() ; i++){ + Map record = records.get(i); + columns.add(new BaseColumnHandle(record.get(dynamoConfig.getLookupColumnName(Optional.of(session))).getS(), DynamoUtils.getTypeForDynamoType(record.get(dynamoConfig.getLookupColumnType(sessionOpt)).getS()), i)); + } + return columns.build(); + } + + @Override + public List listTableNames(ConnectorSession session, Optional schemaName) { + Optional region = Optional.ofNullable(Regions.getCurrentRegion()); + String schema = schemaName.orElse(region.isPresent() ? region.get().getName() : Regions.DEFAULT_REGION.name()); + return getClient(schema, Optional.of(session)).listTables().getTableNames().stream().map(t -> new SchemaTableName(schema, t)).collect(Collectors.toList()); + } + + private void applyPredicatesToQuery(QueryRequest request, BaseSplit baseSplit, Optional session){//applies predicates to query and assigns an index name if needed + List basePredicates = baseSplit.getPredicates(); + ImmutableMap.Builder keyConditions = ImmutableMap.builder(); + Optional indexName = Optional.empty(); + + switch (basePredicates.size()){ + case 2: + BasePredicate rangePredicate = basePredicates.get(1); + Condition rangeCondition = new Condition(); + rangeCondition.setComparisonOperator(DynamoUtils.fromBaseComparisonOperator(rangePredicate.getBaseComparisonOperator())); + rangeCondition.setAttributeValueList(rangePredicate.getValues().stream().map(v -> DynamoUtils.getAttributeValueForObject(v)).collect(Collectors.toList())); + keyConditions.put(rangePredicate.getBaseColumnHandle().getColumnName(),rangeCondition); + case 1: + BasePredicate hashPredicate = basePredicates.get(0); + Condition hashCondition = new Condition(); + hashCondition.setComparisonOperator(DynamoUtils.fromBaseComparisonOperator(hashPredicate.getBaseComparisonOperator())); + hashCondition.setAttributeValueList(hashPredicate.getValues().stream().map(v -> DynamoUtils.getAttributeValueForObject(v)).collect(Collectors.toList())); + keyConditions.put(hashPredicate.getBaseColumnHandle().getColumnName(), hashCondition); + indexName = getDynamoIndex(baseSplit.getTableName(), session).getIndexNameForHashPredicate(hashPredicate); + } + + if(indexName.isPresent() && !indexName.get().equals(baseSplit.getTableName().getTableName())){ + request.setIndexName(indexName.get()); + } + request.setKeyConditions(keyConditions.build()); + } + + private void updateQueryBatch(DynamoQuery query, DynamoRecordBatch dynamoRecordBatch){//updates the limit on the query and determines if last batch + boolean limitUsed = false; + if(query.getLimit().isPresent()){//update limit + Integer limit = query.getLimit().get(); + limit -= dynamoRecordBatch.getBaseRecords().size(); + query.setLimit(limit > 0 ? Optional.of(limit) : Optional.empty()); + limitUsed = limit <= 0; + } + query.setOffset(Optional.ofNullable(dynamoRecordBatch.getLastKeyEvaluated())); + dynamoRecordBatch.setLastBatch(limitUsed || !query.getOffset().isPresent()); + } + + public boolean shouldScan(BaseQuery key){ + return key.getBaseSplit().getPredicates().isEmpty(); + } + + @Override + public BaseRecordBatch getRecordBatchForQueryFromSource(BaseQuery key) {//assume that a list of basePredicates always has only one hashKey and one/zero rangeKeys + DynamoQuery dynamoQuery = (DynamoQuery) key; + BaseSplit baseSplit = key.getBaseSplit(); + Optional session = Optional.of(key.getConnectorSession()); + AmazonDynamoDB dynamo = getClient(baseSplit.getTableName().getSchemaName(), session); + DynamoRecordBatch dynamoRecordBatch; + + if(shouldScan(key)){ + ScanRequest scan = new ScanRequest(baseSplit.getTableName().getTableName()); + + dynamoQuery.getOffset().ifPresent(scan::setExclusiveStartKey); + + ScanResult result = dynamo.scan(scan); + dynamoRecordBatch = new DynamoRecordBatch(result.getItems().stream().map(DynamoRecord::new).collect(Collectors.toList()), false, result.getLastEvaluatedKey()); + }else{ + QueryRequest query = new QueryRequest(baseSplit.getTableName().getTableName()); + dynamoQuery.getLimit().ifPresent(query::setLimit); + dynamoQuery.getOffset().ifPresent(query::setExclusiveStartKey); + applyPredicatesToQuery(query, baseSplit, session); + + QueryResult result = dynamo.query(query); + dynamoRecordBatch = new DynamoRecordBatch(result.getItems().stream().map(DynamoRecord::new).collect(Collectors.toList()), false, result.getLastEvaluatedKey()); + } + + updateQueryBatch(dynamoQuery, dynamoRecordBatch); + return dynamoRecordBatch; + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoQuery.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoQuery.java new file mode 100644 index 000000000000..2628ef9c951b --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoQuery.java @@ -0,0 +1,44 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.facebook.presto.baseplugin.BaseQuery; +import com.facebook.presto.baseplugin.BaseSplit; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Created by amehta on 7/25/16. + */ +public class DynamoQuery extends BaseQuery { + private Optional limit; + private Optional> offset; + + public DynamoQuery(BaseSplit baseSplit, ConnectorSession connectorSession, List desiredColumns) { + super(baseSplit, connectorSession, desiredColumns); + this.offset = Optional.empty(); + //set this empty until limit pushdown enabled + this.limit = Optional.empty(); + } + + public Optional getLimit() { + return limit; + } + + public DynamoQuery setLimit(Optional limit) { + this.limit = limit; + return this; + } + + public Optional> getOffset() { + return offset; + } + + public DynamoQuery setOffset(Optional> offset) { + this.offset = offset; + return this; + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecord.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecord.java new file mode 100644 index 000000000000..588b3880e7cb --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecord.java @@ -0,0 +1,51 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.facebook.presto.baseplugin.BaseColumnHandle; +import com.facebook.presto.baseplugin.cache.BaseRecord; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; + +import java.util.Map; + +/** + * Created by amehta on 7/25/16. + */ +public class DynamoRecord implements BaseRecord { + private final Map dynamoRecord; + + public DynamoRecord(Map dynamoRecord) { + this.dynamoRecord = dynamoRecord; + } + + @Override + public boolean getBoolean(BaseColumnHandle baseColumnHandle, int field) { + return dynamoRecord.get(baseColumnHandle.getColumnName()).getBOOL(); + } + + @Override + public long getLong(BaseColumnHandle baseColumnHandle, int field) { + return Long.parseLong(dynamoRecord.get(baseColumnHandle.getColumnName()).getN()); + } + + @Override + public double getDouble(BaseColumnHandle baseColumnHandle, int field) { + return Double.parseDouble(dynamoRecord.get(baseColumnHandle.getColumnName()).getN()); + } + + @Override + public Slice getSlice(BaseColumnHandle baseColumnHandle, int field) { + return Slices.utf8Slice(dynamoRecord.get(baseColumnHandle.getColumnName()).getS()); + } + + @Override + public Object getObject(BaseColumnHandle baseColumnHandle, int field) { + return dynamoRecord.get(baseColumnHandle.getColumnName()); + } + + @Override + public boolean isNull(BaseColumnHandle baseColumnHandle, int field) { + return dynamoRecord.get(baseColumnHandle.getColumnName()) == null || + (dynamoRecord.get(baseColumnHandle.getColumnName()).getNULL() != null && dynamoRecord.get(baseColumnHandle.getColumnName()).getNULL()); + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordBatch.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordBatch.java new file mode 100644 index 000000000000..0953b4ff3264 --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordBatch.java @@ -0,0 +1,24 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.facebook.presto.baseplugin.cache.BaseRecord; +import com.facebook.presto.baseplugin.cache.BaseRecordBatch; + +import java.util.List; +import java.util.Map; + +/** + * Created by amehta on 7/25/16. + */ +public class DynamoRecordBatch extends BaseRecordBatch { + private final Map lastKeyEvaluated; + + public DynamoRecordBatch(List baseRecords, boolean isLastBatch, Map lastKeyEvaluated) { + super(baseRecords, isLastBatch); + this.lastKeyEvaluated = lastKeyEvaluated; + } + + public Map getLastKeyEvaluated() { + return lastKeyEvaluated; + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordSetProvider.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordSetProvider.java new file mode 100644 index 000000000000..707d2bead297 --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoRecordSetProvider.java @@ -0,0 +1,31 @@ +package com.facebook.presto.dynamo; + +import com.facebook.presto.baseplugin.BaseProvider; +import com.facebook.presto.baseplugin.BaseRecordSet; +import com.facebook.presto.baseplugin.BaseRecordSetProvider; +import com.facebook.presto.baseplugin.BaseSplit; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.inject.Inject; + +import java.util.List; +import java.util.Optional; + +/** + * Created by amehta on 7/25/16. + */ +public class DynamoRecordSetProvider extends BaseRecordSetProvider { + @Inject + public DynamoRecordSetProvider(BaseProvider baseProvider) { + super(baseProvider); + } + + @Override + public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns) { + DynamoQuery dynamoQuery = new DynamoQuery((BaseSplit) split, session, columns); + return new BaseRecordSet(dynamoQuery, getBaseProvider()); + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoSplitManager.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoSplitManager.java new file mode 100644 index 000000000000..af9591a2f1ce --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoSplitManager.java @@ -0,0 +1,122 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.facebook.presto.baseplugin.BaseConnectorInfo; +import com.facebook.presto.baseplugin.BaseProvider; +import com.facebook.presto.baseplugin.BaseSplit; +import com.facebook.presto.baseplugin.BaseSplitManager; +import com.facebook.presto.baseplugin.BaseTableHandle; +import com.facebook.presto.baseplugin.predicate.BaseComparisonOperator; +import com.facebook.presto.baseplugin.predicate.BasePredicate; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.NodeManager; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Created by amehta on 8/15/16. + */ +public class DynamoSplitManager extends BaseSplitManager { + @Inject + public DynamoSplitManager(NodeManager nodeManager, BaseConnectorInfo baseConnectorInfo, BaseProvider baseProvider) { + super(nodeManager, baseConnectorInfo, baseProvider); + } + + @Override + public List updateBasePredicates(List basePredicates, BaseTableHandle tableHandle, BaseProvider baseProvider, ConnectorSession session) { + //filter out invalid and/or non-key predicates + return basePredicates.stream() + .filter(((DynamoProvider) baseProvider).getDynamoIndex(tableHandle.getSchemaTableName(), Optional.of(session))::containsKey) + .collect(Collectors.toList()); + } + + @Override + public List updateBaseSplits(List baseSplits, BaseProvider baseProvider, ConnectorSession session) {//split predicates contain only key attributes + DynamoProvider dynamoProvider = (DynamoProvider) baseProvider; + return baseSplits.stream().flatMap(b -> splitBaseSplit(b, dynamoProvider, session)).collect(Collectors.toList()); + } + + private Stream splitBaseSplit(BaseSplit baseSplit, DynamoProvider dynamoProvider, ConnectorSession session){//these should be pre-filtered for only key attributes + if(baseSplit.getPredicates().isEmpty()){ + return ImmutableList.of(baseSplit).stream(); + } + DynamoIndex index = dynamoProvider.getDynamoIndex(baseSplit.getTableName(), Optional.of(session)); + List basePredicates = Lists.newArrayList(baseSplit.getPredicates()); + Map> predicateMap = new HashMap<>();//key is index + + Optional hashPredicate = basePredicates.stream().filter(p -> index.containsKey(p, KeyType.HASH)).findFirst(); + Optional rangePredicate = basePredicates.stream().filter(p -> index.containsKey(p, KeyType.RANGE)).findFirst(); + + if(hashPredicate.isPresent()){// main table + predicateMap.put(baseSplit.getTableName().getTableName(), new ArrayList<>()); + predicateMap.get(baseSplit.getTableName().getTableName()).add(hashPredicate.get()); + rangePredicate.ifPresent(r -> predicateMap.get(baseSplit.getTableName().getTableName()).add(r)); + } + + for (BasePredicate basePredicate : basePredicates) {// hash predicates + Optional indexName = index.getIndexNameForHashPredicate(basePredicate); + if(indexName.isPresent()) { + predicateMap.put(indexName.get(), new ArrayList<>()); + predicateMap.get(indexName.get()).add(basePredicate); + } + } + + for (BasePredicate basePredicate : basePredicates) {// range predicates + for (String indexName : predicateMap.keySet()) { + if(index.getIndexName().equals(indexName) && index.isValidRangeKey(basePredicate)){ + predicateMap.get(indexName).add(basePredicate); + }else{ + Optional dynamoIndex = index.getIndices().stream() + .filter(i -> i.getIndexName().equals(indexName)) + .findFirst(); + if(dynamoIndex.isPresent() && dynamoIndex.get().isValidRangeKey(basePredicate)){ + predicateMap.get(indexName).add(basePredicate); + } + } + } + } + + if(predicateMap.isEmpty()){//in the event a valid predicate pair/group was not found + return ImmutableList.of(new BaseSplit(baseSplit.getAddresses(), baseSplit.getTableName(), ImmutableList.of())).stream(); + } + + return predicateMap.entrySet().stream() + .map(e -> new BaseSplit( + baseSplit.getAddresses(), + baseSplit.getTableName(), + e.getValue())) + .flatMap(this::splitBaseSplitOnPredicate); + } + + //splits the given split on the hash/range predicate if the comparison operator is splittable + private Stream splitBaseSplitOnPredicate(BaseSplit baseSplit) {// split should only have one or two predicates + Stream predicateStream; + Optional rangePredicate = Optional.empty(); + switch (baseSplit.getPredicates().size()){ + case 2: + rangePredicate = Optional.of(baseSplit.getPredicates().get(1)); + case 1: + BasePredicate hashPredicate = baseSplit.getPredicates().get(0); + switch (hashPredicate.getBaseComparisonOperator()){ + //break up IN predicate into x number of EQ predicates (one for each value in the IN clause) + case IN: + predicateStream = hashPredicate.getValues().stream().map(v -> new BasePredicate(hashPredicate.getBaseColumnHandle(), BaseComparisonOperator.EQUAL, ImmutableList.of(v))); + if(rangePredicate.isPresent()){ + BasePredicate range = rangePredicate.get(); + return predicateStream.map(p -> new BaseSplit(baseSplit.getAddresses(), baseSplit.getTableName(), ImmutableList.of(p, range))); + } + return predicateStream.map(p -> new BaseSplit(baseSplit.getAddresses(), baseSplit.getTableName(), ImmutableList.of(p))); + } + } + return ImmutableList.of(baseSplit).stream(); + } +} diff --git a/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoUtils.java b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoUtils.java new file mode 100644 index 000000000000..2c589508ff4a --- /dev/null +++ b/presto-dynamo/src/main/java/com/facebook/presto/dynamo/DynamoUtils.java @@ -0,0 +1,71 @@ +package com.facebook.presto.dynamo; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.ComparisonOperator; +import com.facebook.presto.baseplugin.BaseUtils; +import com.facebook.presto.baseplugin.predicate.BaseComparisonOperator; +import com.facebook.presto.spi.type.BigintType; +import com.facebook.presto.spi.type.BooleanType; +import com.facebook.presto.spi.type.DoubleType; +import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.VarcharType; + +import java.util.EnumSet; + +/** + * Created by amehta on 7/25/16. + */ +public final class DynamoUtils { + private DynamoUtils(){} + + public static final EnumSet HASH_OPERATORS = EnumSet.of(BaseComparisonOperator.EQUAL, BaseComparisonOperator.IN); + public static final EnumSet RANGE_OPERATORS = EnumSet.of(BaseComparisonOperator.EQUAL, BaseComparisonOperator.BETWEEN, BaseComparisonOperator.GT, BaseComparisonOperator.GTE, BaseComparisonOperator.LT, BaseComparisonOperator.LTE); + + public static Type getTypeForDynamoType(String type){ + switch (type.toLowerCase()){ + case "s": + return VarcharType.VARCHAR; + case "n": + return DoubleType.DOUBLE; + case "long": + return BigintType.BIGINT; + case "b": + return BooleanType.BOOLEAN; + default: + return null; + } + } + + public static AttributeValue getAttributeValueForObject(Object o){ + Type type = BaseUtils.getTypeForObject(o); + AttributeValue value = new AttributeValue(); + if(type == VarcharType.VARCHAR){ + value.withS(o.toString()); + }else if (type == IntegerType.INTEGER || type == DoubleType.DOUBLE || type == BigintType.BIGINT){ + value.withN(o.toString()); + } + return value; + } + + public static ComparisonOperator fromBaseComparisonOperator(BaseComparisonOperator comparisonOperator){ + switch (comparisonOperator){ + case IN: + return ComparisonOperator.IN; + case EQUAL: + return ComparisonOperator.EQ; + case BETWEEN: + return ComparisonOperator.BETWEEN; + case GT: + return ComparisonOperator.GT; + case GTE: + return ComparisonOperator.GE; + case LT: + return ComparisonOperator.LT; + case LTE: + return ComparisonOperator.LE; + default: + return null; + } + } +}