diff --git a/examples/data-processor/README.md b/examples/data-processor/README.md new file mode 100644 index 00000000..1352aeed --- /dev/null +++ b/examples/data-processor/README.md @@ -0,0 +1,107 @@ +# Event Driven Architectures with Amazon EKS and AWS Controllers for Kubernetes + +This data processing example uses event-driven approach for data ingestion and process orchestration, along with Amazon EMR on EKS for data processing implementation. This example uses [New York City taxi data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page). +## Architecture + +Following diagram illustrates flow of the example and services used: + +![Architecture diagram](./assets/eks-eda.png) +1. Once data object lands into input bucket on Amazon S3, it sends event to the Amazon EventBridge +2. EventBridge, based on a rule, starts AWS Step Functions workflow execution that orchestrates data processing +3. Workflow creates EMR on EKS virtual cluster and starts Apache Spark job, specifying script in an S3 bucket to be used and data object in the S3 to be processed +4. Spark job reads the newly arrived data from S3, runs the processing, and saves output data to a S3 bucket. +5. In parallel to the data processing by Spark, Step Functions workflow copies incoming data file to a data lake (another S3 bucket) +6. Once Spark finishes data processing, workflow reads results from the S3 bucket and puts them to an Amazon DynamoDB database +7. Workflow sends event to the EventBridge custom bus, notifying all subscribers that data processing task finished +8. Amazon Simple Notification Service (SNS) receives event from the event bus and sends e-mail message to the subscribers + +Following diagram shows Step Functions workflow: +![StepFunctions workflow](./assets/stepfunctions_graph.png) + +## Prerequisites +EKS cluster with EMR on EKS deployed and IRSA configured. Following steps are based on [AWS ACK tutorial instructions](https://aws-controllers-k8s.github.io/community/docs/tutorials/emr-on-eks-example/) + +Install kro in the cluster created in the previous step following [instructions](https://kro.run/docs/getting-started/Installation) + +## Create instance + +Create kro ResourceGroup for the data processor: +```shell +kubectl apply -f eda-eks-data-processor.yaml +``` + +Create instance of the data processor + +Set workload name (it will be used as a prefix for the stack components): +```shell +export WORKLOAD_NAME="eda-eks-demo" +``` + +Following steps assume that input, scripts and data lake buckets are the same one. Resource group creates a new input bucket only. If you use the same name for scripts and lake, it will use input bucket for all purposes. Specify different bucket name for the scripts library and data lake if necessary. + +```shell +export INPUT_BUCKET_NAME="${WORKLOAD_NAME}-bucket" +export SCRIPTS_BUCKET_NAME="${WORKLOAD_NAME}-bucket" +export LAKE_BUCKET_NAME="${WORKLOAD_NAME}-bucket" +``` + +```shell +envsubst < "instance-template.yaml" > "instance.yaml" +``` +Check that the instance definition populated with values. Update prefix, API name or description values in the definition if desired. +```shell +cat instance.yaml +``` + +## Deploy instance + +Apply the instance definition: +```shell +kubectl apply -f instance.yaml +``` + + +## Post-deployment steps +### S3 event notification configuration update +Check newly created bucket name: +```shell +export BUCKET_NAME=$(kubectl get bucket.s3.services.k8s.aws -o jsonpath='{.items..metadata.name}' --namespace $WORKLOAD_NAME | grep $WORKLOAD_NAME) +``` +This step is required until ACK missing feature is implemented. +```bash +# Enable EventBridge notifications as ACK does not support it at this time and they are not enabled by default +aws s3api put-bucket-notification-configuration --bucket $BUCKET_NAME --notification-configuration='{ "EventBridgeConfiguration": {} }' +``` +### Spark data processing script upload +```bash +aws s3 cp ./scripts s3://$BUCKET_NAME/scripts --recursive +``` + +## Test + +List all resources in the stack namespace (it will take some time to get all results): +```shell +kubectl api-resources --verbs=list --namespaced -o name | xargs -n 1 kubectl get --show-kind --ignore-not-found -n $WORKLOAD_NAME +``` +Copy sample data for processing (for example `yellow_tripdata_2024-05.parquet`): +```shell +aws s3 cp /yellow_tripdata_2024-05.parquet s3://$BUCKET_NAME/input/yellow_tripdata_2024-05.parquet +``` + +## Clean up + +Delete S3 bucket content: +```shell +aws s3 rm --recursive s3://$BUCKET_NAME +``` +Delete instance and resource group: +```shell +kubectl delete -f instance.yaml +kubectl delete -f eda-eks-data-processor.yaml +``` + +Note: You may need to patch resource finalizer in case deletion of the resource hangs. For example, following command patches SNS subscription in `eda-eks-demo` namespace (unconfirmed subscriptions cannot be deleted (they are cleaned up automatically after 48hrs) and prevent resource from deletion): +```shell +kubectl patch subscription.sns.services.k8s.aws/eda-eks-demo-notifications-subscription -p '{"metadata":{"finalizers":[]}}' --type=merge --namespace eda-eks-demo +``` + diff --git a/examples/data-processor/assets/eks-eda.drawio b/examples/data-processor/assets/eks-eda.drawio new file mode 100644 index 00000000..bc0e0185 --- /dev/null +++ b/examples/data-processor/assets/eks-eda.drawio @@ -0,0 +1,128 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/data-processor/assets/eks-eda.png b/examples/data-processor/assets/eks-eda.png new file mode 100644 index 00000000..a5a4313b Binary files /dev/null and b/examples/data-processor/assets/eks-eda.png differ diff --git a/examples/data-processor/assets/stepfunctions_graph.png b/examples/data-processor/assets/stepfunctions_graph.png new file mode 100644 index 00000000..926ae7b3 Binary files /dev/null and b/examples/data-processor/assets/stepfunctions_graph.png differ diff --git a/examples/data-processor/eda-eks-data-processor.yaml b/examples/data-processor/eda-eks-data-processor.yaml new file mode 100644 index 00000000..f54992a3 --- /dev/null +++ b/examples/data-processor/eda-eks-data-processor.yaml @@ -0,0 +1,678 @@ +apiVersion: kro.run/v1alpha1 +kind: ResourceGroup +metadata: + name: eda-eks-data-processor.kro.run +spec: + schema: + apiVersion: v1alpha1 + kind: EdaEksDataProcessor + spec: + namePrefix: string | default="data-processor" + environment: + accountId: string | required=true + region: string | required=true + eksClusterName: string | default="eks-eda" + oidcProvider: string | required=true + emrVirtualClusterNamespace: string | default="emr-ns" + input: + bucket: string | default="" + prefix: string | default="input" + tempPrefix: string | default="temp" + script: + bucket: string | required=true + prefix: string | default="scripts" + name: string | default="process_data.py" + lake: + bucket: string | required=true + prefix: string | default="lake" + inputDataPrefix: string | default="data" + scriptOutputPrefix: string | default="summaries" + notifications: + email: string | default="me@example.com" + resources: + # Kubernetes namespace for the stack + - id: stackNamespace + template: + apiVersion: v1 + kind: Namespace + metadata: + name: ${schema.spec.namePrefix} + # Pod role setup for EMR on EKS job execution + - id: podRole + template: + apiVersion: iam.services.k8s.aws/v1alpha1 + kind: Role + metadata: + name: ${schema.spec.namePrefix}-podrole + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-podrole + assumeRolePolicyDocument: | + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "pods.eks.amazonaws.com" + }, + "Action": [ + "sts:AssumeRole", + "sts:TagSession" + ] + }, + { + "Effect": "Allow", + "Principal": { + "Service": "ec2.amazonaws.com" + }, + "Action": "sts:AssumeRole" + }, + { + "Effect": "Allow", + "Principal": { + "Federated": "arn:aws:iam::${schema.spec.environment.accountId}:oidc-provider/${schema.spec.environment.oidcProvider}" + }, + "Action": "sts:AssumeRoleWithWebIdentity", + "Condition": { + "StringLike": { + "${schema.spec.environment.oidcProvider}:sub": "system:serviceaccount:${schema.spec.namePrefix}:emr-containers-sa-*-*-${schema.spec.environment.accountId}-*" + } + } + } + ] + } + inlinePolicies: + s3-policy: | + { + "Version" : "2012-10-17", + "Statement" : [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:GetObject", + "s3:ListBucket", + "s3:DeleteObject" + ], + "Resource": [ + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.prefix}/*", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.prefix}", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.tempPrefix}/*", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.tempPrefix}", + "arn:aws:s3:::${schema.spec.script.bucket}/${schema.spec.script.prefix}/${schema.spec.script.name}", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.inputDataPrefix}/*", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.inputDataPrefix}", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.scriptOutputPrefix}/*", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.scriptOutputPrefix}" + ] + } + ] + } + - id: serviceAccount + template: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: ${schema.spec.namePrefix}-serviceaccount + namespace: ${stackNamespace.metadata.name} + - id: podIdentityAssociation + template: + apiVersion: eks.services.k8s.aws/v1alpha1 + kind: PodIdentityAssociation + metadata: + name: ${schema.spec.namePrefix}-podidentityassociation + namespace: ${stackNamespace.metadata.name} + spec: + clusterName: ${schema.spec.environment.eksClusterName} + namespace: ${stackNamespace.metadata.name} + roleARN: ${podRole.status.ackResourceMetadata.arn} + serviceAccount: ${schema.spec.namePrefix}-serviceaccount + # S3 bucket for data/script input for processing + - id: inputBucket + template: + apiVersion: s3.services.k8s.aws/v1alpha1 + kind: Bucket + metadata: + name: ${schema.spec.namePrefix}-bucket + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.input.bucket} + # DynamoDB table to store processed data summaries + - id: dataProcessingResultsTable + template: + apiVersion: dynamodb.services.k8s.aws/v1alpha1 + kind: Table + metadata: + name: ${schema.spec.namePrefix}-data-processing-results + namespace: ${stackNamespace.metadata.name} + spec: + keySchema: + - attributeName: id + keyType: HASH + attributeDefinitions: + - attributeName: id + attributeType: S + billingMode: PAY_PER_REQUEST + tableName: ${schema.spec.namePrefix}-data-processing-results + # EMR on EKS virtual cluster for data processing + - id: emrRbacRole + template: + apiVersion: rbac.authorization.k8s.io/v1 + kind: Role + metadata: + name: ${schema.spec.namePrefix}-emr-containers + namespace: ${stackNamespace.metadata.name} + rules: + - apiGroups: [""] + resources: ["namespaces"] + verbs: ["get"] + - apiGroups: [""] + resources: ["serviceaccounts", "services", "configmaps", "events", "pods", "pods/log"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"] + - apiGroups: [""] + resources: ["secrets"] + verbs: ["create", "patch", "delete", "watch"] + - apiGroups: ["apps"] + resources: ["statefulsets", "deployments"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + - apiGroups: ["extensions", "networking.k8s.io"] + resources: ["ingresses"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles", "rolebindings"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"] + - id: roleBinding + template: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: emr-containers + namespace: ${stackNamespace.metadata.name} + subjects: + - kind: User + name: emr-containers + apiGroup: rbac.authorization.k8s.io + roleRef: + kind: Role + name: ${schema.spec.namePrefix}-emr-containers + apiGroup: rbac.authorization.k8s.io + - id: dataProcessingVirtualCluster + template: + apiVersion: emrcontainers.services.k8s.aws/v1alpha1 + kind: VirtualCluster + metadata: + name: ${schema.spec.namePrefix}-emr-vc + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-emr-vc + containerProvider: + id: ${schema.spec.environment.eksClusterName} + type_: EKS + info: + eksInfo: + namespace: ${stackNamespace.metadata.name} + # SNS topic for data processing notifications + - id: dataProcessingNotificationsSNSTopic + template: + apiVersion: sns.services.k8s.aws/v1alpha1 + kind: Topic + metadata: + name: ${schema.spec.namePrefix}-notifications-topic + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-notifications-topic + policy: | + { + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "default_statement", + "Effect": "Allow", + "Principal": { + "AWS": "*" + }, + "Action": [ + "SNS:GetTopicAttributes", + "SNS:SetTopicAttributes", + "SNS:AddPermission", + "SNS:RemovePermission", + "SNS:DeleteTopic", + "SNS:Subscribe", + "SNS:ListSubscriptionsByTopic", + "SNS:Publish" + ], + "Resource": "arn:aws:sns:${schema.spec.environment.region}:${schema.spec.environment.accountId}:${schema.spec.namePrefix}-notifications-topic", + "Condition": { + "StringEquals": { + "AWS:SourceOwner": "${schema.spec.environment.accountId}" + } + } + }, + { + "Sid": "${schema.spec.namePrefix}-events-publishing-to-topicpolicy", + "Effect": "Allow", + "Principal": { + "Service": "events.amazonaws.com" + }, + "Action": "sns:Publish", + "Resource": "arn:aws:sns:${schema.spec.environment.region}:${schema.spec.environment.accountId}:${schema.spec.namePrefix}-notifications-topic" + } + ] + } + # SNS email subscription for status notifications + - id: dataProcessingNotificationEmailSubsription + template: + apiVersion: sns.services.k8s.aws/v1alpha1 + kind: Subscription + metadata: + name: ${schema.spec.namePrefix}-notifications-subscription + namespace: ${stackNamespace.metadata.name} + spec: + topicARN: ${dataProcessingNotificationsSNSTopic.status.ackResourceMetadata.arn} + protocol: email-json + endpoint: ${schema.spec.notifications.email} + # EventBridge bus for data processing events + - id: ebStatusBus + template: + apiVersion: eventbridge.services.k8s.aws/v1alpha1 + kind: EventBus + metadata: + name: ${schema.spec.namePrefix}-eb-bus + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-eb-bus + # EventBridge permissions + - id: ebIamRole + template: + apiVersion: iam.services.k8s.aws/v1alpha1 + kind: Role + metadata: + name: ${schema.spec.namePrefix}-eb-role + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-eb-role + assumeRolePolicyDocument: | + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": [ + "events.amazonaws.com" + ] + }, + "Action": "sts:AssumeRole" + } + ] + } + inlinePolicies: + eb-policy: | + { + "Version" : "2012-10-17", + "Statement" : [ + { + "Effect" : "Allow", + "Action" : "states:StartExecution", + "Resource" : [ + "${stepFunctionsWorkflow.status.ackResourceMetadata.arn}" + ] + }, + { + "Effect" : "Allow", + "Action": "sns:Publish", + "Resource": "${dataProcessingNotificationsSNSTopic.status.ackResourceMetadata.arn}" + } + ] + } + # EventBridge rules + - id: ebS3ObjectRule + template: + apiVersion: eventbridge.services.k8s.aws/v1alpha1 + kind: Rule + metadata: + name: ${schema.spec.namePrefix}-eb-rule-input + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-eb-rule-input + description: "EventBridge rule to start Step Functions execution when data comes in" + eventPattern: | + { + "source": ["aws.s3"], + "detail-type": ["Object Created"], + "detail": { + "bucket": { + "name": ["${schema.spec.input.bucket}"] + }, + "object": { + "key": [{ + "prefix": "${schema.spec.input.prefix}/" + }] + } + } + } + targets: + - arn: ${stepFunctionsWorkflow.status.ackResourceMetadata.arn} + id: ${schema.spec.namePrefix}-sfn-data-processing + roleARN: ${ebIamRole.status.ackResourceMetadata.arn} + - id: ebProcessingFinishedRule + template: + apiVersion: eventbridge.services.k8s.aws/v1alpha1 + kind: Rule + metadata: + name: ${schema.spec.namePrefix}-eb-rule-finished + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-eb-rule-finished + eventBusName: ${ebStatusBus.spec.name} + description: "Notify subscribers once input data processing finishes" + eventPattern: | + { + "source": ["${schema.spec.namePrefix}"], + "detail-type": ["InputDataProcessed"] + } + targets: + - arn: ${dataProcessingNotificationsSNSTopic.status.ackResourceMetadata.arn} + id: ${schema.spec.namePrefix}-sfn-data-processed + # StepFunctions permissions + - id: stepFunctionsRole + template: + apiVersion: iam.services.k8s.aws/v1alpha1 + kind: Role + metadata: + name: ${schema.spec.namePrefix}-sfn-role + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-sfn-role + assumeRolePolicyDocument: | + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": [ + "states.amazonaws.com" + ] + }, + "Action": "sts:AssumeRole" + } + ] + } + inlinePolicies: + sfn-policy: | + { + "Version" : "2012-10-17", + "Statement" : [ + { + "Effect" : "Allow", + "Action" : [ + "emr-containers:StartJobRun", + "emr-containers:DescribeJobRun", + "emr-containers:CancelJobRun" + ], + "Resource" : "*" + }, + { + "Effect" : "Allow", + "Action" : [ + "logs:CreateLogDelivery", + "logs:GetLogDelivery", + "logs:UpdateLogDelivery", + "logs:DeleteLogDelivery", + "logs:ListLogDeliveries", + "logs:PutResourcePolicy", + "logs:DescribeResourcePolicies", + "logs:DescribeLogGroups" + ], + "Resource" : "*" + }, + { + "Action" : "iam:PassRole", + "Effect" : "Allow", + "Resource" : "*" + }, + { + "Effect" : "Allow", + "Action" : [ + "xray:PutTraceSegments", + "xray:PutTelemetryRecords", + "xray:GetSamplingRules", + "xray:GetSamplingTargets" + ], + "Resource" : "*" + }, + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:GetObject", + "s3:ListBucket", + "s3:DeleteObject" + ], + "Resource": [ + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.prefix}/*", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.prefix}", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.tempPrefix}/*", + "arn:aws:s3:::${schema.spec.input.bucket}/${schema.spec.input.tempPrefix}", + "arn:aws:s3:::${schema.spec.script.bucket}/${schema.spec.script.prefix}/${schema.spec.script.name}", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.inputDataPrefix}/*", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.inputDataPrefix}", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.scriptOutputPrefix}/*", + "arn:aws:s3:::${schema.spec.lake.bucket}/${schema.spec.lake.prefix}/${schema.spec.lake.scriptOutputPrefix}" + ] + }, + { + "Effect": "Allow", + "Action": [ + "dynamodb:*" + ], + "Resource": [ + "${dataProcessingResultsTable.status.ackResourceMetadata.arn}", + "${dataProcessingResultsTable.status.ackResourceMetadata.arn}/index/*" + ] + }, + { + "Effect": "Allow", + "Action": "events:PutEvents", + "Resource": "${ebStatusBus.status.ackResourceMetadata.arn}" + } + ] + } + # StepFunctions workflow + - id: stepFunctionsWorkflow + template: + apiVersion: sfn.services.k8s.aws/v1alpha1 + kind: StateMachine + metadata: + name: ${schema.spec.namePrefix}-sfn + namespace: ${stackNamespace.metadata.name} + spec: + name: ${schema.spec.namePrefix}-sfn + roleARN: "${stepFunctionsRole.status.ackResourceMetadata.arn}" + definition: | + { + "Comment": "S3 data processing using EMR on EKS", + "StartAt": "Prepare configuration", + "States": { + "Prepare configuration": { + "Type": "Pass", + "Next": "Process input data", + "Parameters": { + "stack": "${schema.spec.namePrefix}", + "objectName.$": "States.ArrayGetItem(States.StringSplit($.object.key, '/'), States.MathAdd(States.ArrayLength(States.StringSplit($.object.key, '/')),-1))", + "inputBucketName.$": "$.bucket.name", + "inputPrefix": "${schema.spec.input.prefix}", + "outputBucketName": "${schema.spec.input.bucket}", + "outputPrefix": "${schema.spec.input.tempPrefix}", + "scriptBucketName": "${schema.spec.script.bucket}", + "scriptPrefix": "${schema.spec.script.prefix}", + "scriptName": "${schema.spec.script.name}", + "lakeBucketName": "${schema.spec.lake.bucket}", + "lakePrefix": "${schema.spec.lake.prefix}", + "lakeInputDataPrefix": "${schema.spec.lake.inputDataPrefix}", + "lakeScriptOutputPrefix": "${schema.spec.lake.scriptOutputPrefix}" + }, + "InputPath": "$.detail" + }, + "Process input data": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Summarize data", + "States": { + "Summarize data": { + "Type": "Task", + "Resource": "arn:aws:states:::emr-containers:startJobRun.sync", + "Parameters": { + "VirtualClusterId": "${dataProcessingVirtualCluster.status.id}", + "ExecutionRoleArn": "${podRole.status.ackResourceMetadata.arn}", + "ReleaseLabel": "emr-7.2.0-latest", + "JobDriver": { + "SparkSubmitJobDriver": { + "EntryPoint.$": "States.Format('s3://{}/{}/{}', $.scriptBucketName, $.scriptPrefix, $.scriptName)", + "EntryPointArguments.$": "States.Array($.inputBucketName,States.Format('/{}/{}', $.inputPrefix, $.objectName),$.outputBucketName,States.Format('{}/{}.json', $.outputPrefix, $.objectName))" + } + } + }, + "Next": "Process data summary", + "ResultPath": null + }, + "Process data summary": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Read summary from S3", + "States": { + "Read summary from S3": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.outputBucketName", + "Key.$": "States.Format('{}/{}.json', $.outputPrefix, $.objectName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:getObject", + "Next": "Update summary in DynamoDB", + "ResultSelector": { + "Body.$": "States.StringToJson($.Body)" + }, + "ResultPath": "$.SummaryTaskOutput" + }, + "Update summary in DynamoDB": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${dataProcessingResultsTable.spec.tableName}", + "Key": { + "id": { + "S.$": "$.SummaryTaskOutput.Body.data_file" + } + }, + "UpdateExpression": "SET summary = :summary, top_10_most_expensive_trips= :top_trips, payments_distribution= :payments", + "ExpressionAttributeValues": { + ":summary": { + "S.$": "States.JsonToString($.SummaryTaskOutput.Body.data_summary)" + }, + ":top_trips": { + "S.$": "States.JsonToString($.SummaryTaskOutput.Body.top_10_most_expensive_trips)" + }, + ":payments": { + "S.$": "States.JsonToString($.SummaryTaskOutput.Body.payment_types)" + } + } + }, + "ResultPath": null, + "Next": "Discard summary data" + }, + "Discard summary data": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.outputBucketName", + "Key.$": "States.Format('{}/{}.json', $.outputPrefix, $.objectName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:deleteObject", + "ResultPath": null, + "End": true + } + } + }, + { + "StartAt": "Add summary to the data lake", + "States": { + "Add summary to the data lake": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.lakeBucketName", + "CopySource.$": "States.Format('/{}/{}/{}.json', $.outputBucketName, $.outputPrefix, $.objectName)", + "Key.$": "States.Format('{}/{}/{}.json', $.lakePrefix, $.lakeScriptOutputPrefix, $.objectName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:copyObject", + "ResultPath": null, + "End": true + } + } + } + ], + "End": true, + "ResultPath": null + } + } + }, + { + "StartAt": "Add input to the data lake", + "States": { + "Add input to the data lake": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.lakeBucketName", + "CopySource.$": "States.Format('/{}/{}/{}', $.inputBucketName, $.inputPrefix, $.objectName)", + "Key.$": "States.Format('{}/{}/{}', $.lakePrefix, $.lakeInputDataPrefix, $.objectName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:copyObject", + "ResultPath": null, + "End": true + } + } + } + ], + "Next": "Discard input data" + }, + "Discard input data": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$$.Execution.Input.detail.bucket.name", + "Key.$": "$$.Execution.Input.detail.object.key" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:deleteObject", + "Next": "Publish data processing finished event", + "ResultPath": null + }, + "Publish data processing finished event": { + "Type": "Task", + "Resource": "arn:aws:states:::events:putEvents", + "Parameters": { + "Entries": [ + { + "Detail": { + "message": "Data processed", + "executionId.$": "$$.Execution.Id", + "inputBucket.$": "$$.Execution.Input.detail.bucket.name", + "inputObject.$": "$$.Execution.Input.detail.object.key" + }, + "DetailType": "InputDataProcessed", + "EventBusName": "${ebStatusBus.spec.name}", + "Source.$": "$.[0].stack" + } + ] + }, + "End": true + } + } + } + diff --git a/examples/data-processor/instance-template.yaml b/examples/data-processor/instance-template.yaml new file mode 100644 index 00000000..22b33bc3 --- /dev/null +++ b/examples/data-processor/instance-template.yaml @@ -0,0 +1,27 @@ +## Workload stack +apiVersion: kro.run/v1alpha1 +kind: EdaEksDataProcessor +metadata: + name: $WORKLOAD_NAME +spec: + namePrefix: $WORKLOAD_NAME + environment: + region: $AWS_REGION + accountId: "$AWS_ACCOUNT_ID" + eksClusterName: $EKS_CLUSTER_NAME + oidcProvider: $OIDC_PROVIDER + input: + bucket: $INPUT_BUCKET_NAME + prefix: input + tempPrefix: temp + script: + bucket: $SCRIPTS_BUCKET_NAME + prefix: scripts + name: process_data.py + lake: + bucket: $LAKE_BUCKET_NAME + prefix: lake + inputDataPrefix: data + scriptOutputPrefix: summaries + notifications: + email: "me@example.com" diff --git a/examples/data-processor/scripts/summarize_nyc_taxi_data.py b/examples/data-processor/scripts/summarize_nyc_taxi_data.py new file mode 100644 index 00000000..32fc51ce --- /dev/null +++ b/examples/data-processor/scripts/summarize_nyc_taxi_data.py @@ -0,0 +1,92 @@ +import sys +import boto3 +import json +from datetime import datetime +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +from pyspark.sql import functions as f + +appName = "STACK_NAME"+datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + +def map_payment_type(payment_type): + payment_map = { + 1: "Credit card", + 2: "Cash", + 3: "No charge", + 4: "Dispute", + 5: "Unknown", + 6: "Voided trip" + } + return payment_map.get(payment_type, "Other") + +def main(args): + result={} + input_bucket_name = args[1] + input_file_name = args[2] + output_bucket_name = args[3] + output_file_name = args[4] + + spark = SparkSession \ + .builder \ + .appName(appName) \ + .getOrCreate() + + # Load data from S3 + df = (spark.read.format("parquet") \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .load("s3://"+input_bucket_name+"/"+input_file_name)) \ + .dropna() + + # Get data summary + df_summary = df.groupBy("VendorID").agg( + avg("Fare_amount").alias("avg_fare"), + count("*").alias("num_trips"), + sum("Fare_amount").alias("total_fare") + ) + + # Get most expensive trips + top_10_expensive_trips = df.orderBy(f.col("fare_amount").desc()).limit(10) + + # Create UDF for payment type mapping + map_payment_type_udf = f.udf(map_payment_type, StringType()) + # Calculate distribution by payment_type + payment_type_distribution = df.groupBy("payment_type") \ + .agg(f.count("*").alias("trip_count"), + f.round(f.avg("fare_amount"), 2).alias("avg_fare"), + f.round(f.sum("fare_amount"), 2).alias("total_fare")) \ + .withColumn("payment_type_desc", map_payment_type_udf(f.col("payment_type"))) \ + .orderBy(f.col("trip_count").desc()) + # Calculate percentage distribution + total_trips = df.count() + payment_type_percentage = payment_type_distribution.withColumn( + "percentage", f.round((f.col("trip_count") / total_trips) * 100, 2) + ) + + # Generate result + result = { + "data_file": input_file_name.split("/")[-1], + "source_url": "s3://"+input_bucket_name+input_file_name, + "data_summary": [json.loads(x) for x in df_summary.toJSON().collect()], + "top_10_most_expensive_trips": [json.loads(x) for x in top_10_expensive_trips.toJSON().collect()], + "payment_types": [json.loads(x) for x in payment_type_percentage.toJSON().collect()] + } + + # Write result to S3 + s3 = boto3.resource('s3') + object = s3.Object(output_bucket_name, output_file_name) + object.put(Body=json.dumps(result)) + + spark.stop() + + return None + + +if __name__ == "__main__": + print(len(sys.argv)) + if len(sys.argv) != 5: + print("Usage: process_data input_bucket_name input_file_name output_bucket_name output_file_name") + sys.exit(0) + + main(sys.argv) +