This repository has been archived by the owner on Aug 3, 2023. It is now read-only.
forked from aws-samples/amazon-textract-transformer-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
textract_ocr.py
246 lines (234 loc) · 10.4 KB
/
textract_ocr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
"""CDK for core OCR stage of the document processing pipeline with Amazon Textract
"""
# External Dependencies:
from aws_cdk import Duration, RemovalPolicy
import aws_cdk.aws_dynamodb as dynamodb
from aws_cdk.aws_iam import Effect, PolicyDocument, PolicyStatement, Role, ServicePrincipal
from aws_cdk.aws_lambda import Runtime as LambdaRuntime
from aws_cdk.aws_lambda_python_alpha import PythonFunction
from aws_cdk.aws_s3 import Bucket
import aws_cdk.aws_sns as sns
import aws_cdk.aws_sns_subscriptions as subs
import aws_cdk.aws_stepfunctions as sfn
import aws_cdk.aws_stepfunctions_tasks as sfn_tasks
from constructs import Construct
# Local Dependencies:
from . import sfn_semaphore
from ..shared import abs_path
TEXTRACT_LAMBDA_PATH = abs_path("fn-call-textract", __file__)
class TextractOCRStep(Construct):
"""CDK Construct for a concurrency- & TPS-limiting Amazon Textract OCR step in a Step Function
This construct's `.sfn_task` expects inputs with $.Input.Bucket and $.Input.Key properties
specifying the location of the raw input document, and will return an object with Bucket and
Key pointing to the consolidated Textract JSON object.
"""
def __init__(
self,
scope: Construct,
id: str,
lambda_role: Role,
output_bucket: Bucket,
output_prefix: str,
concurrency_limit: int = 90,
warmup_tps_limit: float = 2,
lambda_memory_size: int = 1024,
lambda_timeout: Duration = Duration.minutes(3),
timeout_excluding_queue: Duration = Duration.minutes(25),
timeout_including_queue: Duration = Duration.minutes(30),
**kwargs,
):
"""Create a TextractOCRStep
Parameters
----------
scope :
CDK construct scope
id :
CDK construct ID
lambda_role :
IAM Role that the Textract-invoking Lambda function will run with
output_bucket :
(Pre-existing) S3 bucket where Textract result files should be stored
output_prefix :
Prefix under which Textract result files should be stored in S3 (under this prefix,
the original input document keys will be mapped).
concurrency_limit :
Maximum number of Textract jobs which may be in-progress at a time. Additional requests
will be pooled for retry via AWS Step Functions (order not guaranteed). Refer to your
account & region's Amazon Textract quotas to set, and consider reducing a margin in
case your usage is limited more in practice by the rate of result-fetching Get*** APIs
than the total job concurrency limit.
warmup_tps_limit :
Limit on maximum rate of new Textract job creation. Additional requests will be pooled
for retry via AWS Step Functions (order not guaranteed). Refer to your account &
region's Amazon Textract Quotas.
lambda_memory_size :
MB of memory to reserve for the Lambda function that calls Amazon Textract and
consolidates response objects. You may need to increase this setting if dealing with
very large documents, to ensure the full consolidated Textract response JSON fits in
memory.
lambda_timeout :
Time-out for the Lambda function that calls Amazon Textract and consolidates response
objects. You may need to increase this setting if dealing with documents so large that
consolidation takes a long time, or allowing the Lambda to poll/retry Textract API calls
over a longer period.
timeout_excluding_queue :
Timeout for the Textract processing job itself to be considered as failed.
timeout_including_queue :
Timeout for the end-to-end step (including concurrency management / queuing) to be
considered as failed.
"""
super().__init__(scope, id, **kwargs)
lambda_role.add_to_policy(
PolicyStatement(
sid="CallTextract",
actions=[
"textract:AnalyzeDocument",
"textract:DetectDocumentText",
"textract:GetDocumentAnalysis",
"textract:GetDocumentTextDetection",
"textract:StartDocumentAnalysis",
"textract:StartDocumentTextDetection",
],
effect=Effect.ALLOW,
resources=["*"],
)
)
self.ddb_table = dynamodb.Table(
self,
"TextractAsyncStateCacheTable",
partition_key=dynamodb.Attribute(
name="TextractJobId",
type=dynamodb.AttributeType.STRING,
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
removal_policy=RemovalPolicy.DESTROY,
time_to_live_attribute="ExpiresAt",
)
lambda_role.add_to_policy(
PolicyStatement(
sid="DDBAsyncStateCacheTableAccess",
actions=["dynamodb:GetItem", "dynamodb:PutItem"],
effect=Effect.ALLOW,
resources=[self.ddb_table.table_arn],
)
)
self.sns_topic = sns.Topic(self, "TextractCallbackTopic")
self.textract_sns_role = Role(
self,
"TextractSNSRole",
description="Execution role to give Textract permission to notify our SNS topic",
assumed_by=ServicePrincipal("textract.amazonaws.com"),
inline_policies={
"SNSAccess": PolicyDocument(
statements=[
PolicyStatement(
sid="SNSTopicAccess",
actions=["sns:Publish"],
effect=Effect.ALLOW,
resources=[self.sns_topic.topic_arn],
),
],
),
},
)
lambda_role.add_to_policy(
PolicyStatement(
sid="PassTextractSNSRole",
actions=["iam:PassRole"],
effect=Effect.ALLOW,
resources=[self.textract_sns_role.role_arn],
)
)
self.caller_lambda = PythonFunction(
self,
"CallTextract",
entry=TEXTRACT_LAMBDA_PATH,
environment={
"CALLBACK_SNS_ROLE_ARN": self.textract_sns_role.role_arn,
"CALLBACK_SNS_TOPIC_ARN": self.sns_topic.topic_arn,
"DDB_STATE_CACHE_TABLE": self.ddb_table.table_name,
},
index="main.py",
handler="handler",
memory_size=lambda_memory_size,
role=lambda_role,
runtime=LambdaRuntime.PYTHON_3_8,
timeout=lambda_timeout,
)
self.sns_topic.add_subscription(subs.LambdaSubscription(self.caller_lambda))
self.inner_textact_step = sfn_tasks.LambdaInvoke(
self,
"TextractLambdaStep",
comment="Extract document text and structure with Amazon Textract",
lambda_function=self.caller_lambda,
payload=sfn.TaskInput.from_object(
{
"IdempotencySalt": sfn.JsonPath.task_token,
"Input": sfn.JsonPath.string_at("$.Input"),
"Output": sfn.JsonPath.string_at("$.Output"),
"TaskToken": sfn.JsonPath.task_token,
}
),
result_path="$.Output", # Overwrite 'Output'
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
timeout=timeout_excluding_queue,
)
# We'll use a Step Function semaphore construct to manage concurrency and TPS for Textract,
# from the sfn_semaphore folder. While in general it might be nice to share the semaphore
# DB table and reaper machine between semaphores, this CDK app only uses this one lock.
# Therefore we'll just init everything here and not over-complicate this construct's API:
global_lock_name = "TextractConcurrencyLock" # Could consider: cdk.Names.unique_id(self)
self.semaphore_ddb = sfn_semaphore.SFnSemaphoreDynamoDbTable(
self,
"TextractSemaphoreDDB",
lock_id_attr="LockName",
)
self.semaphore_reaper = sfn_semaphore.SFnSemaphoreReaper(
self,
"TextractSemaphoreReaper",
ddb_lock_table=self.semaphore_ddb,
lock_id_attr=self.semaphore_ddb.lock_id_attr,
)
self.semaphore = sfn_semaphore.SFnSemaphore(
self,
"TextractSemaphore",
workchain=self.inner_textact_step,
ddb_lock_table=self.semaphore_ddb,
lock_id_attr=self.semaphore_ddb.lock_id_attr,
lock_name=global_lock_name,
concurrency_limit=concurrency_limit,
warmup_tps_limit=warmup_tps_limit,
)
self.textract_state_machine = sfn.StateMachine(
self,
"TextractStateMachine",
definition=self.semaphore.chain,
timeout=timeout_including_queue,
)
self.semaphore_reaper.attach(self.textract_state_machine, lock_name=global_lock_name)
# This inner, semaphore-wrapped state machine will be the task as seen by the OCR pipeline:
self.sfn_task = sfn_tasks.StepFunctionsStartExecution(
self,
"TextractTask",
comment="Extract document text and structure with Amazon Textract",
state_machine=self.textract_state_machine,
input=sfn.TaskInput.from_object(
{
"Input": {
"Bucket": sfn.JsonPath.string_at("$.Input.Bucket"),
"Key": sfn.JsonPath.string_at("$.Input.Key"),
},
"Output": {
"Bucket": output_bucket.bucket_name,
"Prefix": output_prefix,
},
}
),
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
timeout=timeout_including_queue,
# We overwrite the entire SFN state here because this step is used in a Parallel with
# thumbnail generation:
output_path=sfn.JsonPath.string_at("$.Output.Output"),
)