Skip to content

Commit 8a9aeb6

Browse files
clean up snapshots during data refresh based on snapshotValidityDays (#38)
* clean up snapshots during data refresh based on `snapshotValidityDays` * added validation for loadNumberOfDays, snapshotValidity
1 parent f6ac7bc commit 8a9aeb6

File tree

13 files changed

+314
-21
lines changed

13 files changed

+314
-21
lines changed
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"workflow": [
3+
{
4+
"type": "ConfigureSources",
5+
"name": "ConfigureSources",
6+
"configFiles": [
7+
"config/config1.rel"
8+
],
9+
"defaultContainer": "input",
10+
"sources": [
11+
{
12+
"relation": "device_seen_snapshot",
13+
"isChunkPartitioned": true,
14+
"isDatePartitioned": true,
15+
"relativePath": "device_seen_snapshot",
16+
"inputFormat": "csv",
17+
"loadsNumberOfDays": 1,
18+
"offsetByNumberOfDays": 0,
19+
"snapshotValidityDays": 1
20+
}
21+
]
22+
}
23+
]
24+
}
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"workflow": [
3+
{
4+
"type": "ConfigureSources",
5+
"name": "ConfigureSources",
6+
"configFiles": [
7+
"config/config1.rel"
8+
],
9+
"defaultContainer": "input",
10+
"sources": [
11+
{
12+
"relation": "device_seen_snapshot",
13+
"isChunkPartitioned": true,
14+
"isDatePartitioned": true,
15+
"relativePath": "device_seen_snapshot",
16+
"inputFormat": "csv",
17+
"loadsNumberOfDays": 1,
18+
"offsetByNumberOfDays": 0,
19+
"snapshotValidityDays": 2
20+
}
21+
]
22+
}
23+
]
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
device,last_seen
2+
IPhone1,2021-12-31
3+
IPhone2,2021-12-30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
device,last_seen
2+
IPhone6,2021-11-16
3+
IPhone7,2021-11-17

cli-e2e-test/test_e2e.py

+57
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,63 @@ def test_scenario6_model_two_partitions_overriden_by_one(self):
200200
self.assertEqual(rsp_json, [{'partition': 1, 'relation': 'product_data'},
201201
{'partition': 2, 'relation': 'product_data'}])
202202

203+
def test_scenario7_model_1_day_snapshot_2_day_declared_1_day_out_of_range(self):
204+
# when
205+
test_args = ["--batch-config", "./config/model/scenario7.json"]
206+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220102", "--drop-db"])
207+
# then
208+
self.assertNotEqual(rsp, 1)
209+
# and when
210+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220104"])
211+
# then
212+
self.assertNotEqual(rsp, 1)
213+
rai_config = self.resource_manager.get_rai_config()
214+
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
215+
self.assertEqual(rsp_json, [{'partition': 2022010200001, 'relation': 'device_seen_snapshot'}])
216+
217+
def test_scenario7_model_1_day_snapshot_1_day_declared_1_day_out_of_range(self):
218+
# when
219+
test_args = ["--batch-config", "./config/model/scenario7.json"]
220+
rsp = call(self.cmd_with_common_arguments + test_args + ["--start-date", "20220103", "--end-date", "20220104",
221+
"--drop-db"])
222+
# then
223+
self.assertNotEqual(rsp, 1)
224+
# and when
225+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220105"])
226+
# then
227+
self.assertNotEqual(rsp, 1)
228+
rai_config = self.resource_manager.get_rai_config()
229+
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
230+
self.assertEqual(rsp_json, {})
231+
232+
def test_scenario7_model_1_day_snapshot_1_day_declared_0_days_out_of_range(self):
233+
# when
234+
test_args = ["--batch-config", "./config/model/scenario7.json"]
235+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220105", "--drop-db"])
236+
# then
237+
self.assertNotEqual(rsp, 1)
238+
# and when
239+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220105"])
240+
# then
241+
self.assertNotEqual(rsp, 1)
242+
rai_config = self.resource_manager.get_rai_config()
243+
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
244+
self.assertEqual(rsp_json, {})
245+
246+
def test_scenario8_model_2_day_snapshot_1_day_declared_1_days_out_of_range(self):
247+
# when
248+
test_args = ["--batch-config", "./config/model/scenario8.json"]
249+
rsp = call(self.cmd_with_common_arguments + test_args + ["--end-date", "20220103", "--drop-db"])
250+
# then
251+
self.assertNotEqual(rsp, 1)
252+
# and when
253+
rsp = call(self.cmd_with_common_arguments + test_args + ["--start-date", "20220104", "--end-date", "20220105"])
254+
# then
255+
self.assertNotEqual(rsp, 1)
256+
rai_config = self.resource_manager.get_rai_config()
257+
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
258+
self.assertEqual(rsp_json, [{'partition': 2022010300001, 'relation': 'device_seen_snapshot'}])
259+
203260
@classmethod
204261
def setUpClass(cls) -> None:
205262
# Make sure output folder is empty since the folder share across repository. Remove README.md, other files left.

cli/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive
66
1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md).
77
2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file:
88
```txt
9-
rai-workflow-manager==0.0.22
9+
rai-workflow-manager==0.0.23
1010
```
1111
3. Build the project:
1212
```bash

rel/source_configs/config.rel

+40
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,46 @@ def missing_resources_json(:[], i, :dates, :[], j, :resources, :[], k, :part_ind
419419
from s, r, d
420420
}
421421

422+
/*
423+
* Declared date partitioned sources
424+
*/
425+
module declared_date_partitioned_source
426+
427+
def path_on_date(rel, d, p) {
428+
source_declares_resource(rel, _, p) and
429+
r = ^RelationName[rel] . rel_name:identifies . relation:identifies . source:declares and
430+
resource:id(r, ^URI[p]) and
431+
uri:identifies(u, r) and
432+
d = uri:parse[u, "date"]
433+
from r, u
434+
}
435+
436+
def date(s, d) = path_on_date(s, d, _)
437+
438+
def id(s) = date(s, _)
439+
440+
def index = enumerate[id]
441+
def date_idx = enumerate[date]
442+
def path_idx = enumerate[path_on_date]
443+
444+
def json(:[], idx, :source, s) {
445+
index(idx, s)
446+
}
447+
448+
def json(:[], idx, :dates, :[], d_idx, :date, d) {
449+
index(idx, s) and
450+
date_idx(d_idx, s, d)
451+
from s
452+
}
453+
454+
def json(:[], idx, :dates, :[], d_idx, :paths, :[], p_idx, p) {
455+
index(idx, s) and
456+
date_idx(d_idx, s, d) and
457+
path_idx(p_idx, s, d, p)
458+
from s, d
459+
}
460+
end
461+
422462
/**
423463
*/
424464

rel/source_configs/data_reload.rel

+25-8
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@ bound force_reimport_not_chunk_partitioned
33
bound resources_data_to_delete
44
bound declared_sources_to_delete = String, String
55
bound new_source_config_csv = RelName, FilePos, String
6+
bound expired_source_config_csv = RelName, FilePos, String
67

78
def new_source_config[:syntax, :header_row] = -1
89
def new_source_config[:syntax, :header] = (1, :Relation); (2, :Path); (3, :ChunkPartitioned)
910
def new_source_config[:schema, :Relation] = "string"
1011
def new_source_config[:schema, :Path] = "string"
1112
def new_source_config[:schema, :ChunkPartitioned] = "string"
1213

14+
def expired_source_config[:syntax, :header_row] = -1
15+
def expired_source_config[:syntax, :header] = (1, :Relation); (2, :Path)
16+
def expired_source_config[:schema, :Relation] = "string"
17+
def expired_source_config[:schema, :Path] = "string"
18+
1319
def chunk_partitioned_sources(rel, path, p_idx) {
1420
new_source_config_csv(:Path, i, path) and
1521
new_source_config_csv(:Relation, i, rel) and
@@ -25,6 +31,12 @@ def simple_sources(rel, path) {
2531
from i
2632
}
2733

34+
def expired_sources(rel, path) {
35+
expired_source_config_csv(:Path, i, path) and
36+
expired_source_config_csv(:Relation, i, rel)
37+
from i
38+
}
39+
2840
/*
2941
* All simple sources are affected if they match with declared sources.
3042
*/
@@ -57,25 +69,30 @@ def potentially_affected_sources(rel, o_path, p_idx) {
5769
from n_path, res
5870
}
5971
/*
60-
* Identify sources for replacement.
72+
* Identify sources for invalidation.
6173
*/
62-
def part_resource_to_replace(rel, p_idx, path) {
74+
def part_resource_to_invalidate(rel, p_idx, path) {
6375
not force_reimport and
6476
potentially_affected_sources(rel, path, p_idx) and
6577
not chunk_partitioned_sources(rel, path, p_idx)
6678
}
6779

68-
def part_resource_to_replace(rel, p_idx, path) {
80+
def part_resource_to_invalidate(rel, p_idx, path) {
6981
force_reimport and
7082
potentially_affected_sources(rel, path, p_idx)
7183
}
84+
// We support invalidation only for partitioned expired sources
85+
def part_resource_to_invalidate(rel, p_idx, path) {
86+
expired_sources(rel, path) and
87+
part_resource:parse_part_index[path](p_idx)
88+
}
7289

73-
def resource_to_replace(rel, o_path) {
90+
def resource_to_invalidate(rel, o_path) {
7491
force_reimport_not_chunk_partitioned and
7592
simple_sources(rel, o_path)
7693
}
7794

78-
def resource_to_replace(rel, o_path) {
95+
def resource_to_invalidate(rel, o_path) {
7996
force_reimport and
8097
simple_sources(rel, o_path)
8198
}
@@ -84,16 +101,16 @@ def resource_to_replace(rel, o_path) {
84101
* Save resources that we are marked for data reimport.
85102
*/
86103
// PRODUCT_LIMITATION: Message: Argument for specializing is more complex than the current staging implementation supports.
87-
def reverse_part_resource_to_replace(path, p_idx, rel) = part_resource_to_replace(rel, p_idx, path)
104+
def reverse_part_resource_to_invalidate(path, p_idx, rel) = part_resource_to_invalidate(rel, p_idx, path)
88105
// resource partitions to delete
89106
def resources_to_delete(rel, val) {
90-
rel = #(reverse_part_resource_to_replace[_, p_idx]) and
107+
rel = #(reverse_part_resource_to_invalidate[_, p_idx]) and
91108
p_idx = ^PartIndex[val]
92109
from p_idx
93110
}
94111
// resources to delete
95112
def resources_to_delete(rel) {
96-
rel = #(transpose[resource_to_replace][_])
113+
rel = #(transpose[resource_to_invalidate][_])
97114
}
98115

99116
def resources_data_to_delete_idx = enumerate[resources_data_to_delete]

test/test_cfg_src_step.py

+92-2
Original file line numberDiff line numberDiff line change
@@ -397,13 +397,103 @@ def test_inflate_sources_snapshot_1day_multiple_paths(self):
397397
]
398398
self.assertEqual(expected_paths, test_src.paths)
399399

400+
def test_calculate_expired_sources_1_day_snapshot_1_day_declared_1_day_out_of_range(self):
401+
# setup
402+
test_src = _create_test_source(
403+
snapshot_validity_days=1
404+
)
405+
declared_sources = {
406+
"test": {
407+
"source": "test",
408+
"dates": [
409+
{
410+
"paths": [
411+
"/test/data_dt=20220104/part-1.csv"
412+
],
413+
"date": "20220104"
414+
}
415+
]
416+
}
417+
}
418+
paths_builder = Mock()
419+
workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, "20220106")
420+
# when
421+
expired_source = workflow_step._calculate_expired_sources(self.logger, declared_sources)
422+
# then
423+
expected_sources = [("test", "/test/data_dt=20220104/part-1.csv")]
424+
self.assertEqual(expected_sources, expired_source)
425+
426+
def test_calculate_expired_sources_1_day_snapshot_2_day_declared_1_day_out_of_range(self):
427+
# setup
428+
test_src = _create_test_source(
429+
snapshot_validity_days=1
430+
)
431+
declared_sources = {
432+
"test": {
433+
"source": "test",
434+
"dates": [
435+
{
436+
"paths": [
437+
"/test/data_dt=20220104/part-1.csv"
438+
],
439+
"date": "20220104"
440+
},
441+
{
442+
"paths": [
443+
"/test/data_dt=20220105/part-1.csv"
444+
],
445+
"date": "20220105"
446+
}
447+
]
448+
}
449+
}
450+
paths_builder = Mock()
451+
workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, "20220106")
452+
# when
453+
expired_source = workflow_step._calculate_expired_sources(self.logger, declared_sources)
454+
# then
455+
expected_sources = [("test", "/test/data_dt=20220104/part-1.csv")]
456+
self.assertEqual(expected_sources, expired_source)
457+
458+
def test_calculate_expired_sources_1_day_snapshot_2_day_declared_0_day_out_of_range(self):
459+
# setup
460+
test_src = _create_test_source(
461+
snapshot_validity_days=1
462+
)
463+
declared_sources = {
464+
"test": {
465+
"source": "test",
466+
"dates": [
467+
{
468+
"paths": [
469+
"/test/data_dt=20220104/part-1.csv"
470+
],
471+
"date": "20220104"
472+
},
473+
{
474+
"paths": [
475+
"/test/data_dt=20220105/part-1.csv"
476+
],
477+
"date": "20220105"
478+
}
479+
]
480+
}
481+
}
482+
paths_builder = Mock()
483+
workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, "20220105")
484+
# when
485+
expired_source = workflow_step._calculate_expired_sources(self.logger, declared_sources)
486+
# then
487+
expected_sources = []
488+
self.assertEqual(expected_sources, expired_source)
489+
400490

401491
def _create_test_source(is_chunk_partitioned: bool = True, is_date_partitioned: bool = True,
402-
loads_number_of_days: int = 1, offset_by_number_of_days: int = 0,
492+
loads_number_of_days: int = 1, offset_by_number_of_days: int = 0, relation="test",
403493
snapshot_validity_days=None) -> Source:
404494
return Source(
405495
container=Container("default", ContainerType.LOCAL, {}),
406-
relation="test",
496+
relation=relation,
407497
relative_path="test",
408498
input_format="test",
409499
extensions=["test"],

workflow/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version_info__ = (0, 0, 22)
15+
__version_info__ = (0, 0, 23)
1616
__version__ = ".".join(map(str, __version_info__))

workflow/constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
RESOURCES_TO_DELETE_REL = "resources_data_to_delete_json"
3030
WORKFLOW_JSON_REL = "workflow_json"
3131
BATCH_CONFIG_REL = "batch:config"
32+
DECLARED_DATE_PARTITIONED_SOURCE_REL = "declared_date_partitioned_source:json"
3233

3334
FILE_LOAD_RELATION = {
3435
"CSV": "load_csv",

0 commit comments

Comments
 (0)