Skip to content

Commit

Permalink
Update MWAA CFn Resource Provider (#6)
Browse files Browse the repository at this point in the history
Co-authored-by: Jake He <[email protected]>
  • Loading branch information
Mercury2699 and Jake He authored Jun 23, 2023
1 parent b518a5a commit 5a603c3
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 51 deletions.
64 changes: 33 additions & 31 deletions aws-mwaa-environment/aws-mwaa-environment.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,6 @@
"description": "Resource schema for AWS::MWAA::Environment",
"sourceUrl": "https://github.com/aws-cloudformation/aws-cloudformation-resource-providers-mwaa.git",
"definitions": {
"TagKey": {
"type": "string",
"description": "",
"minLength": 1,
"maxLength": 128,
"pattern": "^([\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]*)$"
},
"TagValue": {
"type": "string",
"description": "",
"minLength": 1,
"maxLength": 256,
"pattern": "^([\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]*)$"
},
"TagMap": {
"type": "object",
"description": "A map of tags for the environment."
},
"EnvironmentName": {
"type": "string",
"description": "Customer-defined identifier for the environment, unique per customer region.",
Expand Down Expand Up @@ -69,21 +51,21 @@
"description": "",
"minLength": 1,
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:airflow:[a-z0-9\\-]+:\\d{12}:environment/\\w+"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:airflow:[a-z0-9\\-]+:\\d{12}:environment/\\w+"
},
"EnvironmentArn": {
"type": "string",
"description": "ARN for the MWAA environment.",
"minLength": 1,
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:airflow:[a-z0-9\\-]+:\\d{12}:environment/\\w+"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:airflow:[a-z0-9\\-]+:\\d{12}:environment/\\w+"
},
"S3BucketArn": {
"type": "string",
"description": "ARN for the AWS S3 bucket to use as the source of DAGs and plugins for the environment.",
"minLength": 1,
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:s3:::[a-z0-9.\\-]+$"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:s3:::[a-z0-9.\\-]+$"
},
"CreatedAt": {
"type": "string",
Expand All @@ -104,19 +86,19 @@
"type": "string",
"description": "IAM role to be used by tasks.",
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+$"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+$"
},
"ServiceRoleArn": {
"type": "string",
"description": "IAM role to be used by MWAA to perform AWS API calls on behalf of the customer.",
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+$"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+$"
},
"KmsKey": {
"type": "string",
"description": "The identifier of the AWS Key Management Service (AWS KMS) customer master key (CMK) to use for MWAA data encryption.\n\n You can specify the CMK using any of the following:\n\n Key ID. For example, key/1234abcd-12ab-34cd-56ef-1234567890ab.\n\n Key alias. For example, alias/ExampleAlias.\n\n Key ARN. For example, arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef.\n\n Alias ARN. For example, arn:aws:kms:us-east-1:012345678910:alias/ExampleAlias.\n\n AWS authenticates the CMK asynchronously. Therefore, if you specify an ID, alias, or ARN that is not valid, the action can appear to complete, but eventually fails.",
"maxLength": 1224,
"pattern": "^(((arn:aws(-[a-z]+)?:kms:[a-z]{2}-[a-z]+-\\d:\\d+:)?key\\/)?[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}|(arn:aws:kms:[a-z]{2}-[a-z]+-\\d:\\d+:)?alias/.+)$"
"pattern": "^(((arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:kms:[a-z]{2}-[a-z]+-\\d:\\d+:)?key\\/)?[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}|(arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b):kms:[a-z]{2}-[a-z]+-\\d:\\d+:)?alias/.+)$"
},
"AirflowVersion": {
"type": "string",
Expand Down Expand Up @@ -159,7 +141,7 @@
"type": "string",
"description": "",
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:logs:[a-z0-9\\-]+:\\d{12}:log-group:\\w+"
"pattern": "^arn:(aws|aws-us-gov|aws-cn|aws-iso|aws-iso-b)(-[a-z]+)?:logs:[a-z0-9\\-]+:\\d{12}:log-group:\\w+"
},
"LoggingEnabled": {
"type": "boolean",
Expand Down Expand Up @@ -192,13 +174,19 @@
"description": "Minimum worker compute units.",
"minimum": 1
},
"Schedulers": {
"type": "integer",
"description": "Scheduler compute units.",
"minimum": 1
},
"NetworkConfiguration": {
"type": "object",
"description": "Configures the network resources of the environment.",
"additionalProperties": false,
"properties": {
"SubnetIds": {
"type": "array",
"insertionOrder": true,
"description": "A list of subnets to use for the environment. These must be private subnets, in the same VPC, in two different availability zones.",
"minItems": 2,
"maxItems": 2,
Expand All @@ -208,6 +196,7 @@
},
"SecurityGroupIds": {
"type": "array",
"insertionOrder": true,
"description": "A list of security groups to use for the environment.",
"minItems": 1,
"maxItems": 5,
Expand Down Expand Up @@ -368,15 +357,18 @@
"PluginsS3ObjectVersion": {
"$ref": "#/definitions/S3ObjectVersion"
},
"MinWorkers": {
"$ref": "#/definitions/MinWorkers"
},
"RequirementsS3Path": {
"$ref": "#/definitions/RelativePath"
},
"RequirementsS3ObjectVersion": {
"$ref": "#/definitions/S3ObjectVersion"
},
"StartupScriptS3Path": {
"$ref": "#/definitions/RelativePath"
},
"StartupScriptS3ObjectVersion": {
"$ref": "#/definitions/S3ObjectVersion"
},
"AirflowConfigurationOptions": {
"type": "object",
"description": "Key/value pairs representing Airflow configuration variables.\n Keys are prefixed by their section:\n\n [core]\n dags_folder={AIRFLOW_HOME}/dags\n\n Would be represented as\n\n \"core.dags_folder\": \"{AIRFLOW_HOME}/dags\""
Expand All @@ -387,6 +379,12 @@
"MaxWorkers": {
"$ref": "#/definitions/MaxWorkers"
},
"MinWorkers": {
"$ref": "#/definitions/MinWorkers"
},
"Schedulers": {
"$ref": "#/definitions/Schedulers"
},
"NetworkConfiguration": {
"$ref": "#/definitions/NetworkConfiguration"
},
Expand All @@ -397,7 +395,8 @@
"$ref": "#/definitions/WeeklyMaintenanceWindowStart"
},
"Tags": {
"$ref": "#/definitions/TagMap"
"type": "object",
"description": "A map of tags for the environment."
},
"WebserverAccessMode": {
"$ref": "#/definitions/WebserverAccessMode"
Expand All @@ -421,6 +420,7 @@
"/properties/LoggingConfiguration/WorkerLogs/CloudWatchLogGroupArn",
"/properties/LoggingConfiguration/TaskLogs/CloudWatchLogGroupArn"
],
"taggable": true,
"primaryIdentifier": [
"/properties/Name"
],
Expand All @@ -438,7 +438,9 @@
},
"update": {
"permissions": [
"airflow:UpdateEnvironment"
"airflow:UpdateEnvironment",
"airflow:TagResource",
"airflow:UntagResource"
],
"timeoutInMinutes": 480
},
Expand All @@ -453,4 +455,4 @@
]
}
}
}
}
9 changes: 9 additions & 0 deletions aws-mwaa-environment/resource-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ Resources:
Principal:
Service: resources.cloudformation.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
aws:SourceAccount:
Ref: AWS::AccountId
StringLike:
aws:SourceArn:
Fn::Sub: arn:${AWS::Partition}:cloudformation:${AWS::Region}:${AWS::AccountId}:type/resource/AWS-MWAA-Environment/*
Path: "/"
Policies:
- PolicyName: ResourceTypePolicy
Expand All @@ -27,6 +34,8 @@ Resources:
- "airflow:DeleteEnvironment"
- "airflow:GetEnvironment"
- "airflow:ListEnvironments"
- "airflow:TagResource"
- "airflow:UntagResource"
- "airflow:UpdateEnvironment"
Resource: "*"
Outputs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ protected Optional<EnvironmentStatus> getEnvironmentStatus(
}
}


protected Environment getEnvironment(final ProxyClient<MwaaClient> mwaaClientProxy,
final GetEnvironmentRequest awsRequest) {
final GetEnvironmentResponse response = doReadEnvironment(awsRequest, mwaaClientProxy);
Expand Down Expand Up @@ -182,4 +181,8 @@ protected GetEnvironmentResponse doReadEnvironment(
throw new CfnNotFoundException(ResourceModel.TYPE_NAME, request.name(), e);
}
}

protected Logger getLogger() {
return this.logger;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.

package software.amazon.mwaa.environment;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import software.amazon.cloudformation.proxy.Logger;

/**
* Listener for CreateEnvironment retries.
*/
public class CreateEnvironmentRetryListener implements RetryListener {
private long attemptNumber = 0;
private long delaySinceFirstAttempt = 0;

private final Logger logger;
private final int maxRetries;
private final String environmentName;

/**
*
* @param logger logger for retry logging.
* @param maxRetries maximum retry attempts before failure.
* @param environmentName name of environment to be created.
*/
public CreateEnvironmentRetryListener(Logger logger, int maxRetries, String environmentName) {
this.logger = logger;
this.maxRetries = maxRetries;
this.environmentName = environmentName;
}

@Override
public <V> void onRetry(Attempt<V> attempt) {
attemptNumber = attempt.getAttemptNumber();
delaySinceFirstAttempt = attempt.getDelaySinceFirstAttempt();

if (attempt.hasResult()) {
log("CreateEnvironment [%s]: retry attempt %d/%d successful. Total delay since first attempt: %dms",
environmentName, attemptNumber, maxRetries, delaySinceFirstAttempt);
} else {
log("CreateEnvironment [%s]: retry attempt %d/%d failed with error message: %s. "
+ "Total delay since first attempt: %dms",
environmentName,
attemptNumber,
maxRetries,
attempt.getExceptionCause().getMessage(),
delaySinceFirstAttempt);
}
}

private void log(final String format, final Object... args) {
if (logger != null) {
logger.log(String.format(format, args));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@

package software.amazon.mwaa.environment;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import software.amazon.awssdk.services.mwaa.MwaaClient;
import software.amazon.awssdk.services.mwaa.model.CreateEnvironmentRequest;
import software.amazon.awssdk.services.mwaa.model.CreateEnvironmentResponse;
import software.amazon.awssdk.services.mwaa.model.EnvironmentStatus;
import software.amazon.awssdk.services.mwaa.model.InternalServerException;
import software.amazon.awssdk.services.mwaa.model.ValidationException;
import software.amazon.cloudformation.exceptions.CfnInvalidRequestException;
import software.amazon.cloudformation.proxy.HandlerErrorCode;
Expand All @@ -24,6 +33,7 @@
*/
public class CreateHandler extends BaseHandlerStd {
private static final Duration CALLBACK_DELAY = Duration.ofMinutes(1);
public static final int MAX_RETRIES = 14;

protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
final Proxies proxies,
Expand Down Expand Up @@ -91,14 +101,36 @@ private CreateEnvironmentResponse doCreateEnvironment(

try {
log("Creating %s [%s]", ResourceModel.TYPE_NAME, name);
final CreateEnvironmentResponse response = mwaaClientProxy.injectCredentialsAndInvokeV2(

final CreateEnvironmentRetryListener listener = new CreateEnvironmentRetryListener(
getLogger(), MAX_RETRIES, name);
final Retryer<CreateEnvironmentResponse> retryer = getCreateEnvironmentRetryer(listener);

final CreateEnvironmentResponse response = retryer.call(() -> mwaaClientProxy.injectCredentialsAndInvokeV2(
awsRequest,
mwaaClientProxy.client()::createEnvironment);
mwaaClientProxy.client()::createEnvironment));
log("Create submitted %s [%s]", ResourceModel.TYPE_NAME, name);
callbackContext.setStabilizing(true);
return response;
} catch (final ValidationException e) {
throw new CfnInvalidRequestException(e.getMessage(), e);
} catch (final RetryException e) {
final Attempt<?> lastAttempt = e.getLastFailedAttempt();
Throwable rootCause = lastAttempt.getExceptionCause();
log("CreateEnvironment [%s]: Reached maximum number of retires. Total delay since first attempt: %dms",
name,
lastAttempt.getDelaySinceFirstAttempt());
throw new CfnInvalidRequestException(rootCause.getMessage(), e);
} catch (ExecutionException e) {
throw new CfnInvalidRequestException(e.getCause().getMessage(), e);
}
}

private Retryer<CreateEnvironmentResponse> getCreateEnvironmentRetryer(RetryListener listener) {
return RetryerBuilder.<CreateEnvironmentResponse>newBuilder()
.retryIfExceptionOfType(ValidationException.class)
.retryIfExceptionOfType(InternalServerException.class)
.withRetryListener(listener)
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,13 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
HandlerErrorCode.NotStabilized,
"Update failed, resource no longer exists");
}

if (status.get() == EnvironmentStatus.AVAILABLE) {
log("status is AVAILABLE, returning success");
return ProgressEvent.progress(model, callbackContext).then(
progress -> getEnvironmentDetails("Update::PostUpdateRead", proxies, progress));
}

if (status.get() == EnvironmentStatus.UPDATE_FAILED) {
log("status is UPDATE_FAILED, returning failure");
log("status is UPDATE_FAILED, returning failure");
return ProgressEvent.failed(
model,
null,
Expand All @@ -88,8 +86,7 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
HandlerErrorCode.NotStabilized,
String.format("Update failed, Environment unavailable. %s", errorMessage));
}

log("status is %s, requesting a callback in %s", status, CALLBACK_DELAY);
log("status is {}, requesting a callback in {}", status, CALLBACK_DELAY);
return ProgressEvent.<ResourceModel, CallbackContext>builder()
.resourceModel(model)
.callbackContext(callbackContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ public static CreateEnvironmentRequest translateToCreateRequest(final ResourceMo
.requirementsS3Path(model.getRequirementsS3Path())
.requirementsS3ObjectVersion(
model.getRequirementsS3ObjectVersion())
.startupScriptS3Path(model.getStartupScriptS3Path())
.startupScriptS3ObjectVersion(
model.getStartupScriptS3ObjectVersion())
.airflowConfigurationOptions(toStringToStringMap(
model.getAirflowConfigurationOptions()))
.environmentClass(model.getEnvironmentClass())
.maxWorkers(model.getMaxWorkers())
.minWorkers(model.getMinWorkers())
.schedulers(model.getSchedulers())
.networkConfiguration(toApiNetworkConfiguration(
model.getNetworkConfiguration()))
.loggingConfiguration(toApiLoggingConfiguration(
Expand Down
Loading

0 comments on commit 5a603c3

Please sign in to comment.