diff --git a/pom.xml b/pom.xml
index 4608fe8a918f..713f72393ccc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,8 @@
presto-testing-server-launcher
presto-plugin-toolkit
presto-resource-group-managers
+ presto-base-plugin-nosql
+ presto-dynamo
diff --git a/presto-base-plugin-nosql/pom.xml b/presto-base-plugin-nosql/pom.xml
new file mode 100644
index 000000000000..df5a39ce3d62
--- /dev/null
+++ b/presto-base-plugin-nosql/pom.xml
@@ -0,0 +1,122 @@
+
+
+
+ presto-root
+ com.facebook.presto
+ 0.156
+
+ 4.0.0
+
+ presto-base-plugin
+ Presto - Base plugin
+
+
+ true
+ true
+
+ false
+ true
+
+
+
+
+ com.h2database
+ h2
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ configuration
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ javax.inject
+ javax.inject
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ provided
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.4
+
+
+
+
+ com.facebook.presto
+ presto-main
+ test
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+
\ No newline at end of file
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java
new file mode 100644
index 000000000000..dabf0b88c287
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseColumnHandle.java
@@ -0,0 +1,94 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseColumnHandle implements ColumnHandle {
+ private final String columnName;
+ private final Type columnType;
+ private final int ordinalPosition;
+
+ @JsonCreator
+ public BaseColumnHandle(
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("columnType") Type columnType,
+ @JsonProperty("ordinalPosition") int ordinalPosition)
+ {
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "columnType is null");
+ this.ordinalPosition = ordinalPosition;
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ @JsonProperty
+ public int getOrdinalPosition()
+ {
+ return ordinalPosition;
+ }
+
+ public ColumnMetadata toColumnMetadata()
+ {
+ return new ColumnMetadata(columnName, columnType);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if(o == this){
+ return true;
+ }
+ if(o instanceof BaseColumnHandle){
+ BaseColumnHandle baseColumnHandle = (BaseColumnHandle) o;
+ return new EqualsBuilder()
+ .append(columnName, baseColumnHandle.columnName)
+ .append(columnType, baseColumnHandle.columnType)
+ .append(ordinalPosition, baseColumnHandle.ordinalPosition)
+ .isEquals();
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return new HashCodeBuilder()
+ .append(columnName)
+ .append(columnType)
+ .append(ordinalPosition)
+ .toHashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("columnName", columnName)
+ .add("columnType", columnType)
+ .add("ordinalPosition", ordinalPosition)
+ .toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java
new file mode 100644
index 000000000000..a9bcfbc98c8f
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConfig.java
@@ -0,0 +1,104 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ConnectorSession;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public abstract class BaseConfig {
+ private final Properties properties;
+
+ public BaseConfig(){
+ this.properties = new Properties();
+
+ setProperty("basePlugin.cacheEnabled", false);
+ setProperty("basePlugin.metastoreJdbcUrl", "jdbc:h2:~/h2/presto");
+ setProperty("basePlugin.metastoreUsername", "sa");
+ setProperty("basePlugin.metastorePassword", "sa");
+ setProperty("basePlugin.defaultSchemaName", "default_schema");
+ }
+
+ protected void setProperty(String propertyName, Object value){
+ this.properties.setProperty(propertyName, value.toString());
+ }
+
+ protected T getProperty(String propertyName, Class clazz, Optional session){
+ //return session.isPresent() ? BaseUtils.getPropertyFromSessionConfig(propertyName, clazz, session.get(), this) : BaseUtils.getPropertyFromMap(propertyName, clazz, properties);
+ return BaseUtils.getPropertyFromMap(propertyName, clazz, properties);
+ }
+
+ @Config("basePlugin.cacheEnabled")
+ @ConfigDescription("Whether the cache is enabled for this plugin")
+ public void setCacheEnabled(Boolean cacheEnabled){
+ setProperty("basePlugin.cacheEnabled", cacheEnabled);
+ }
+
+ public Boolean getCacheEnabled(Optional session){
+ return getProperty("basePlugin.cacheEnabled", Boolean.class, session);
+ }
+
+ public Boolean getCacheEnabled(){
+ return getCacheEnabled(Optional.empty());
+ }
+
+ @Config("basePlugin.metastoreJdbcUrl")
+ @ConfigDescription("the location of the metadata store")
+ public void setMetastoreJdbcUrl(String metastoreJdbcUrl){
+ setProperty("basePlugin.metastoreJdbcUrl", metastoreJdbcUrl);
+ }
+
+ public String getMetastoreJdbcUrl(Optional session){
+ return getProperty("basePlugin.metastoreJdbcUrl", String.class, session);
+ }
+
+ public String getMetastoreJdbcUrl(){
+ return getMetastoreJdbcUrl(Optional.empty());
+ }
+
+ @Config("basePlugin.metastoreUsername")
+ @ConfigDescription("the username for connecting to the metadata store")
+ public void setMetastoreUsername(String metastoreUsername){
+ setProperty("basePlugin.metastoreUsername", metastoreUsername);
+ }
+
+ public String getMetastoreUsername(Optional session){
+ return getProperty("basePlugin.metastoreUsername", String.class, session);
+ }
+
+ public String getMetastoreUsername(){
+ return getMetastoreUsername(Optional.empty());
+ }
+
+ @Config("basePlugin.metastorePassword")
+ @ConfigDescription("the password for connecting to the metadata store")
+ public void setMetastorePassword(String metastorePassword){
+ setProperty("basePlugin.metastorePassword", metastorePassword);
+ }
+
+ public String getMetastorePassword(Optional session){
+ return getProperty("basePlugin.metastorePassword", String.class, session);
+ }
+
+ public String getMetastorePassword(){
+ return getMetastorePassword(Optional.empty());
+ }
+
+ @Config("basePlugin.defaultSchemaName")
+ @ConfigDescription("the name of the default schema for datasources that don't have schemas")
+ public void setDefaultSchemaName(String defaultSchemaName){
+ setProperty("basePlugin.defaultSchemaName", defaultSchemaName);
+ }
+
+ public String getDefaultSchemaName(Optional session){
+ return getProperty("basePlugin.defaultSchemaName", String.class, session);
+ }
+
+ public String getDefaultSchemaName(){
+ return getDefaultSchemaName(Optional.empty());
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java
new file mode 100644
index 000000000000..7a10df163dbb
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnector.java
@@ -0,0 +1,77 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseConnector implements Connector{
+ private final Logger log = Logger.get(this.getClass());
+
+ private final LifeCycleManager lifeCycleManager;
+ private final BaseMetadata metadata;
+ private final BaseSplitManager splitManager;
+ private final BaseRecordSetProvider recordSetProvider;
+
+ @Inject
+ public BaseConnector(
+ LifeCycleManager lifeCycleManager,
+ BaseMetadata metadata,
+ BaseSplitManager splitManager,
+ BaseRecordSetProvider recordSetProvider)
+ {
+ this.recordSetProvider = recordSetProvider;
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return BaseTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java
new file mode 100644
index 000000000000..133089b34e48
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorFactory.java
@@ -0,0 +1,92 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseConnectorFactory implements ConnectorFactory{
+ private final String name;
+
+ private final Class extends BaseConfig> baseConfigClass;
+ private final Class extends BaseConnector> baseConnectorClass;
+ private final Class extends BaseMetadata> baseMetadataClass;
+ private final Class extends BaseSplitManager> baseSplitManagerClass;
+ private final Class extends BaseRecordSetProvider> baseRecordSetProviderClass;
+ private final Class extends BaseHandleResolver> baseHandleResolverClass;
+ private final Class extends BaseProvider> baseProviderClass;
+
+ public BaseConnectorFactory(
+ String name,
+ Class extends BaseConfig> baseConfigClass,
+ Class extends BaseConnector> baseConnectorClass,
+ Class extends BaseMetadata> baseMetadataClass,
+ Class extends BaseSplitManager> baseSplitManagerClass,
+ Class extends BaseRecordSetProvider> baseRecordSetProviderClass,
+ Class extends BaseHandleResolver> baseHandleResolverClass,
+ Class extends BaseProvider> baseProviderClass
+ ) {
+ this.name = requireNonNull(name, "connectorFactory name is null");
+
+ this.baseConfigClass = requireNonNull(baseConfigClass, "baseConfig is null");
+ this.baseConnectorClass = requireNonNull(baseConnectorClass, "baseConnector is null");
+ this.baseMetadataClass = requireNonNull(baseMetadataClass, "baseMetadata is null");
+ this.baseSplitManagerClass = requireNonNull(baseSplitManagerClass, "baseSplitManager is null");
+ this.baseRecordSetProviderClass = requireNonNull(baseRecordSetProviderClass, "baseRecordSetProvider is null");
+ this.baseHandleResolverClass = requireNonNull(baseHandleResolverClass, "baseHandleResolver is null");
+ this.baseProviderClass = requireNonNull(baseProviderClass, "baseProvider is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new BaseHandleResolver();
+ }
+
+ @Override
+ public Connector create(String connectorId, Map config, ConnectorContext connectorContext)
+ {
+ requireNonNull(config, "config is null");
+
+ try {
+ Bootstrap app = new Bootstrap(
+ new BaseModule(
+ connectorContext.getNodeManager(),
+ new BaseConnectorInfo(connectorId),
+ baseConfigClass,
+ baseConnectorClass,
+ baseMetadataClass,
+ baseSplitManagerClass,
+ baseRecordSetProviderClass,
+ baseHandleResolverClass,
+ baseProviderClass));
+
+ Injector injector = app
+ .strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ return injector.getInstance(BaseConnector.class);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java
new file mode 100644
index 000000000000..9b1e13a9badf
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseConnectorInfo.java
@@ -0,0 +1,31 @@
+package com.facebook.presto.baseplugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/14/16.
+ */
+public class BaseConnectorInfo {
+ private final String connectorId;
+
+ @JsonCreator
+ public BaseConnectorInfo(
+ @JsonProperty("connectorId") String connectorId
+ ){
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ }
+
+ @JsonProperty
+ public String getConnectorId() {
+ return connectorId;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("connectorId", connectorId).toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java
new file mode 100644
index 000000000000..df95e99a96c4
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseHandleResolver.java
@@ -0,0 +1,43 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseHandleResolver implements ConnectorHandleResolver {
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return BaseTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return BaseTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return BaseColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return BaseSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return BaseTransactionHandle.class;
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java
new file mode 100644
index 000000000000..5e64d27a899a
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseMetadata.java
@@ -0,0 +1,117 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.baseplugin.metastore.BaseMetastore;
+import com.facebook.presto.baseplugin.metastore.BaseMetastoreDefinition;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.ConnectorViewDefinition;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseMetadata implements ConnectorMetadata {
+ private final BaseProvider baseProvider;
+ private final BaseMetastoreDefinition baseMetastoreDefinition;
+
+ @Inject
+ public BaseMetadata(BaseProvider baseProvider, BaseConnectorInfo baseConnectorInfo){
+ this.baseProvider = requireNonNull(baseProvider, "baseProvider is null");
+ this.baseMetastoreDefinition = new BaseMetastore(baseConnectorInfo, baseProvider.getConfig());
+ }
+
+ ////////////////////////////////////////USE BASE PROVIDER////////////////////////////////
+
+ @Override
+ public List listSchemaNames(ConnectorSession session) {
+ return baseProvider.listSchemaNames(session);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, String schemaNameOrNull) {
+ return baseProvider.listTableNames(session, Optional.ofNullable(schemaNameOrNull));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
+ return baseProvider.getMappedColumnHandlesForSchemaTable(session, ((BaseTableHandle) tableHandle).getSchemaTableName());
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
+ SchemaTableName schemaTableName = ((BaseTableHandle) table).getSchemaTableName();
+ return new ConnectorTableMetadata(schemaTableName, baseProvider.listColumnMetadata(session, schemaTableName));
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
+ SchemaTableName schemaTableName = new SchemaTableName(Optional.ofNullable(prefix.getSchemaName()).orElse("default_schema"), Optional.ofNullable(prefix.getTableName()).orElse("columns"));
+ return ImmutableMap.of(schemaTableName, baseProvider.listColumnMetadata(session, schemaTableName));
+ }
+
+ ////////////////////////////////GENERICALLY HANDLED////////////////////////////////
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ return new BaseTableHandle(tableName);
+ }
+
+ @Override
+ public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) {
+ return ImmutableList.of(new ConnectorTableLayoutResult(new ConnectorTableLayout(new BaseTableLayoutHandle((BaseTableHandle) table, constraint.getSummary())), constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+ BaseColumnHandle baseColumnHandle = (BaseColumnHandle)columnHandle;
+ return new ColumnMetadata(baseColumnHandle.getColumnName(), baseColumnHandle.getColumnType());
+ }
+
+ //////////////////////////////////////////Views////////////////////////////////////
+
+ @Override
+ public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace) {
+ //baseMetastoreDefinition.createView(session, viewName, viewData, replace);
+ }
+
+ @Override
+ public void dropView(ConnectorSession session, SchemaTableName viewName) {
+ //baseMetastoreDefinition.dropView(session, viewName);
+ }
+
+ @Override
+ public List listViews(ConnectorSession session, String schemaNameOrNull) {
+ //return baseMetastoreDefinition.listViews(session, Optional.ofNullable(schemaNameOrNull));
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Map getViews(ConnectorSession session, SchemaTablePrefix prefix) {
+ //return baseMetastoreDefinition.getViews(session, prefix);
+ return ImmutableMap.of();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java
new file mode 100644
index 000000000000..90704968ad3c
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseModule.java
@@ -0,0 +1,91 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.NodeManager;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseModule implements Module {
+ private final NodeManager nodeManager;
+ private final BaseConnectorInfo baseConnectorInfo;
+ private final Class extends BaseConfig> baseConfigClass;
+ private final Class extends BaseConnector> baseConnectorClass;
+ private final Class extends BaseMetadata> baseMetadataClass;
+ private final Class extends BaseSplitManager> baseSplitManagerClass;
+ private final Class extends BaseRecordSetProvider> baseRecordSetProviderClass;
+ private final Class extends BaseHandleResolver> baseHandleResolverClass;
+ private final Class extends BaseProvider> baseProviderClass;
+
+ public BaseModule(
+ NodeManager nodeManager,
+ BaseConnectorInfo baseConnectorInfo,
+ Class extends BaseConfig> baseConfigClass,
+ Class extends BaseConnector> baseConnectorClass,
+ Class extends BaseMetadata> baseMetadataClass,
+ Class extends BaseSplitManager> baseSplitManagerClass,
+ Class extends BaseRecordSetProvider> baseRecordSetProviderClass,
+ Class extends BaseHandleResolver> baseHandleResolverClass,
+ Class extends BaseProvider> baseProviderClass
+ ) {
+ this.nodeManager = requireNonNull(nodeManager);
+ this.baseConnectorInfo = requireNonNull(baseConnectorInfo, "baseConnectorInfo is null");
+ this.baseConfigClass = requireNonNull(baseConfigClass, "baseConfig is null");
+ this.baseConnectorClass = requireNonNull(baseConnectorClass, "baseConnector is null");
+ this.baseMetadataClass = requireNonNull(baseMetadataClass, "baseMetadata is null");
+ this.baseSplitManagerClass = requireNonNull(baseSplitManagerClass, "baseSplitManager is null");
+ this.baseRecordSetProviderClass = requireNonNull(baseRecordSetProviderClass, "baseRecordSetProvider is null");
+ this.baseHandleResolverClass = requireNonNull(baseHandleResolverClass, "baseHandleResolver is null");
+ this.baseProviderClass = requireNonNull(baseProviderClass, "baseProvider is null");
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(baseConfigClass);
+
+ binder.bind(BaseConnectorInfo.class).toInstance(baseConnectorInfo);
+ binder.bind(NodeManager.class).toInstance(nodeManager);
+
+ if(baseConnectorClass != BaseConnector.class) {
+ binder.bind(BaseConnector.class).to(baseConnectorClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseConnector.class).in(Scopes.SINGLETON);
+ }
+
+ if(baseMetadataClass != BaseMetadata.class){
+ binder.bind(BaseMetadata.class).to(baseMetadataClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseMetadata.class).in(Scopes.SINGLETON);
+ }
+
+ if(baseSplitManagerClass != BaseSplitManager.class){
+ binder.bind(BaseSplitManager.class).to(baseSplitManagerClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseSplitManager.class).in(Scopes.SINGLETON);
+ }
+
+ if(baseRecordSetProviderClass != BaseRecordSetProvider.class){
+ binder.bind(BaseRecordSetProvider.class).to(baseRecordSetProviderClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseRecordSetProvider.class).in(Scopes.SINGLETON);
+ }
+
+ if(baseHandleResolverClass != BaseHandleResolver.class){
+ binder.bind(BaseHandleResolver.class).to(baseHandleResolverClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseHandleResolver.class).in(Scopes.SINGLETON);
+ }
+
+ if(baseProviderClass != BaseProvider.class){
+ binder.bind(BaseProvider.class).to(baseProviderClass).in(Scopes.SINGLETON);
+ }else{
+ binder.bind(BaseProvider.class).in(Scopes.SINGLETON);
+ }
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java
new file mode 100644
index 000000000000..0ae2474741b1
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BasePlugin.java
@@ -0,0 +1,80 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public abstract class BasePlugin implements Plugin {
+ private String name;
+
+ private Class extends BaseConfig> baseConfigClass;
+ private Class extends BaseConnector> baseConnectorClass;
+ private Class extends BaseMetadata> baseMetadataClass;
+ private Class extends BaseSplitManager> baseSplitManagerClass;
+ private Class extends BaseRecordSetProvider> baseRecordSetProviderClass;
+ private Class extends BaseHandleResolver> baseHandleResolverClass;
+ private Class extends BaseProvider> baseProviderClass;
+
+ public BasePlugin() {
+ setBaseConfigClass(BaseConfig.class);
+ setBaseConnectorClass(BaseConnector.class);
+ setBaseMetadataClass(BaseMetadata.class);
+ setBaseSplitManagerClass(BaseSplitManager.class);
+ setBaseRecordSetProviderClass(BaseRecordSetProvider.class);
+ setBaseHandleResolverClass(BaseHandleResolver.class);
+ setBaseProviderClass(BaseProvider.class);
+
+ //allow custom init
+ init();
+ }
+
+ public abstract void init();
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setBaseConfigClass(Class extends BaseConfig> baseConfigClass) {
+ this.baseConfigClass = baseConfigClass;
+ }
+
+ public void setBaseConnectorClass(Class extends BaseConnector> baseConnectorClass) {
+ this.baseConnectorClass = baseConnectorClass;
+ }
+
+ public void setBaseMetadataClass(Class extends BaseMetadata> baseMetadataClass) {
+ this.baseMetadataClass = baseMetadataClass;
+ }
+
+ public void setBaseSplitManagerClass(Class extends BaseSplitManager> baseSplitManagerClass) {
+ this.baseSplitManagerClass = baseSplitManagerClass;
+ }
+
+ public void setBaseRecordSetProviderClass(Class extends BaseRecordSetProvider> baseRecordSetProviderClass) {
+ this.baseRecordSetProviderClass = baseRecordSetProviderClass;
+ }
+
+ public void setBaseHandleResolverClass(Class extends BaseHandleResolver> baseHandleResolverClass) {
+ this.baseHandleResolverClass = baseHandleResolverClass;
+ }
+
+ public void setBaseProviderClass(Class extends BaseProvider> baseProviderClass) {
+ this.baseProviderClass = baseProviderClass;
+ }
+
+ @Override
+ public Iterable getConnectorFactories() {
+ return ImmutableList.of(new BaseConnectorFactory(
+ name,
+ baseConfigClass,
+ baseConnectorClass,
+ baseMetadataClass,
+ baseSplitManagerClass,
+ baseRecordSetProviderClass,
+ baseHandleResolverClass,
+ baseProviderClass));
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java
new file mode 100644
index 000000000000..d6d7e21f7866
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseProvider.java
@@ -0,0 +1,118 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.baseplugin.cache.BaseRecordBatch;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public abstract class BaseProvider {
+ private final BaseConfig config;
+ private final Map> tableMap;
+
+ private final LoadingCache recordCache;
+
+ public BaseProvider(BaseConfig config) {
+ this.config = requireNonNull(config, "config is null");
+ this.tableMap = new HashMap<>();
+
+ this.recordCache = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(2, TimeUnit.MINUTES).build(new CacheLoader() {
+ @Override
+ public BaseRecordBatch load(BaseQuery key) throws Exception {
+ return getRecordBatchForQueryFromSource(key);
+ }
+ });
+ }
+
+ public BaseConfig getConfig() {
+ return config;
+ }
+
+ public LoadingCache getRecordCache() {
+ return recordCache;
+ }
+
+ /**
+ * Gets the schema names for the source
+ * @param session contains query specific meta-information
+ * @return a list of schema names for the source
+ */
+ public List listSchemaNames(ConnectorSession session){
+ return ImmutableList.of(config.getDefaultSchemaName(Optional.of(session)));
+ }
+
+ /**
+ * Gets a list of columns for a table/schema
+ * @param session contains query specific meta-information
+ * @param name the schema/table for which to get columns
+ * @return columns corresponding to a table/schema
+ */
+ public List getTableColumns(ConnectorSession session, SchemaTableName name) {
+ return tableMap.computeIfAbsent(name, n -> generateTableColumns(session, n));
+ }
+
+ /**
+ * Gets a list of columnMetadata for a schema/table
+ * @param session contains query specific meta-information
+ * @param schemaTableName the schema/table for which to get columns
+ * @return the columnMetadata's corresponding to a schema/table
+ */
+ public List listColumnMetadata(ConnectorSession session, SchemaTableName schemaTableName){
+ return getTableColumns(session, schemaTableName).stream().map(x -> new ColumnMetadata(x.getColumnName(), x.getColumnType())).collect(Collectors.toList());
+ }
+
+ /**
+ * Gets a map of columnName -> columnHandles for a schema/table
+ * @param session contains query specific meta-information
+ * @param tableName the schema/table for which to get columns
+ * @return the columnName -> columnHandle corresponding to a schema/table
+ */
+ public Map getMappedColumnHandlesForSchemaTable(ConnectorSession session, SchemaTableName tableName){
+ return getTableColumns(session, tableName).stream().collect(Collectors.toMap(BaseColumnHandle::getColumnName, Function.identity()));
+ }
+
+ /**
+ * attempts to get a recordBatch from custom logic. If that returns an EMPTY_BATCH, hits source
+ * @param query the request for data
+ * @return the recordBatch
+ */
+ public BaseRecordBatch getRecordBatchForQuery(BaseQuery query){
+ if(config.getCacheEnabled()){
+ try {
+ return recordCache.get(query);
+ }catch (ExecutionException e){
+ e.printStackTrace();
+ }
+ }
+ return getRecordBatchForQueryFromSource(query);
+ }
+
+ /**
+ * Gets a recordBatch corresponding to the state of a given query
+ * @param key the query that represents a request for data
+ * @return the recordBatch corresponding to the state of the given query
+ */
+ public abstract BaseRecordBatch getRecordBatchForQueryFromSource(BaseQuery key);
+
+ public abstract List generateTableColumns(ConnectorSession session, SchemaTableName tableName);
+
+ public abstract List listTableNames(ConnectorSession session, Optional schemaName);
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java
new file mode 100644
index 000000000000..80ada80b35d3
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseQuery.java
@@ -0,0 +1,76 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.google.common.base.MoreObjects;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.List;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 7/20/16.
+ */
+public class BaseQuery {
+ private final BaseSplit baseSplit;
+ private final ConnectorSession connectorSession;
+ private final List extends ColumnHandle> desiredColumns;
+ private final String id;
+
+ public BaseQuery(BaseSplit baseSplit, ConnectorSession connectorSession, List extends ColumnHandle> desiredColumns) {
+ this.baseSplit = requireNonNull(baseSplit, "baseSplit is null");
+ this.connectorSession = requireNonNull(connectorSession, "connectorSession is null");
+ this.desiredColumns = requireNonNull(desiredColumns, "desiredColumns is null");
+ this.id = UUID.randomUUID().toString();
+ }
+
+ public BaseSplit getBaseSplit() {
+ return baseSplit;
+ }
+
+ public ConnectorSession getConnectorSession() {
+ return connectorSession;
+ }
+
+ public List extends ColumnHandle> getDesiredColumns() {
+ return desiredColumns;
+ }
+
+ public String getId(){
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(o == this){
+ return true;
+ }
+ if(o instanceof BaseQuery){
+ BaseQuery baseQuery = (BaseQuery) o;
+ return new EqualsBuilder()
+ .append(baseSplit.getTableName(), baseQuery.baseSplit.getTableName())
+ .append(baseSplit.getPredicates(), baseQuery.baseSplit.getPredicates())
+ .isEquals();
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(baseSplit.getTableName())
+ .append(baseSplit.getPredicates())
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("tableName", baseSplit.getTableName())
+ .add("predicates", baseSplit.getPredicates())
+ .toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java
new file mode 100644
index 000000000000..ad0d460142c8
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordCursor.java
@@ -0,0 +1,119 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.baseplugin.cache.BaseRecord;
+import com.facebook.presto.baseplugin.cache.BaseRecordBatch;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public final class BaseRecordCursor implements RecordCursor {
+ private final BaseQuery baseQuery;
+ private final BaseProvider baseProvider;
+
+ private BaseRecordBatch currentBatch;
+ private BaseRecord currentRecord;
+ //use index instead of iterator to support concurrency better
+ private int index;
+
+ private boolean finishedFetching;
+
+ public BaseRecordCursor(BaseQuery baseQuery, BaseProvider baseProvider) {
+ this.baseQuery = requireNonNull(baseQuery, "baseQuery is null");
+ this.baseProvider = requireNonNull(baseProvider, "baseProvider is null");
+ this.currentBatch = BaseRecordBatch.EMPTY_BATCH;
+ this.index = 0;
+
+ this.finishedFetching = currentBatch.isLastBatch();
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return 0;
+ }
+
+ @Override
+ public long getCompletedBytes() {
+ return 0;
+ }
+
+ @Override
+ public long getReadTimeNanos() {
+ return 0;
+ }
+
+ @Override
+ public Type getType(int field) {
+ return getColumnAtPosition(field).getColumnType();
+ }
+
+ @Override
+ public boolean advanceNextPosition(){
+ while(true) {
+ if (index < currentBatch.getBaseRecords().size()) {
+ currentRecord = currentBatch.getBaseRecords().get(index);
+ index++;
+ return true;
+ } else {
+ if (finishedFetching) {
+ return false;
+ } else {//this is the fetching piece of the data
+ try {
+ currentBatch = baseProvider.getConfig().getCacheEnabled(Optional.of(baseQuery.getConnectorSession())) ? baseProvider.getRecordCache().get(baseQuery) : baseProvider.getRecordBatchForQuery(baseQuery);
+ }catch (ExecutionException e){
+ e.printStackTrace();
+ currentBatch = baseProvider.getRecordBatchForQuery(baseQuery);
+ }
+ index = 0;
+ finishedFetching = currentBatch.isLastBatch();
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int field){
+ return currentRecord.getBoolean(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public long getLong(int field){
+ return currentRecord.getLong(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public double getDouble(int field){
+ return currentRecord.getDouble(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public Slice getSlice(int field){
+ return currentRecord.getSlice(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public Object getObject(int field){
+ return currentRecord.getObject(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public boolean isNull(int field){
+ return currentRecord.isNull(getColumnAtPosition(field), field);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private BaseColumnHandle getColumnAtPosition(int field){
+ return ((BaseColumnHandle) baseQuery.getDesiredColumns().get(field));
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java
new file mode 100644
index 000000000000..f9259a8c88b2
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSet.java
@@ -0,0 +1,41 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.type.Type;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseRecordSet implements RecordSet {
+ private final BaseQuery baseQuery;
+ private final BaseProvider baseProvider;
+
+ public BaseRecordSet(BaseQuery baseQuery, BaseProvider baseProvider) {
+ this.baseQuery = requireNonNull(baseQuery, "baseQuery is null");
+ this.baseProvider = requireNonNull(baseProvider, "baseProvider is null");
+ }
+
+ public BaseQuery getBaseQuery(){
+ return baseQuery;
+ }
+
+ public BaseProvider getBaseProvider() {
+ return baseProvider;
+ }
+
+ @Override
+ public List getColumnTypes() {
+ return baseQuery.getDesiredColumns().stream().map(x -> ((BaseColumnHandle)x).getColumnType()).collect(Collectors.toList());
+ }
+
+ @Override
+ public RecordCursor cursor() {
+ return new BaseRecordCursor(baseQuery, baseProvider);
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java
new file mode 100644
index 000000000000..f28668a518f2
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseRecordSetProvider.java
@@ -0,0 +1,34 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseRecordSetProvider implements ConnectorRecordSetProvider {
+ private final BaseProvider baseProvider;
+
+ @Inject
+ public BaseRecordSetProvider(BaseProvider baseProvider){
+ this.baseProvider = requireNonNull(baseProvider, "baseProvider is null");
+ }
+
+ public BaseProvider getBaseProvider() {
+ return baseProvider;
+ }
+
+ @Override
+ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List extends ColumnHandle> columns){
+ return new BaseRecordSet(new BaseQuery((BaseSplit) split, session, columns), baseProvider);
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java
new file mode 100644
index 000000000000..486e24ce23a2
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplit.java
@@ -0,0 +1,79 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.baseplugin.predicate.BasePredicate;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseSplit implements ConnectorSplit {
+ private final List addresses;
+ private final SchemaTableName tableName;
+ private List predicates;
+
+ @JsonCreator
+ public BaseSplit(
+ @JsonProperty("addresses") List addresses,
+ @JsonProperty("tableName") SchemaTableName tableName,
+ @JsonProperty("predicates") List predicates)
+ {
+ this.addresses = requireNonNull(addresses, "address is null");
+ this.tableName = requireNonNull(tableName, "tableName is null");
+ this.predicates = requireNonNull(predicates, "predicates is null");
+ }
+
+ @JsonProperty
+ public SchemaTableName getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public List getPredicates()
+ {
+ return predicates;
+ }
+
+ public BaseSplit setPredicates(List predicates) {
+ this.predicates = predicates;
+ return this;
+ }
+
+ @JsonProperty
+ @Override
+ public List getAddresses()
+ {
+ return addresses;
+ }
+
+ @Override
+ public boolean isRemotelyAccessible()
+ {
+ return true;
+ }
+
+ @Override
+ public Object getInfo()
+ {
+ return this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("addresses", addresses)
+ .add("tableName", tableName)
+ .add("predicates", predicates)
+ .toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java
new file mode 100644
index 000000000000..d2b1739c612b
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseSplitManager.java
@@ -0,0 +1,140 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.baseplugin.predicate.BaseComparisonOperator;
+import com.facebook.presto.baseplugin.predicate.BasePredicate;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseSplitManager implements ConnectorSplitManager {
+ private final NodeManager nodeManager;
+ private final BaseConnectorInfo baseConnectorInfo;
+ private final BaseProvider baseProvider;
+
+ private final Logger logger = Logger.get(BaseSplitManager.class);
+
+ @Inject
+ public BaseSplitManager(NodeManager nodeManager, BaseConnectorInfo baseConnectorInfo, BaseProvider baseProvider)
+ {
+ this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
+ this.baseConnectorInfo = requireNonNull(baseConnectorInfo, "baseConnectorInfo is null");
+ this.baseProvider = requireNonNull(baseProvider, "baseProvider is null");
+ }
+
+ /**
+ * breaks up the predicate across the splits depending on the predicate characteristics
+ * @param predicate the predicate to break up
+ * @param splits the fixed list of splits to break up the predicate across
+ */
+ public void allocatePredicateToSplits(BasePredicate predicate, List extends BaseSplit> splits)
+ {
+ if(!splits.isEmpty()) {
+ switch (predicate.getBaseComparisonOperator()) {
+ case IN:
+ int partitionSize = (int) Math.ceil(((double) predicate.getValues().size()) / splits.size());
+ logger.info("Partition size of %s using %s number of splits for predicate with %s number of values", partitionSize, splits.size(), predicate.getValues().size());
+ List predicateList = Lists.partition(
+ predicate.getValues(),
+ partitionSize
+ ).stream().map(x -> new BasePredicate(predicate.getBaseColumnHandle(), BaseComparisonOperator.IN, x)).collect(Collectors.toList());
+ logger.info("Broke original predicate into %s number of predicates", predicateList.size());
+ //splits.forEach(x -> x.getPredicates().add(predicateIterator.next()));
+ int splitIndex = 0;
+ Iterator predicateIterator = predicateList.iterator();
+ while (predicateIterator.hasNext()) {
+ splits.get(splitIndex).getPredicates().add(predicateIterator.next());
+ if(splits.size() - 1 > splitIndex) {
+ splitIndex++;
+ }
+ }
+ break;
+ case BETWEEN:
+ Double start = ((Number) predicate.getValues().get(0)).doubleValue();
+ Double end = ((Number) predicate.getValues().get(1)).doubleValue();
+ Double incrementSize = (end - start) / splits.size();
+ for (BaseSplit split : splits) {
+ split.getPredicates().add(new BasePredicate(predicate.getBaseColumnHandle(), BaseComparisonOperator.BETWEEN, ImmutableList.of(start, start + incrementSize)));
+ start += incrementSize;
+ }
+ break;
+ default:
+ if (!splits.isEmpty()) {
+ splits.forEach(s -> s.getPredicates().add(predicate));
+ }
+ }
+ }
+ }
+
+
+ @Override
+ /**
+ * returns a list of splits for the coordinator to handle
+ * @return a representation of a source and the splits associated with it
+ * @param transactionHandle contains information about the SQL transaction
+ * @param session contains query specific meta-information
+ * @param layout contains information about the table, predicates, and additional constraint
+ */
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
+ {
+ BaseTableLayoutHandle baseTableLayout = (BaseTableLayoutHandle) layout;
+
+ //creates as many splits as there are active nodes in the cluster. EX: 3 nodes -> 3 splits
+ final List baseSplits = nodeManager.getWorkerNodes().stream()
+ .map(x -> new BaseSplit(ImmutableList.of(x.getHostAndPort()), baseTableLayout.getTable().getSchemaTableName(), new ArrayList<>()))
+ .collect(Collectors.toList());
+
+ //turns the columnDomains from the baseTableLayout into basePredicates
+ List basePredicates = baseTableLayout.getSummary().getColumnDomains().get().stream()
+ .map(BasePredicate::fromColumnDomain)
+ .collect(Collectors.toList());
+
+ //modify list of predicates going into be split
+ basePredicates = updateBasePredicates(basePredicates, baseTableLayout.getTable(), baseProvider, session);
+
+ //allocates predicates to splits based on whether predicates have finite bounds or not
+ basePredicates.forEach(p -> allocatePredicateToSplits(p, baseSplits));
+
+ //allows updates to predicate-filled splits before they're sent back to coordinator
+ return new FixedSplitSource(updateBaseSplits(baseSplits, baseProvider, session));
+ }
+
+ /**
+ * Should override this method
+ * @param basePredicates
+ * @param tableHandle
+ * @param baseProvider
+ * @param session
+ * @return
+ */
+ public List updateBasePredicates(List basePredicates, BaseTableHandle tableHandle, BaseProvider baseProvider, ConnectorSession session){
+ return basePredicates;
+ }
+
+ /**
+ * Should override this method
+ * @param baseSplits
+ * @param baseProvider
+ * @return
+ */
+ public List updateBaseSplits(List baseSplits, BaseProvider baseProvider, ConnectorSession session){
+ return baseSplits;
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java
new file mode 100644
index 000000000000..be23565cd4b8
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableHandle.java
@@ -0,0 +1,59 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseTableHandle implements ConnectorTableHandle {
+ private final SchemaTableName schemaTableName;
+
+ @JsonCreator
+ public BaseTableHandle(
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName
+ )
+ {
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName()
+ {
+ return schemaTableName;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BaseTableHandle that = (BaseTableHandle) o;
+ return Objects.equals(schemaTableName, that.schemaTableName);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(schemaTableName);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("schemaTableName", schemaTableName)
+ .toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java
new file mode 100644
index 000000000000..8c771a12204e
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTableLayoutHandle.java
@@ -0,0 +1,44 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseTableLayoutHandle implements ConnectorTableLayoutHandle {
+ private final BaseTableHandle table;
+ private final TupleDomain summary;
+
+ @JsonCreator
+ public BaseTableLayoutHandle(
+ @JsonProperty("table") BaseTableHandle table,
+ @JsonProperty("summary") TupleDomain summary
+ ){
+ this.table = requireNonNull(table, "table is null");
+ this.summary = requireNonNull(summary, "summary is null");
+ }
+
+ @JsonProperty
+ public BaseTableHandle getTable()
+ {
+ return table;
+ }
+
+ @JsonProperty
+ public TupleDomain getSummary()
+ {
+ return summary;
+ }
+
+ @Override
+ public String toString()
+ {
+ return table.toString();
+ }
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java
new file mode 100644
index 000000000000..9957307009ee
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseTransactionHandle.java
@@ -0,0 +1,10 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public enum BaseTransactionHandle implements ConnectorTransactionHandle {
+ INSTANCE
+}
diff --git a/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java
new file mode 100644
index 000000000000..b7527d5c7189
--- /dev/null
+++ b/presto-base-plugin-nosql/src/main/java/com/facebook/presto/baseplugin/BaseUtils.java
@@ -0,0 +1,75 @@
+package com.facebook.presto.baseplugin;
+
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DoubleType;
+import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.VarcharType;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Created by amehta on 6/13/16.
+ */
+public class BaseUtils {
+ private BaseUtils()
+ {}
+
+ public static T getValueAsType(Object value, Class clazz){
+ String v = value.toString();
+ if (clazz == String.class){
+ value = v;
+ } else if (clazz == Double.class) {
+ value = Double.parseDouble(v);
+ }
+ else if (clazz == Integer.class) {
+ value = Integer.parseInt(v);
+ }
+ else if (clazz == Long.class) {
+ try {
+ value = Long.parseLong(v);
+ }catch (NumberFormatException e){// in the event of a date string
+ value = LocalDateTime.parse(v, DateTimeFormatter.ISO_OFFSET_DATE_TIME).toEpochSecond(ZoneOffset.UTC);
+ }
+ }
+ else if (clazz == Boolean.class) {
+ value = Boolean.parseBoolean(v);
+ }
+ return clazz.cast(value);
+ }
+
+ public static T getPropertyFromMap(String propertyName, Class clazz, Map