diff --git a/README.md b/README.md index 7d6e3ad..7d6fa6c 100644 --- a/README.md +++ b/README.md @@ -152,7 +152,7 @@ A description of the properties in the file: - **entity_service_url** - The RESTful service endpoint for the anonlink-entity-service. - **matching_threshold** - The threshold for considering a potential set of - records a match when comparing in anonlink. + records a match when comparing in anonlink. This can either be a single number between 0 and 1 or a list of numbers between 0 and 1 - **mongo_uri** - The URI to use when connecting to MongoDB to store or access results. For details on the URI structure, consult the [Connection String URI Format documentation](https://docs.mongodb.com/manual/reference/connection-string/) @@ -208,9 +208,9 @@ for individuals: ``` output/ - site_a.csv - site_b.csv - site_c.csv + site_a.zip + site_b.zip + site_c.zip ... ``` @@ -218,9 +218,9 @@ And the second example, for households: ``` output/ - site_a_households.csv - site_b_households.csv - site_c_households.csv + site_a_households.zip + site_b_households.zip + site_c_households.zip ... ``` @@ -248,8 +248,11 @@ This project is a set of python scripts driven by a central configuration file, matching information and use it to generate LINK_IDs, which are written to a CSV file in the configured results folder. 1. Once all LINK_IDs have been created, run `data_owner_ids.py` which will - create a file per data owner that can be sent with only information on their - LINK_IDs. + create one ZIP file per data owner. That file will contain a metadata file + and a CSV file with only information on their LINK_IDs. + +`projects.py`, `match.py` and `link_ids.py` will also generate JSON metadata +files that contain information about the corresponding process. #### Example system folder hierarchy: diff --git a/config-metadata.json b/config-metadata.json new file mode 100644 index 0000000..def3f4a --- /dev/null +++ b/config-metadata.json @@ -0,0 +1,17 @@ +{ + "systems": ["site_a", "site_b", "site_c", "site_d", "site_e", "site_f"], + "projects": ["name-sex-dob-phone", "name-sex-dob-zip", "name-sex-dob-addr"], + "matching_threshold": [0.75, 0.75, 0.75], + "schema_folder": "test-data/envs/small/schema", + "inbox_folder": "test-data/envs/metadata-test/inbox", + "project_results_folder": "test-data/envs/metadata-test/project_results", + "matching_results_folder": "test-data/envs/metadata-test/results", + "output_folder": "test-data/envs/metadata-test/output", + "entity_service_url": "http://localhost:8851/api/v1", + "mongo_uri": "localhost:27017", + "blocked": false, + "blocking_schema": "test-data/envs/small/schema/blocking-schema/lambda.json", + "household_match": false, + "household_threshold": 0.65, + "household_schema": "test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json" +} diff --git a/data_owner_ids.py b/data_owner_ids.py index 2ac22c3..2f91f12 100755 --- a/data_owner_ids.py +++ b/data_owner_ids.py @@ -2,6 +2,12 @@ import argparse import csv +import json +import os +import shutil +import uuid +import zipfile +from datetime import datetime from pathlib import Path from dcctools.config import Configuration @@ -18,30 +24,107 @@ def parse_args(): return args -def process_csv(csv_path, system_csv_path, system): - with open(csv_path) as csvfile: +def zip_and_clean(system_output_path, system, timestamp, household_match): + if household_match: + zip_path = system_output_path.parent / f"{system}_households.zip" + else: + zip_path = system_output_path.parent / f"{system}.zip" + with zipfile.ZipFile(zip_path, mode="w") as system_archive: + if household_match: + system_archive.write( + system_output_path / "households" / f"{system}_households.csv", + arcname=str(Path("output") / "households" / f"{system}_households.csv"), + ) + else: + system_archive.write( + system_output_path / f"{system}.csv", + arcname=str(Path("output") / f"{system}.csv"), + ) + system_archive.write( + system_output_path / f"{system}-metadata{timestamp}.json", + arcname=str(Path("output") / f"{system}-metadata{timestamp}.json"), + ) + + print(zip_path.name, "created") + shutil.rmtree(system_output_path) + print("Uncompressed directory removed") + + +def process_output(link_id_path, output_path, system, metadata, household_match): + data_owner_id_time = datetime.now() + timestamp = data_owner_id_time.strftime("%Y%m%dT%H%M%S") + n_rows = 0 + with open(link_id_path) as csvfile: reader = csv.DictReader(csvfile) - with open(system_csv_path, "w", newline="") as system_csvfile: + if household_match: + csv_path = output_path / "households" / f"{system}_households.csv" + else: + csv_path = output_path / f"{system}.csv" + + with open(csv_path, "w", newline="") as system_csvfile: writer = csv.DictWriter(system_csvfile, fieldnames=["LINK_ID", system]) writer.writeheader() for row in reader: if len(row[system]) > 0: + n_rows += 1 writer.writerow({"LINK_ID": row["LINK_ID"], system: row[system]}) + system_metadata = { + "link_id_metadata": { + "creation_date": metadata["creation_date"], + "uuid1": metadata["uuid1"], + }, + "input_system_metadata": { + key: val for key, val in metadata["input_system_metadata"][system].items() + }, + "output_system_metadata": { + "creation_date": data_owner_id_time.isoformat(), + "number_of_records": n_rows, + "uuid1": str(uuid.uuid1()), + }, + } + with open( + output_path / f"{system}-metadata{timestamp}.json", "w", newline="" + ) as system_metadata_file: + json.dump(system_metadata, system_metadata_file, indent=2) + return timestamp def do_data_owner_ids(c): if c.household_match: - csv_path = Path(c.matching_results_folder) / "household_link_ids.csv" + link_ids = sorted( + Path(c.matching_results_folder).glob("household_link_ids*.csv") + ) + else: + link_ids = sorted(Path(c.matching_results_folder).glob("link_ids*.csv")) + + if len(link_ids) > 1: + print("More than one link_id file found") + print(link_ids) + link_id_times = [ + datetime.strptime(x.name[-19:-4], "%Y%m%dT%H%M%S") for x in link_ids + ] + most_recent = link_ids[link_id_times.index(max(link_id_times))] + print(f"Using most recent link_id file: {most_recent}") + + link_id_path = most_recent else: - csv_path = Path(c.matching_results_folder) / "link_ids.csv" + link_id_path = link_ids[0] + + link_id_metadata_name = link_id_path.parent / link_id_path.name.replace( + "link_ids", "link_id-metadata" + ).replace(".csv", ".json") + with open(link_id_metadata_name) as metadata_file: + metadata = json.load(metadata_file) for system in c.systems: + system_output_path = Path(c.output_folder) / "output" + os.makedirs(system_output_path, exist_ok=True) if c.household_match: - system_csv_path = Path(c.output_folder) / "{}_households.csv".format(system) - else: - system_csv_path = Path(c.output_folder) / "{}.csv".format(system) - process_csv(csv_path, system_csv_path, system) - print(f"{system_csv_path} created") + os.makedirs(system_output_path / "households", exist_ok=True) + timestamp = process_output( + link_id_path, system_output_path, system, metadata, c.household_match + ) + zip_and_clean(system_output_path, system, timestamp, c.household_match) if __name__ == "__main__": diff --git a/dcctools/config.py b/dcctools/config.py index dc225ca..bae59b5 100644 --- a/dcctools/config.py +++ b/dcctools/config.py @@ -2,6 +2,8 @@ import json import os import sys +import zipfile +from datetime import datetime, timedelta from pathlib import Path from zipfile import ZipFile @@ -41,10 +43,40 @@ def validate_config(self): ) return config_issues + def validate_metadata(self, system_path): + metadata_issues = [] + with zipfile.ZipFile(system_path) as archive: + metadata_files = [] + for fname in archive.namelist(): + if "metadata" in fname: + metadata_files.append(fname) + anchor = fname.rfind("T") + mname = fname[(anchor - 8) : (anchor + 7)] + timestamp = datetime.strptime(mname, "%Y%m%dT%H%M%S") + with archive.open(fname, "r") as metadata_fp: + metadata = json.load(metadata_fp) + garble_time = datetime.fromisoformat(metadata["creation_date"]) + if (garble_time - timestamp) >= timedelta(seconds=1): + metadata_issues.append( + f"{system_path.name} metadata timecode {timestamp} does " + "not match listed garble time {garble_time}" + ) + if len(metadata_files) == 0: + metadata_issues.append( + f"could not find metadata file within {system_path.name}" + ) + elif len(metadata_files) > 1: + metadata_issues.append( + f"Too many metadata files found in {system_path.name}:" + + "\n\t".join([metadata_file for metadata_file in metadata_files]) + ) + return metadata_issues + def validate_all_present(self): missing_paths = [] expected_paths = [] unexpected_paths = [] + metadata_issues = [] root_path = Path(self.filename).parent inbox_folder = root_path / self.config_json["inbox_folder"] for s in self.config_json["systems"]: @@ -54,12 +86,16 @@ def validate_all_present(self): expected_paths.append(household_zip_path) if not os.path.exists(household_zip_path): missing_paths.append(household_zip_path) + else: + metadata_issues.extend(self.validate_metadata(household_zip_path)) if os.path.exists(system_zip_path): unexpected_paths.append(system_zip_path) else: expected_paths.append(system_zip_path) if not os.path.exists(system_zip_path): missing_paths.append(system_zip_path) + else: + metadata_issues.extend(self.validate_metadata(system_zip_path)) if os.path.exists(household_zip_path): unexpected_paths.append(household_zip_path) if self.config_json["blocked"]: @@ -85,7 +121,7 @@ def validate_all_present(self): path_to_check = root_path / getattr(self, d) if not os.path.exists(path_to_check): missing_paths.append(path_to_check) - return (set(missing_paths), set(unexpected_paths)) + return set(missing_paths), set(unexpected_paths), metadata_issues @property def systems(self): @@ -124,6 +160,17 @@ def extract_blocks(self, system): with ZipFile(block_zip_path, mode="r") as block_zip: block_zip.extractall(Path(self.config_json["inbox_folder"]) / system) + def get_metadata(self, system): + archive_name = ( + f"{system}_households.zip" if self.household_match else f"{system}.zip" + ) + archive_path = Path(self.config_json["inbox_folder"]) / archive_name + with ZipFile(archive_path, mode="r") as archive: + for file_name in archive.namelist(): + if "metadata" in file_name: + with archive.open(file_name) as metadata_file: + return json.load(metadata_file) + def get_clk(self, system, project): clk_path = ( Path(self.config_json["inbox_folder"]) @@ -136,7 +183,11 @@ def get_clks_raw(self, system, project): clks = None clk_zip_path = Path(self.config_json["inbox_folder"]) / "{}.zip".format(system) with ZipFile(clk_zip_path, mode="r") as clk_zip: - with clk_zip.open("output/{}.json".format(project)) as clk_file: + for file_name in clk_zip.namelist(): + if f"{project}.json" in file_name: + project_file = file_name + break + with clk_zip.open(project_file) as clk_file: clks = clk_file.read() return clks @@ -146,7 +197,11 @@ def get_household_clks_raw(self, system, schema): self.config_json["inbox_folder"] ) / "{}_households.zip".format(system) with ZipFile(clk_zip_path, mode="r") as clk_zip: - with clk_zip.open("output/households/{}.json".format(schema)) as clk_file: + for file_name in clk_zip.namelist(): + if f"{schema}.json" in file_name: + project_file = file_name + break + with clk_zip.open(project_file) as clk_file: clks = clk_file.read() return clks @@ -190,7 +245,7 @@ def get_project_threshold(self, project_name): else self.matching_threshold ) if type(config_threshold) == list: - project_index = config_threshold.index(project_name) + project_index = self.projects.index(project_name) threshold = config_threshold[project_index] else: threshold = config_threshold diff --git a/link_ids.py b/link_ids.py index 84b3add..82990d3 100755 --- a/link_ids.py +++ b/link_ids.py @@ -34,6 +34,10 @@ def parse_args(): def do_link_ids(c, remove=False): + link_id_time = datetime.datetime.now() + timestamp = datetime.datetime.strftime(link_id_time, "%Y%m%dT%H%M%S") + n_records = 0 + client = MongoClient(c.mongo_uri) database = client.linkage_agent @@ -58,7 +62,9 @@ def do_link_ids(c, remove=False): all_ids_for_systems[system] = list(range(system_size)) if c.household_match: - result_csv_path = Path(c.matching_results_folder) / "household_link_ids.csv" + result_csv_path = ( + Path(c.matching_results_folder) / f"household_link_ids-{timestamp}.csv" + ) with open(result_csv_path, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=header) writer.writeheader() @@ -90,12 +96,32 @@ def do_link_ids(c, remove=False): final_record = {system: unmatched_id} final_records.append(final_record) + n_records += len(final_records) for record in final_records: record["LINK_ID"] = uuid.uuid1() writer.writerow(record) print(f"{result_csv_path} created") + + metadata_json_path = ( + Path(c.matching_results_folder) + / f"household_link_id-metadata-{timestamp}.json" + ) + with open(metadata_json_path, "w") as metadata_file: + metadata = { + "creation_date": link_id_time.isoformat(), + "number_of_records": n_records, + "uuid1": str(uuid.uuid1()), + "input_system_metadata": {}, + } + for system in c.systems: + system_metadata = c.get_metadata(system) + metadata["input_system_metadata"][system] = system_metadata + json.dump(metadata, metadata_file, indent=2) + + print(f"{metadata_json_path} created") + else: - result_csv_path = Path(c.matching_results_folder) / "link_ids.csv" + result_csv_path = Path(c.matching_results_folder) / f"link_ids-{timestamp}.csv" with open(result_csv_path, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=header) writer.writeheader() @@ -128,6 +154,7 @@ def do_link_ids(c, remove=False): final_record["LINK_ID"] = uuid.uuid1() individual_linkages.append(final_record) writer.writerow(final_record) + n_records += 1 for system, unmatched_ids in all_ids_for_systems.items(): for unmatched_id in unmatched_ids: @@ -135,8 +162,26 @@ def do_link_ids(c, remove=False): final_record["LINK_ID"] = uuid.uuid1() individual_linkages.append(final_record) writer.writerow(final_record) + n_records += 1 print(f"{result_csv_path} created") + metadata_json_path = ( + Path(c.matching_results_folder) / f"link_id-metadata-{timestamp}.json" + ) + with open(metadata_json_path, "w") as metadata_file: + metadata = { + "creation_date": link_id_time.isoformat(), + "number_of_records": n_records, + "uuid1": str(uuid.uuid1()), + "input_system_metadata": {}, + } + for system in c.systems: + system_metadata = c.get_metadata(system) + metadata["input_system_metadata"][system] = system_metadata + json.dump(metadata, metadata_file, indent=2) + + print(f"{metadata_json_path} created") + if remove: print("Removing records from database") database.match_groups.drop() diff --git a/match.py b/match.py index eb96a8c..944035d 100755 --- a/match.py +++ b/match.py @@ -1,8 +1,10 @@ #!/usr/bin/env python import argparse +import datetime import json import logging +import uuid from pathlib import Path from pymongo import MongoClient @@ -42,12 +44,20 @@ def parse_args(): def has_results_available(config, projects=None): - available_results = set( - map(lambda x: x.stem, Path(config.project_results_dir).glob("*.json")) - ) + available_results = set() expected_results = set(projects) if projects else set(config.projects) + project_to_filenames = {} + for expected_result in expected_results: + found_result = [ + x.name + for x in Path(config.project_results_dir).glob(f"{expected_result}*.json") + ] + if len(found_result) == 1: + found_project_name = found_result[0][: len(expected_result)] + project_to_filenames[expected_result] = found_result[0] + available_results.add(found_project_name) if expected_results <= available_results: - return True + return project_to_filenames else: raise MissingResults(expected_results, available_results) @@ -62,13 +72,28 @@ def do_match(config, projects=None): def do_matching(config, projects, collection): - has_results_available(config, projects) + match_time = datetime.datetime.now() + metadata = { + "creation_date": match_time.isoformat(), + "projects": projects, + "uuid1": str(uuid.uuid1()), + } + projects_to_filenames = has_results_available(config, projects) for project_name in projects: - with open(Path(config.project_results_dir) / f"{project_name}.json") as file: + with open( + Path(config.project_results_dir) / projects_to_filenames[project_name] + ) as file: result_json = json.load(file) + metadata[project_name] = { + "number_of_records": len(result_json.get("groups", [])) + } results = Results(config.systems, project_name, result_json) - print(f"Matching for project: {project_name}") results.insert_results(collection) + timestamp = match_time.strftime("%Y%m%dT%H%M%S") + with open( + Path(config.project_results_dir) / f"match-metadata-{timestamp}.json", "w+" + ) as metadata_file: + json.dump(metadata, metadata_file, indent=2) if __name__ == "__main__": diff --git a/projects.py b/projects.py index 25cf7db..4d9786e 100755 --- a/projects.py +++ b/projects.py @@ -1,9 +1,11 @@ #!/usr/bin/env python import argparse +import datetime import json import logging import time +import uuid from pathlib import Path from dcctools.anonlink import Project @@ -30,10 +32,21 @@ def parse_args(): def run_projects(c, project_name=None): + projects_start_time = datetime.datetime.now() + timestamp = projects_start_time.strftime("%y%m%d") + with open( + Path(c.project_results_dir) / f"project-metadata-{timestamp}.json", "w+" + ) as metadata_file: + metadata = { + "start-time": projects_start_time.isoformat(), + "projects": [], + "uuid1": str(uuid.uuid1()), + } + json.dump(metadata, metadata_file) if c.household_match: log.debug("Processing households") project_name = "fn-phone-addr-zip" - run_project(c, project_name, households=True) + run_project(c, timestamp, project_name, households=True) else: log.debug("Processing individuals") if c.blocked: @@ -42,13 +55,20 @@ def run_projects(c, project_name=None): c.extract_clks(system) c.extract_blocks(system) if project_name: - run_project(c, project_name) + run_project(c, timestamp, project_name) else: for project_name in c.load_schema().keys(): - run_project(c, project_name) + run_project(c, timestamp, project_name) -def run_project(c, project_name=None, households=False): +def run_project(c, metadata_timestamp, project_name=None, households=False): + with open( + Path(c.project_results_dir) / f"project-metadata-{metadata_timestamp}.json", "r" + ) as metadata_file: + metadata = json.load(metadata_file) + project_start_time = datetime.datetime.now() + metadata["projects"].append(project_name) + metadata[project_name] = {"start_time": project_start_time.isoformat()} schema = c.load_household_schema() if households else c.load_schema()[project_name] project = Project(project_name, schema, c.systems, c.entity_service_url, c.blocked) project.start_project() @@ -78,9 +98,18 @@ def run_project(c, project_name=None, households=False): time.sleep(SLEEP_TIME) print("\n--- Getting results ---\n") result_json = project.get_results() + metadata[project_name]["completion_time"] = datetime.datetime.now().isoformat() + metadata[project_name]["number_of_groups"] = len(result_json.get("groups", [])) + timestamp = project_start_time.strftime("%y%m%d") Path(c.project_results_dir).mkdir(parents=True, exist_ok=True) - with open(Path(c.project_results_dir) / f"{project_name}.json", "w") as json_file: + with open( + Path(c.project_results_dir) / f"{project_name}-{timestamp}.json", "w" + ) as json_file: json.dump(result_json, json_file) + with open( + Path(c.project_results_dir) / f"project-metadata-{metadata_timestamp}.json", "w" + ) as metadata_file: + json.dump(metadata, metadata_file, indent=2) if __name__ == "__main__": diff --git a/test-data/envs/small/inbox/hospital_001.zip b/test-data/envs/small/inbox/hospital_001.zip index 831ccea..1312e11 100644 Binary files a/test-data/envs/small/inbox/hospital_001.zip and b/test-data/envs/small/inbox/hospital_001.zip differ diff --git a/test-data/envs/small/inbox/hospital_001_households.zip b/test-data/envs/small/inbox/hospital_001_households.zip index 8e19fde..ea4875e 100644 Binary files a/test-data/envs/small/inbox/hospital_001_households.zip and b/test-data/envs/small/inbox/hospital_001_households.zip differ diff --git a/test-data/envs/small/inbox/hospital_002.zip b/test-data/envs/small/inbox/hospital_002.zip index 8aea158..93576fc 100644 Binary files a/test-data/envs/small/inbox/hospital_002.zip and b/test-data/envs/small/inbox/hospital_002.zip differ diff --git a/test-data/envs/small/inbox/hospital_002_households.zip b/test-data/envs/small/inbox/hospital_002_households.zip index 8e19fde..db9b3f6 100644 Binary files a/test-data/envs/small/inbox/hospital_002_households.zip and b/test-data/envs/small/inbox/hospital_002_households.zip differ diff --git a/test-data/envs/small/inbox/hospital_003.zip b/test-data/envs/small/inbox/hospital_003.zip index fc76db5..e0fe77d 100644 Binary files a/test-data/envs/small/inbox/hospital_003.zip and b/test-data/envs/small/inbox/hospital_003.zip differ diff --git a/test-data/envs/small/inbox/hospital_003_households.zip b/test-data/envs/small/inbox/hospital_003_households.zip index 8e19fde..b396adf 100644 Binary files a/test-data/envs/small/inbox/hospital_003_households.zip and b/test-data/envs/small/inbox/hospital_003_households.zip differ diff --git a/test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json b/test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json index f8a296e..b4d48b8 100644 --- a/test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json +++ b/test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json @@ -1,7 +1,7 @@ { "version": 3, "clkConfig": { - "l": 1024, + "l": 4096, "k": 30, "hash": { "type": "blakeHash" @@ -22,7 +22,7 @@ }, "hashing": { "strategy": { - "bitsPerFeature": 200 + "bitsPerFeature": 300 }, "hash": { "type": "blakeHash" @@ -42,7 +42,7 @@ }, "hashing": { "strategy": { - "bitsPerFeature": 200 + "bitsPerFeature": 300 }, "hash": { "type": "blakeHash" @@ -94,6 +94,10 @@ "positional": false } } + }, + { + "identifier": "record_ids", + "ignored": true } ] } diff --git a/validate.py b/validate.py index 6a87b33..50a1ac3 100755 --- a/validate.py +++ b/validate.py @@ -17,7 +17,9 @@ def parse_args(): def do_validate(c): - missing, unexpected = c.validate_all_present() + # N.B. at time of PR household files don't have metadata associated with them. + # Will be fixed in an upcoming patch to data-owner-tools + missing, unexpected, metadata_issues = c.validate_all_present() if len(missing) == 0: print("All necessary input is present") else: @@ -39,6 +41,16 @@ def do_validate(c): else: print("\nNo unexpected files are present") + if len(metadata_issues) == 0: + print("\nNo issues are present in the metadata files for each system") + else: + if len(metadata_issues) == 1: + print(f"\nWARNING: {metadata_issues[0]}") + else: + print("\nWARNING: multiple issues found with metadata files") + for issue in metadata_issues: + print(f"\t{issue}") + config_issues = c.validate_config() if config_issues: print(f"\nWARNING: Found {len(config_issues)} issues in config file:")