forked from wellcomecollection/platform
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shared_conftest.py
274 lines (209 loc) · 7.43 KB
/
shared_conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# -*- encoding: utf-8 -*-
"""
Global py.test configuration for our Lambdas.
"""
import os
import random
import string
import boto3
import pytest
import requests
def pytest_runtest_setup(item):
# Set a default region before we start running tests.
#
# Without this line, boto3 complains about not having a region defined
# (despite one being passed in the Travis env variables/local config).
# TODO: Investigate this properly.
boto3.setup_default_session(region_name="eu-west-1")
@pytest.fixture
def random_alpha():
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
root_docker_compose = pytestconfig.rootdir.join("docker-compose.yml")
src_docker_compose = pytestconfig.rootdir.join("src", "docker-compose.yml")
if root_docker_compose.exists():
return root_docker_compose
elif src_docker_compose.exists():
return src_docker_compose
else:
assert False, "Cannot find docker-compose file!"
def _is_responsive(endpoint_url, condition):
def is_responsive():
try:
resp = requests.get(endpoint_url)
if condition(resp):
return True
except requests.exceptions.ConnectionError:
return False
return is_responsive
@pytest.fixture(scope="session")
def dynamodb_client(docker_services, docker_ip):
endpoint_url = f'http://{docker_ip}:{docker_services.port_for("dynamodb", 8000)}'
docker_services.wait_until_responsive(
timeout=5.0,
pause=0.1,
check=_is_responsive(
endpoint_url,
lambda r: (
r.status_code == 400
and r.json()["__type"]
== "com.amazonaws.dynamodb.v20120810#MissingAuthenticationToken"
),
),
)
yield boto3.client(
"dynamodb",
aws_access_key_id="testAccessKey",
aws_secret_access_key="testSecretAccessKey",
endpoint_url=endpoint_url,
)
@pytest.fixture(scope="session")
def dynamodb_resource(docker_services, docker_ip):
endpoint_url = f'http://{docker_ip}:{docker_services.port_for("dynamodb", 8000)}'
docker_services.wait_until_responsive(
timeout=5.0,
pause=0.1,
check=_is_responsive(
endpoint_url,
lambda r: (
r.status_code == 400
and r.json()["__type"]
== "com.amazonaws.dynamodb.v20120810#MissingAuthenticationToken"
),
),
)
yield boto3.resource(
"dynamodb",
aws_access_key_id="testAccessKey",
aws_secret_access_key="testSecretAccessKey",
endpoint_url=endpoint_url,
)
@pytest.fixture(scope="session")
def s3_endpoint_url(docker_services, docker_ip):
return f'http://{docker_ip}:{docker_services.port_for("s3", 8000)}'
@pytest.fixture(scope="session")
def s3_client(s3_endpoint_url, docker_services):
docker_services.wait_until_responsive(
timeout=10.0,
pause=0.1,
check=_is_responsive(
s3_endpoint_url,
lambda r: (r.status_code == 403 and "<Code>AccessDenied</Code>" in r.text),
),
)
# These credentials are required for the scality/s3server image we use.
yield boto3.client(
"s3",
aws_access_key_id="accessKey1",
aws_secret_access_key="verySecretKey1",
endpoint_url=s3_endpoint_url,
)
@pytest.fixture(scope="session")
def sqs_endpoint_url(docker_services, docker_ip):
return f'http://{docker_ip}:{docker_services.port_for("sqs", 9324)}'
@pytest.fixture(scope="session")
def sqs_client(sqs_endpoint_url, docker_services):
docker_services.wait_until_responsive(
timeout=45.0,
pause=0.1,
check=_is_responsive(sqs_endpoint_url, lambda r: r.status_code == 404),
)
sqs_client = boto3.client("sqs", endpoint_url=sqs_endpoint_url)
yield sqs_client
@pytest.fixture(scope="session")
def sns_endpoint_url(docker_services, docker_ip):
return f'http://{docker_ip}:{docker_services.port_for("sns", 9292)}'
@pytest.fixture(scope="session")
def sns_client(sns_endpoint_url, docker_services):
docker_services.wait_until_responsive(
timeout=5.0,
pause=0.1,
check=_is_responsive(sns_endpoint_url, lambda r: r.status_code == 200),
)
client = boto3.client(
"sns",
aws_access_key_id="testAccessKey",
aws_secret_access_key="testSecretAccessKey",
endpoint_url=sns_endpoint_url,
)
# This is a sample returned by the fake-sns implementation:
# ---
# topics:
# - arn: arn:aws:sns:us-east-1:123456789012:es_ingest
# name: es_ingest
# - arn: arn:aws:sns:us-east-1:123456789012:id_minter
# name: id_minter
# messages:
# - :id: acbca1e1-e3c5-4c74-86af-06a9418e8fe4
# :subject: Foo
# :message: '{"foo": "bar"}'
# :topic_arn: arn:aws:sns:us-east-1:123456789012:id_minter
# :structure:
# :target_arn:
# :received_at: 2017-04-18 13:20:45.289912607 +00:00
def list_messages():
import json
import yaml
resp = requests.get(sns_endpoint_url)
data = yaml.safe_load(resp.text)["messages"]
for d in data:
d[":message"] = json.loads(d[":message"])
try:
d[":message"] = json.loads(d[":message"]["default"])
except KeyError:
pass
return data
# We monkey-patch this method into the SNS client, so it's available
# in tests. Dynamism FTW.
client.list_messages = list_messages
yield client
@pytest.fixture
def topic_arn(sns_client, docker_services, docker_ip):
"""Creates an SNS topic, and yields the new topic ARN."""
topic_name = "test-lambda-topic"
resp = sns_client.create_topic(Name=topic_name)
topic_arn = resp["TopicArn"]
# Our Lambdas all read their topic ARN from the environment, so we
# set it here.
os.environ.update({"TOPIC_ARN": topic_arn})
yield topic_arn
del os.environ["TOPIC_ARN"]
# This clears all the messages on the topic at the end of the test,
# so the next test gets an empty topic.
endpoint_url = f'http://{docker_ip}:{docker_services.port_for("sns", 9292)}'
requests.delete(f"{endpoint_url}/messages")
@pytest.fixture
def queue_url(sqs_client):
"""
Creates an SQS queue and yields the new queue URL.
"""
queue_name = "test-lambda-queue"
resp = sqs_client.create_queue(QueueName=queue_name)
queue_url = resp["QueueUrl"]
yield queue_url
sqs_client.delete_queue(QueueUrl=queue_url)
@pytest.fixture
def bucket(s3_client):
bucket_name = "test-python-bucket-%d" % random.randint(0, 10000)
s3_client.create_bucket(Bucket=bucket_name)
yield bucket_name
@pytest.fixture
def elasticsearch_hostname(docker_ip):
return docker_ip
@pytest.fixture
def elasticsearch_url(docker_services, elasticsearch_hostname):
return f'http://{elasticsearch_hostname}:{docker_services.port_for("elasticsearch", 9200)}'
@pytest.fixture
def elasticsearch_index(docker_services, elasticsearch_url, random_alpha):
docker_services.wait_until_responsive(
timeout=60.0,
pause=0.1,
check=_is_responsive(elasticsearch_url, lambda r: r.status_code == 401),
)
index_name = random_alpha
resp = requests.put(
f"{elasticsearch_url}/{index_name}", auth=("elastic", "changeme")
)
resp.raise_for_status()
yield index_name