Skip to content

Commit

Permalink
Update mwaa cfn resource provider (#5)
Browse files Browse the repository at this point in the history
* Update mwaa cfn resource provider

Co-authored-by: Kalhan Dhar <[email protected]>
  • Loading branch information
Kalhan22 and Kalhan Dhar authored Sep 22, 2021
1 parent 9b6aeb1 commit b518a5a
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 64 deletions.
97 changes: 43 additions & 54 deletions aws-mwaa-environment/aws-mwaa-environment.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
"TagMap": {
"type": "object",
"description": ""
"description": "A map of tags for the environment."
},
"EnvironmentName": {
"type": "string",
Expand All @@ -30,14 +30,16 @@
},
"EnvironmentStatus": {
"type": "string",
"description": "",
"description": "The status of the environment.",
"enum": [
"CREATING",
"CREATE_FAILED",
"AVAILABLE",
"UPDATING",
"DELETING",
"DELETED"
"DELETED",
"UPDATE_FAILED",
"UNAVAILABLE"
]
},
"UpdateStatus": {
Expand Down Expand Up @@ -81,7 +83,7 @@
"description": "ARN for the AWS S3 bucket to use as the source of DAGs and plugins for the environment.",
"minLength": 1,
"maxLength": 1224,
"pattern": "^arn:aws(-[a-z]+)?:s3:::airflow-[a-z0-9.\\-]+$"
"pattern": "^arn:aws(-[a-z]+)?:s3:::[a-z0-9.\\-]+$"
},
"CreatedAt": {
"type": "string",
Expand Down Expand Up @@ -128,10 +130,6 @@
"maxLength": 1024,
"pattern": ".*"
},
"AirflowConfigurationOptions": {
"type": "object",
"description": "Key/value pairs representing Airflow configuration variables.\n Keys are prefixed by their section:\n\n [core]\n dags_folder={AIRFLOW_HOME}/dags\n\n Would be represented as\n\n \"core.dags_folder\": \"{AIRFLOW_HOME}/dags\""
},
"ConfigKey": {
"type": "string",
"description": "",
Expand All @@ -157,24 +155,6 @@
"maxLength": 1024,
"pattern": "^subnet-[a-zA-Z0-9\\-._]+$"
},
"SubnetList": {
"type": "array",
"description": "",
"minItems": 2,
"maxItems": 2,
"items": {
"$ref": "#/definitions/SubnetId"
}
},
"SecurityGroupList": {
"type": "array",
"description": "",
"minItems": 1,
"maxItems": 5,
"items": {
"$ref": "#/definitions/SecurityGroupId"
}
},
"CloudWatchLogGroupArn": {
"type": "string",
"description": "",
Expand Down Expand Up @@ -207,16 +187,33 @@
"description": "Maximum worker compute units.",
"minimum": 1
},
"MinWorkers": {
"type": "integer",
"description": "Minimum worker compute units.",
"minimum": 1
},
"NetworkConfiguration": {
"type": "object",
"description": "Configures the network resources of the environment.",
"additionalProperties": false,
"properties": {
"SubnetIds": {
"$ref": "#/definitions/SubnetList"
"type": "array",
"description": "A list of subnets to use for the environment. These must be private subnets, in the same VPC, in two different availability zones.",
"minItems": 2,
"maxItems": 2,
"items": {
"$ref": "#/definitions/SubnetId"
}
},
"SecurityGroupIds": {
"$ref": "#/definitions/SecurityGroupList"
"type": "array",
"description": "A list of security groups to use for the environment.",
"minItems": 1,
"maxItems": 5,
"items": {
"$ref": "#/definitions/SecurityGroupId"
}
}
}
},
Expand Down Expand Up @@ -320,11 +317,6 @@
"maxLength": 1024,
"pattern": "^.+$"
},
"AirflowRbacRole": {
"type": "string",
"description": "",
"maxLength": 64
},
"S3ObjectVersion": {
"type": "string",
"description": "Represents an version ID for an S3 object.",
Expand All @@ -349,24 +341,15 @@
"Name": {
"$ref": "#/definitions/EnvironmentName"
},
"Status": {
"$ref": "#/definitions/EnvironmentStatus"
},
"Arn": {
"$ref": "#/definitions/EnvironmentArn"
},
"CreatedAt": {
"$ref": "#/definitions/CreatedAt"
},
"WebserverUrl": {
"$ref": "#/definitions/WebserverUrl"
},
"ExecutionRoleArn": {
"$ref": "#/definitions/ExecutionRoleArn"
},
"ServiceRoleArn": {
"$ref": "#/definitions/ServiceRoleArn"
},
"KmsKey": {
"$ref": "#/definitions/KmsKey"
},
Expand All @@ -385,14 +368,18 @@
"PluginsS3ObjectVersion": {
"$ref": "#/definitions/S3ObjectVersion"
},
"MinWorkers": {
"$ref": "#/definitions/MinWorkers"
},
"RequirementsS3Path": {
"$ref": "#/definitions/RelativePath"
},
"RequirementsS3ObjectVersion": {
"$ref": "#/definitions/S3ObjectVersion"
},
"AirflowConfigurationOptions": {
"$ref": "#/definitions/AirflowConfigurationOptions"
"type": "object",
"description": "Key/value pairs representing Airflow configuration variables.\n Keys are prefixed by their section:\n\n [core]\n dags_folder={AIRFLOW_HOME}/dags\n\n Would be represented as\n\n \"core.dags_folder\": \"{AIRFLOW_HOME}/dags\""
},
"EnvironmentClass": {
"$ref": "#/definitions/EnvironmentClass"
Expand All @@ -406,9 +393,6 @@
"LoggingConfiguration": {
"$ref": "#/definitions/LoggingConfiguration"
},
"LastUpdate": {
"$ref": "#/definitions/LastUpdate"
},
"WeeklyMaintenanceWindowStart": {
"$ref": "#/definitions/WeeklyMaintenanceWindowStart"
},
Expand All @@ -424,15 +408,18 @@
"Name"
],
"createOnlyProperties": [
"/properties/NetworkConfiguration"
"/properties/Name",
"/properties/KmsKey",
"/properties/NetworkConfiguration/SubnetIds"
],
"readOnlyProperties": [
"/properties/CreatedAt",
"/properties/LastUpdate",
"/properties/ServiceRoleArn",
"/properties/Arn",
"/properties/Status",
"/properties/Name"
"/properties/WebserverUrl",
"/properties/LoggingConfiguration/DagProcessingLogs/CloudWatchLogGroupArn",
"/properties/LoggingConfiguration/SchedulerLogs/CloudWatchLogGroupArn",
"/properties/LoggingConfiguration/WebserverLogs/CloudWatchLogGroupArn",
"/properties/LoggingConfiguration/WorkerLogs/CloudWatchLogGroupArn",
"/properties/LoggingConfiguration/TaskLogs/CloudWatchLogGroupArn"
],
"primaryIdentifier": [
"/properties/Name"
Expand All @@ -441,7 +428,8 @@
"create": {
"permissions": [
"airflow:CreateEnvironment"
]
],
"timeoutInMinutes": 180
},
"read": {
"permissions": [
Expand All @@ -451,7 +439,8 @@
"update": {
"permissions": [
"airflow:UpdateEnvironment"
]
],
"timeoutInMinutes": 480
},
"delete": {
"permissions": [
Expand Down
11 changes: 9 additions & 2 deletions aws-mwaa-environment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<dependency>
<groupId>software.amazon.cloudformation</groupId>
<artifactId>aws-cloudformation-rpdk-java-plugin</artifactId>
<version>2.0.1</version>
<version>2.0.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
Expand All @@ -37,7 +37,14 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>mwaa</artifactId>
<version>2.15.35</version>
<version>2.16.43</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/utils -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>utils</artifactId>
<version>2.17.42</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.assertj/assertj-core -->
Expand Down
2 changes: 1 addition & 1 deletion aws-mwaa-environment/resource-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Resources:
ExecutionRole:
Type: AWS::IAM::Role
Properties:
MaxSessionDuration: 8400
MaxSessionDuration: 33600
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import software.amazon.awssdk.services.mwaa.model.EnvironmentStatus;
import software.amazon.awssdk.services.mwaa.model.GetEnvironmentRequest;
import software.amazon.awssdk.services.mwaa.model.GetEnvironmentResponse;
import software.amazon.awssdk.services.mwaa.model.LastUpdate;
import software.amazon.awssdk.services.mwaa.model.ResourceNotFoundException;
import software.amazon.awssdk.services.mwaa.model.UpdateError;
import software.amazon.cloudformation.exceptions.CfnAlreadyExistsException;
import software.amazon.cloudformation.exceptions.CfnNotFoundException;
import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
Expand Down Expand Up @@ -91,8 +93,7 @@ protected Optional<EnvironmentStatus> getEnvironmentStatus(
final GetEnvironmentRequest awsRequest) {

try {
final GetEnvironmentResponse response = doReadEnvironment(awsRequest, mwaaClientProxy);
final Environment environment = response.environment();
final Environment environment = getEnvironment(mwaaClientProxy, awsRequest);
final EnvironmentStatus status = EnvironmentStatus.fromValue(environment.statusAsString().toUpperCase());

log("%s [%s] exists. Status: %s", ResourceModel.TYPE_NAME, environment.name(), status);
Expand All @@ -103,6 +104,29 @@ protected Optional<EnvironmentStatus> getEnvironmentStatus(
}
}


protected Environment getEnvironment(final ProxyClient<MwaaClient> mwaaClientProxy,
final GetEnvironmentRequest awsRequest) {
final GetEnvironmentResponse response = doReadEnvironment(awsRequest, mwaaClientProxy);
return response.environment();

}

protected Optional<UpdateError> getLastUpdateError(
final ProxyClient<MwaaClient> mwaaClientProxy,
final String name) {
final GetEnvironmentRequest awsRequest = translateToReadRequest(name);
try {
final Environment environment = getEnvironment(mwaaClientProxy, awsRequest);
final Optional<UpdateError> lastUpdateError = Optional.of(environment.lastUpdate())
.map(LastUpdate::error);
return lastUpdateError;
} catch (CfnNotFoundException e) {
log("%s [%s] does not exist", ResourceModel.TYPE_NAME, awsRequest.name());
return Optional.empty();
}
}

protected ProgressEvent<ResourceModel, CallbackContext> ensureEnvironmentExists(
final GetEnvironmentRequest awsRequest,
final ProxyClient<MwaaClient> mwaaClientProxy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import software.amazon.awssdk.services.mwaa.model.UntagResourceRequest;
import software.amazon.awssdk.services.mwaa.model.UpdateEnvironmentRequest;
import software.amazon.awssdk.services.mwaa.model.UpdateEnvironmentResponse;
import software.amazon.awssdk.services.mwaa.model.UpdateError;
import software.amazon.awssdk.services.mwaa.model.ValidationException;
import software.amazon.cloudformation.exceptions.CfnInvalidRequestException;
import software.amazon.cloudformation.exceptions.CfnNotUpdatableException;
Expand All @@ -35,6 +36,7 @@
/**
* Handler for Update command.
*/
@SuppressWarnings({"checkstyle:MethodLength"})
public class UpdateHandler extends BaseHandlerStd {
private static final Duration CALLBACK_DELAY = Duration.ofMinutes(1);

Expand All @@ -50,6 +52,11 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
proxies.getMwaaClientProxy(),
model.getName());

final Optional<UpdateError> lastUpdateError = getLastUpdateError(
proxies.getMwaaClientProxy(),
model.getName());
String errorMessage = lastUpdateError.map(UpdateError::errorMessage).orElse("");

if (!status.isPresent()) {
log("Environment not found, failing update");
return ProgressEvent.failed(
Expand All @@ -65,7 +72,24 @@ protected ProgressEvent<ResourceModel, CallbackContext> handleRequest(
progress -> getEnvironmentDetails("Update::PostUpdateRead", proxies, progress));
}

log("status is {}, requesting a callback in {}", status, CALLBACK_DELAY);
if (status.get() == EnvironmentStatus.UPDATE_FAILED) {
log("status is UPDATE_FAILED, returning failure");
return ProgressEvent.failed(
model,
null,
HandlerErrorCode.NotStabilized,
String.format("Update failed. %s", errorMessage));
}
if (status.get() == EnvironmentStatus.UNAVAILABLE) {
log("status is UNAVAILABLE, returning failure");
return ProgressEvent.failed(
model,
null,
HandlerErrorCode.NotStabilized,
String.format("Update failed, Environment unavailable. %s", errorMessage));
}

log("status is %s, requesting a callback in %s", status, CALLBACK_DELAY);
return ProgressEvent.<ResourceModel, CallbackContext>builder()
.resourceModel(model)
.callbackContext(callbackContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static CreateEnvironmentRequest translateToCreateRequest(final ResourceMo
model.getAirflowConfigurationOptions()))
.environmentClass(model.getEnvironmentClass())
.maxWorkers(model.getMaxWorkers())
.minWorkers(model.getMinWorkers())
.networkConfiguration(toApiNetworkConfiguration(
model.getNetworkConfiguration()))
.loggingConfiguration(toApiLoggingConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static ResourceModel translateFromReadResponse(final GetEnvironmentRespon

return ResourceModel.builder()
.name(env.name())
.arn(env.arn())
.executionRoleArn(env.executionRoleArn())
.kmsKey(env.kmsKey())
.airflowVersion(env.airflowVersion())
Expand All @@ -66,11 +67,13 @@ public static ResourceModel translateFromReadResponse(final GetEnvironmentRespon
.airflowConfigurationOptions(toStringToObjectMap(env.airflowConfigurationOptions()))
.environmentClass(env.environmentClass())
.maxWorkers(env.maxWorkers())
.minWorkers(env.minWorkers())
.networkConfiguration(toCfnNetworkConfiguration(env.networkConfiguration()))
.loggingConfiguration(toCfnLoggingConfiguration(env.loggingConfiguration()))
.weeklyMaintenanceWindowStart(env.weeklyMaintenanceWindowStart())
.tags(toStringToObjectMap(removeInternalTags(env.tags())))
.webserverAccessMode(env.webserverAccessModeAsString())
.webserverUrl(env.webserverUrl())
.build();
}
}
Loading

0 comments on commit b518a5a

Please sign in to comment.