diff --git a/.gitignore b/.gitignore index 5612a58c..ca7ca0f2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ # taskcat taskcat_outputs/ + +*.tgz +*.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 05ba573d..c4d3b41d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,9 @@ install: git: submodules: false before_install: + # DELETE ME + - export BASE_IP=0.0.0.0/0 + - sed -i 's/git@github.com:/https:\/\/github.com\//' .gitmodules - git submodule update --init --recursive - git checkout .gitmodules diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..b4772b0c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,35 @@ +# CHANGELOG + +Fork: 31.09.2019 +Branch: develop +Contact: olmighty99@gmail.com + +## Changes in Branch Develop + +### 1. Comments + +- Comments are good, we love comments!! + +### 2. Structure + +- More granular templates folder <- easier for maintaining and debugging + * `turbine-resource.template` contains all the Turbine support services + * The security groups have a dedicated template + * CI has its own sub folder +- Templates split in `cluster`, `services`, and `ci` + +### 3. Log and Deployment Bucket + +- Private Buckets by default (explicit) + +Incident: After dag run, CloudFormation DELETE_FAILED with "Logs and Deployment +Bucket are not empty" + +- Added custom Cfn event + Lambda function for cleaning deployments bucket +content when delete-stack +- Retain Logs bucket for error investigation or dag data archiving + + + + + \ No newline at end of file diff --git a/Makefile b/Makefile index 1eb5852d..7a11e9ca 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,15 @@ +define message1 + Environment variable BASE_IP is required. Not set. + Use following command: + "$$ my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=$${my_ip[i]};my_ip="$$my_ip/32"; export BASE_IP=$$my_ip" + +endef + +ifndef BASE_IP +export message1 +$(error $(message1)) +endif + ifndef BRANCH BRANCH := $(shell git rev-parse --abbrev-ref HEAD) endif @@ -8,12 +20,41 @@ else BUCKET := s3://turbine-quickstart/quickstart-turbine-airflow-$(BRANCH) endif +# turbine-master +CURRENT_LOCAL_IP = $(BASE_IP) +# DELETE ME +AWS_REGION := eu-central-1 +PROJECT_NAME := eksairflow01-staging lint: - cfn-lint templates/*.template + cfn-lint templates/cluster/*.template + cfn-lint templates/services/*.template test: taskcat -c ./ci/taskcat.yaml sync: aws s3 sync --exclude '.*' --acl public-read . $(BUCKET) + +# DELETE ME +artifacts: + aws s3 cp --recursive submodules/quickstart-aws-vpc s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}submodules/quickstart-aws-vpc/templates/ + aws s3 cp --recursive templates/cluster s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/services s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/ci s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + +# DELETE ME +cluster: + aws cloudformation --region ${AWS_REGION} create-stack --stack-name ${PROJECT_NAME} \ + --template-body file://templates/turbine-master.template \ + --parameters \ + ParameterKey="AllowedWebBlock",ParameterValue="${CURRENT_LOCAL_IP}" \ + ParameterKey="DbMasterPassword",ParameterValue="super_secret" \ + ParameterKey="QSS3BucketName",ParameterValue="${PROJECT_NAME}-${AWS_REGION}" \ + ParameterKey="QSS3KeyPrefix",ParameterValue="${PROJECT_NAME}" \ + --capabilities CAPABILITY_NAMED_IAM + +# DELETE ME +clean: + aws cloudformation delete-stack --stack-name ${PROJECT_NAME} + diff --git a/examples/project/Makefile b/examples/project/Makefile index 9fc1af3f..1315bdf4 100644 --- a/examples/project/Makefile +++ b/examples/project/Makefile @@ -1,5 +1,12 @@ -ifndef stack-name -$(error stack-name is not set) +define message1 + Environment variable stack_name is required. Not set. + Use following command: + "$$ export stack_name=" + +endef + +ifndef stack_name +$(error $(message1)) endif ifndef revision revision := $(shell date --utc +%Y%m%dT%H%M%SZ) @@ -8,16 +15,17 @@ endif define getRef $(shell aws cloudformation describe-stacks \ - --stack-name $(stack-name) \ + --stack-name $(stack_name) \ --query "Stacks[0].Outputs[?OutputKey=='$(1)'].OutputValue" \ --output text) endef + APPLICATION := $(call getRef,CodeDeployApplication) DEPLOYMENT_GROUP := $(call getRef,CodeDeployDeploymentGroup) DEPLOYMENTS_BUCKET := $(call getRef,DeploymentsBucket) -PACKAGE := $(stack-name)_$(revision).tgz +PACKAGE := $(stack_name)_$(revision).tgz package: diff --git a/templates/ci/turbine-codedeploy.template b/templates/ci/turbine-codedeploy.template new file mode 100644 index 00000000..e5874403 --- /dev/null +++ b/templates/ci/turbine-codedeploy.template @@ -0,0 +1,189 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: >- + CI CodeDeploy resources for Airflow build. + + NOTE: The DeploymentsBucket is defined in turbine-cluster.template for + the cluster instance configuration depends on it. This is in order to + avoid circular dependency between CIStack and AirflowStack. + +Parameters: + SchedulerScalingGroup: + Type: String + WebserverScalingGroup: + Type: String + WorkerSetScalingGroup: + Type: String + DeploymentsBucket: + Type: String + +Resources: + + CodeDeployApplication: + Type: AWS::CodeDeploy::Application + Properties: + ApplicationName: !Sub ${AWS::StackName}-deployment-application + ComputePlatform: Server + + CodeDeployDeploymentGroup: + Type: AWS::CodeDeploy::DeploymentGroup + Properties: + ApplicationName: !Ref CodeDeployApplication + DeploymentGroupName: !Sub ${AWS::StackName}-deployment-group + AutoScalingGroups: + - !Ref SchedulerScalingGroup + - !Ref WebserverScalingGroup + - !Ref WorkerSetScalingGroup + ServiceRoleArn: !GetAtt + - CodeDeployServiceRole + - Arn + + CodeDeployServiceRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - codedeploy.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' + + # Custom CloudFormation hook that signals to target: Delete, Update, Create + CleanUpDeployments: + DependsOn: cleanupDeployBucket + Type: Custom::cleanupdeploy + Properties: + ServiceToken: + Fn::GetAtt: + - "cleanupDeployBucket" + - "Arn" + BucketName: !Ref DeploymentsBucket + + # Removes all objects from Deployments Bucket upon cfn delete-stack + cleanupDeployBucket: + Type: "AWS::Lambda::Function" + Properties: + Code: + ZipFile: !Sub | + import boto3 + import json + import logging + # module cfnresponse does not exist for python3.7, use requests instead + # import cfnresponse + # Will yield a warning, but is currently the only solution for python3.7 + # since inline code cannot import third party packages + from botocore.vendored import requests + from botocore.exceptions import ClientError + + logger = logging.getLogger(__name__) + + def setup(level='DEBUG', boto_level=None, **kwargs): + logging.root.setLevel(level) + + if not boto_level: + boto_level = level + + logging.getLogger('boto').setLevel(boto_level) + logging.getLogger('boto3').setLevel(boto_level) + logging.getLogger('botocore').setLevel(boto_level) + logging.getLogger('urllib3').setLevel(boto_level) + + try: + setup('DEBUG', formatter_cls=None, boto_level='ERROR') + except Exception as e: + logger.error(e, exc_info=True) + + def clean_up_bucket(target_bucket): + logger.info(f"Clean content of bucket {target_bucket}.") + s3_resource = boto3.resource('s3') + try: + bucket_response = s3_resource.Bucket(target_bucket).load() + except ClientError as e: + logger.info(f"s3:://{target_bucket} not found. {e}") + return + else: + bucket_obj = s3_resource.Bucket(target_bucket) + bucket_obj.objects.all().delete() + + def handler(event, context): + # helper(event, context) + + response_data = {} + # NOTE: The status value sent by the custom resource provider must be either SUCCESS or FAILED!! + try: + bucket = event['ResourceProperties']['BucketName'] + if event['RequestType'] == 'Delete': + clean_up_bucket(bucket) + if event['RequestType'] == 'Update': + logger.info(f"custom::cleanupbucket update. Target bucket: {bucket}") + if event['RequestType'] == 'Create': + logger.info(f"custom::cleanupbucket create. Target bucket: {bucket}") + send_response_cfn(event, context, "SUCCESS") + except Exception as e: + logger.info(str(e)) + send_response_cfn(event, context, "FAILED") + + def send_response_cfn(event, context, response_status): + response_body = {'Status': response_status, + 'Reason': 'Log stream name: ' + context.log_stream_name, + 'PhysicalResourceId': context.log_stream_name, + 'StackId': event['StackId'], + 'RequestId': event['RequestId'], + 'LogicalResourceId': event['LogicalResourceId'], + 'Data': json.loads("{}")} + # Sends the response signal to the respective custom resource request + requests.put(event['ResponseURL'], data=json.dumps(response_body)) + Description: cleanup Bucket on Delete Lambda function. + Handler: index.handler + Role: !GetAtt CleanupS3ExecutionRole.Arn + Runtime: python3.7 + Timeout: 100 + + CleanupS3ExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + Path: "/" + + CleanupS3ExecutionPolicy: + DependsOn: + - CleanupS3ExecutionRole + Type: AWS::IAM::Policy + Properties: + PolicyName: DeleteS3BucketLogsRolePolicy + Roles: + - Ref: CleanupS3ExecutionRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - logs:* + Resource: + - arn:aws:logs:*:*:* + - Effect: Allow + Action: + - s3:* + Resource: + - "*" + +Outputs: + DeploymentsBucket: + Value: !Ref DeploymentsBucket + CodeDeployApplication: + Value: !Ref CodeDeployApplication + CodeDeployDeploymentGroup: + Value: !Ref CodeDeployDeploymentGroup diff --git a/templates/turbine-cluster.template b/templates/cluster/turbine-cluster.template similarity index 68% rename from templates/turbine-cluster.template rename to templates/cluster/turbine-cluster.template index e6e67774..23a0d797 100644 --- a/templates/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -38,11 +38,6 @@ Parameters: Type: Number GrowthThreshold: Type: Number - DbMasterUsername: - Type: String - DbMasterPassword: - Type: String - NoEcho: true LoadExampleDags: Type: String LoadDefaultConn: @@ -51,9 +46,22 @@ Parameters: Type: String QSS3KeyPrefix: Type: String + DbMasterUsername: + Type: String + DbMasterPassword: + Type: String + NoEcho: true + DbEndpointAddress: + Type: String + TaskQueueName: + Type: String + EfsFileSystem: + Type: String Resources: + # ------------------------ Create Cluster Instance Services--------------------------------- + SchedulerStack: Type: AWS::CloudFormation::Stack Properties: @@ -129,379 +137,10 @@ Resources: DependsOn: - Meta - LogsBucket: - Type: AWS::S3::Bucket - - DeploymentsBucket: - Type: AWS::S3::Bucket - - CodeDeployApplication: - Type: AWS::CodeDeploy::Application - Properties: - ApplicationName: !Sub ${AWS::StackName}-deployment-application - ComputePlatform: Server - - CodeDeployDeploymentGroup: - Type: AWS::CodeDeploy::DeploymentGroup - Properties: - ApplicationName: !Ref CodeDeployApplication - DeploymentGroupName: !Sub ${AWS::StackName}-deployment-group - AutoScalingGroups: - - !GetAtt SchedulerStack.Outputs.AutoScalingGroup - - !GetAtt WebserverStack.Outputs.AutoScalingGroup - - !GetAtt WorkerSetStack.Outputs.AutoScalingGroup - ServiceRoleArn: !GetAtt - - CodeDeployServiceRole - - Arn - - CodeDeployServiceRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: - - codedeploy.amazonaws.com - Action: - - sts:AssumeRole - ManagedPolicyArns: - - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' - - Logger: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: lambda.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: !Sub ${AWS::StackName}-cloudwatch-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Resource: '*' - Action: - - cloudwatch:GetMetric* - - cloudwatch:PutMetricData - - Metric: - Type: AWS::Lambda::Function - Properties: - Runtime: nodejs8.10 - Handler: index.handler - Code: - ZipFile: !Sub - - | - var AWS = require('aws-sdk'); - AWS.config.update({region: '${AWS::Region}'}); - var cw = new AWS.CloudWatch({apiVersion: '2010-08-01'}); - const datePlusMinutes = (d, m) => { - const _d = new Date(d); - _d.setMinutes(d.getMinutes() + m); - return _d; - }; - const getMetricAtTime = (ms, m, t) => { - const m_idx = ms.MetricDataResults - .map(_r => _r.Id) - .indexOf(m); - const t_idx = ms.MetricDataResults[m_idx] - .Timestamps - .map(_t => _t.toISOString()) - .indexOf(t.toISOString()); - return ms.MetricDataResults[m_idx] - .Values[t_idx]; - }; - const discount = (ms, m, t1, t2, ws) => { - let incs = 0, d = t1; - let v1 = getMetricAtTime(ms, m, d), v2; - for (let i = 0; d < t2 ; i++) { - d = datePlusMinutes(t1, i+1); - v2 = getMetricAtTime(ms, m, d); - if (v2 > v1) incs += ws[i]; - v1 = v2; - } - return incs; - }; - exports.handler = async function(event, context) { - let curr = new Date(); - curr.setMinutes(Math.floor(curr.getMinutes()/5)*5-5); - curr.setSeconds(0); curr.setMilliseconds(0); - const prev = datePlusMinutes(curr, -5); - const back = datePlusMinutes(prev, -5); - const metrics = await cw.getMetricData({ - StartTime: back, EndTime: curr, - ScanBy: 'TimestampDescending', - MetricDataQueries: [ - { Id: 'maxANOMV', MetricStat: { - Metric: { Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesVisible', - Dimensions: [{ Name: 'QueueName', - Value: '${queueName}' }]}, - Period: 300, - Stat: 'Maximum', - Unit: 'Count' }}, - { Id: 'sumNOER', MetricStat: { - Metric: { Namespace: 'AWS/SQS', - MetricName: 'NumberOfEmptyReceives', - Dimensions: [{ Name: 'QueueName', - Value: '${queueName}' }]}, - Period: 300, - Stat: 'Sum', - Unit: 'Count', }}, - { Id: 'avgGISI', MetricStat: { - Metric: { Namespace: 'AWS/AutoScaling', - MetricName: 'GroupInServiceInstances', - Dimensions: [{ Name: 'AutoScalingGroupName', - Value: '${asgName}' }]}, - Period: 300, - Stat: 'Average', - Unit: 'None', }}, - { Id: 'uGISI', MetricStat: { - Metric: { Namespace: 'AWS/AutoScaling', - MetricName: 'GroupDesiredCapacity', - Dimensions: [{ Name: 'AutoScalingGroupName', - Value: '${asgName}' }]}, - Period: 60, - Stat: 'Average', - Unit: 'None', }}, - ]}).promise(); - const ANOMV = getMetricAtTime(metrics, 'maxANOMV', prev); - const NOER = getMetricAtTime(metrics, 'sumNOER', prev); - const GISI = getMetricAtTime(metrics, 'avgGISI', prev); - const ws = [0, 0, 0, 0.1, 0.3, 0.3, 0.3, 0.3, 0.2]; - const dGISI = discount(metrics, 'uGISI', back, curr, ws); - const M = GISI - dGISI; - let l; - if (M > 0) - l = 1 - NOER / (M * 0.098444 * 300); - else - l = (ANOMV > 0) ? 1.0 : 0.0; - await cw.putMetricData({ - Namespace: 'Turbine', - MetricData: [{ MetricName: 'WorkerLoad', - Dimensions: [ { Name: 'StackName', - Value: '${AWS::StackName}' }], - Timestamp: prev, - Value: (l > 0) ? l : 0, - Unit: 'None' }], - }).promise(); - }; - - asgName: !Sub '${AWS::StackName}-scaling-group' - queueName: !GetAtt - - Tasks - - QueueName - Role: !GetAtt - - Logger - - Arn - Metadata: - 'AWS::CloudFormation::Designer': - id: 94c385fa-fb13-42cc-a292-7e68c10956f3 - - Timer: - Type: AWS::Events::Rule - Properties: - ScheduleExpression: rate(1 minute) - State: ENABLED - Targets: - - Arn: !GetAtt - - Metric - - Arn - Id: TargetFunction - - Invoke: - Type: AWS::Lambda::Permission - Properties: - FunctionName: !Ref Metric - Action: lambda:InvokeFunction - Principal: events.amazonaws.com - SourceArn: !GetAtt - - Timer - - Arn - - EfsFileSystem: - Type: AWS::EFS::FileSystem - Properties: - FileSystemTags: - - Key: Name - Value: !Sub ${AWS::StackName}-filesystem - - EfsMountTarget1A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet1AID - SecurityGroups: - - !Ref Access - - EfsMountTarget2A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet2AID - SecurityGroups: - - !Ref Access - - DBs: - Type: AWS::RDS::DBSubnetGroup - Properties: - DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. - SubnetIds: - - !Ref PrivateSubnet1AID - - !Ref PrivateSubnet2AID - - Database: - Type: AWS::RDS::DBInstance - Properties: - AllocatedStorage: '20' - DBInstanceClass: db.t2.micro - DBName: airflow - Engine: postgres - MasterUsername: !Ref DbMasterUsername - MasterUserPassword: !Ref DbMasterPassword - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-database - DBSubnetGroupName: !Ref DBs - VPCSecurityGroups: - - !Ref Connection - - Tasks: - Type: AWS::SQS::Queue - Properties: {} - - Access: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: >- - Security Rules with permissions for the shared filesystem across Airflow - instances. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 2049 - ToPort: 2049 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub '${AWS::StackName}-access' - - Connection: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: Security Rules with permissions for database connections for Airflow. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 5432 - ToPort: 5432 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-connection - - AirflowRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: - - ec2.amazonaws.com - Action: - - sts:AssumeRole - ManagedPolicyArns: - - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM - Policies: - - PolicyName: !Sub ${AWS::StackName}-cfn-describe - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - cloudformation:DescribeStackResource - Resource: !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/${AWS::StackName}/* - - PolicyName: !Sub ${AWS::StackName}-ssm-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - ssm:GetParameter - - ssm:PutParameter - Resource: - - !Sub arn:aws:ssm:*:${AWS::AccountId}:*/* - - PolicyName: !Sub ${AWS::StackName}-queue-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - sqs:ListQueues - Resource: - - !Sub arn:aws:sqs:*:${AWS::AccountId}:* - - Effect: Allow - Action: - - sqs:ChangeMessageVisibility - - sqs:DeleteMessage - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:ReceiveMessage - - sqs:SendMessage - Resource: !Sub - - arn:aws:sqs:*:${AWS::AccountId}:${queue} - - queue: !GetAtt - - Tasks - - QueueName - - PolicyName: !Sub ${AWS::StackName}-deployments-r-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - s3:Get* - - s3:List* - Resource: !Sub arn:aws:s3:::${DeploymentsBucket}/* - - Effect: Allow - Action: - - codedeploy:List* - Resource: !Sub arn:aws:codedeploy:*:${AWS::AccountId}:deploymentgroup:* - - PolicyName: !Sub ${AWS::StackName}-logs-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - s3:Get* - - s3:Put* - Resource: !Sub arn:aws:s3:::${LogsBucket}/* - - PolicyName: !Sub ${AWS::StackName}-lifecycle-heartbeat - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - autoscaling:RecordLifecycleActionHeartbeat - - autoscaling:CompleteLifecycleAction - Resource: !Sub arn:aws:autoscaling:*:${AWS::AccountId}:autoScalingGroup:*:* - - Effect: Allow - Action: - - autoscaling:DescribeScalingActivities - Resource: '*' - - AirflowProfile: - Type: AWS::IAM::InstanceProfile - Properties: - Roles: - - !Ref AirflowRole + # ---------------------- Turbine EC2 cluster instance configuration----------------------------- + # Loaded by every cluster instance's launch configuration respectively via + # {$ParentStack} --resource Meta Meta: Type: AWS::CloudFormation::WaitConditionHandle Properties: {} @@ -619,16 +258,16 @@ Resources: AIRFLOW__CELERY__DEFAULT_QUEUE=${QueueName} AIRFLOW__CELERY__RESULT_BACKEND=db+${DbUri} AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION=${AWS::Region} - - QueueName: !GetAtt Tasks.QueueName + - QueueName: !Ref TaskQueueName DbUri: !Join - - '' - - - postgresql:// - - !Ref DbMasterUsername - - ':' - - !Ref DbMasterPassword - - '@' - - !GetAtt Database.Endpoint.Address - - /airflow + - '' + - - postgresql:// + - !Ref DbMasterUsername + - ':' + - !Ref DbMasterPassword + - '@' + - !Ref DbEndpointAddress + - /airflow commands: envsubst: command: | @@ -720,6 +359,7 @@ Resources: else echo "Deployment pending, deferring service start" fi + # Lifecycle hooks promoting graceful shutdown, delaying termination for task completion lchooks: files: /usr/bin/lchkill: @@ -751,6 +391,7 @@ Resources: [Service] Type=oneshot ExecStart=/usr/bin/lchkill + # WORKER ONLY <- lchbeat, lchbeat.timer /usr/bin/lchbeat: mode: 755 content: !Sub | @@ -825,13 +466,144 @@ Resources: chmod +x ./install ./install auto + # IAM Profile and role for all EC2 cluster instances + AirflowProfile: + Type: AWS::IAM::InstanceProfile + Properties: + Roles: + - !Ref AirflowRole + + AirflowRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - ec2.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM + Policies: + - PolicyName: !Sub ${AWS::StackName}-cfn-describe + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - cloudformation:DescribeStackResource + Resource: !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/${AWS::StackName}/* + - PolicyName: !Sub ${AWS::StackName}-ssm-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - ssm:GetParameter + - ssm:PutParameter + Resource: + - !Sub arn:aws:ssm:*:${AWS::AccountId}:*/* + - PolicyName: !Sub ${AWS::StackName}-queue-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - sqs:ListQueues + Resource: + - !Sub arn:aws:sqs:*:${AWS::AccountId}:* + - Effect: Allow + Action: + - sqs:ChangeMessageVisibility + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:ReceiveMessage + - sqs:SendMessage + Resource: !Sub + - arn:aws:sqs:*:${AWS::AccountId}:${queue} + - queue: !Ref TaskQueueName + - PolicyName: !Sub ${AWS::StackName}-deployments-r-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:List* + Resource: !Sub arn:aws:s3:::${DeploymentsBucket}/* + - Effect: Allow + Action: + - codedeploy:List* + Resource: !Sub arn:aws:codedeploy:*:${AWS::AccountId}:deploymentgroup:* + - PolicyName: !Sub ${AWS::StackName}-logs-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:Put* + Resource: !Sub arn:aws:s3:::${LogsBucket}/* + - PolicyName: !Sub ${AWS::StackName}-lifecycle-heartbeat + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - autoscaling:RecordLifecycleActionHeartbeat + - autoscaling:CompleteLifecycleAction + Resource: !Sub arn:aws:autoscaling:*:${AWS::AccountId}:autoScalingGroup:*:* + - Effect: Allow + Action: + - autoscaling:DescribeScalingActivities + Resource: '*' + + # --------------------- Buckets containing Logs and Deployment packages--------------------- + + LogsBucket: + Type: AWS::S3::Bucket + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: True + BlockPublicPolicy: True + IgnorePublicAcls: True + RestrictPublicBuckets: True + DeletionPolicy: Retain + + # NOTE: Dependency from instance configuration requires Deployments Bucket to be + # defined here instead in turbine-codedeploy.template + DeploymentsBucket: + Type: AWS::S3::Bucket + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: True + BlockPublicPolicy: True + IgnorePublicAcls: True + RestrictPublicBuckets: True + Outputs: DeploymentsBucket: Value: !Ref DeploymentsBucket - CodeDeployApplication: - Value: !Ref CodeDeployApplication - CodeDeployDeploymentGroup: - Value: !Ref CodeDeployDeploymentGroup + SchedulerAutoScaling: + Value: !GetAtt SchedulerStack.Outputs.AutoScalingGroup + WebserverAutoScaling: + Value: !GetAtt WebserverStack.Outputs.AutoScalingGroup + WorkerSetAutoScaling: + Value: !GetAtt WorkerSetStack.Outputs.AutoScalingGroup Mappings: AWSAMIRegionMap: diff --git a/templates/cluster/turbine-sg.template b/templates/cluster/turbine-sg.template new file mode 100644 index 00000000..8c4984c7 --- /dev/null +++ b/templates/cluster/turbine-sg.template @@ -0,0 +1,50 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: >- + All Security Groups for Turbine. + +Parameters: + VPCID: + Type: AWS::EC2::VPC::Id + VPCCIDR: + Type: String + AllowedPattern: >- + ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$ + +Resources: + + EfsAccessSG: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: >- + Security Rules with permissions for the shared filesystem across Airflow + instances. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 2049 + ToPort: 2049 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub '${AWS::StackName}-access' + + DbConnectionSG: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security Rules with permissions for database connections for Airflow. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 5432 + ToPort: 5432 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-connection + +Outputs: + EfsAccessSecurityGroup: + Value: !Ref EfsAccessSG + DbConnectionSecurityGroup: + Value: !Ref DbConnectionSG diff --git a/templates/services/turbine-resource.template b/templates/services/turbine-resource.template new file mode 100644 index 00000000..a033c047 --- /dev/null +++ b/templates/services/turbine-resource.template @@ -0,0 +1,254 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: >- + Turbine resources for supporting the main Airflow cluster: + - RDS Airflow metadata database + - SQS Broker + - EFS shared storage mount + - Custom Cluster Scaling Metric + +Parameters: + PrivateSubnet1ID: + Type: String + PrivateSubnet2ID: + Type: String + EfsSecurityGroup: + Type: String + DbSecurityGroup: + Type: String + PostgresDbUser: + Type: String + PostgresMasterPasswd: + Type: String + +Resources: + + # ---------------------------- Shared Storage Volumes (EFS)------------------------------ + + EfsFileSystem: + Type: AWS::EFS::FileSystem + Properties: + FileSystemTags: + - Key: Name + Value: !Sub ${AWS::StackName}-filesystem + + EfsMountTarget1A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet1ID + SecurityGroups: + - !Ref EfsSecurityGroup + + EfsMountTarget2A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet2ID + SecurityGroups: + - !Ref EfsSecurityGroup + + # ---------------------------- Airflow Postgres---------------------------------------- + + DBs: + Type: AWS::RDS::DBSubnetGroup + Properties: + DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. + SubnetIds: + - !Ref PrivateSubnet1ID + - !Ref PrivateSubnet2ID + + Database: + Type: AWS::RDS::DBInstance + Properties: + AllocatedStorage: '20' + DBInstanceClass: db.t2.micro + DBName: airflow + Engine: postgres + MasterUsername: !Ref PostgresDbUser + MasterUserPassword: !Ref PostgresMasterPasswd + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-database + DBSubnetGroupName: !Ref DBs + VPCSecurityGroups: + - !Ref DbSecurityGroup + + # ---------------------------- SQS Queue----------------------------------------------- + + Tasks: + Type: AWS::SQS::Queue + Properties: {} + + # ---------------------------- Custom Cluster Scaling Metric--------------------------- + + # How is the Metric Lambda connected to the ASG? How is the threshold being calculated? + # Alarms are triggered above or below threshold based on Lambda Metric. ASG reacts to the Alarms. + Metric: + Type: AWS::Lambda::Function + Properties: + Runtime: nodejs8.10 + Handler: index.handler + Code: + ZipFile: !Sub + - | + var AWS = require('aws-sdk'); + AWS.config.update({region: '${AWS::Region}'}); + var cw = new AWS.CloudWatch({apiVersion: '2010-08-01'}); + const datePlusMinutes = (d, m) => { + const _d = new Date(d); + _d.setMinutes(d.getMinutes() + m); + return _d; + }; + + const getMetricAtTime = (ms, m, t) => { + const m_idx = ms.MetricDataResults + .map(_r => _r.Id) + .indexOf(m); + const t_idx = ms.MetricDataResults[m_idx] + .Timestamps + .map(_t => _t.toISOString()) + .indexOf(t.toISOString()); + return ms.MetricDataResults[m_idx] + .Values[t_idx]; + }; + const discount = (ms, m, t1, t2, ws) => { + let incs = 0, d = t1; + let v1 = getMetricAtTime(ms, m, d), v2; + for (let i = 0; d < t2 ; i++) { + d = datePlusMinutes(t1, i+1); + v2 = getMetricAtTime(ms, m, d); + if (v2 > v1) incs += ws[i]; + v1 = v2; + } + return incs; + }; + exports.handler = async function(event, context) { + let curr = new Date(); + curr.setMinutes(Math.floor(curr.getMinutes()/5)*5-5); + curr.setSeconds(0); curr.setMilliseconds(0); + const prev = datePlusMinutes(curr, -5); + const back = datePlusMinutes(prev, -5); + + # ------- Step1: Get the relevant variables for worker load calculation-------- + const metrics = await cw.getMetricData({ + StartTime: back, EndTime: curr, + ScanBy: 'TimestampDescending', + MetricDataQueries: [ + { Id: 'maxANOMV', MetricStat: { + Metric: { Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesVisible', + Dimensions: [{ Name: 'QueueName', + Value: '${queueName}' }]}, + Period: 300, + Stat: 'Maximum', + Unit: 'Count' }}, + { Id: 'sumNOER', MetricStat: { + Metric: { Namespace: 'AWS/SQS', + MetricName: 'NumberOfEmptyReceives', + Dimensions: [{ Name: 'QueueName', + Value: '${queueName}' }]}, + Period: 300, + Stat: 'Sum', + Unit: 'Count', }}, + { Id: 'avgGISI', MetricStat: { + Metric: { Namespace: 'AWS/AutoScaling', + MetricName: 'GroupInServiceInstances', + Dimensions: [{ Name: 'AutoScalingGroupName', + Value: '${asgName}' }]}, + Period: 300, + Stat: 'Average', + Unit: 'None', }}, + { Id: 'uGISI', MetricStat: { + Metric: { Namespace: 'AWS/AutoScaling', + MetricName: 'GroupDesiredCapacity', + Dimensions: [{ Name: 'AutoScalingGroupName', + Value: '${asgName}' }]}, + Period: 60, + Stat: 'Average', + Unit: 'None', }}, + ]}).promise(); + + # ---------Step2: Calculate Threshold--------------------------------------- + const ANOMV = getMetricAtTime(metrics, 'maxANOMV', prev); + const NOER = getMetricAtTime(metrics, 'sumNOER', prev); + const GISI = getMetricAtTime(metrics, 'avgGISI', prev); + const ws = [0, 0, 0, 0.1, 0.3, 0.3, 0.3, 0.3, 0.2]; + const dGISI = discount(metrics, 'uGISI', back, curr, ws); + const M = GISI - dGISI; + let l; + if (M > 0) + l = 1 - NOER / (M * 0.098444 * 300); + else + l = (ANOMV > 0) ? 1.0 : 0.0; + + # ---------Step3: Push results into the Metric 'WorkerLoad'----------------- + await cw.putMetricData({ + Namespace: 'Turbine', + MetricData: [{ MetricName: 'WorkerLoad', + Dimensions: [ { Name: 'StackName', + Value: '${AWS::StackName}' }], + Timestamp: prev, + Value: (l > 0) ? l : 0, + Unit: 'None' }], + }).promise(); + }; + - asgName: !Sub '${AWS::StackName}-scaling-group' + queueName: !GetAtt Tasks.QueueName + Role: !GetAtt + - MetricRole + - Arn + Metadata: + 'AWS::CloudFormation::Designer': + id: 94c385fa-fb13-42cc-a292-7e68c10956f3 + + Timer: + Type: AWS::Events::Rule + Properties: + ScheduleExpression: rate(1 minute) + State: ENABLED + Targets: + - Arn: !GetAtt + - Metric + - Arn + Id: TargetFunction + + Invoke: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref Metric + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt + - Timer + - Arn + + # Allowing 'Metric Lambda' to access the CloudWatch metric 'Turbine' + MetricRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub ${AWS::StackName}-cloudwatch-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Resource: '*' + Action: + - cloudwatch:GetMetric* + - cloudwatch:PutMetricData + +Outputs: + TaskQueueName: + Value: !GetAtt Tasks.QueueName + DatabaseAddress: + Value: !GetAtt Database.Endpoint.Address + EfsFS: + Value: !Ref EfsFileSystem \ No newline at end of file diff --git a/templates/turbine-scheduler.template b/templates/services/turbine-scheduler.template similarity index 100% rename from templates/turbine-scheduler.template rename to templates/services/turbine-scheduler.template diff --git a/templates/turbine-webserver.template b/templates/services/turbine-webserver.template similarity index 100% rename from templates/turbine-webserver.template rename to templates/services/turbine-webserver.template diff --git a/templates/turbine-workerset.template b/templates/services/turbine-workerset.template similarity index 95% rename from templates/turbine-workerset.template rename to templates/services/turbine-workerset.template index 01bcaaf8..07b94983 100644 --- a/templates/turbine-workerset.template +++ b/templates/services/turbine-workerset.template @@ -53,7 +53,7 @@ Resources: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: >- - Security Rules with permissions for node itercommunication between + Security Rules with permissions for node intercommunication between Airflow worker instances. VpcId: !Ref VPCID SecurityGroupIngress: @@ -89,6 +89,7 @@ Resources: HeartbeatTimeout: 300 LifecycleTransition: autoscaling:EC2_INSTANCE_TERMINATING + # ShrinkAlarm and GrowthAlarm are triggered by the treshold [...] from Metric Workerload ShrinkAlarm: Type: AWS::CloudWatch::Alarm Properties: diff --git a/templates/turbine-master.template b/templates/turbine-master.template index a078f45d..d6a8c4bd 100644 --- a/templates/turbine-master.template +++ b/templates/turbine-master.template @@ -162,6 +162,18 @@ Resources: - 1 - !GetAZs + SGStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-sg.template + Parameters: + VPCID: !GetAtt VPCStack.Outputs.VPCID + VPCCIDR: !Ref VPCCIDR + AirflowStack: Type: AWS::CloudFormation::Stack Properties: @@ -186,20 +198,54 @@ Resources: MaxGroupSize: !Ref MaxGroupSize GrowthThreshold: !Ref GrowthThreshold ShrinkThreshold: !Ref ShrinkThreshold - DbMasterUsername: !Ref DbMasterUsername - DbMasterPassword: !Ref DbMasterPassword LoadExampleDags: !Ref LoadExampleDags LoadDefaultConn: !Ref LoadDefaultConn QSS3BucketName: !Ref QSS3BucketName QSS3KeyPrefix: !Ref QSS3KeyPrefix + DbMasterUsername: !Ref DbMasterUsername + DbMasterPassword: !Ref DbMasterPassword + DbEndpointAddress: !GetAtt ResourceStack.Outputs.DatabaseAddress + TaskQueueName: !GetAtt ResourceStack.Outputs.TaskQueueName + EfsFileSystem: !GetAtt ResourceStack.Outputs.EfsFS + + ResourceStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-resource.template + Parameters: + PrivateSubnet1ID: !GetAtt VPCStack.Outputs.PrivateSubnet1AID + PrivateSubnet2ID: !GetAtt VPCStack.Outputs.PublicSubnet2ID + EfsSecurityGroup: !GetAtt SGStack.Outputs.EfsAccessSecurityGroup + DbSecurityGroup: !GetAtt SGStack.Outputs.DbConnectionSecurityGroup + PostgresDbUser: !Ref DbMasterUsername + PostgresMasterPasswd: !Ref DbMasterPassword + + CIStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-codedeploy.template + Parameters: + DeploymentsBucket: !GetAtt AirflowStack.Outputs.DeploymentsBucket + SchedulerScalingGroup: !GetAtt AirflowStack.Outputs.SchedulerAutoScaling + WebserverScalingGroup: !GetAtt AirflowStack.Outputs.WebserverAutoScaling + WorkerSetScalingGroup: !GetAtt AirflowStack.Outputs.WorkerSetAutoScaling Outputs: + # NOTE: This is the context required by CodeDeploy create-deployment for 'example project airflow' DeploymentsBucket: - Value: !GetAtt AirflowStack.Outputs.DeploymentsBucket + Value: !GetAtt CIStack.Outputs.DeploymentsBucket CodeDeployApplication: - Value: !GetAtt AirflowStack.Outputs.CodeDeployApplication + Value: !GetAtt CIStack.Outputs.CodeDeployApplication CodeDeployDeploymentGroup: - Value: !GetAtt AirflowStack.Outputs.CodeDeployDeploymentGroup + Value: !GetAtt CIStack.Outputs.CodeDeployDeploymentGroup Metadata: AWS::CloudFormation::Interface: