From 98fd827d94d3cca3b5f4132a38716357f3268eba Mon Sep 17 00:00:00 2001 From: Stephan Hoermann Date: Wed, 30 Jul 2014 14:43:49 +1000 Subject: [PATCH] Adds ability to set alternate location for the hive table. --- .../cascading/tap/hive/HivePartition.java | 9 ++- .../tap/hive/HiveTableDescriptor.java | 55 +++++++++++++++---- src/main/java/cascading/tap/hive/HiveTap.java | 20 ++++++- .../tap/hive/HiveTableDescriptorTest.java | 24 ++++++-- .../java/cascading/tap/hive/HiveTapTest.java | 18 ++++++ 5 files changed, 104 insertions(+), 22 deletions(-) diff --git a/src/main/java/cascading/tap/hive/HivePartition.java b/src/main/java/cascading/tap/hive/HivePartition.java index 1df8f73..8884710 100644 --- a/src/main/java/cascading/tap/hive/HivePartition.java +++ b/src/main/java/cascading/tap/hive/HivePartition.java @@ -25,7 +25,9 @@ import cascading.tap.partition.DelimitedPartition; import cascading.tuple.Fields; import cascading.tuple.TupleEntry; + import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; /** * Implements a Hive compatible Partition for Cascading. @@ -60,8 +62,13 @@ public HivePartition( Fields partitionFields ) Partition toHivePartition( String partitionString, HiveTableDescriptor tableDescriptor ) { int now = (int) ( System.currentTimeMillis() / 1000 ); + //Set the correct location in the storage descriptor + StorageDescriptor sd = tableDescriptor.toHiveTable().getSd(); + if( sd.getLocation() != null ) + sd.setLocation( sd.getLocation() + "/" + partitionString ); + return new Partition( Arrays.asList( parse( partitionString ) ), tableDescriptor.getDatabaseName(), - tableDescriptor.getTableName(), now, now, tableDescriptor.toHiveTable().getSd(), + tableDescriptor.getTableName(), now, now, sd, new HashMap() ); } diff --git a/src/main/java/cascading/tap/hive/HiveTableDescriptor.java b/src/main/java/cascading/tap/hive/HiveTableDescriptor.java index 0a218cd..8f97266 100644 --- a/src/main/java/cascading/tap/hive/HiveTableDescriptor.java +++ b/src/main/java/cascading/tap/hive/HiveTableDescriptor.java @@ -33,6 +33,9 @@ import cascading.scheme.hadoop.TextDelimited; import cascading.tap.partition.Partition; import cascading.tuple.Fields; + +import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -85,6 +88,9 @@ public class HiveTableDescriptor implements Serializable /** Hive serialization library */ private String serializationLib; + /** Optional alternate location of the table */ + private String location = null; + /** * Constructs a new HiveTableDescriptor object. * @@ -95,7 +101,7 @@ public class HiveTableDescriptor implements Serializable public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes ) { this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, new String[]{}, HIVE_DEFAULT_DELIMITER, - HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } /** @@ -109,7 +115,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys ) { this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER, - HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } /** @@ -125,7 +131,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col public HiveTableDescriptor( String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys, String delimiter ) { this( HIVE_DEFAULT_DATABASE_NAME, tableName, columnNames, columnTypes, partitionKeys, delimiter, - HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } @@ -140,7 +146,7 @@ public HiveTableDescriptor( String tableName, String[] columnNames, String[] col public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes ) { this( databaseName, tableName, columnNames, columnTypes, new String[]{}, HIVE_DEFAULT_DELIMITER, - HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } /** @@ -154,7 +160,8 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu */ public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys ) { - this( databaseName, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER, HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + this( databaseName, tableName, columnNames, columnTypes, partitionKeys, HIVE_DEFAULT_DELIMITER, + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } @@ -173,7 +180,8 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys, String delimiter ) { - this( databaseName, tableName, columnNames, columnTypes, partitionKeys, delimiter, HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + this( databaseName, tableName, columnNames, columnTypes, partitionKeys, delimiter, + HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); } /** @@ -188,7 +196,7 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu */ public HiveTableDescriptor( String databaseName, String tableName, String[] columnNames, String[] columnTypes, String[] partitionKeys, String delimiter, - String serializationLib ) + String serializationLib, Path location ) { if( tableName == null || tableName.isEmpty() ) throw new IllegalArgumentException( "tableName cannot be null or empty" ); @@ -209,6 +217,15 @@ public HiveTableDescriptor( String databaseName, String tableName, String[] colu verifyPartitionKeys(); if( columnNames.length == 0 || columnTypes.length == 0 || columnNames.length != columnTypes.length ) throw new IllegalArgumentException( "columnNames and columnTypes cannot be empty and must have the same size" ); + + if( location != null ) + { + if( !location.isAbsolute() ) + throw new IllegalArgumentException( "location must be a fully qualified absolute path" ); + + // Store as string since path is not serialisable + this.location = location.toString(); + } } /** @@ -254,6 +271,15 @@ public Table toHiveTable() sd.setSerdeInfo( serDeInfo ); sd.setInputFormat( HIVE_DEFAULT_INPUT_FORMAT_NAME ); sd.setOutputFormat( HIVE_DEFAULT_OUTPUT_FORMAT_NAME ); + + if ( location != null ) + { + table.setTableType( TableType.EXTERNAL_TABLE.toString() ); + // Need to set this as well since setting the table type would be too obvious + table.putToParameters( "EXTERNAL", "TRUE" ); + sd.setLocation( location.toString() ); + } + table.setSd( sd ); if ( isPartitioned() ) @@ -316,15 +342,16 @@ public Fields toFields() * Returns the path of the table within the warehouse directory. * @return The path of the table within the warehouse directory. */ - public String getFilesystemPath() + public String getLocation( String warehousePath ) { - if ( getDatabaseName().equals( HIVE_DEFAULT_DATABASE_NAME ) ) - return getTableName(); + if (location != null) + return location.toString(); + else if ( getDatabaseName().equals( HIVE_DEFAULT_DATABASE_NAME ) ) + return String.format( "%s/%s", warehousePath, getTableName() ); else - return String.format( "%s.db/%s", getDatabaseName(), getTableName() ); + return String.format( "%s/%s.db/%s", warehousePath, getDatabaseName(), getTableName() ); } - /** * Converts the HiveTableDescriptor to a Scheme instance based on the information available. * @@ -397,6 +424,8 @@ public boolean equals( Object object ) return false; if( tableName != null ? !tableName.equalsIgnoreCase( that.tableName ) : that.tableName != null ) return false; + if( location != null ? !location.equals( that.location ) : that.location != null ) + return false; return true; } @@ -411,6 +440,7 @@ public int hashCode() result = 31 * result + ( columnNames != null ? arraysHashCodeCaseInsensitive( columnNames ) : 0 ); result = 31 * result + ( columnTypes != null ? arraysHashCodeCaseInsensitive( columnTypes ) : 0 ); result = 31 * result + ( serializationLib != null ? serializationLib.hashCode() : 0 ); + result = 31 * result + ( location != null ? location.hashCode() : 0 ); return result; } @@ -425,6 +455,7 @@ public String toString() ", columnNames=" + Arrays.toString( columnNames ) + ", columnTypes=" + Arrays.toString( columnTypes ) + ", serializationLib='" + serializationLib + '\'' + + ( location != null ? ", location='" + location + '\'' : "" ) + '}'; } diff --git a/src/main/java/cascading/tap/hive/HiveTap.java b/src/main/java/cascading/tap/hive/HiveTap.java index b0ef8dd..6687db5 100644 --- a/src/main/java/cascading/tap/hive/HiveTap.java +++ b/src/main/java/cascading/tap/hive/HiveTap.java @@ -28,7 +28,10 @@ import cascading.tap.SinkMode; import cascading.tap.TapException; import cascading.tap.hadoop.Hfs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -41,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.JobConf; import org.apache.thrift.TException; @@ -172,7 +176,18 @@ public boolean resourceExists( JobConf conf ) throws IOException throw new HiveTableValidationException( String.format( "expected a table of type '%s' but found '%s'", tableDescriptor.toHiveTable().getTableType(), table.getTableType() ) ); - List schemaList = table.getSd().getCols(); + // Check that the paths are the same + FileSystem fs = FileSystem.get( conf ); + StorageDescriptor sd = table.getSd(); + Path expectedPath = fs.makeQualified( new Path( tableDescriptor.getLocation( hiveConf.getVar( ConfVars.METASTOREWAREHOUSE ) ) ) ); + Path actualPath = fs.makeQualified( new Path( sd.getLocation() )); + + if ( !expectedPath.equals(actualPath) ) + throw new HiveTableValidationException( String.format( + "table in MetaStore does not have the sampe path. Expected %s got %s", + expectedPath, actualPath ) ); + + List schemaList = sd.getCols(); if( schemaList.size() != tableDescriptor.getColumnNames().length - tableDescriptor.getPartitionKeys().length ) throw new HiveTableValidationException( String.format( "table in MetaStore does not have same number of columns. expected %d got %d", @@ -369,8 +384,7 @@ private void setFilesystemLocation( ) } catch( NoSuchObjectException exception ) { - setStringPath( String.format( "%s/%s", hiveConf.get( HiveConf.ConfVars.METASTOREWAREHOUSE.varname ), - tableDescriptor.getFilesystemPath() ) ); + setStringPath( tableDescriptor.getLocation( hiveConf.getVar( ConfVars.METASTOREWAREHOUSE ) ) ); } catch( TException exception ) { diff --git a/src/test/java/cascading/tap/hive/HiveTableDescriptorTest.java b/src/test/java/cascading/tap/hive/HiveTableDescriptorTest.java index 7bf849c..521695d 100644 --- a/src/test/java/cascading/tap/hive/HiveTableDescriptorTest.java +++ b/src/test/java/cascading/tap/hive/HiveTableDescriptorTest.java @@ -32,6 +32,7 @@ import cascading.scheme.hadoop.TextDelimited; import cascading.tuple.Fields; import junit.framework.Assert; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -162,7 +163,7 @@ public void testToSchemeWithCustomDelimiter() HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable", new String[]{"one", "two", "three"}, new String[]{"int", "string", "boolean"}, new String[]{}, - delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); Scheme scheme = descriptor.toScheme(); assertNotNull( scheme ); Assert.assertEquals( delim, ( (TextDelimited) scheme ).getDelimiter() ); @@ -175,7 +176,7 @@ public void testCustomDelimiterInSerdeParameters() HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable", new String[]{"one", "two", "three"}, new String[]{"int", "string", "boolean"}, new String[]{}, - delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME ); + delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); StorageDescriptor sd = descriptor.toHiveTable().getSd(); Map expected = new HashMap( ); expected.put( "field.delim", delim ); @@ -190,8 +191,8 @@ public void testToSchemeWithNullDelimiter() String delim = null; HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable", new String[]{"one", "two", "three"}, - new String[]{"int", "string", - "boolean"}, new String[]{}, delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME + new String[]{"int", "string", "boolean"}, new String[]{}, + delim, HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, null ); Scheme scheme = descriptor.toScheme(); assertNotNull( scheme ); @@ -237,7 +238,7 @@ public void testGetFilesystempathWithDefaultDB() { HiveTableDescriptor descriptor = new HiveTableDescriptor( "myTable", new String[]{"one", "two", "three"}, new String[]{"int", "string", "boolean"} ); - assertEquals( "mytable", descriptor.getFilesystemPath() ); + assertEquals( "warehouse/mytable", descriptor.getLocation( "warehouse") ); } @Test @@ -245,7 +246,18 @@ public void testGetFilesystempathWithCustomDB() { HiveTableDescriptor descriptor = new HiveTableDescriptor( "myDB", "myTable", new String[]{"one", "two", "three"}, new String[]{"int", "string", "boolean"} ); - assertEquals( "mydb.db/mytable", descriptor.getFilesystemPath() ); + assertEquals( "warehouse/mydb.db/mytable", descriptor.getLocation( "warehouse" ) ); } + + @Test + public void testGetFilesystempathWithSpecifiedLocation() + { + HiveTableDescriptor descriptor = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable", + new String[]{"one", "two", "three"}, + new String[]{"int", "string", "boolean"}, new String[]{}, + ",", HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, new Path( "file:/custom_path" ) ); + + assertEquals( "file:/custom_path", descriptor.getLocation( "warehouse" ) ); + } } diff --git a/src/test/java/cascading/tap/hive/HiveTapTest.java b/src/test/java/cascading/tap/hive/HiveTapTest.java index 27ae43d..1dd29b0 100644 --- a/src/test/java/cascading/tap/hive/HiveTapTest.java +++ b/src/test/java/cascading/tap/hive/HiveTapTest.java @@ -27,6 +27,7 @@ import cascading.HiveTestCase; import cascading.scheme.NullScheme; import cascading.tap.SinkMode; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.mapred.JobConf; @@ -137,6 +138,23 @@ public void testResourceExistsStrictModeWithPartitionedTable() throws IOExceptio assertTrue( tap.resourceExists( new JobConf() ) ); } + @Test(expected = HiveTableValidationException.class) + public void testResourceExistsStrictModeLocationMismatch() throws IOException + { + HiveTableDescriptor desc = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable9", + new String[]{"one", "two", "three"}, + new String[]{"int", "string", "boolean"}, new String[]{}, + ",", HiveTableDescriptor.HIVE_DEFAULT_SERIALIZATION_LIB_NAME, new Path( HIVE_WAREHOUSE_DIR + "/custompath" ) ); + HiveTap tap = new HiveTap( desc, new NullScheme() ); + tap.createResource( new JobConf() ); + + HiveTableDescriptor mismatch = new HiveTableDescriptor( HiveTableDescriptor.HIVE_DEFAULT_DATABASE_NAME, "mytable9", + new String[]{"one", "two", "three"}, + new String[]{"int", "string", "boolean"}, new String[]{}, + ","); + tap = new HiveTap( mismatch, new NullScheme(), SinkMode.REPLACE, true ); + tap.resourceExists( new JobConf() ); + } @Test public void testDeleteRessource() throws Exception