diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClient.h b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClient.h index 5963155e62f..19bf979d7d5 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClient.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClient.h @@ -169,6 +169,8 @@ namespace Aws const Aws::Http::QueryStringParameterCollection& extraParams = Aws::Http::QueryStringParameterCollection(), long long expirationInSeconds = 0, const std::shared_ptr serviceSpecificParameter = {}) const; + const std::shared_ptr& GetHttpClient() const { return m_httpClient; } + /** * Stop all requests immediately. * In flight requests will likely fail. diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h index fab268465b4..7210ee38d69 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h @@ -67,6 +67,8 @@ namespace Client { if (&other != this) { + ShutdownSdkClient(static_cast(this)); + m_operationsProcessed = 0; m_isInitialized = other.m_isInitialized.load(); } @@ -97,7 +99,10 @@ namespace Client std::unique_lock lock(pClient->m_shutdownMutex); pClient->m_isInitialized = false; - + if (pClient->GetHttpClient().use_count() == 1) + { + pClient->DisableRequestProcessing(); + } if (timeoutMs == -1) { @@ -107,10 +112,15 @@ namespace Client std::chrono::milliseconds(timeoutMs), [&](){ return pClient->m_operationsProcessed.load() == 0; }); - pClient->m_endpointProvider.reset(); - pClient->m_executor.reset(); + if (pClient->m_operationsProcessed.load()) + { + AWS_LOGSTREAM_FATAL(AwsServiceClientT::GetAllocationTag(), "Service client " + << AwsServiceClientT::GetServiceName() << " is shutting down while async tasks are present."); + } + pClient->m_clientConfiguration.executor.reset(); pClient->m_clientConfiguration.retryStrategy.reset(); + pClient->m_endpointProvider.reset(); } /** @@ -124,7 +134,7 @@ namespace Client const std::shared_ptr& context = nullptr) const { const AwsServiceClientT* clientThis = static_cast(this); - Aws::Client::MakeAsyncOperation(operationFunc, clientThis, request, handler, context, clientThis->m_executor.get()); + Aws::Client::MakeAsyncOperation(operationFunc, clientThis, request, handler, context, clientThis->m_clientConfiguration.executor.get()); } /** @@ -139,7 +149,7 @@ namespace Client const std::shared_ptr& context = nullptr) const { const AwsServiceClientT* clientThis = static_cast(this); - Aws::Client::MakeAsyncStreamingOperation(operationFunc, clientThis, request, handler, context, clientThis->m_executor.get()); + Aws::Client::MakeAsyncStreamingOperation(operationFunc, clientThis, request, handler, context, clientThis->m_clientConfiguration.executor.get()); } /** @@ -152,7 +162,7 @@ namespace Client const std::shared_ptr& context = nullptr) const { const AwsServiceClientT* clientThis = static_cast(this); - Aws::Client::MakeAsyncOperation(operationFunc, clientThis, handler, context, clientThis->m_executor.get()); + Aws::Client::MakeAsyncOperation(operationFunc, clientThis, handler, context, clientThis->m_clientConfiguration.executor.get()); } /** @@ -165,7 +175,7 @@ namespace Client -> std::future(nullptr)->*operationFunc)(request))> { const AwsServiceClientT* clientThis = static_cast(this); - return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, request, clientThis->m_executor.get()); + return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, request, clientThis->m_clientConfiguration.executor.get()); } /** @@ -178,7 +188,7 @@ namespace Client -> std::future(nullptr)->*operationFunc)(request))> { const AwsServiceClientT* clientThis = static_cast(this); - return Aws::Client::MakeCallableStreamingOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, request, clientThis->m_executor.get()); + return Aws::Client::MakeCallableStreamingOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, request, clientThis->m_clientConfiguration.executor.get()); } /** @@ -191,7 +201,7 @@ namespace Client -> std::future(nullptr)->*operationFunc)())> { const AwsServiceClientT* clientThis = static_cast(this); - return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, clientThis->m_executor.get()); + return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, clientThis->m_clientConfiguration.executor.get()); } protected: std::atomic m_isInitialized; diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h b/src/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h index 2fcd9707ed3..fa79f7c7344 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/ClientConfiguration.h @@ -14,7 +14,6 @@ #include #include #include -#include #include namespace Aws @@ -75,6 +74,34 @@ namespace Aws */ struct AWS_CORE_API ClientConfiguration { + struct ProviderFactories + { + /** + * Retry Strategy factory method. Default is DefaultRetryStrategy (i.e. exponential backoff). + */ + std::function()> retryStrategyCreateFn; + /** + * Threading Executor factory method. Default creates a factory that creates DefaultExecutor + * (i.e. spawn a separate thread for each task) for backward compatibility reasons. + * Please switch to a better executor such as PooledThreadExecutor. + */ + std::function()> executorCreateFn; + /** + * Rate Limiter factory for outgoing bandwidth. Default is wide-open. + */ + std::function()> writeRateLimiterCreateFn; + /** + * Rate Limiter factory for incoming bandwidth. Default is wide-open. + */ + std::function()> readRateLimiterCreateFn; + /** + * TelemetryProvider factory. Defaults to Noop provider. + */ + std::function()> telemetryProviderCreateFn; + + static ProviderFactories defaultFactories; + }; + ClientConfiguration(); /** @@ -104,6 +131,11 @@ namespace Aws */ virtual ~ClientConfiguration() = default; + /** + * Client configuration factory methods to init client utility classes such as Executor, Retry Strategy + */ + ProviderFactories configFactories = ProviderFactories::defaultFactories; + /** * User Agent string user for http calls. This is filled in for you in the constructor. Don't override this unless you have a really good reason. */ @@ -165,9 +197,10 @@ namespace Aws */ unsigned long lowSpeedLimit = 1; /** - * Strategy to use in case of failed requests. Default is DefaultRetryStrategy (i.e. exponential backoff) + * Strategy to use in case of failed requests. Default is DefaultRetryStrategy (i.e. exponential backoff). + * Provide retry strategy here or via a factory method. */ - std::shared_ptr retryStrategy; + std::shared_ptr retryStrategy = nullptr; /** * Override the http endpoint used to talk to a service. */ @@ -227,9 +260,10 @@ namespace Aws */ Aws::Utils::Array nonProxyHosts; /** - * Threading Executor implementation. Default uses std::thread::detach() - */ - std::shared_ptr executor; + * Threading Executor implementation. Default uses std::thread::detach() + * Provide executor here or via a factory method. + */ + std::shared_ptr executor = nullptr; /** * If you need to test and want to get around TLS validation errors, do that here. * You probably shouldn't use this flag in a production scenario. @@ -263,12 +297,14 @@ namespace Aws Aws::String proxyCaFile; /** * Rate Limiter implementation for outgoing bandwidth. Default is wide-open. + * Provide limiter here or via a factory method. */ - std::shared_ptr writeRateLimiter; + std::shared_ptr writeRateLimiter = nullptr; /** * Rate Limiter implementation for incoming bandwidth. Default is wide-open. + * Provide limiter here or via a factory method. */ - std::shared_ptr readRateLimiter; + std::shared_ptr readRateLimiter = nullptr; /** * Override the http implementation the default factory returns. */ @@ -379,10 +415,10 @@ namespace Aws const Aws::String& defaultValue); /** - * A wrapper for interfacing with telemetry functionality. + * A wrapper for interfacing with telemetry functionality. Defaults to Noop provider. + * Provide TelemetryProvider here or via a factory method. */ - std::shared_ptr telemetryProvider = - smithy::components::tracing::NoopTelemetryProvider::CreateProvider(); + std::shared_ptr telemetryProvider; }; /** diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h index 81e0be194d7..dbfd75e82d7 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h @@ -66,3 +66,10 @@ if(!m_isInitialized) \ } \ Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal) +#define AWS_ASYNC_OPERATION_GUARD(OPERATION) \ +if(!m_isInitialized) \ +{ \ + AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \ + return handler(this, request, Aws::Client::AWSError(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false), handlerContext); \ +} \ +Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal) diff --git a/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClient.h b/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClient.h index bfa19b0cc05..ecbe34fac17 100644 --- a/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClient.h +++ b/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClient.h @@ -37,8 +37,8 @@ namespace client const std::shared_ptr endpointProvider, const std::shared_ptr& authSchemeResolver, const Aws::UnorderedMap& authSchemes) - : AwsSmithyClientBase(clientConfig, serviceName, httpClient, errorMarshaller), - m_clientConfig(clientConfig), + : AwsSmithyClientBase(Aws::MakeUnique(ServiceNameT, clientConfig), serviceName, httpClient, errorMarshaller), + m_clientConfig(*AwsSmithyClientBase::m_clientConfig.get()), m_endpointProvider(endpointProvider), m_authSchemeResolver(authSchemeResolver), m_authSchemes(authSchemes) @@ -119,7 +119,7 @@ namespace client } protected: - ServiceClientConfigurationT m_clientConfig{}; + ServiceClientConfigurationT& m_clientConfig; std::shared_ptr m_endpointProvider{}; std::shared_ptr m_authSchemeResolver{}; Aws::UnorderedMap m_authSchemes{}; diff --git a/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClientBase.h b/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClientBase.h index a7f839fe163..80e6012564d 100644 --- a/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClientBase.h +++ b/src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClientBase.h @@ -77,16 +77,44 @@ namespace client using SelectAuthSchemeOptionOutcome = Aws::Utils::Outcome; using ResolveEndpointOutcome = Aws::Utils::Outcome; - AwsSmithyClientBase(const Aws::Client::ClientConfiguration& clientConfig, + AwsSmithyClientBase(Aws::UniquePtr&& clientConfig, Aws::String serviceName, std::shared_ptr httpClient, std::shared_ptr errorMarshaller) : - m_clientConfig(clientConfig), + m_clientConfig(std::move(clientConfig)), m_serviceName(std::move(serviceName)), - m_userAgent(Aws::Client::ComputeUserAgentString(&clientConfig)), + m_userAgent(), m_httpClient(std::move(httpClient)), m_errorMarshaller(std::move(errorMarshaller)) - {} + { + if (!m_clientConfig->retryStrategy) + { + assert(m_clientConfig->configFactories.retryStrategyCreateFn); + m_clientConfig->retryStrategy = m_clientConfig->configFactories.retryStrategyCreateFn(); + } + if (!m_clientConfig->executor) + { + assert(m_clientConfig->configFactories.executorCreateFn); + m_clientConfig->executor = m_clientConfig->configFactories.executorCreateFn(); + } + if (!m_clientConfig->writeRateLimiter) + { + assert(m_clientConfig->configFactories.writeRateLimiterCreateFn); + m_clientConfig->writeRateLimiter = m_clientConfig->configFactories.writeRateLimiterCreateFn(); + } + if (!m_clientConfig->readRateLimiter) + { + assert(m_clientConfig->configFactories.readRateLimiterCreateFn); + m_clientConfig->readRateLimiter = m_clientConfig->configFactories.readRateLimiterCreateFn(); + } + if (!m_clientConfig->telemetryProvider) + { + assert(m_clientConfig->configFactories.telemetryProviderCreateFn); + m_clientConfig->telemetryProvider = m_clientConfig->configFactories.telemetryProviderCreateFn(); + } + + m_userAgent = Aws::Client::ComputeUserAgentString(m_clientConfig.get()); + } AwsSmithyClientBase(const AwsSmithyClientBase&) = delete; AwsSmithyClientBase(AwsSmithyClientBase&&) = delete; @@ -127,7 +155,7 @@ namespace client virtual bool AdjustClockSkew(HttpResponseOutcome& outcome, const AuthSchemeOption& authSchemeOption) const = 0; protected: - Aws::Client::ClientConfiguration m_clientConfig; + Aws::UniquePtr m_clientConfig; Aws::String m_serviceName; Aws::String m_userAgent; diff --git a/src/aws-cpp-sdk-core/source/client/AWSClient.cpp b/src/aws-cpp-sdk-core/source/client/AWSClient.cpp index 44dfbe770e3..f7b728b134c 100644 --- a/src/aws-cpp-sdk-core/source/client/AWSClient.cpp +++ b/src/aws-cpp-sdk-core/source/client/AWSClient.cpp @@ -119,13 +119,19 @@ AWSClient::AWSClient(const Aws::Client::ClientConfiguration& configuration, const std::shared_ptr& signer, const std::shared_ptr& errorMarshaller) : m_region(configuration.region), - m_telemetryProvider(configuration.telemetryProvider), + m_telemetryProvider(configuration.telemetryProvider ? configuration.telemetryProvider : configuration.configFactories.telemetryProviderCreateFn()), m_signerProvider(Aws::MakeUnique(AWS_CLIENT_LOG_TAG, signer)), - m_httpClient(CreateHttpClient(configuration)), + m_httpClient(CreateHttpClient( + [&configuration, this]() + { + ClientConfiguration tempConfig(configuration); + tempConfig.telemetryProvider = m_telemetryProvider; + return tempConfig; + }())), m_errorMarshaller(errorMarshaller), - m_retryStrategy(configuration.retryStrategy), - m_writeRateLimiter(configuration.writeRateLimiter), - m_readRateLimiter(configuration.readRateLimiter), + m_retryStrategy(configuration.retryStrategy ? configuration.retryStrategy : configuration.configFactories.retryStrategyCreateFn()), + m_writeRateLimiter(configuration.writeRateLimiter ? configuration.writeRateLimiter : configuration.configFactories.writeRateLimiterCreateFn()), + m_readRateLimiter(configuration.readRateLimiter ? configuration.readRateLimiter : configuration.configFactories.readRateLimiterCreateFn()), m_userAgent(Aws::Client::ComputeUserAgentString(&configuration)), m_hash(Aws::Utils::Crypto::CreateMD5Implementation()), m_requestTimeoutMs(configuration.requestTimeoutMs), @@ -138,13 +144,19 @@ AWSClient::AWSClient(const Aws::Client::ClientConfiguration& configuration, const std::shared_ptr& signerProvider, const std::shared_ptr& errorMarshaller) : m_region(configuration.region), - m_telemetryProvider(configuration.telemetryProvider), + m_telemetryProvider(configuration.telemetryProvider ? configuration.telemetryProvider : configuration.configFactories.telemetryProviderCreateFn()), m_signerProvider(signerProvider), - m_httpClient(CreateHttpClient(configuration)), + m_httpClient(CreateHttpClient( + [&configuration, this]() + { + ClientConfiguration tempConfig(configuration); + tempConfig.telemetryProvider = m_telemetryProvider; + return tempConfig; + }())), m_errorMarshaller(errorMarshaller), - m_retryStrategy(configuration.retryStrategy), - m_writeRateLimiter(configuration.writeRateLimiter), - m_readRateLimiter(configuration.readRateLimiter), + m_retryStrategy(configuration.retryStrategy ? configuration.retryStrategy : configuration.configFactories.retryStrategyCreateFn()), + m_writeRateLimiter(configuration.writeRateLimiter ? configuration.writeRateLimiter : configuration.configFactories.writeRateLimiterCreateFn()), + m_readRateLimiter(configuration.readRateLimiter ? configuration.readRateLimiter : configuration.configFactories.readRateLimiterCreateFn()), m_userAgent(Aws::Client::ComputeUserAgentString(&configuration)), m_hash(Aws::Utils::Crypto::CreateMD5Implementation()), m_requestTimeoutMs(configuration.requestTimeoutMs), diff --git a/src/aws-cpp-sdk-core/source/client/ClientConfiguration.cpp b/src/aws-cpp-sdk-core/source/client/ClientConfiguration.cpp index fcbede8acdc..1f9f0c30f68 100644 --- a/src/aws-cpp-sdk-core/source/client/ClientConfiguration.cpp +++ b/src/aws-cpp-sdk-core/source/client/ClientConfiguration.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -39,6 +40,19 @@ static const char* AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV"; static const char* DISABLE_IMDSV1_CONFIG_VAR = "AWS_EC2_METADATA_V1_DISABLED"; static const char* DISABLE_IMDSV1_ENV_VAR = "ec2_metadata_v1_disabled"; +ClientConfiguration::ProviderFactories ClientConfiguration::ProviderFactories::defaultFactories = []() +{ + ProviderFactories factories; + + factories.retryStrategyCreateFn = [](){return InitRetryStrategy();}; + factories.executorCreateFn = [](){return Aws::MakeShared(CLIENT_CONFIG_TAG);}; + factories.writeRateLimiterCreateFn = [](){return nullptr;}; + factories.readRateLimiterCreateFn = [](){return nullptr;}; + factories.telemetryProviderCreateFn = [](){return smithy::components::tracing::NoopTelemetryProvider::CreateProvider();}; + + return factories; +}(); + Aws::String FilterUserAgentToken(char const * const source) { // Tokens are short textual identifiers that do not include whitespace or delimiters. @@ -131,7 +145,6 @@ void setLegacyClientConfigurationParameters(ClientConfiguration& clientConfig) clientConfig.lowSpeedLimit = 1; clientConfig.proxyScheme = Aws::Http::Scheme::HTTP; clientConfig.proxyPort = 0; - clientConfig.executor = Aws::MakeShared(CLIENT_CONFIG_TAG); clientConfig.verifySSL = true; clientConfig.writeRateLimiter = nullptr; clientConfig.readRateLimiter = nullptr; @@ -235,7 +248,6 @@ ClientConfiguration::ClientConfiguration() { this->disableIMDS = false; setLegacyClientConfigurationParameters(*this); - retryStrategy = InitRetryStrategy(); if (!this->disableIMDS && region.empty() && @@ -259,7 +271,6 @@ ClientConfiguration::ClientConfiguration(const ClientConfigurationInitValues &co { this->disableIMDS = configuration.shouldDisableIMDS; setLegacyClientConfigurationParameters(*this); - retryStrategy = InitRetryStrategy(); if (!this->disableIMDS && region.empty() && @@ -319,10 +330,6 @@ ClientConfiguration::ClientConfiguration(const char* profile, bool shouldDisable hasEc2MetadataRegion, ec2MetadataRegion); return; } - if (!retryStrategy) - { - retryStrategy = InitRetryStrategy(); - } AWS_LOGSTREAM_WARN(CLIENT_CONFIG_TAG, "User specified profile: [" << profile << "] is not found, will use the SDK resolved one."); setConfigFromEnvOrProfile(*this); diff --git a/src/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp b/src/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp index d2df377fe38..b401ded3e26 100644 --- a/src/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp +++ b/src/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp @@ -81,7 +81,7 @@ namespace Aws AWSHttpResourceClient::AWSHttpResourceClient(const Aws::Client::ClientConfiguration& clientConfiguration, const char* logtag) : m_logtag(logtag), m_userAgent(Aws::Client::ComputeUserAgentString(&clientConfiguration)), - m_retryStrategy(clientConfiguration.retryStrategy), + m_retryStrategy(clientConfiguration.retryStrategy ? clientConfiguration.retryStrategy : clientConfiguration.configFactories.retryStrategyCreateFn()), m_httpClient(nullptr) { AWS_LOGSTREAM_INFO(m_logtag.c_str(), diff --git a/src/aws-cpp-sdk-core/source/smithy/client/AwsSmithyClientBase.cpp b/src/aws-cpp-sdk-core/source/smithy/client/AwsSmithyClientBase.cpp index 916539d9d83..6ad02cd9b30 100644 --- a/src/aws-cpp-sdk-core/source/smithy/client/AwsSmithyClientBase.cpp +++ b/src/aws-cpp-sdk-core/source/smithy/client/AwsSmithyClientBase.cpp @@ -72,7 +72,7 @@ AwsSmithyClientBase::BuildHttpRequest(const std::shared_ptrGetSelectedCompressionAlgorithm(m_clientConfig.requestCompressionConfig); + Aws::Client::CompressionAlgorithm selectedCompressionAlgorithm = pRequest->GetSelectedCompressionAlgorithm(m_clientConfig->requestCompressionConfig); if (Aws::Client::CompressionAlgorithm::NONE != selectedCompressionAlgorithm) { RequestPayloadCompression::AddCompressedContentBodyToRequest(pRequest, httpRequest, selectedCompressionAlgorithm, m_httpClient); } else { @@ -186,7 +186,7 @@ void AwsSmithyClientBase::AttemptOneRequestAsync(std::shared_ptrm_httpRequest = BuildHttpRequest(pRequestCtx, pRequestCtx->m_endpoint.GetURI(), pRequestCtx->m_method); }, TracingUtils::SMITHY_CLIENT_SERIALIZATION_METRIC, - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); @@ -206,7 +206,7 @@ void AwsSmithyClientBase::AttemptOneRequestAsync(std::shared_ptrm_requestName, pRequestCtx->m_httpRequest); - if(m_clientConfig.retryStrategy && !m_clientConfig.retryStrategy->HasSendToken()) + if(m_clientConfig->retryStrategy && !m_clientConfig->retryStrategy->HasSendToken()) { auto errOutcome = HttpResponseOutcome(ClientError(CoreErrors::SLOW_DOWN, "", @@ -223,7 +223,7 @@ void AwsSmithyClientBase::AttemptOneRequestAsync(std::shared_ptrSignRequest(pRequestCtx->m_httpRequest, pRequestCtx->m_authSchemeOption); }, TracingUtils::SMITHY_CLIENT_SIGNING_METRIC, - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); @@ -260,20 +260,20 @@ void AwsSmithyClientBase::AttemptOneRequestAsync(std::shared_ptrMakeAsyncRequest(pRequestCtx->m_httpRequest, pRequestCtx->m_pExecutor, responseHandler, - m_clientConfig.readRateLimiter.get(), - m_clientConfig.writeRateLimiter.get()); + m_clientConfig->readRateLimiter.get(), + m_clientConfig->writeRateLimiter.get()); }, TracingUtils::SMITHY_CLIENT_SERVICE_CALL_METRIC, - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); #else auto httpResponse = TracingUtils::MakeCallWithTiming>( [&]() -> std::shared_ptr { - return m_httpClient->MakeRequest(signedHttpRequest, m_clientConfig.readRateLimiter.get(), m_clientConfig.writeRateLimiter.get()); + return m_httpClient->MakeRequest(signedHttpRequest, m_clientConfig->readRateLimiter.get(), m_clientConfig->writeRateLimiter.get()); }, TracingUtils::SMITHY_CLIENT_SERVICE_CALL_METRIC, - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName},{TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); pRequestCtx->m_pExecutor->Submit([httpResponse, httpResponseHandler]() mutable @@ -319,16 +319,16 @@ void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptrm_retryCount == 0) { - m_clientConfig.retryStrategy->RequestBookkeeping(outcome); + m_clientConfig->retryStrategy->RequestBookkeeping(outcome); } else { assert(pRequestCtx->m_lastError); - m_clientConfig.retryStrategy->RequestBookkeeping(outcome, pRequestCtx->m_lastError.value()); + m_clientConfig->retryStrategy->RequestBookkeeping(outcome, pRequestCtx->m_lastError.value()); } coreMetrics.httpClientMetrics = pRequestCtx->m_httpRequest->GetRequestMetrics(); TracingUtils::EmitCoreHttpMetrics(pRequestCtx->m_httpRequest->GetRequestMetrics(), - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); if (outcome.IsSuccess()) @@ -374,7 +374,7 @@ void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptrExtractRegion(outcome.GetError()); const Aws::String& signerRegion = pRequestCtx->m_endpoint.GetAttributes()->authScheme.GetSigningRegion() ? pRequestCtx->m_endpoint.GetAttributes()->authScheme.GetSigningRegion().value() : ""; - if (m_clientConfig.region == Aws::Region::AWS_GLOBAL && !regionFromResponse.empty() && + if (m_clientConfig->region == Aws::Region::AWS_GLOBAL && !regionFromResponse.empty() && regionFromResponse != signerRegion) { pRequestCtx->m_endpoint.AccessAttributes()->authScheme.SetSigningRegion(regionFromResponse); AWS_LOGSTREAM_DEBUG(AWS_SMITHY_CLIENT_LOG, "Need to retry with a correct region"); @@ -386,21 +386,21 @@ void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptr( [&]() -> long { - return m_clientConfig.retryStrategy->CalculateDelayBeforeNextRetry(outcome.GetError(), static_cast(pRequestCtx->m_retryCount)); + return m_clientConfig->retryStrategy->CalculateDelayBeforeNextRetry(outcome.GetError(), static_cast(pRequestCtx->m_retryCount)); }, TracingUtils::SMITHY_CLIENT_SERVICE_BACKOFF_DELAY_METRIC, - *m_clientConfig.telemetryProvider->getMeter(this->GetServiceClientName(), {}), + *m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}), {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); bool shouldSleep = !retryWithCorrectRegion; - if (m_clientConfig.enableClockSkewAdjustment) + if (m_clientConfig->enableClockSkewAdjustment) { // AdjustClockSkew returns true means clock skew was the problem and skew was adjusted, false otherwise. // sleep if clock skew and region was NOT the problem. AdjustClockSkew may update error inside outcome. shouldSleep |= !this->AdjustClockSkew(outcome, pRequestCtx->m_authSchemeOption); } - if (!retryWithCorrectRegion && !m_clientConfig.retryStrategy->ShouldRetry(outcome.GetError(), static_cast(pRequestCtx->m_retryCount))) + if (!retryWithCorrectRegion && !m_clientConfig->retryStrategy->ShouldRetry(outcome.GetError(), static_cast(pRequestCtx->m_retryCount))) { break; } @@ -435,10 +435,10 @@ void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptrm_requestInfo.ttl = Utils::DateTime::Now() + clockSkew + std::chrono::milliseconds(m_clientConfig.requestTimeoutMs); + pRequestCtx->m_requestInfo.ttl = Utils::DateTime::Now() + clockSkew + std::chrono::milliseconds(m_clientConfig->requestTimeoutMs); } pRequestCtx->m_requestInfo.attempt ++; - pRequestCtx->m_requestInfo.maxAttempts = m_clientConfig.retryStrategy->GetMaxAttempts(); + pRequestCtx->m_requestInfo.maxAttempts = m_clientConfig->retryStrategy->GetMaxAttempts(); Aws::Monitoring::OnRequestRetry(this->GetServiceClientName(), pRequestCtx->m_requestName, pRequestCtx->m_httpRequest, pRequestCtx->m_monitoringContexts); @@ -447,7 +447,7 @@ void AwsSmithyClientBase::HandleAsyncReply(std::shared_ptrgetMeter(this->GetServiceClientName(), {}); + auto meter = m_clientConfig->telemetryProvider->getMeter(this->GetServiceClientName(), {}); auto counter = meter->CreateCounter(TracingUtils::SMITHY_CLIENT_SERVICE_ATTEMPTS_METRIC, TracingUtils::COUNT_METRIC_TYPE, ""); counter->add(pRequestCtx->m_requestInfo.attempt, {{TracingUtils::SMITHY_METHOD_DIMENSION, pRequestCtx->m_requestName}, {TracingUtils::SMITHY_SERVICE_DIMENSION, this->GetServiceClientName()}}); diff --git a/src/aws-cpp-sdk-transfer/include/aws/transfer/TransferManager.h b/src/aws-cpp-sdk-transfer/include/aws/transfer/TransferManager.h index fee728fd171..5261e4b432c 100644 --- a/src/aws-cpp-sdk-transfer/include/aws/transfer/TransferManager.h +++ b/src/aws-cpp-sdk-transfer/include/aws/transfer/TransferManager.h @@ -39,10 +39,14 @@ namespace Aws */ struct TransferManagerConfiguration { - TransferManagerConfiguration(Aws::Utils::Threading::Executor* executor) : s3Client(nullptr), transferExecutor(executor), computeContentMD5(false), transferBufferMaxHeapSize(10 * MB5), bufferSize(MB5) + TransferManagerConfiguration(Aws::Utils::Threading::Executor* executor) + : s3Client(nullptr), + transferExecutor(executor), + computeContentMD5(false), + transferBufferMaxHeapSize(10 * MB5), + bufferSize(MB5) { } - /** * S3 Client to use for transfers. You are responsible for setting this. */ @@ -53,6 +57,18 @@ namespace Aws * It is not a bug to use the same executor, but at least be aware that this is how the manager will be used. */ Aws::Utils::Threading::Executor* transferExecutor = nullptr; + + /** + * Threading Executor shared pointer. + * Created and owned by Transfer manager if no raw pointer `transferExecutor` is provided. + */ + std::shared_ptr spExecutor = nullptr; + + /** + * Threading Executor factory method. Default creates a factory that creates DefaultExecutor + */ + std::function()> executorCreateFn; + /** * When true, TransferManager will calculate the MD5 digest of the content being uploaded. * The digest is sent to S3 via an HTTP header enabling the service to perform integrity checks. diff --git a/src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp b/src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp index 30ea0e33aea..51e706ff9c0 100644 --- a/src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp +++ b/src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp @@ -57,6 +57,18 @@ namespace Aws TransferManager::TransferManager(const TransferManagerConfiguration& configuration) : m_transferConfig(configuration) { assert(m_transferConfig.s3Client); + if (!m_transferConfig.transferExecutor) + { + if(!m_transferConfig.spExecutor && m_transferConfig.executorCreateFn) + { + m_transferConfig.spExecutor = m_transferConfig.executorCreateFn(); + } + m_transferConfig.transferExecutor = m_transferConfig.spExecutor.get(); + } + if (!m_transferConfig.transferExecutor) + { + AWS_LOGSTREAM_FATAL(CLASS_TAG, "Failed to init TransferManager: transferExecutor is null"); + } assert(m_transferConfig.transferExecutor); m_transferConfig.s3Client->AppendToUserAgent("ft/s3-transfer"); for (uint64_t i = 0; i < m_transferConfig.transferBufferMaxHeapSize; i += m_transferConfig.bufferSize) diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceInit.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceInit.vm index c7163e4299f..514c9a02c36 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceInit.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceInit.vm @@ -1,6 +1,7 @@ #if(!${onlyGeneratedOperations}) #set($additionalCtorSignatureArgs = {}) #set($ctorMemberInitList = []) +#set($addArgDummy = $ctorMemberInitList.add("m_clientConfiguration(clientConfiguration)")) #set($signerCtorArgs = []) #set($additionalCtorArgs = {}) #if($signPayloadsOptional) @@ -94,7 +95,6 @@ ${className}::${className}(const ${className} &rhs) : Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), Aws::Client::ClientWithAsyncTemplateMethods<${className}>(), m_clientConfiguration(rhs.m_clientConfiguration), - m_executor(rhs.m_clientConfiguration.executor), m_endpointProvider(rhs.m_endpointProvider) {} ${className}& ${className}::operator=(const ${className} &rhs) { @@ -110,7 +110,6 @@ ${className}& ${className}::operator=(const ${className} &rhs) { rhs.m_clientConfiguration.payloadSigningPolicy, /*doubleEncodeValue*/ false); m_clientConfiguration = rhs.m_clientConfiguration; - m_executor = rhs.m_executor; m_endpointProvider = rhs.m_endpointProvider; init(m_clientConfiguration); return *this; @@ -128,7 +127,6 @@ S3Client::S3Client(${className} &&rhs) noexcept : Aws::MakeShared(ALLOCATION_TAG)), Aws::Client::ClientWithAsyncTemplateMethods(), m_clientConfiguration(std::move(rhs.m_clientConfiguration)), - m_executor(std::move(rhs.m_clientConfiguration.executor)), m_endpointProvider(std::move(rhs.m_endpointProvider)) {} ${className}& ${className}::operator=(${className} &&rhs) noexcept { @@ -144,7 +142,6 @@ ${className}& ${className}::operator=(${className} &&rhs) noexcept { rhs.m_clientConfiguration.payloadSigningPolicy, /*doubleEncodeValue*/ false); m_clientConfiguration = std::move(rhs.m_clientConfiguration); - m_executor = std::move(rhs.m_executor); m_endpointProvider = std::move(rhs.m_endpointProvider); init(m_clientConfiguration); return *this; @@ -169,11 +166,6 @@ ${className}& ${className}::operator=(${className} &&rhs) noexcept { BASECLASS(clientConfiguration, Aws::MakeShared<${signerToMake}>(ALLOCATION_TAG, bearerTokenProvider), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty()) -#else,#end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), @@ -214,11 +206,6 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, -#end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), #else#end @@ -258,11 +245,6 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, -#end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), #else#end @@ -302,11 +284,6 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, -#end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), #else#end @@ -333,10 +310,7 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end BASECLASS(clientConfiguration, signerProvider, Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration), -#end - m_executor(clientConfiguration.executor)${virtualAddressingInit} +${virtualAddressingInit} { init(m_clientConfiguration); } @@ -362,6 +336,14 @@ std::shared_ptr<${metadata.classNamePrefix}EndpointProviderBase>& ${className}:: void ${className}::init(const ${clientConfigurationCls}& config) { AWSClient::SetServiceClientName("${metadata.serviceId}"); + if (!m_clientConfiguration.executor) { + if (!m_clientConfiguration.configFactories.executorCreateFn()) { + AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "Failed to initialize client: config is missing Executor or executorCreateFn"); + m_isInitialized = false; + return; + } + m_clientConfiguration.executor = m_clientConfiguration.configFactories.executorCreateFn(); + } #if($serviceModel.endpointRules) AWS_CHECK_PTR(SERVICE_NAME, m_endpointProvider); m_endpointProvider->InitBuiltInParameters(config); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceLegacyConstructors.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceLegacyConstructors.vm index e2e1e6c46ec..0dde615a7bf 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceLegacyConstructors.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/ServiceClientSourceLegacyConstructors.vm @@ -85,10 +85,7 @@ BASECLASS(clientConfiguration, Aws::MakeShared<${signerToMake}>(ALLOCATION_TAG, bearerTokenProvider), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty()) + m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString})#if($ctorMemberInitList.isEmpty()) #else,#end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), @@ -126,10 +123,7 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, + m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString})#if($ctorMemberInitList.isEmpty())#else, #end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), @@ -167,10 +161,7 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, + m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString})#if($ctorMemberInitList.isEmpty())#else, #end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), @@ -208,10 +199,7 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end #else#end #end), Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), -#end - m_executor(clientConfiguration.executor)#if($ctorMemberInitList.isEmpty())#else, + m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString})#if($ctorMemberInitList.isEmpty())#else, #end #foreach($ctorMemberInit in $ctorMemberInitList) ${ctorMemberInit}#if( $foreach.hasNext ), @@ -239,10 +227,7 @@ ${clsWSpace} ${clsWSpace} ${ctorArgument}#if( $foreach.hasNext ),#else) :#end BASECLASS(clientConfiguration, signerProvider, Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), -#if($serviceModel.endpointRules) - m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), -#end - m_executor(clientConfiguration.executor) + m_clientConfiguration(clientConfiguration${signPayloadsClientConfigParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}) { init(m_clientConfiguration); } diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationAsync.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationAsync.vm index f23dfb62014..54e5aed67b2 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationAsync.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationAsync.vm @@ -7,7 +7,7 @@ #end#if($serviceModel.metadata.serviceId == "S3" && $operation.s3CrtEnabled) void ${className}::${operation.name}Async(${constText}${operation.request.shape.name}& request, const ${operation.name}ResponseReceivedHandler& handler, const std::shared_ptr& context) const { - m_executor->Submit( [this, ${refText}request, handler, context]() + m_clientConfiguration.executor->Submit( [this, ${refText}request, handler, context]() { handler(this, request, ${operation.name}(request), context); } ); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationOutcomeCallable.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationOutcomeCallable.vm index 2a1a71b6adc..ee4ad8d8177 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationOutcomeCallable.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/common/operation/withrequest/OperationOutcomeCallable.vm @@ -9,7 +9,7 @@ ${operation.name}OutcomeCallable ${className}::${operation.name}Callable(${const { auto task = Aws::MakeShared< std::packaged_task< ${operation.name}Outcome() > >(ALLOCATION_TAG, [this, ${refText}request](){ return this->${operation.name}(request); } ); auto packagedFunction = [task]() { (*task)(); }; - m_executor->Submit(packagedFunction); + m_clientConfiguration.executor->Submit(packagedFunction); return task->get_future(); } diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceClientHeader.vm index 522ba82777d..af02be0c2d8 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceClientHeader.vm @@ -75,7 +75,6 @@ namespace ${serviceNamespace} #if($serviceModel.endpointRules) ${metadata.classNamePrefix}ClientConfiguration m_clientConfiguration; #end - std::shared_ptr m_executor; #if($serviceModel.endpointRules) std::shared_ptr<${metadata.classNamePrefix}EndpointProviderBase> m_endpointProvider; #end diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm index a287443d1ad..7a3d274bcc8 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm @@ -3,6 +3,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name} const ${operation.name}ResponseReceivedHandler& handler, const std::shared_ptr& handlerContext) const { + AWS_ASYNC_OPERATION_GUARD(${operation.name}); #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/ServiceClientOperationEndpointPrepareCommonBody.vm") #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/ServiceClientOperationRequestRequiredMemberValidate.vm") #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/UriRequestQueryParams.vm") @@ -33,11 +34,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name} auto sem = Aws::MakeShared(ALLOCATION_TAG, 0, 1); request.SetRequestSignedHandler([eventEncoderStream, sem](const Aws::Http::HttpRequest& httpRequest) { eventEncoderStream->SetSignatureSeed(Aws::Client::GetAuthorizationHeader(httpRequest)); sem->ReleaseAll(); }); -#if($serviceModel.endpointRules) - m_executor->Submit([this, endpointResolutionOutcome, &request, handler, handlerContext] () mutable { -#else - m_executor->Submit([this, uri, &request, handler, handlerContext] () mutable { -#end + m_clientConfiguration.executor->Submit([this, endpointResolutionOutcome, &request, handler, handlerContext] () mutable { JsonOutcome outcome = MakeRequest(request, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_POST, Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); if(outcome.IsSuccess()) { diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationAsync.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationAsync.vm index b633bea6294..a553d351750 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationAsync.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationAsync.vm @@ -1,7 +1,7 @@ #if(0) void ${className}::${operation.name}Async(const ${operation.name}ResponseReceivedHandler& handler, const std::shared_ptr& context) const { - m_executor->Submit( [this, handler, context]() + m_clientConfiguration.executor->Submit( [this, handler, context]() { handler(this, ${operation.name}(), context); } ); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationOutcomeCallable.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationOutcomeCallable.vm index ef7ce8e3e44..62d0e6fa45e 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationOutcomeCallable.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/serviceoperations/withoutrequest/OperationOutcomeCallable.vm @@ -3,6 +3,6 @@ ${operation.name}OutcomeCallable ${className}::${operation.name}Callable() const { auto task = Aws::MakeShared< std::packaged_task< ${operation.name}Outcome() > >(ALLOCATION_TAG, [this](){ return this->${operation.name}(); } ); auto packagedFunction = [task]() { (*task)(); }; - m_executor->Submit(packagedFunction); + m_clientConfiguration.executor->Submit(packagedFunction); return task->get_future(); }#end diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/rds/RDSClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/rds/RDSClientHeader.vm index fecd46bb06b..45df7a85b72 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/rds/RDSClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/rds/RDSClientHeader.vm @@ -90,7 +90,6 @@ namespace ${rootNamespace} #if($serviceModel.endpointRules) ${metadata.classNamePrefix}ClientConfiguration m_clientConfiguration; #end - std::shared_ptr m_executor; #if($serviceModel.endpointRules) std::shared_ptr<${metadata.classNamePrefix}EndpointProviderBase> m_endpointProvider; #end diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm index 04ddf566724..22d8500e65d 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm @@ -265,7 +265,6 @@ namespace ${rootNamespace} ${metadata.classNamePrefix}ClientConfiguration m_clientConfiguration; #end #end - std::shared_ptr m_executor; #if($serviceNamespace == "S3Crt") struct aws_s3_client* m_s3CrtClient = {}; struct aws_signing_config_aws m_s3CrtSigningConfig = {}; diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm index b0f1e144cb8..d5871de459a 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm @@ -44,10 +44,10 @@ #set($credentialsParam = ", m_credProvider") #set($credentialsArg = ", const std::shared_ptr credentialsProvider") #set($defaultCredentialsProviderChainParam = "Aws::MakeShared(ALLOCATION_TAG, credentialsProvider)") -#set($defaultCredentialsProviderChainMember = ", m_credProvider(Aws::MakeShared(ALLOCATION_TAG, credentialsProvider))") +#set($defaultCredentialsProviderChainMember = "m_credProvider(Aws::MakeShared(ALLOCATION_TAG, credentialsProvider))") #set($simpleCredentialsProviderParam = "Aws::MakeShared(ALLOCATION_TAG, credentials)") -#set($simpleCredentialsProviderMember = ", m_credProvider(Aws::MakeShared(ALLOCATION_TAG, credentials))") -#set($credentialsProviderMember = ", m_credProvider(credentialsProvider)") +#set($simpleCredentialsProviderMember = "m_credProvider(Aws::MakeShared(ALLOCATION_TAG, credentials))") +#set($credentialsProviderMember = "m_credProvider(credentialsProvider)") #set($credentialProviderArg = ", const Aws::Auth::DefaultAWSCredentialsProviderChain& credentialsProvider") #set($hasEventStreamInputOperation = false) #foreach($operation in $serviceModel.operations) @@ -67,7 +67,6 @@ ${className}::${className}(const ${className} &rhs) : Aws::MakeShared(ALLOCATION_TAG)), Aws::Client::ClientWithAsyncTemplateMethods(), m_clientConfiguration(rhs.m_clientConfiguration), - m_executor(rhs.m_clientConfiguration.executor), m_endpointProvider(rhs.m_endpointProvider), m_identityProvider(rhs.m_identityProvider){} @@ -84,7 +83,6 @@ ${className}& ${className}::operator=(const ${className} &rhs) { rhs.m_clientConfiguration.payloadSigningPolicy, /*doubleEncodeValue*/ false); m_clientConfiguration = rhs.m_clientConfiguration; - m_executor = rhs.m_executor; m_endpointProvider = rhs.m_endpointProvider; init(m_clientConfiguration, m_credProvider); return *this; @@ -102,7 +100,6 @@ ${className}::${className}(${className} &&rhs) noexcept : Aws::MakeShared(ALLOCATION_TAG)), Aws::Client::ClientWithAsyncTemplateMethods(), m_clientConfiguration(std::move(rhs.m_clientConfiguration)), - m_executor(std::move(rhs.m_clientConfiguration.executor)), m_endpointProvider(std::move(rhs.m_endpointProvider)) {} ${className}& ${className}::operator=(${className} &&rhs) noexcept { @@ -118,7 +115,6 @@ ${className}& ${className}::operator=(${className} &&rhs) noexcept { rhs.m_clientConfiguration.payloadSigningPolicy, /*doubleEncodeValue*/ false); m_clientConfiguration = std::move(rhs.m_clientConfiguration); - m_executor = std::move(rhs.m_executor); m_endpointProvider = std::move(rhs.m_endpointProvider); init(m_clientConfiguration, m_credProvider); return *this; @@ -137,9 +133,8 @@ ${className}::${className}(const ${clientConfigurationNamespace}::ClientConfigur Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${defaultCredentialsProviderChainMember}, -#else - m_executor(clientConfiguration.executor)${defaultCredentialsProviderChainMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, + ${defaultCredentialsProviderChainMember}, +#else${defaultCredentialsProviderChainMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -158,9 +153,8 @@ ${className}::${className}(const AWSCredentials& credentials, const ${clientConf Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${simpleCredentialsProviderMember}, -#else - m_executor(clientConfiguration.executor)${simpleCredentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, + ${simpleCredentialsProviderMember}, +#else${simpleCredentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -180,9 +174,9 @@ ${className}::${className}(const std::shared_ptr& creden Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${credentialsProviderMember}, + ${credentialsProviderMember}, #else - m_executor(clientConfiguration.executor)${credentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, + ${credentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -196,9 +190,9 @@ ${className}::${className}(const ${clientConfigurationNamespace}::ClientConfigur Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${defaultCredentialsProviderChainMember}, + ${defaultCredentialsProviderChainMember}, #else - m_executor(clientConfiguration.executor)${defaultCredentialsProviderChainMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, + ${defaultCredentialsProviderChainMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -212,9 +206,9 @@ ${className}::${className}(const AWSCredentials& credentials, const ${clientConf Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${simpleCredentialsProviderMember}, + ${simpleCredentialsProviderMember}, #else - m_executor(clientConfiguration.executor)${simpleCredentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, + ${simpleCredentialsProviderMember}${virtualAddressingInit}${USEast1RegionalEndpointInitString}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -229,9 +223,9 @@ ${className}::${className}(const std::shared_ptr& creden Aws::MakeShared<${metadata.classNamePrefix}ErrorMarshaller>(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${virtualAddressingInit}, + ${virtualAddressingInit}, #else - m_executor(clientConfiguration.executor)${virtualAddressingInit}${USEast1RegionalEndpointInitString}${credentialsProviderMember}, + ${virtualAddressingInit}${USEast1RegionalEndpointInitString}${credentialsProviderMember}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -246,9 +240,9 @@ ${className}::${className}(const std::shared_ptr(ALLOCATION_TAG)), #if($serviceModel.endpointRules) m_clientConfiguration(clientConfiguration${signPayloadsParam}${virtualAddressingInit}${USEast1RegionalEndpointInitString}), - m_executor(clientConfiguration.executor)${virtualAddressingInit}, + ${virtualAddressingInit}, #else - m_executor(clientConfiguration.executor)${virtualAddressingInit}, + ${virtualAddressingInit}, #end m_identityProvider(Aws::MakeShared(ALLOCATION_TAG, *this)) { @@ -279,6 +273,14 @@ void ${className}::init(const ${clientConfigurationNamespace}::ClientConfigurati const std::shared_ptr credentialsProvider) { AWSClient::SetServiceClientName("${metadata.serviceId}"); + if (!m_clientConfiguration.executor) { + if (!m_clientConfiguration.configFactories.executorCreateFn()) { + AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "Failed to initialize client: config is missing Executor or executorCreateFn"); + m_isInitialized = false; + return; + } + m_clientConfiguration.executor = m_clientConfiguration.configFactories.executorCreateFn(); + } #if($serviceModel.endpointRules) m_endpointProvider = Aws::MakeShared(ALLOCATION_TAG); AWS_CHECK_PTR(SERVICE_NAME, m_endpointProvider); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm index a0e931e9c77..abcc93d2373 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm @@ -259,6 +259,7 @@ static void ${operation.name}RequestShutdownCallback(void *user_data) #if($operation.request) void ${className}::${operation.name}Async(${constText}${operation.request.shape.name}& request, const ${operation.name}ResponseReceivedHandler& handler, const std::shared_ptr& handlerContext) const { + AWS_ASYNC_OPERATION_GUARD(${operation.name}); #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/ServiceClientOperationRequestRequiredMemberValidate.vm") #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/ServiceClientOperationEndpointPrepareCommonBody.vm") #parse("com/amazonaws/util/awsclientgenerator/velocity/cpp/common/UriRequestQueryParams.vm") diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3control/S3ControlClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3control/S3ControlClientHeader.vm index 56a90312092..e5197ddb40e 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3control/S3ControlClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3control/S3ControlClientHeader.vm @@ -96,7 +96,6 @@ namespace ${metadata.namespace} #if($serviceModel.endpointRules) ${metadata.classNamePrefix}ClientConfiguration m_clientConfiguration; #end - std::shared_ptr m_executor; #if(!$serviceModel.endpointRules) bool m_useDualStack = false; bool m_useArnRegion = false; diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/XmlServiceClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/XmlServiceClientHeader.vm index abdc4804b6c..f62170e763d 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/XmlServiceClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/XmlServiceClientHeader.vm @@ -85,7 +85,6 @@ namespace ${serviceNamespace} #if($serviceModel.endpointRules) ${metadata.classNamePrefix}ClientConfiguration m_clientConfiguration; #end - std::shared_ptr m_executor; #if($serviceModel.endpointRules) std::shared_ptr<${metadata.classNamePrefix}EndpointProviderBase> m_endpointProvider; #end diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/rest/RestXmlServiceClientOperations.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/rest/RestXmlServiceClientOperations.vm index 55491bc0cb8..710012b7bf6 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/rest/RestXmlServiceClientOperations.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/xml/rest/RestXmlServiceClientOperations.vm @@ -150,13 +150,13 @@ ${operation.name}OutcomeCallable ${className}::${operation.name}Callable() const { auto task = Aws::MakeShared< std::packaged_task< ${operation.name}Outcome() > >(ALLOCATION_TAG, [this](){ return this->${operation.name}(); } ); auto packagedFunction = [task]() { (*task)(); }; - m_executor->Submit(packagedFunction); + m_clientConfiguration.executor->Submit(packagedFunction); return task->get_future(); } void ${className}::${operation.name}Async(${constText}${operation.name}ResponseReceivedHandler& handler, const std::shared_ptr& context) const { - m_executor->Submit( [this, handler, context]() + m_clientConfiguration.executor->Submit( [this, handler, context]() { handler(this, ${operation.name}(), context); } );