Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to set alternate location for the hive table. #8

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/main/java/cascading/tap/hive/HivePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import cascading.tap.partition.DelimitedPartition;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;

/**
* Implements a Hive compatible Partition for Cascading.
Expand Down Expand Up @@ -60,8 +62,13 @@ public HivePartition( Fields partitionFields )
Partition toHivePartition( String partitionString, HiveTableDescriptor tableDescriptor )
{
int now = (int) ( System.currentTimeMillis() / 1000 );
//Set the correct location in the storage descriptor
StorageDescriptor sd = tableDescriptor.toHiveTable().getSd();
if( sd.getLocation() != null )
sd.setLocation( sd.getLocation() + "/" + partitionString );

return new Partition( Arrays.asList( parse( partitionString ) ), tableDescriptor.getDatabaseName(),
tableDescriptor.getTableName(), now, now, tableDescriptor.toHiveTable().getSd(),
tableDescriptor.getTableName(), now, now, sd,
new HashMap<String, String>() );
}

Expand Down
55 changes: 43 additions & 12 deletions src/main/java/cascading/tap/hive/HiveTableDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.partition.Partition;
import cascading.tuple.Fields;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
Expand Down Expand Up @@ -85,6 +88,9 @@ public class HiveTableDescriptor implements Serializable
/** Hive serialization library */
private String serializationLib;

/** Optional alternate location of the table */
private String location = null;

/**
* Constructs a new HiveTableDescriptor object.
*
Expand All @@ -95,7 +101,7 @@ public class HiveTableDescriptor implements Serializable
public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes )
{
this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, new String[]{}, HIVE_DEFAULT_DELIMITER,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}

/**
Expand All @@ -109,7 +115,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col
public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys )
{
this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}

/**
Expand All @@ -125,7 +131,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col
public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys, String delimiter )
{
this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, partitionKeys, delimiter,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}


Expand All @@ -140,7 +146,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col
public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes )
{
this( databaseName, tableName, columnNames, columnTypes, new String[]{}, HIVE_DEFAULT_DELIMITER,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}

/**
Expand All @@ -154,7 +160,8 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu
*/
public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys )
{
this( databaseName, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER, HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
this( databaseName, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}


Expand All @@ -173,7 +180,8 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu
public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes,
String[] partitionKeys, String delimiter )
{
this( databaseName, tableName, columnNames, columnTypes, partitionKeys, delimiter, HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
this( databaseName, tableName, columnNames, columnTypes, partitionKeys, delimiter,
HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
}

/**
Expand All @@ -188,7 +196,7 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu
*/
public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes,
String[] partitionKeys, String delimiter,
String serializationLib )
String serializationLib, Path location )
{
if( tableName == null || tableName.isEmpty() )
throw new IllegalArgumentException( "tableName cannot be null or empty" );
Expand All @@ -209,6 +217,15 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu
verifyPartitionKeys();
if( columnNames.length == 0 || columnTypes.length == 0 || columnNames.length != columnTypes.length )
throw new IllegalArgumentException( "columnNames and columnTypes cannot be empty and must have the same size" );

if( location != null )
{
if( !location.isAbsolute() )
throw new IllegalArgumentException( "location must be a fully qualified absolute path" );

// Store as string since path is not serialisable
this.location = location.toString();
}
}

/**
Expand Down Expand Up @@ -254,6 +271,15 @@ public Table toHiveTable()
sd.setSerdeInfo( serDeInfo );
sd.setInputFormat( HIVE_DEFAULT_INPUT_FORMAT_NAME );
sd.setOutputFormat( HIVE_DEFAULT_OUTPUT_FORMAT_NAME );

if ( location != null )
{
table.setTableType( TableType.EXTERNAL_TABLE.toString() );
// Need to set this as well since setting the table type would be too obvious
table.putToParameters( "EXTERNAL", "TRUE" );
sd.setLocation( location.toString() );
}

table.setSd( sd );

if ( isPartitioned() )
Expand Down Expand Up @@ -316,15 +342,16 @@ public Fields toFields()
* Returns the path of the table within the warehouse directory.
* @return The path of the table within the warehouse directory.
*/
public String getFilesystemPath()
public String getLocation( String warehousePath )
{
if ( getDatabaseName().equals( HIVE_DEFAULT_DATABASE_NAME ) )
return getTableName();
if (location != null)
return location.toString();
else if ( getDatabaseName().equals( HIVE_DEFAULT_DATABASE_NAME ) )
return String.format( "%s/%s", warehousePath, getTableName() );
else
return String.format( "%s.db/%s", getDatabaseName(), getTableName() );
return String.format( "%s/%s.db/%s", warehousePath, getDatabaseName(), getTableName() );
}


/**
* Converts the HiveTableDescriptor to a Scheme instance based on the information available.
*
Expand Down Expand Up @@ -397,6 +424,8 @@ public boolean equals( Object object )
return false;
if( tableName != null ? !tableName.equalsIgnoreCase( that.tableName ) : that.tableName != null )
return false;
if( location != null ? !location.equals( that.location ) : that.location != null )
return false;

return true;
}
Expand All @@ -411,6 +440,7 @@ public int hashCode()
result = 31 * result + ( columnNames != null ? arraysHashCodeCaseInsensitive( columnNames ) : 0 );
result = 31 * result + ( columnTypes != null ? arraysHashCodeCaseInsensitive( columnTypes ) : 0 );
result = 31 * result + ( serializationLib != null ? serializationLib.hashCode() : 0 );
result = 31 * result + ( location != null ? location.hashCode() : 0 );
return result;
}

Expand All @@ -425,6 +455,7 @@ public String toString()
", columnNames=" + Arrays.toString( columnNames ) +
", columnTypes=" + Arrays.toString( columnTypes ) +
", serializationLib='" + serializationLib + '\'' +
( location != null ? ", location='" + location + '\'' : "" ) +
'}';
}

Expand Down
20 changes: 17 additions & 3 deletions src/main/java/cascading/tap/hive/HiveTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import cascading.tap.SinkMode;
import cascading.tap.TapException;
import cascading.tap.hadoop.Hfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand All @@ -41,6 +44,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -172,7 +176,18 @@ public boolean resourceExists( JobConf conf ) throws IOException
throw new HiveTableValidationException( String.format( "expected a table of type '%s' but found '%s'",
tableDescriptor.toHiveTable().getTableType(), table.getTableType() ) );

List<FieldSchema> schemaList = table.getSd().getCols();
// Check that the paths are the same
FileSystem fs = FileSystem.get( conf );
StorageDescriptor sd = table.getSd();
Path expectedPath = fs.makeQualified( new Path( tableDescriptor.getLocation( hiveConf.getVar( ConfVars.METASTOREWAREHOUSE ) ) ) );
Path actualPath = fs.makeQualified( new Path( sd.getLocation() ));

if ( !expectedPath.equals(actualPath) )
throw new HiveTableValidationException( String.format(
"table in MetaStore does not have the sampe path. Expected %s got %s",
expectedPath, actualPath ) );

List<FieldSchema> schemaList = sd.getCols();
if( schemaList.size() != tableDescriptor.getColumnNames().length - tableDescriptor.getPartitionKeys().length )
throw new HiveTableValidationException( String.format(
"table in MetaStore does not have same number of columns. expected %d got %d",
Expand Down Expand Up @@ -369,8 +384,7 @@ private void setFilesystemLocation( )
}
catch( NoSuchObjectException exception )
{
setStringPath( String.format( "%s/%s", hiveConf.get( HiveConf.ConfVars.METASTOREWAREHOUSE.varname ),
tableDescriptor.getFilesystemPath() ) );
setStringPath( tableDescriptor.getLocation( hiveConf.getVar( ConfVars.METASTOREWAREHOUSE ) ) );
}
catch( TException exception )
{
Expand Down
24 changes: 18 additions & 6 deletions src/test/java/cascading/tap/hive/HiveTableDescriptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cascading.scheme.hadoop.TextDelimited;
import cascading.tuple.Fields;
import junit.framework.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void testToSchemeWithCustomDelimiter()
HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable",
new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"}, new String[]{},
delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
Scheme scheme = descriptor.toScheme();
assertNotNull( scheme );
Assert.assertEquals( delim, ( (TextDelimited) scheme ).getDelimiter() );
Expand All @@ -175,7 +176,7 @@ public void testCustomDelimiterInSerdeParameters()
HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable",
new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"}, new String[]{},
delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME );
delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null );
StorageDescriptor sd = descriptor.toHiveTable().getSd();
Map<String, String> expected = new HashMap<String, String>( );
expected.put( "field.delim", delim );
Expand All @@ -190,8 +191,8 @@ public void testToSchemeWithNullDelimiter()
String delim = null;
HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable",
new String[]{"one", "two", "three"},
new String[]{"int", "string",
"boolean"}, new String[]{}, delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME
new String[]{"int", "string", "boolean"}, new String[]{},
delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null
);
Scheme scheme = descriptor.toScheme();
assertNotNull( scheme );
Expand Down Expand Up @@ -237,15 +238,26 @@ public void testGetFilesystempathWithDefaultDB()
{
HiveTableDescriptor descriptor = new HiveTableDescriptor( "myTable", new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"} );
assertEquals( "mytable", descriptor.getFilesystemPath() );
assertEquals( "warehouse/mytable", descriptor.getLocation( "warehouse") );
}

@Test
public void testGetFilesystempathWithCustomDB()
{
HiveTableDescriptor descriptor = new HiveTableDescriptor( "myDB", "myTable", new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"} );
assertEquals( "mydb.db/mytable", descriptor.getFilesystemPath() );
assertEquals( "warehouse/mydb.db/mytable", descriptor.getLocation( "warehouse" ) );
}

@Test
public void testGetFilesystempathWithSpecifiedLocation()
{
HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable",
new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"}, new String[]{},
",", HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, new Path( "file:/custom_path" ) );

assertEquals( "file:/custom_path", descriptor.getLocation( "warehouse" ) );
}
}

18 changes: 18 additions & 0 deletions src/test/java/cascading/tap/hive/HiveTapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cascading.HiveTestCase;
import cascading.scheme.NullScheme;
import cascading.tap.SinkMode;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -137,6 +138,23 @@ public void testResourceExistsStrictModeWithPartitionedTable() throws IOExceptio
assertTrue( tap.resourceExists( new JobConf() ) );
}

@Test(expected = HiveTableValidationException.class)
public void testResourceExistsStrictModeLocationMismatch() throws IOException
{
HiveTableDescriptor desc = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable9",
new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"}, new String[]{},
",", HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, new Path( HIVE_WAREHOUSE_DIR + "/custompath" ) );
HiveTap tap = new HiveTap( desc, new NullScheme() );
tap.createResource( new JobConf() );

HiveTableDescriptor mismatch = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable9",
new String[]{"one", "two", "three"},
new String[]{"int", "string", "boolean"}, new String[]{},
",");
tap = new HiveTap( mismatch, new NullScheme(), SinkMode.REPLACE, true );
tap.resourceExists( new JobConf() );
}

@Test
public void testDeleteRessource() throws Exception
Expand Down