Skip to content

Commit e1e974d

Browse files
authored
Add service integrations for SQS, SNS and DynamoDB (#11)
Add service integrations for SQS, SNS and DynamoDB
1 parent a06a33e commit e1e974d

File tree

3 files changed

+353
-0
lines changed

3 files changed

+353
-0
lines changed

src/stepfunctions/steps/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@
1818
from stepfunctions.steps.states import Graph, FrozenGraph
1919
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep
2020
from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep
21+
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
22+
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep

src/stepfunctions/steps/service.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
12+
# permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
from stepfunctions.steps.states import Task
16+
from stepfunctions.steps.fields import Field
17+
18+
19+
class DynamoDBGetItemStep(Task):
20+
"""
21+
Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
22+
"""
23+
24+
def __init__(self, state_id, **kwargs):
25+
"""
26+
Args:
27+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
28+
comment (str, optional): Human-readable comment or description. (default: None)
29+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
30+
parameters (dict, optional): The value of this field becomes the effective input for the state.
31+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
32+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
33+
"""
34+
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:getItem'
35+
super(DynamoDBGetItemStep, self).__init__(state_id, **kwargs)
36+
37+
38+
class DynamoDBPutItemStep(Task):
39+
40+
"""
41+
Creates a Task state to put an item to DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
42+
"""
43+
44+
def __init__(self, state_id, **kwargs):
45+
"""
46+
Args:
47+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
48+
comment (str, optional): Human-readable comment or description. (default: None)
49+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
50+
parameters (dict, optional): The value of this field becomes the effective input for the state.
51+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
52+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
53+
"""
54+
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:putItem'
55+
super(DynamoDBPutItemStep, self).__init__(state_id, **kwargs)
56+
57+
58+
class DynamoDBDeleteItemStep(Task):
59+
60+
"""
61+
Creates a Task state to delete an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
62+
"""
63+
64+
def __init__(self, state_id, **kwargs):
65+
"""
66+
Args:
67+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
68+
comment (str, optional): Human-readable comment or description. (default: None)
69+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
70+
parameters (dict, optional): The value of this field becomes the effective input for the state.
71+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
72+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
73+
"""
74+
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:deleteItem'
75+
super(DynamoDBDeleteItemStep, self).__init__(state_id, **kwargs)
76+
77+
78+
class DynamoDBUpdateItemStep(Task):
79+
80+
"""
81+
Creates a Task state to update an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
82+
"""
83+
84+
def __init__(self, state_id, **kwargs):
85+
"""
86+
Args:
87+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
88+
comment (str, optional): Human-readable comment or description. (default: None)
89+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
90+
parameters (dict, optional): The value of this field becomes the effective input for the state.
91+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
92+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
93+
"""
94+
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:updateItem'
95+
super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs)
96+
97+
98+
class SnsPublishStep(Task):
99+
100+
"""
101+
Creates a Task state to publish a message to SNS topic. See `Call Amazon SNS with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-sns.html>`_ for more details.
102+
"""
103+
104+
def __init__(self, state_id, wait_for_callback=False, **kwargs):
105+
"""
106+
Args:
107+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
108+
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
109+
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
110+
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
111+
comment (str, optional): Human-readable comment or description. (default: None)
112+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
113+
parameters (dict, optional): The value of this field becomes the effective input for the state.
114+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
115+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
116+
"""
117+
if wait_for_callback:
118+
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish.waitForTaskToken'
119+
else:
120+
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish'
121+
122+
super(SnsPublishStep, self).__init__(state_id, **kwargs)
123+
124+
125+
class SqsSendMessageStep(Task):
126+
127+
"""
128+
Creates a Task state to send a message to SQS queue. See `Call Amazon SQS with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-sqs.html>`_ for more details.
129+
"""
130+
131+
def __init__(self, state_id, wait_for_callback=False, **kwargs):
132+
"""
133+
Args:
134+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
135+
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
136+
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
137+
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
138+
comment (str, optional): Human-readable comment or description. (default: None)
139+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
140+
parameters (dict, optional): The value of this field becomes the effective input for the state.
141+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
142+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
143+
"""
144+
if wait_for_callback:
145+
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage.waitForTaskToken'
146+
else:
147+
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage'
148+
149+
super(SqsSendMessageStep, self).__init__(state_id, **kwargs)

tests/unit/test_service_steps.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
12+
# permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
import pytest
16+
17+
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
18+
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
19+
20+
21+
def test_sns_publish_step_creation():
22+
step = SnsPublishStep('Publish to SNS', parameters={
23+
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
24+
'Message': 'message',
25+
})
26+
27+
assert step.to_dict() == {
28+
'Type': 'Task',
29+
'Resource': 'arn:aws:states:::sns:publish',
30+
'Parameters': {
31+
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
32+
'Message': 'message',
33+
},
34+
'End': True
35+
}
36+
37+
step = SnsPublishStep('Publish to SNS', wait_for_callback=True, parameters={
38+
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
39+
'Message': {
40+
'Input.$': '$',
41+
'TaskToken.$': '$$.Task.Token'
42+
}
43+
})
44+
45+
assert step.to_dict() == {
46+
'Type': 'Task',
47+
'Resource': 'arn:aws:states:::sns:publish.waitForTaskToken',
48+
'Parameters': {
49+
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
50+
'Message': {
51+
'Input.$': '$',
52+
'TaskToken.$': '$$.Task.Token'
53+
}
54+
},
55+
'End': True
56+
}
57+
58+
59+
def test_sqs_send_message_step_creation():
60+
step = SqsSendMessageStep('Send to SQS', parameters={
61+
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
62+
'MessageBody': 'Hello'
63+
})
64+
65+
assert step.to_dict() == {
66+
'Type': 'Task',
67+
'Resource': 'arn:aws:states:::sqs:sendMessage',
68+
'Parameters': {
69+
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
70+
'MessageBody': 'Hello'
71+
},
72+
'End': True
73+
}
74+
75+
step = SqsSendMessageStep('Send to SQS', wait_for_callback=True, parameters={
76+
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
77+
'MessageBody': {
78+
'Input.$': '$',
79+
'TaskToken.$': '$$.Task.Token'
80+
}
81+
})
82+
83+
assert step.to_dict() == {
84+
'Type': 'Task',
85+
'Resource': 'arn:aws:states:::sqs:sendMessage.waitForTaskToken',
86+
'Parameters': {
87+
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
88+
'MessageBody': {
89+
'Input.$': '$',
90+
'TaskToken.$': '$$.Task.Token'
91+
}
92+
},
93+
'End': True
94+
}
95+
96+
97+
def test_dynamodb_get_item_step_creation():
98+
step = DynamoDBGetItemStep('Read Message From DynamoDB', parameters={
99+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
100+
'Key': {
101+
'MessageId': {
102+
'S.$': '$.List[0]'
103+
}
104+
}
105+
})
106+
107+
assert step.to_dict() == {
108+
'Type': 'Task',
109+
'Resource': 'arn:aws:states:::dynamodb:getItem',
110+
'Parameters': {
111+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
112+
'Key': {
113+
'MessageId': {
114+
'S.$': '$.List[0]'
115+
}
116+
}
117+
},
118+
'End': True
119+
}
120+
121+
122+
def test_dynamodb_put_item_step_creation():
123+
step = DynamoDBPutItemStep('Add Message From DynamoDB', parameters={
124+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
125+
'Item': {
126+
'MessageId': {
127+
'S': '123456789'
128+
}
129+
}
130+
})
131+
132+
assert step.to_dict() == {
133+
'Type': 'Task',
134+
'Resource': 'arn:aws:states:::dynamodb:putItem',
135+
'Parameters': {
136+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
137+
'Item': {
138+
'MessageId': {
139+
'S': '123456789'
140+
}
141+
}
142+
},
143+
'End': True
144+
}
145+
146+
147+
def test_dynamodb_delete_item_step_creation():
148+
step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', parameters={
149+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
150+
'Key': {
151+
'MessageId': {
152+
'S': 'MyMessage'
153+
}
154+
}
155+
})
156+
157+
assert step.to_dict() == {
158+
'Type': 'Task',
159+
'Resource': 'arn:aws:states:::dynamodb:deleteItem',
160+
'Parameters': {
161+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
162+
'Key': {
163+
'MessageId': {
164+
'S': 'MyMessage'
165+
}
166+
}
167+
},
168+
'End': True
169+
}
170+
171+
172+
def test_dynamodb_update_item_step_creation():
173+
step = DynamoDBUpdateItemStep('Update Message From DynamoDB', parameters={
174+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
175+
'Key': {
176+
'RecordId': {
177+
'S': 'RecordId'
178+
}
179+
},
180+
'UpdateExpression': 'set Revision = :val1',
181+
'ExpressionAttributeValues': {
182+
':val1': { 'S': '2' }
183+
}
184+
})
185+
186+
assert step.to_dict() == {
187+
'Type': 'Task',
188+
'Resource': 'arn:aws:states:::dynamodb:updateItem',
189+
'Parameters': {
190+
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
191+
'Key': {
192+
'RecordId': {
193+
'S': 'RecordId'
194+
}
195+
},
196+
'UpdateExpression': 'set Revision = :val1',
197+
'ExpressionAttributeValues': {
198+
':val1': { 'S': '2' }
199+
}
200+
},
201+
'End': True
202+
}

0 commit comments

Comments
 (0)