Skip to content

Commit

Permalink
DRILL-8259: Supports advanced HBase persistence storage options
Browse files Browse the repository at this point in the history
  • Loading branch information
luocooong committed Jul 13, 2022
1 parent 2cb1cec commit 8a6e8d8
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* This is a category used to mark unit tests that test the HBase storage plugin.
*/
public interface HbaseStorageTest {
public interface HBaseStorageTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.common.types.Types;

public interface DrillHBaseConstants {

String ROW_KEY = "row_key";

SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
Expand All @@ -38,4 +39,8 @@ public interface DrillHBaseConstants {
String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";

String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";

String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config";

String SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG = "drill.exec.sys.store.provider.hbase.column_config";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.store.hbase.config;

import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME;

import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -70,7 +70,7 @@ public PersistentStoreMode getMode() {
public boolean contains(String key) {
try {
Get get = new Get(row(key));
get.addColumn(FAMILY, QUALIFIER);
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
return hbaseTable.exists(get);
} catch (IOException e) {
throw UserException
Expand All @@ -82,13 +82,13 @@ public boolean contains(String key) {

@Override
public V get(String key) {
return get(key, FAMILY);
return get(key, FAMILY_NAME);
}

protected synchronized V get(String key, byte[] family) {
try {
Get get = new Get(row(key));
get.addColumn(family, QUALIFIER);
get.addColumn(family, QUALIFIER_NAME);
Result r = hbaseTable.get(get);
if(r.isEmpty()){
return null;
Expand All @@ -103,13 +103,13 @@ protected synchronized V get(String key, byte[] family) {

@Override
public void put(String key, V value) {
put(key, FAMILY, value);
put(key, FAMILY_NAME, value);
}

protected synchronized void put(String key, byte[] family, V value) {
try {
Put put = new Put(row(key));
put.addColumn(family, QUALIFIER, bytes(value));
put.addColumn(family, QUALIFIER_NAME, bytes(value));
hbaseTable.put(put);
} catch (IOException e) {
throw UserException.dataReadError(e)
Expand All @@ -122,8 +122,8 @@ protected synchronized void put(String key, byte[] family, V value) {
public synchronized boolean putIfAbsent(String key, V value) {
try {
Put put = new Put(row(key));
put.addColumn(FAMILY, QUALIFIER, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put);
put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put);
} catch (IOException e) {
throw UserException.dataReadError(e)
.message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName)
Expand Down Expand Up @@ -183,7 +183,7 @@ private class Iter implements Iterator<Entry<String, V>> {
Iter(int take) {
try {
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
scan.addColumn(FAMILY, QUALIFIER);
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.setCaching(Math.min(take, 100));
scan.setBatch(take); // set batch size
scanner = hbaseTable.getScanner(scan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,116 +20,233 @@
import java.io.IOException;
import java.util.Map;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;

public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);

static final byte[] FAMILY = Bytes.toBytes("s");
public static final byte[] FAMILY_NAME = Bytes.toBytes("s");

public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d");

static final byte[] QUALIFIER = Bytes.toBytes("d");
private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client";

private final TableName hbaseTableName;

private Table hbaseTable;

private Configuration hbaseConf;

private Connection connection;
private final Map<String, Object> tableConfig;

private Table hbaseTable;
private final Map<String, Object> columnConfig;

private Connection connection;

@SuppressWarnings("unchecked")
public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
@SuppressWarnings("unchecked")
final Map<String, Object> config = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
this.hbaseConf = HBaseConfiguration.create();
this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client");
if (config != null) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
final Map<String, Object> hbaseConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG)) {
tableConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG);
} else {
tableConfig = Maps.newHashMap();
}
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG)) {
columnConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG);
} else {
columnConfig = Maps.newHashMap();
}
hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, HBASE_CLIENT_ID);
if (hbaseConfig != null) {
for (Map.Entry<String, Object> entry : hbaseConfig.entrySet()) {
hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
}
}
this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE));
logger.info("Received the hbase config is {}", hbaseConfig);
if (!tableConfig.isEmpty()) {
logger.info("Received the table config is {}", tableConfig);
}
if (!columnConfig.isEmpty()) {
logger.info("Received the column config is {}", columnConfig);
}
hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE));
}

@VisibleForTesting
public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
this.tableConfig = Maps.newHashMap();
this.columnConfig = Maps.newHashMap();
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
}


@VisibleForTesting
public HBasePersistentStoreProvider(Map<String, Object> tableConfig, Map<String, Object> columnConfig, Configuration conf, String storeTableName) {
this.tableConfig = tableConfig;
this.columnConfig = columnConfig;
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
}

@Override
public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
switch(config.getMode()){
switch (config.getMode()) {
case BLOB_PERSISTENT:
case PERSISTENT:
return new HBasePersistentStore<>(config, this.hbaseTable);

return new HBasePersistentStore<>(config, hbaseTable);
default:
throw new IllegalStateException();
throw new IllegalStateException("Unknown persistent mode");
}
}


@Override
public void start() throws IOException {
// Create the column family builder
ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(FAMILY_NAME)
.setMaxVersions(1);
// Append the config to column family
verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder);
// Create the table builder
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder
.newBuilder(hbaseTableName)
.setColumnFamily(columnFamilyBuilder.build());
// Append the config to table
verifyAndSetTableConfig(tableConfig, tableBuilder);
this.connection = ConnectionFactory.createConnection(hbaseConf);

try(Admin admin = connection.getAdmin()) {
if (!admin.tableExists(hbaseTableName)) {
HTableDescriptor desc = new HTableDescriptor(hbaseTableName);
desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
admin.createTable(desc);
// Go to create the table
admin.createTable(tableBuilder.build());
logger.info("The HBase table of persistent store created : {}", hbaseTableName);
} else {
HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName);
if (!desc.hasFamily(FAMILY)) {
TableDescriptor table = admin.getDescriptor(hbaseTableName);
if (!admin.isTableEnabled(hbaseTableName)) {
admin.enableTable(hbaseTableName); // In case the table is disabled
}
if (!table.hasColumnFamily(FAMILY_NAME)) {
throw new DrillRuntimeException("The HBase table " + hbaseTableName
+ " specified as persistent store exists but does not contain column family: "
+ (Bytes.toString(FAMILY)));
+ (Bytes.toString(FAMILY_NAME)));
}
logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName);
}
}

this.hbaseTable = connection.getTable(hbaseTableName);
}

@Override
public synchronized void close() {
if (this.hbaseTable != null) {
try {
this.hbaseTable.close();
this.hbaseTable = null;
} catch (IOException e) {
logger.warn("Caught exception while closing HBase table.", e);
/**
* Verify the configuration of HBase table and
* add them to the table builder.
* @param config Received the table config
* @param builder HBase table builder
*/
private void verifyAndSetTableConfig(Map<String, Object> config, TableDescriptorBuilder builder) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
switch (entry.getKey().toUpperCase()) {
case TableDescriptorBuilder.DURABILITY:
Durability durability = Durability.valueOf(((String) entry.getValue()).toUpperCase());
builder.setDurability(durability);
break;
case TableDescriptorBuilder.COMPACTION_ENABLED:
builder.setCompactionEnabled((Boolean) entry.getValue());
break;
case TableDescriptorBuilder.SPLIT_ENABLED:
builder.setSplitEnabled((Boolean) entry.getValue());
break;
case TableDescriptorBuilder.FLUSH_POLICY:
builder.setFlushPolicyClassName((String) entry.getValue());
break;
case TableDescriptorBuilder.SPLIT_POLICY:
builder.setRegionSplitPolicyClassName((String) entry.getValue());
break;
case TableDescriptorBuilder.MAX_FILESIZE:
builder.setMaxFileSize((Integer) entry.getValue());
break;
case TableDescriptorBuilder.MEMSTORE_FLUSHSIZE:
builder.setMemStoreFlushSize((Integer) entry.getValue());
break;
default:
break;
}
}
if (this.connection != null && !this.connection.isClosed()) {
try {
this.connection.close();
} catch (IOException e) {
logger.warn("Caught exception while closing HBase connection.", e);
}

/**
* Verify the configuration of HBase column family and
* add them to the column family builder.
* @param config Received the column config
* @param builder HBase column family builder
*/
private void verifyAndSetColumnConfig(Map<String, Object> config, ColumnFamilyDescriptorBuilder builder) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
switch (entry.getKey().toUpperCase()) {
case ColumnFamilyDescriptorBuilder.MAX_VERSIONS:
builder.setMaxVersions((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.TTL:
builder.setTimeToLive((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.COMPRESSION:
Algorithm algorithm = Algorithm.valueOf(((String) entry.getValue()).toUpperCase());
builder.setCompressionType(algorithm);
break;
case ColumnFamilyDescriptorBuilder.BLOCKCACHE:
builder.setBlockCacheEnabled((Boolean) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.BLOCKSIZE:
builder.setBlocksize((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING:
DataBlockEncoding encoding = DataBlockEncoding.valueOf(((String) entry.getValue()).toUpperCase());
builder.setDataBlockEncoding(encoding);
break;
case ColumnFamilyDescriptorBuilder.IN_MEMORY:
builder.setInMemory((Boolean) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.DFS_REPLICATION:
builder.setDFSReplication(((Integer) entry.getValue()).shortValue());
break;
default:
break;
}
this.connection = null;
}
}

@Override
public synchronized void close() {
if (hbaseTable != null) {
AutoCloseables.closeSilently(hbaseTable);
}
if (connection != null && !connection.isClosed()) {
AutoCloseables.closeSilently(connection);
}
logger.info("The HBase connection of persistent store closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
*/
package org.apache.drill.hbase;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.apache.drill.test.TestBuilder.mapOf;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class HBaseRecordReaderTest extends BaseHBaseTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HBaseTestsSuite extends BaseTest {

private static Configuration conf;

private static Connection conn;
protected static Connection conn;
private static Admin admin;

private static HBaseTestingUtility UTIL;
Expand Down
Loading

0 comments on commit 8a6e8d8

Please sign in to comment.