forked from MeltanoLabs/tap-airbyte-wrapper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_syncs.py
123 lines (100 loc) · 4.75 KB
/
test_syncs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Copyright (c) 2022 Alex Butler
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
# to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
import io
import json
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
import orjson
from tap_airbyte.tap import TapAirbyte
def test_weather_sync():
"""Run a sync and compare the output to a fixture derived from a public dataset.
This test provides a very strong guarantee that the tap is working as expected."""
tap = TapAirbyte(
config={
"airbyte_spec": {"airbyte_source_executable": "python venv/src/source-pokeapi/airbyte-integrations/connectors/source-file/main.py"},
"airbyte_config": {
"dataset_name": "test",
"format": "csv",
"url": "https://raw.githubusercontent.com/fivethirtyeight/data/master/us-weather-history/KPHX.csv",
"provider": {
"storage": "HTTPS",
"user_agent": True,
},
},
},
parse_env_config=True,
)
tap.ORJSON_OPTS |= orjson.OPT_SORT_KEYS
FIXTURE = Path(__file__).parent.joinpath("fixtures", "KPHX.singer")
SINGER_DUMP = FIXTURE.read_text()
stdout = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(stdout), redirect_stderr(stderr):
tap.sync_all()
stdout.seek(0), stderr.seek(0)
inp = stdout.readlines()
dmp = SINGER_DUMP.splitlines()
assert len(inp) == len(dmp), f"Expected {len(dmp)} stdout lines, got {len(inp)}"
for no, test_case, baseline in enumerate(zip(stdout.readlines(), SINGER_DUMP.splitlines())):
try:
parsed_test_case, parsed_baseline = json.loads(test_case), json.loads(baseline)
if parsed_test_case["type"] == "RECORD":
assert (
parsed_baseline["type"] == "RECORD"
), f"Parsed message at line {no} is not a record but the test input is"
parsed_baseline.pop("time_extracted", None)
parsed_test_case.pop("time_extracted", None)
assert (
parsed_baseline == parsed_test_case
), f"{no}: {parsed_baseline} != {parsed_test_case}"
except json.JSONDecodeError:
pass
def test_poke_sync():
"""Run a sync and compare the output to a fixture derived from a public dataset.
This test provides a very strong guarantee that the tap is working as expected."""
tap = TapAirbyte(
config={
"airbyte_spec": {"airbyte_source_executable": "python venv/src/source-pokeapi/airbyte-integrations/connectors/source-pokeapi/main.py"},
"airbyte_config": {
# sketch -> spore, endeavor, extreme speed, destiny bond w/ focus sash
# if you know, you know.
"pokemon_name": "smeargle",
},
},
)
tap.ORJSON_OPTS |= orjson.OPT_SORT_KEYS
FIXTURE = Path(__file__).parent.joinpath("fixtures", "SMEARGLE.singer")
SINGER_DUMP = FIXTURE.read_text()
stdout = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
stderr = io.TextIOWrapper(io.BytesIO(), encoding="utf-8")
with redirect_stdout(stdout), redirect_stderr(stderr):
tap.sync_all()
stdout.seek(0), stderr.seek(0)
inp = stdout.readlines()
dmp = SINGER_DUMP.splitlines()
assert len(inp) == len(dmp), f"Expected {len(dmp)} stdout lines, got {len(inp)}"
for no, test_case, baseline in enumerate(zip(stdout.readlines(), SINGER_DUMP.splitlines())):
try:
parsed_test_case, parsed_baseline = json.loads(test_case), json.loads(baseline)
if parsed_test_case["type"] == "RECORD":
assert (
parsed_baseline["type"] == "RECORD"
), f"Parsed message at line {no} is not a record but the test input is"
parsed_baseline.pop("time_extracted", None)
parsed_test_case.pop("time_extracted", None)
assert (
parsed_baseline == parsed_test_case
), f"{no}: {parsed_baseline} != {parsed_test_case}"
except json.JSONDecodeError:
pass
if __name__ == "__main__":
test_weather_sync()
test_poke_sync()