diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java index d6d01c0a407..883614020b8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java @@ -36,8 +36,12 @@ public interface DrillHBaseConstants { MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY); + String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace"; + String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table"; + String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family"; + String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config"; String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config"; diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index f579c6e2022..9cf281cd477 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.hbase.config; -import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME; import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME; import java.io.IOException; @@ -48,10 +47,11 @@ public class HBasePersistentStore extends BasePersistentStore { private final String hbaseTableName; private final String tableName; + private final byte[] familyName; private final byte[] tableNameStartKey; private final byte[] tableNameStopKey; - public HBasePersistentStore(PersistentStoreConfig config, Table table) { + public HBasePersistentStore(PersistentStoreConfig config, Table table, byte[] family) { this.tableName = config.getName() + '\0'; this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" this.tableNameStopKey = this.tableNameStartKey.clone(); @@ -59,6 +59,7 @@ public HBasePersistentStore(PersistentStoreConfig config, Table table) { this.config = config; this.hbaseTable = table; this.hbaseTableName = table.getName().getNameAsString(); + this.familyName = family; } @Override @@ -70,7 +71,7 @@ public PersistentStoreMode getMode() { public boolean contains(String key) { try { Get get = new Get(row(key)); - get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + get.addColumn(familyName, QUALIFIER_NAME); return hbaseTable.exists(get); } catch (IOException e) { throw UserException @@ -82,7 +83,7 @@ public boolean contains(String key) { @Override public V get(String key) { - return get(key, FAMILY_NAME); + return get(key, familyName); } protected synchronized V get(String key, byte[] family) { @@ -103,7 +104,7 @@ protected synchronized V get(String key, byte[] family) { @Override public void put(String key, V value) { - put(key, FAMILY_NAME, value); + put(key, familyName, value); } protected synchronized void put(String key, byte[] family, V value) { @@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) { public synchronized boolean putIfAbsent(String key, V value) { try { Put put = new Put(row(key)); - put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value)); - return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put); + put.addColumn(familyName, QUALIFIER_NAME, bytes(value)); + return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put); } catch (IOException e) { throw UserException.dataReadError(e) .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) @@ -183,7 +184,7 @@ private class Iter implements Iterator> { Iter(int take) { try { Scan scan = new Scan(tableNameStartKey, tableNameStopKey); - scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + scan.addColumn(familyName, QUALIFIER_NAME); scan.setCaching(Math.min(take, 100)); scan.setBatch(take); // set batch size scanner = hbaseTable.getScanner(scan); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java index 626521a98cd..80a3f04c142 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -49,7 +49,7 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); - public static final byte[] FAMILY_NAME = Bytes.toBytes("s"); + public static final byte[] DEFAULT_FAMILY_NAME = Bytes.toBytes("s"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d"); @@ -57,6 +57,8 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private final TableName hbaseTableName; + private final byte[] family; + private Table hbaseTable; private Configuration hbaseConf; @@ -94,7 +96,19 @@ public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { if (!columnConfig.isEmpty()) { logger.info("Received the column config is {}", columnConfig); } - hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); + String tableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE)) { + String namespaceStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE); + hbaseTableName = TableName.valueOf(namespaceStr.concat(":").concat(tableName)); + } else { + hbaseTableName = TableName.valueOf(tableName); + } + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY)) { + String familyStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY); + family = Bytes.toBytes(familyStr); + } else { // The default name + family = DEFAULT_FAMILY_NAME; + } } @VisibleForTesting @@ -103,6 +117,7 @@ public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { this.columnConfig = Maps.newHashMap(); this.hbaseConf = conf; this.hbaseTableName = TableName.valueOf(storeTableName); + this.family = DEFAULT_FAMILY_NAME; } @VisibleForTesting @@ -111,6 +126,7 @@ public HBasePersistentStoreProvider(Map tableConfig, Map PersistentStore getOrCreateStore(PersistentStoreConfig config) switch (config.getMode()) { case BLOB_PERSISTENT: case PERSISTENT: - return new HBasePersistentStore<>(config, hbaseTable); + return new HBasePersistentStore<>(config, hbaseTable, family); default: throw new IllegalStateException("Unknown persistent mode"); } @@ -128,7 +144,7 @@ public PersistentStore getOrCreateStore(PersistentStoreConfig config) public void start() throws IOException { // Create the column family builder ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(FAMILY_NAME) + .newBuilder(family) .setMaxVersions(1); // Append the config to column family verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder); @@ -149,10 +165,10 @@ public void start() throws IOException { if (!admin.isTableEnabled(hbaseTableName)) { admin.enableTable(hbaseTableName); // In case the table is disabled } - if (!table.hasColumnFamily(FAMILY_NAME)) { + if (!table.hasColumnFamily(family)) { throw new DrillRuntimeException("The HBase table " + hbaseTableName + " specified as persistent store exists but does not contain column family: " - + (Bytes.toString(FAMILY_NAME))); + + (Bytes.toString(family))); } logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName); } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 1f32276d056..443bdd0abb0 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -104,8 +104,8 @@ public void testStoreTableAttributes() throws Exception { assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE); assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE); // Column Family verify - assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME)); - ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME); + assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME)); + ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME); assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS); assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL); // TODO native snappy* library not available