Skip to content

Commit

Permalink
Add the column family and namespace options
Browse files Browse the repository at this point in the history
  • Loading branch information
luocooong committed Jul 25, 2022
1 parent 8a6e8d8 commit 6186251
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ public interface DrillHBaseConstants {

MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);

String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace";

String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";

String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family";

String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";

String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.hbase.config;

import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME;

import java.io.IOException;
Expand Down Expand Up @@ -48,17 +47,19 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> {
private final String hbaseTableName;

private final String tableName;
private final byte[] familyName;
private final byte[] tableNameStartKey;
private final byte[] tableNameStopKey;

public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) {
public HBasePersistentStore(PersistentStoreConfig<V> config, Table table, byte[] family) {
this.tableName = config.getName() + '\0';
this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
this.tableNameStopKey = this.tableNameStartKey.clone();
this.tableNameStopKey[tableNameStartKey.length-1] = 1;
this.config = config;
this.hbaseTable = table;
this.hbaseTableName = table.getName().getNameAsString();
this.familyName = family;
}

@Override
Expand All @@ -70,7 +71,7 @@ public PersistentStoreMode getMode() {
public boolean contains(String key) {
try {
Get get = new Get(row(key));
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
get.addColumn(familyName, QUALIFIER_NAME);
return hbaseTable.exists(get);
} catch (IOException e) {
throw UserException
Expand All @@ -82,7 +83,7 @@ public boolean contains(String key) {

@Override
public V get(String key) {
return get(key, FAMILY_NAME);
return get(key, familyName);
}

protected synchronized V get(String key, byte[] family) {
Expand All @@ -103,7 +104,7 @@ protected synchronized V get(String key, byte[] family) {

@Override
public void put(String key, V value) {
put(key, FAMILY_NAME, value);
put(key, familyName, value);
}

protected synchronized void put(String key, byte[] family, V value) {
Expand All @@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) {
public synchronized boolean putIfAbsent(String key, V value) {
try {
Put put = new Put(row(key));
put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put);
put.addColumn(familyName, QUALIFIER_NAME, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put);
} catch (IOException e) {
throw UserException.dataReadError(e)
.message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName)
Expand Down Expand Up @@ -183,7 +184,7 @@ private class Iter implements Iterator<Entry<String, V>> {
Iter(int take) {
try {
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.addColumn(familyName, QUALIFIER_NAME);
scan.setCaching(Math.min(take, 100));
scan.setBatch(take); // set batch size
scanner = hbaseTable.getScanner(scan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@
public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);

public static final byte[] FAMILY_NAME = Bytes.toBytes("s");
public static final byte[] DEFAULT_FAMILY_NAME = Bytes.toBytes("s");

public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d");

private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client";

private final TableName hbaseTableName;

private final byte[] family;

private Table hbaseTable;

private Configuration hbaseConf;
Expand Down Expand Up @@ -94,7 +96,19 @@ public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
if (!columnConfig.isEmpty()) {
logger.info("Received the column config is {}", columnConfig);
}
hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE));
String tableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE)) {
String namespaceStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE);
hbaseTableName = TableName.valueOf(namespaceStr.concat(":").concat(tableName));
} else {
hbaseTableName = TableName.valueOf(tableName);
}
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY)) {
String familyStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY);
family = Bytes.toBytes(familyStr);
} else { // The default name
family = DEFAULT_FAMILY_NAME;
}
}

@VisibleForTesting
Expand All @@ -103,6 +117,7 @@ public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
this.columnConfig = Maps.newHashMap();
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
this.family = DEFAULT_FAMILY_NAME;
}

@VisibleForTesting
Expand All @@ -111,14 +126,15 @@ public HBasePersistentStoreProvider(Map<String, Object> tableConfig, Map<String,
this.columnConfig = columnConfig;
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
this.family = DEFAULT_FAMILY_NAME;
}

@Override
public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
switch (config.getMode()) {
case BLOB_PERSISTENT:
case PERSISTENT:
return new HBasePersistentStore<>(config, hbaseTable);
return new HBasePersistentStore<>(config, hbaseTable, family);
default:
throw new IllegalStateException("Unknown persistent mode");
}
Expand All @@ -128,7 +144,7 @@ public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config)
public void start() throws IOException {
// Create the column family builder
ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(FAMILY_NAME)
.newBuilder(family)
.setMaxVersions(1);
// Append the config to column family
verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder);
Expand All @@ -149,10 +165,10 @@ public void start() throws IOException {
if (!admin.isTableEnabled(hbaseTableName)) {
admin.enableTable(hbaseTableName); // In case the table is disabled
}
if (!table.hasColumnFamily(FAMILY_NAME)) {
if (!table.hasColumnFamily(family)) {
throw new DrillRuntimeException("The HBase table " + hbaseTableName
+ " specified as persistent store exists but does not contain column family: "
+ (Bytes.toString(FAMILY_NAME)));
+ (Bytes.toString(family)));
}
logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public void testStoreTableAttributes() throws Exception {
assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE);
assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE);
// Column Family verify
assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME));
ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME);
assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME));
ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME);
assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS);
assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL);
// TODO native snappy* library not available
Expand Down

0 comments on commit 6186251

Please sign in to comment.