Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8259: Supports advanced HBase persistence storage options #2596

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* This is a category used to mark unit tests that test the HBase storage plugin.
*/
public interface HbaseStorageTest {
public interface HBaseStorageTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.common.types.Types;

public interface DrillHBaseConstants {

String ROW_KEY = "row_key";

SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
Expand All @@ -35,7 +36,15 @@ public interface DrillHBaseConstants {

MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);

String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace";

String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";

String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family";

String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";

String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config";

String SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG = "drill.exec.sys.store.provider.hbase.column_config";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
*/
package org.apache.drill.exec.store.hbase.config;

import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME;

import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -48,17 +47,19 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> {
private final String hbaseTableName;

private final String tableName;
private final byte[] familyName;
private final byte[] tableNameStartKey;
private final byte[] tableNameStopKey;

public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) {
public HBasePersistentStore(PersistentStoreConfig<V> config, Table table, byte[] family) {
this.tableName = config.getName() + '\0';
this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
this.tableNameStopKey = this.tableNameStartKey.clone();
this.tableNameStopKey[tableNameStartKey.length-1] = 1;
this.config = config;
this.hbaseTable = table;
this.hbaseTableName = table.getName().getNameAsString();
this.familyName = family;
}

@Override
Expand All @@ -70,7 +71,7 @@ public PersistentStoreMode getMode() {
public boolean contains(String key) {
try {
Get get = new Get(row(key));
get.addColumn(FAMILY, QUALIFIER);
get.addColumn(familyName, QUALIFIER_NAME);
return hbaseTable.exists(get);
} catch (IOException e) {
throw UserException
Expand All @@ -82,13 +83,13 @@ public boolean contains(String key) {

@Override
public V get(String key) {
return get(key, FAMILY);
return get(key, familyName);
}

protected synchronized V get(String key, byte[] family) {
try {
Get get = new Get(row(key));
get.addColumn(family, QUALIFIER);
get.addColumn(family, QUALIFIER_NAME);
Result r = hbaseTable.get(get);
if(r.isEmpty()){
return null;
Expand All @@ -103,13 +104,13 @@ protected synchronized V get(String key, byte[] family) {

@Override
public void put(String key, V value) {
put(key, FAMILY, value);
put(key, familyName, value);
}

protected synchronized void put(String key, byte[] family, V value) {
try {
Put put = new Put(row(key));
put.addColumn(family, QUALIFIER, bytes(value));
put.addColumn(family, QUALIFIER_NAME, bytes(value));
hbaseTable.put(put);
} catch (IOException e) {
throw UserException.dataReadError(e)
Expand All @@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) {
public synchronized boolean putIfAbsent(String key, V value) {
try {
Put put = new Put(row(key));
put.addColumn(FAMILY, QUALIFIER, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put);
put.addColumn(familyName, QUALIFIER_NAME, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put);
} catch (IOException e) {
throw UserException.dataReadError(e)
.message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName)
Expand Down Expand Up @@ -183,7 +184,7 @@ private class Iter implements Iterator<Entry<String, V>> {
Iter(int take) {
try {
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
scan.addColumn(FAMILY, QUALIFIER);
scan.addColumn(familyName, QUALIFIER_NAME);
scan.setCaching(Math.min(take, 100));
scan.setBatch(take); // set batch size
scanner = hbaseTable.getScanner(scan);
Expand Down
Loading