Skip to content

Commit

Permalink
Feature/cdk pipelines init - testing completed (aws-samples#9)
Browse files Browse the repository at this point in the history
* Multiple code changes based on peer-reviews.

Renamed the files as follows:

	datalake_blog_trigger_load 			--> etl_statemachine_trigger
	datalake_blog_success_status_update --> etl_job_auditor
	dynamodb_config 					--> etl_job_config
	datalake_blog_conformed_sync.py 	--> etl_job_config_manager.py
	datalake_blog_conformed_logic.json  -->
etl_job_config_conformed_stage.json
	datalake_blog_sfn.json 				--> etl_job_state_machine.json
	datalake_conformed_load.py 			--> etl_raw_to_conformed.py
	datalake_purposebuilt_load.py 		--> etl_conformed_to_purposebuilt.py


Fixed few issues around dynamodb client creation

* More code changes based on peer-review

* More code changes after peer review of the code

* combined failure and success lambda logic in one script

* fixes

* Minor typos. Fixed issues with few imports.

* Use flattened CloudFormation exports. Updates to Readme.

* Use alternative S3 event notification strategy

* Remove unused import

* minor code fixes, updated readme, and developer guide file renamed

* updated author section

* Bump CDK version for feature compatibility

* updated cdk dependency versions, readme, and minor code changes

* updated github repo name

* Updated IAM policy name for Glue Service Role

* Updated IAM policy name

* updated number of workers for Glue Jobs.

* additional pip install command

* correct pip install command

* correct pip3 install

* relative path changed to include lib prefix

* pip3 install from requirements.txt

* Remove use of Transformation DynamoDB TAble

* modified transformation logic to read from sql file rather than dynamo db

* Fix readme formatting

* simple fix

* consolidated two Lambda functions into one

* state machine input variables, updated lambda and raw glue job fix

* updated state machine and lambda

* update print statement for sfn input

* code fixes per review with Zahid

* fixed the bugs introduced in the last commit

* uploading modified developer_guid.md to cover ETL processing

* added result_path to glue tasks

* glue job integration pattern is now RUN_JOB

* sql files updated with the right database name

Co-authored-by: Ali <[email protected]>
Co-authored-by: Isaiah Grant <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2021
1 parent c7a2b2d commit 30312b8
Show file tree
Hide file tree
Showing 28 changed files with 595 additions and 922 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ max-line-length = 120
ignore = E722, W503

per-file-ignores =
lib/glue_scripts/datalake_purposebuilt_load.py: F401, F403, F405
lib/glue_scripts/datalake_conformed_load.py: F401, F403, F405
lib/glue_scripts/*.py: F401, F403, F405
347 changes: 160 additions & 187 deletions README.md

Large diffs are not rendered by default.

41 changes: 24 additions & 17 deletions lib/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@

# Used in Automated Outputs
VPC_ID = 'vpc_id'
AVAILABILITY_ZONES = 'availability_zones'
SUBNET_IDS = 'subnet_ids'
ROUTE_TABLES = 'route_tables'
AVAILABILITY_ZONE_1 = 'availability_zone_1'
AVAILABILITY_ZONE_2 = 'availability_zone_2'
AVAILABILITY_ZONE_3 = 'availability_zone_3'
SUBNET_ID_1 = 'subnet_id_1'
SUBNET_ID_2 = 'subnet_id_2'
SUBNET_ID_3 = 'subnet_id_3'
ROUTE_TABLE_1 = 'route_table_1'
ROUTE_TABLE_2 = 'route_table_2'
ROUTE_TABLE_3 = 'route_table_3'
SHARED_SECURITY_GROUP_ID = 'shared_security_group_id'
S3_KMS_KEY = 's3_kms_key'
S3_ACCESS_LOG_BUCKET = 's3_access_log_bucket'
Expand All @@ -37,17 +43,17 @@
S3_PURPOSE_BUILT_BUCKET = 's3_purpose_built_bucket'
CROSS_ACCOUNT_DYNAMODB_ROLE = 'cross_account_dynamodb_role'

GLUE_CONNECTION_AVAILABILITY_ZONE = 'glue_connection_availability_zone'
GLUE_CONNECTION_SUBNET = 'glue_connection_subnet'


def get_local_configuration(environment: str) -> dict:
"""
Provides manually configured variables that are validated for quality and safety.
@param: environment str: The environment used to retrieve corresponding configuration
@raises: Exception: Throws an exception if the resource_name_prefix does not conform
@raises: Exception: Throws an exception if the requested environment does not exist
@returns: dict:
@return: dict:
"""
local_mapping = {
DEPLOYMENT: {
Expand All @@ -56,11 +62,11 @@ def get_local_configuration(environment: str) -> dict:
GITHUB_REPOSITORY_OWNER_NAME: '',
GITHUB_REPOSITORY_NAME: '',
# This is used in the Logical Id of CloudFormation resources.
# We recommend Capital case for consistency.
# We recommend Capital case for consistency.
# Example: DataLakeCdkBlog
LOGICAL_ID_PREFIX: '',
# Important: This is used in resources that must be **globally** unique!
# Resource names may only contain Alphanumeric and hyphens and cannot contain trailing hyphens.
# Resource names may only contain Alphanumeric and hyphens and cannot contain trailing hyphens.
# Example: unique-identifier-data-lake
RESOURCE_NAME_PREFIX: '',
},
Expand Down Expand Up @@ -98,17 +104,21 @@ def get_local_configuration(environment: str) -> dict:
def get_environment_configuration(environment: str) -> dict:
"""
Provides all configuration values for the given target environment
@param environment str: The environment used to retrieve corresponding configuration
@return: dict:
"""
cloudformation_output_mapping = {
ENVIRONMENT: environment,
VPC_ID: f'{environment}VpcId',
AVAILABILITY_ZONES: f'{environment}AvailabilityZones',
SUBNET_IDS: f'{environment}SubnetIds',
ROUTE_TABLES: f'{environment}RouteTables',
AVAILABILITY_ZONE_1: f'{environment}AvailabilityZone1',
AVAILABILITY_ZONE_2: f'{environment}AvailabilityZone2',
AVAILABILITY_ZONE_3: f'{environment}AvailabilityZone3',
SUBNET_ID_1: f'{environment}SubnetId1',
SUBNET_ID_2: f'{environment}SubnetId2',
SUBNET_ID_3: f'{environment}SubnetId3',
ROUTE_TABLE_1: f'{environment}RouteTable1',
ROUTE_TABLE_2: f'{environment}RouteTable2',
ROUTE_TABLE_3: f'{environment}RouteTable3',
SHARED_SECURITY_GROUP_ID: f'{environment}SharedSecurityGroupId',
S3_KMS_KEY: f'{environment}S3KmsKeyArn',
S3_ACCESS_LOG_BUCKET: f'{environment}S3AccessLogBucket',
Expand All @@ -125,7 +135,6 @@ def get_all_configurations() -> dict:
"""
Returns a dict mapping of configurations for all environments.
These keys correspond to static values, CloudFormation outputs, and Secrets Manager (passwords only) records.
@return: dict:
"""
return {
Expand All @@ -142,15 +151,13 @@ def get_all_configurations() -> dict:

def get_logical_id_prefix() -> str:
"""Returns the logical id prefix to apply to all CloudFormation resources
@return: str:
"""
return get_local_configuration(DEPLOYMENT)[LOGICAL_ID_PREFIX]


def get_resource_name_prefix() -> str:
"""Returns the resource name prefix to apply to all resources names
@return: str:
"""
return get_local_configuration(DEPLOYMENT)[RESOURCE_NAME_PREFIX]
73 changes: 0 additions & 73 deletions lib/datalake_blog_failure_status_update/lambda_handler.py

This file was deleted.

66 changes: 0 additions & 66 deletions lib/datalake_blog_success_status_update/lambda_handler.py

This file was deleted.

10 changes: 0 additions & 10 deletions lib/dynamodb_config/datalake_blog_conformed_logic.json

This file was deleted.

57 changes: 0 additions & 57 deletions lib/dynamodb_config/datalake_blog_conformed_sync.py

This file was deleted.

33 changes: 6 additions & 27 deletions lib/dynamodb_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,14 @@
)


def get_transformation_rules_table_name(target_environment, resource_name_prefix: str) -> str:
return f'{target_environment}_{resource_name_prefix}_etl_transformation_rules'


class DynamoDbStack(cdk.Stack):

def __init__(self, scope: cdk.Construct, construct_id: str, target_environment: str, **kwargs) -> None:
"""
CloudFormation stack to create DynamoDB Tables.
@param scope cdk.Construct: Parent of this stack, usually an App or a Stage, but could be any construct.
@param construct_id str:
The construct ID of this stack. If stackName is not explicitly defined,
this id (and any parent IDs) will be used to determine the physical ID of the stack.
@param construct_id str: The construct ID of this stack. If stackName is not explicitly defined,
this id (and any parent IDs) will be used to determine the physical ID of the stack.
@param target_environment str: The target environment for stacks in the deploy stage
@param kwargs:
"""
Expand All @@ -35,26 +29,11 @@ def __init__(self, scope: cdk.Construct, construct_id: str, target_environment:
if (target_environment == PROD or target_environment == TEST):
self.removal_policy = cdk.RemovalPolicy.RETAIN

self.job_audit_table = self.create_table(
f'{target_environment}{logical_id_prefix}EtlAuditTable',
f'{target_environment.lower()}-{resource_name_prefix}-etl-job-audit',
'execution_id',
)

transformation_table = get_transformation_rules_table_name(target_environment, resource_name_prefix)
self.transformation_rules_table = self.create_table(
f'{target_environment}{logical_id_prefix}EtlTransformationRulesTable',
transformation_table,
'load_name',
)

def create_table(self, construct_name, table_name, partition_key, sort_key=None) -> dynamodb.Table:
return dynamodb.Table(
self.job_audit_table = dynamodb.Table(
self,
construct_name,
table_name=table_name,
partition_key=dynamodb.Attribute(name=partition_key, type=dynamodb.AttributeType.STRING),
sort_key=sort_key,
f'{target_environment}{logical_id_prefix}EtlAuditTable',
table_name=f'{target_environment.lower()}-{resource_name_prefix}-etl-job-audit',
partition_key=dynamodb.Attribute(name='execution_id', type=dynamodb.AttributeType.STRING),
billing_mode=dynamodb.BillingMode.PROVISIONED,
encryption=dynamodb.TableEncryption.DEFAULT,
point_in_time_recovery=False,
Expand Down
Loading

0 comments on commit 30312b8

Please sign in to comment.