-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.tf
249 lines (226 loc) · 14.2 KB
/
main.tf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
### parameters
locals {
cluster_enabled = var.cluster != null
scaling = local.cluster_enabled ? lookup(var.cluster, "scaling", local.default_cluster.scaling) : local.default_cluster.scaling
}
### security/policy
resource "aws_iam_role" "cp" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
name = join("-", [local.name, "cp"])
tags = merge(local.default-tags, var.tags)
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = format("elasticmapreduce.%s", module.aws.partition.dns_suffix)
}
}]
})
}
resource "aws_iam_role_policy_attachment" "emr" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
role = aws_iam_role.cp["enabled"].name
policy_arn = format("arn:%s:iam::aws:policy/service-role/AmazonElasticMapReduceRole", module.aws.partition.partition)
}
resource "aws_iam_role" "ng" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
name = join("-", [local.name, "ng"])
tags = merge(local.default-tags, var.tags)
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = format("ec2.%s", module.aws.partition.dns_suffix)
}
}]
})
}
resource "aws_iam_role_policy_attachment" "ec2" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
role = aws_iam_role.ng["enabled"].name
policy_arn = format("arn:%s:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role", module.aws.partition.partition)
}
resource "aws_iam_instance_profile" "ng" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
name = join("-", [local.name, "ng"])
role = aws_iam_role.ng["enabled"].name
}
### cluster/control
data "template_file" "scale-policy" {
template = file("${path.module}/templates/scale-policy.tpl")
}
resource "aws_emr_cluster" "cp" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
name = local.name
tags = merge(local.default-tags, var.tags)
service_role = aws_iam_role.cp["enabled"].arn
release_label = lookup(var.cluster, "release", local.default_cluster.release)
applications = concat(lookup(var.cluster, "applications", local.default_cluster.applications))
termination_protection = lookup(var.cluster, "termination_protections", local.default_cluster.termination_protection)
keep_job_flow_alive_when_no_steps = true
ec2_attributes {
subnet_ids = var.subnets
additional_master_security_groups = var.additional_primary_security_group
additional_slave_security_groups = var.additional_slave_security_group
instance_profile = aws_iam_instance_profile.ng["enabled"].arn
key_name = lookup(var.cluster, "ssh_key", local.default_cluster.ssh_key)
}
dynamic "bootstrap_action" {
for_each = { for k, v in lookup(var.cluster, "bootstrap", local.default_cluster.bootstrap) : k => v if length(lookup(var.cluster, "bootstrap", local.default_cluster.bootstrap)) > 0 }
content {
path = "s3://emr-bootstrap/actions/run-if"
name = "runif"
args = ["instance.isMaster=true", "echo running on primary node"]
}
}
master_instance_fleet {
name = join("-", [local.name, "primary-fleet"])
target_on_demand_capacity = lookup(var.primary_node_groups, "target_on_demand_capacity", local.default_primary_node_groups.target_on_demand_capacity)
dynamic "instance_type_configs" {
for_each = { for k, v in lookup(var.primary_node_groups, "instance_type_configs", local.default_instance_type_configs) : k => v }
content {
bid_price = lookup(instance_type_configs.value, "bid_price", local.default_instance_type_config.bid_price)
bid_price_as_percentage_of_on_demand_price = lookup(instance_type_configs.value, "bid_price_as_percentage_of_on_demand_price", local.default_instance_type_config.bid_price_as_percentage_of_on_demand_price)
instance_type = lookup(instance_type_configs.value, "instance_type", local.default_instance_type_config.instance_type)
weighted_capacity = lookup(instance_type_configs.value, "weighted_capacity", local.default_instance_type_config.weighted_capacity)
dynamic "ebs_config" {
for_each = { for k, v in instance_type_configs.value : k => v if k == "ebs_config" }
content {
size = lookup(ebs_config.value, "size", local.default_instance_type_config.ebs_config.size)
type = lookup(ebs_config.value, "type", local.default_instance_type_config.ebs_config.type)
volumes_per_instance = lookup(ebs_config.value, "volumes_per_instance", local.default_instance_type_config.ebs_config.volumes_per_instance)
}
}
}
}
dynamic "launch_specifications" {
for_each = [lookup(var.primary_node_groups, "launch_specifications", local.default_primary_node_groups.launch_specifications)]
content {
dynamic "on_demand_specification" {
for_each = lookup(launch_specifications.value, "on_demand_specification", null) == null ? [] : [lookup(launch_specifications.value, "on_demand_specification")]
content {
allocation_strategy = lookup(on_demand_specification.value, "allocation_strategy", local.default_on_demand_specification.allocation_strategy)
}
}
dynamic "spot_specification" {
for_each = lookup(launch_specifications.value, "spot_specification", null) == null ? [] : [lookup(launch_specifications.value, "spot_specification")]
content {
allocation_strategy = lookup(spot_specification.value, "allocation_strategy", local.default_spot_specification.allocation_strategy)
block_duration_minutes = lookup(spot_specification.value, "block_duration_minutes", local.default_spot_specification.block_duration_minutes)
timeout_action = lookup(spot_specification.value, "timeout_action", local.default_spot_specification.timeout_action)
timeout_duration_minutes = lookup(spot_specification.value, "timeout_duration_minutes", local.default_spot_specification.timeout_duration_minutes)
}
}
}
}
}
core_instance_fleet {
name = join("-", [local.name, "core-fleet"])
target_on_demand_capacity = lookup(var.core_node_groups, "target_on_demand_capacity", local.default_core_node_groups.target_on_demand_capacity)
target_spot_capacity = lookup(var.core_node_groups, "target_spot_capacity", local.default_core_node_groups.target_spot_capacity)
dynamic "instance_type_configs" {
for_each = { for k, v in lookup(var.core_node_groups, "instance_type_configs", local.default_instance_type_configs) : k => v }
content {
bid_price = lookup(instance_type_configs.value, "bid_price", local.default_instance_type_config.bid_price)
bid_price_as_percentage_of_on_demand_price = lookup(instance_type_configs.value, "bid_price_as_percentage_of_on_demand_price", local.default_instance_type_config.bid_price_as_percentage_of_on_demand_price)
instance_type = lookup(instance_type_configs.value, "instance_type", local.default_instance_type_config.instance_type)
weighted_capacity = lookup(instance_type_configs.value, "weighted_capacity", local.default_instance_type_config.weighted_capacity)
dynamic "ebs_config" {
for_each = { for k, v in instance_type_configs.value : k => v if k == "ebs_config" }
content {
size = lookup(ebs_config.value, "size", local.default_instance_type_config.ebs_config.size)
type = lookup(ebs_config.value, "type", local.default_instance_type_config.ebs_config.type)
volumes_per_instance = lookup(ebs_config.value, "volumes_per_instance", local.default_instance_type_config.ebs_config.volumes_per_instance)
}
}
}
}
dynamic "launch_specifications" {
for_each = [lookup(var.core_node_groups, "launch_specifications", local.default_core_node_groups.launch_specifications)]
content {
dynamic "on_demand_specification" {
for_each = lookup(launch_specifications.value, "on_demand_specification", null) == null ? [] : [lookup(launch_specifications.value, "on_demand_specification")]
content {
allocation_strategy = lookup(on_demand_specification.value, "allocation_strategy", local.default_on_demand_specification.allocation_strategy)
}
}
dynamic "spot_specification" {
for_each = lookup(launch_specifications.value, "spot_specification", null) == null ? [] : [lookup(launch_specifications.value, "spot_specification")]
content {
allocation_strategy = lookup(spot_specification.value, "allocation_strategy", local.default_spot_specification.allocation_strategy)
block_duration_minutes = lookup(spot_specification.value, "block_duration_minutes", local.default_spot_specification.block_duration_minutes)
timeout_action = lookup(spot_specification.value, "timeout_action", local.default_spot_specification.timeout_action)
timeout_duration_minutes = lookup(spot_specification.value, "timeout_duration_minutes", local.default_spot_specification.timeout_duration_minutes)
}
}
}
}
}
lifecycle {
create_before_destroy = true
}
}
### cluster/task
resource "aws_emr_instance_fleet" "dp" {
for_each = toset(local.cluster_enabled ? ["enabled"] : [])
cluster_id = aws_emr_cluster.cp["enabled"].id
name = join("-", [local.name, "task-fleet"])
target_on_demand_capacity = lookup(var.task_node_groups, "target_on_demand_capacity", local.default_task_node_groups.target_on_demand_capacity)
target_spot_capacity = lookup(var.task_node_groups, "target_spot_capacity", local.default_task_node_groups.target_spot_capacity)
dynamic "instance_type_configs" {
for_each = { for k, v in lookup(var.task_node_groups, "instance_type_configs", local.default_instance_type_configs) : k => v }
content {
bid_price = lookup(instance_type_configs.value, "bid_price", local.default_instance_type_config.bid_price)
bid_price_as_percentage_of_on_demand_price = lookup(instance_type_configs.value, "bid_price_as_percentage_of_on_demand_price", local.default_instance_type_config.bid_price_as_percentage_of_on_demand_price)
instance_type = lookup(instance_type_configs.value, "instance_type", local.default_instance_type_config.instance_type)
weighted_capacity = lookup(instance_type_configs.value, "weighted_capacity", local.default_instance_type_config.weighted_capacity)
dynamic "ebs_config" {
for_each = { for k, v in instance_type_configs.value : k => v if k == "ebs_config" }
content {
size = lookup(ebs_config.value, "size", local.default_instance_type_config.ebs_config.size)
type = lookup(ebs_config.value, "type", local.default_instance_type_config.ebs_config.type)
volumes_per_instance = lookup(ebs_config.value, "volumes_per_instance", local.default_instance_type_config.ebs_config.volumes_per_instance)
}
}
}
}
dynamic "launch_specifications" {
for_each = [lookup(var.task_node_groups, "launch_specifications", local.default_task_node_groups.launch_specifications)]
content {
dynamic "on_demand_specification" {
for_each = lookup(launch_specifications.value, "on_demand_specification", null) == null ? [] : [lookup(launch_specifications.value, "on_demand_specification")]
content {
allocation_strategy = lookup(on_demand_specification.value, "allocation_strategy", local.default_on_demand_specification.allocation_strategy)
}
}
dynamic "spot_specification" {
for_each = lookup(launch_specifications.value, "spot_specification", null) == null ? [] : [lookup(launch_specifications.value, "spot_specification")]
content {
allocation_strategy = lookup(spot_specification.value, "allocation_strategy", local.default_spot_specification.allocation_strategy)
block_duration_minutes = lookup(spot_specification.value, "block_duration_minutes", local.default_spot_specification.block_duration_minutes)
timeout_action = lookup(spot_specification.value, "timeout_action", local.default_spot_specification.timeout_action)
timeout_duration_minutes = lookup(spot_specification.value, "timeout_duration_minutes", local.default_spot_specification.timeout_duration_minutes)
}
}
}
}
}
### cluster/scaling
resource "aws_emr_managed_scaling_policy" "as" {
for_each = local.scaling == null ? {} : { emr_managed = local.scaling }
cluster_id = aws_emr_cluster.cp["enabled"].id
dynamic "compute_limits" {
for_each = { for k, v in each.value : k => v if k == "compute_limits" }
content {
unit_type = lookup(compute_limits.value, "unit_type", local.default_scaling_policy.compute_limits.unit_type)
minimum_capacity_units = lookup(compute_limits.value, "minimum_capacity_units", local.default_scaling_policy.compute_limits.minimum_capacity_units)
maximum_capacity_units = lookup(compute_limits.value, "maximum_capacity_units", local.default_scaling_policy.compute_limits.maximum_capacity_units)
maximum_ondemand_capacity_units = lookup(compute_limits.value, "maximum_ondemand_capacity_units", local.default_scaling_policy.compute_limits.maximum_ondemand_capacity_units)
maximum_core_capacity_units = lookup(compute_limits.value, "maximum_core_capacity_units", local.default_scaling_policy.compute_limits.maximum_core_capacity_units)
}
}
}