Skip to content

Commit

Permalink
[Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not…
Browse files Browse the repository at this point in the history
… be load error (apache#7069)
  • Loading branch information
EricJoy2048 authored Jul 1, 2024
1 parent c645d92 commit c23a577
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 78 deletions.
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ sink {
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
```
Expand Down Expand Up @@ -258,6 +259,7 @@ sink {
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
}
Expand Down
164 changes: 151 additions & 13 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ Read all the data in a split in a pollNext call. What splits are read will be sa

## Options

| name | type | required | default value |
|----------------------|--------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|-----------------------|--------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| hive.hadoop.conf | Map | no | - |
| hive.hadoop.conf-path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| compress_codec | string | no | none |
| common-options | | no | - |

### table_name [string]

Expand All @@ -59,6 +61,14 @@ Hive metastore uri

The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive.hadoop.conf [map]

Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')

### hive.hadoop.conf-path [string]

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

### read_partitions [list]

The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table.
Expand Down Expand Up @@ -128,6 +138,134 @@ Source plugin common parameters, please refer to [Source Common Options](common-

```

## Hive on s3

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir.

```shell
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
read_columns = ["pk_id", "name", "score"]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3_sink"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
}
```

## Hive on oss

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.

```shell
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss_sink"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.file.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
removeUnwantedOverwritingProps(extraOptions);
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
if (StringUtils.isNotBlank(hdfsSitePath)) {
Configuration hdfsSiteConfiguration = new Configuration();
hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum HiveConnectorErrorCode implements SeaTunnelErrorCode {
GET_HIVE_TABLE_INFORMATION_FAILED(
"HIVE-03", "Get hive table information from hive metastore service failed"),
HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED("HIVE-05", "Load hive base hadoop config failed"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -63,57 +66,49 @@ protected Config fillBucket(ReadonlyConfig readonlyConfig, Configuration configu
* @return
*/
protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig readonlyConfig) {
Configuration configuration = new Configuration();
// Try to load from hadoop_conf_path(The Bucket configuration is typically in core-site.xml)
Optional<String> hadoopConfPath = readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
if (hadoopConfPath.isPresent()) {
HADOOP_CONF_FILES.forEach(
confFile -> {
java.nio.file.Path path = Paths.get(hadoopConfPath.get(), confFile);
if (Files.exists(path)) {
try {
configuration.addResource(path.toUri().toURL());
} catch (IOException e) {
log.warn(
"Error adding Hadoop resource {}, resource was not added",
path,
e);
}
}
});
}
readonlyConfig
.getOptional(BaseSinkConfig.HDFS_SITE_PATH)
.ifPresent(
hdfsSitePath -> {
try {
configuration.addResource(new File(hdfsSitePath).toURI().toURL());
} catch (IOException e) {
log.warn(
"Error adding Hadoop resource {}, resource was not added",
hdfsSitePath,
e);
try {
Configuration configuration = new Configuration();
// Try to load from hadoop_conf_path(The Bucket configuration is typically in
// core-site.xml)
Optional<String> hadoopConfPath =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
if (hadoopConfPath.isPresent()) {
HADOOP_CONF_FILES.forEach(
confFile -> {
java.nio.file.Path path = Paths.get(hadoopConfPath.get(), confFile);
if (Files.exists(path)) {
try {
configuration.addResource(path.toUri().toURL());
} catch (IOException e) {
log.warn(
"Error adding Hadoop resource {}, resource was not added",
path,
e);
}
}
});
readonlyConfig
.getOptional(HiveConfig.HIVE_SITE_PATH)
.ifPresent(
hiveSitePath -> {
try {
configuration.addResource(new File(hiveSitePath).toURI().toURL());
} catch (IOException e) {
log.warn(
"Error adding Hadoop resource {}, resource was not added",
hiveSitePath,
e);
}
});
// Try to load from hadoopConf
Optional<Map<String, String>> hadoopConf =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
if (hadoopConf.isPresent()) {
hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
}
String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
String hdfsSitePath = readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH);
if (StringUtils.isNotBlank(hdfsSitePath)) {
configuration.addResource(new File(hdfsSitePath).toURI().toURL());
}

if (StringUtils.isNotBlank(hiveSitePath)) {
configuration.addResource(new File(hiveSitePath).toURI().toURL());
}
// Try to load from hadoopConf
Optional<Map<String, String>> hadoopConf =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
if (hadoopConf.isPresent()) {
hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
}
return configuration;
} catch (Exception e) {
String errorMsg = String.format("Failed to load hadoop configuration, please check it");
log.error(errorMsg + ":" + ExceptionUtils.getMessage(e));
throw new HiveConnectorException(
HiveConnectorErrorCode.LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED, e);
}
return configuration;
}
}
Loading

0 comments on commit c23a577

Please sign in to comment.