Skip to content

Commit

Permalink
[BugFix] Fix "Connection pool shut down" in S3AFileSystem (#50816)
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <[email protected]>
  • Loading branch information
Smith-Cruise authored Sep 24, 2024
1 parent 8b8c1c9 commit 9222444
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.google.common.base.Preconditions;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.credential.aws.AWSCloudCredential;
import com.starrocks.credential.aws.AwsCloudCredential;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.logging.log4j.LogManager;
Expand All @@ -40,7 +40,7 @@ public AWSGlueClientFactory(HiveConf conf) {

@Override
public GlueClient newClient() throws MetaException {
AWSCloudCredential glueCloudCredential = CloudConfigurationFactory.buildGlueCloudCredential(conf);
AwsCloudCredential glueCloudCredential = CloudConfigurationFactory.buildGlueCloudCredential(conf);
try {
GlueClientBuilder glueClientBuilder = GlueClient.builder();
if (glueCloudCredential != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.starrocks.credential.CloudType;
import com.starrocks.credential.aliyun.AliyunCloudConfiguration;
import com.starrocks.credential.aliyun.AliyunCloudCredential;
import com.starrocks.credential.aws.AWSCloudConfiguration;
import com.starrocks.credential.aws.AWSCloudCredential;
import com.starrocks.credential.aws.AwsCloudConfiguration;
import com.starrocks.credential.aws.AwsCloudCredential;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand Down Expand Up @@ -95,10 +95,10 @@ public PaimonConnector(ConnectorContext context) {

public void initFsOption(CloudConfiguration cloudConfiguration) {
if (cloudConfiguration.getCloudType() == CloudType.AWS) {
AWSCloudConfiguration awsCloudConfiguration = (AWSCloudConfiguration) cloudConfiguration;
AwsCloudConfiguration awsCloudConfiguration = (AwsCloudConfiguration) cloudConfiguration;
paimonOptions.set("s3.connection.ssl.enabled", String.valueOf(awsCloudConfiguration.getEnableSSL()));
paimonOptions.set("s3.path.style.access", String.valueOf(awsCloudConfiguration.getEnablePathStyleAccess()));
AWSCloudCredential awsCloudCredential = awsCloudConfiguration.getAWSCloudCredential();
AwsCloudCredential awsCloudCredential = awsCloudConfiguration.getAwsCloudCredential();
if (!awsCloudCredential.getEndpoint().isEmpty()) {
paimonOptions.set("s3.endpoint", awsCloudCredential.getEndpoint());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.google.common.collect.ImmutableList;
import com.starrocks.connector.share.credential.CloudConfigurationConstants;
import com.starrocks.credential.aliyun.AliyunCloudConfigurationProvider;
import com.starrocks.credential.aws.AWSCloudConfigurationProvider;
import com.starrocks.credential.aws.AWSCloudCredential;
import com.starrocks.credential.aws.AwsCloudConfigurationProvider;
import com.starrocks.credential.aws.AwsCloudCredential;
import com.starrocks.credential.azure.AzureCloudConfigurationProvider;
import com.starrocks.credential.gcp.GCPCloudConfigurationProvoder;
import com.starrocks.credential.hdfs.HDFSCloudConfigurationProvider;
Expand All @@ -34,7 +34,7 @@
public class CloudConfigurationFactory {

static ImmutableList<CloudConfigurationProvider> cloudConfigurationFactoryChain = ImmutableList.of(
new AWSCloudConfigurationProvider(),
new AwsCloudConfigurationProvider(),
new AzureCloudConfigurationProvider(),
new GCPCloudConfigurationProvoder(),
new AliyunCloudConfigurationProvider(),
Expand All @@ -43,7 +43,7 @@ public class CloudConfigurationFactory {
(Map<String, String> properties) -> new CloudConfiguration());

static ImmutableList<CloudConfigurationProvider> strictCloudConfigurationFactoryChain = ImmutableList.of(
new AWSCloudConfigurationProvider(),
new AwsCloudConfigurationProvider(),
new AzureCloudConfigurationProvider(),
new GCPCloudConfigurationProvoder(),
new AliyunCloudConfigurationProvider(),
Expand Down Expand Up @@ -72,10 +72,10 @@ public static CloudConfiguration buildCloudConfigurationForStorage(Map<String, S
return null;
}

public static AWSCloudCredential buildGlueCloudCredential(HiveConf hiveConf) {
public static AwsCloudCredential buildGlueCloudCredential(HiveConf hiveConf) {
for (CloudConfigurationProvider factory : cloudConfigurationFactoryChain) {
if (factory instanceof AWSCloudConfigurationProvider) {
AWSCloudConfigurationProvider provider = ((AWSCloudConfigurationProvider) factory);
if (factory instanceof AwsCloudConfigurationProvider) {
AwsCloudConfigurationProvider provider = ((AwsCloudConfigurationProvider) factory);
return provider.buildGlueCloudCredential(hiveConf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@

import java.util.Map;

public class AWSCloudConfiguration extends CloudConfiguration {
public class AwsCloudConfiguration extends CloudConfiguration {

private static final int DEFAULT_NUM_OF_PARTITIONED_PREFIX = 256;

private final AWSCloudCredential awsCloudCredential;
private final AwsCloudCredential awsCloudCredential;

private boolean enablePathStyleAccess = false;

Expand All @@ -40,7 +40,7 @@ public class AWSCloudConfiguration extends CloudConfiguration {

private int numOfPartitionedPrefix = 0;

public AWSCloudConfiguration(AWSCloudCredential awsCloudCredential) {
public AwsCloudConfiguration(AwsCloudCredential awsCloudCredential) {
this.awsCloudCredential = awsCloudCredential;
}

Expand All @@ -60,7 +60,7 @@ public boolean getEnableSSL() {
return this.enableSSL;
}

public AWSCloudCredential getAWSCloudCredential() {
public AwsCloudCredential getAwsCloudCredential() {
return this.awsCloudCredential;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_USE_INSTANCE_PROFILE;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.DEFAULT_AWS_REGION;

public class AWSCloudConfigurationProvider implements CloudConfigurationProvider {
public class AwsCloudConfigurationProvider implements CloudConfigurationProvider {

public AWSCloudCredential buildGlueCloudCredential(HiveConf hiveConf) {
public AwsCloudCredential buildGlueCloudCredential(HiveConf hiveConf) {
Preconditions.checkNotNull(hiveConf);
AWSCloudCredential awsCloudCredential = new AWSCloudCredential(
AwsCloudCredential awsCloudCredential = new AwsCloudCredential(
hiveConf.getBoolean(AWS_GLUE_USE_AWS_SDK_DEFAULT_BEHAVIOR, false),
hiveConf.getBoolean(AWS_GLUE_USE_INSTANCE_PROFILE, false),
hiveConf.get(AWS_GLUE_ACCESS_KEY, ""),
Expand All @@ -73,7 +73,7 @@ public AWSCloudCredential buildGlueCloudCredential(HiveConf hiveConf) {
@Override
public CloudConfiguration build(Map<String, String> properties) {
Preconditions.checkNotNull(properties);
AWSCloudCredential awsCloudCredential = new AWSCloudCredential(
AwsCloudCredential awsCloudCredential = new AwsCloudCredential(
Boolean.parseBoolean(properties.getOrDefault(AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR, "false")),
Boolean.parseBoolean(properties.getOrDefault(AWS_S3_USE_INSTANCE_PROFILE, "false")),
properties.getOrDefault(AWS_S3_ACCESS_KEY, ""),
Expand All @@ -90,7 +90,7 @@ public CloudConfiguration build(Map<String, String> properties) {
return null;
}

AWSCloudConfiguration awsCloudConfiguration = new AWSCloudConfiguration(awsCloudCredential);
AwsCloudConfiguration awsCloudConfiguration = new AwsCloudConfiguration(awsCloudCredential);
// put cloud configuration
if (properties.containsKey(AWS_S3_ENABLE_PATH_STYLE_ACCESS)) {
awsCloudConfiguration.setEnablePathStyleAccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.connector.share.credential.CloudConfigurationConstants;
import com.starrocks.credential.CloudCredential;
import com.starrocks.credential.provider.AssumedRoleCredentialProvider;
import com.starrocks.credential.provider.OverwriteAwsDefaultCredentialsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
Expand Down Expand Up @@ -78,11 +79,11 @@
* // If user want to use anonymous credentials, they just don't set cloud credential directly.
* }
*/
public class AWSCloudCredential implements CloudCredential {
public class AwsCloudCredential implements CloudCredential {

private static final Logger LOG = LoggerFactory.getLogger(AWSCloudCredential.class);
private static final Logger LOG = LoggerFactory.getLogger(AwsCloudCredential.class);

private static final String DEFAULT_CREDENTIAL_PROVIDER = DefaultCredentialsProvider.class.getName();
private static final String DEFAULT_CREDENTIAL_PROVIDER = OverwriteAwsDefaultCredentialsProvider.class.getName();
private static final String IAM_CREDENTIAL_PROVIDER = IAMInstanceCredentialsProvider.class.getName();
private static final String ASSUME_ROLE_CREDENTIAL_PROVIDER = AssumedRoleCredentialProvider.class.getName();
private static final String SIMPLE_CREDENTIAL_PROVIDER = SimpleAWSCredentialsProvider.class.getName();
Expand Down Expand Up @@ -110,7 +111,7 @@ public class AWSCloudCredential implements CloudCredential {

private final String endpoint;

protected AWSCloudCredential(boolean useAWSSDKDefaultBehavior, boolean useInstanceProfile, String accessKey,
protected AwsCloudCredential(boolean useAWSSDKDefaultBehavior, boolean useInstanceProfile, String accessKey,
String secretKey, String sessionToken, String iamRoleArn, String stsRegion,
String stsEndpoint, String externalId, String region,
String endpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.credential.provider;

import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

// We have to implement DefaultCredentialsProvider by ourselves,
// otherwise we may face "java.lang.IllegalStateException: Connection pool shut down" error.
//
// Hadoop S3AFileSystem will call `static AwsCredentialsProvider::create()` to create CredentialsProvider.
// But in DefaultCredentialsProvider::create(), it will only return a global static variable.
// If we close S3AFileSystem, it will also close CredentialsProvider.
// For the next time we create a new S3AFileSystem, it will reuse previous closed CredentialsProvider, then error will be thrown
// You can check details in link:
// https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/troubleshooting.html#faq-connection-pool-shutdown-exception
public class OverwriteAwsDefaultCredentialsProvider implements AwsCredentialsProvider {
public static DefaultCredentialsProvider create() {
return DefaultCredentialsProvider.builder().build();
}

// We should not call this function, here will return an anonymous credentials
@Override
public AwsCredentials resolveCredentials() {
// Defense code, return anonymous credentials
return new AwsCredentials() {
@Override
public String accessKeyId() {
return null;
}

@Override
public String secretAccessKey() {
return null;
}
};
}
}
Loading

0 comments on commit 9222444

Please sign in to comment.