Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat duplicates #44

Merged
merged 5 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions metadata_updater/check_duplicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import xml.etree.ElementTree as ET
import os
import csv
import logging

logging.basicConfig(level=logging.INFO)

input_metadata_dir = 'C:/Users/ECheng/OneDrive - Land Information New Zealand/Desktop/Repo/Output'
files = [os.path.join(input_metadata_dir , f) for f in os.listdir(input_metadata_dir ) if f.endswith('.iso.xml')] # get a list of all .iso files in the Output directory

def extract_guid(file_path):
try:
tree = ET.parse(file_path)
root = tree.getroot()
namespace_gmd = '{http://www.isotc211.org/2005/gmd}'
namespace_gco = '{http://www.isotc211.org/2005/gco}'
file_identifier = root.find(f'{namespace_gmd}fileIdentifier')
guid = file_identifier.find(f'{namespace_gco}CharacterString').text
return guid
except Exception as e:
print(f"Error parsing file {file_path}: {str(e)}")
return None

def find_duplicates(files):
unique_guids = set()
duplicate_guids = {}
guid_to_layer = {}
errors = []

for file in files:
guid = extract_guid(file)
if guid:
layer_name = os.path.basename(file).replace('.iso.xml', '')
if guid in unique_guids:
if guid not in duplicate_guids:
duplicate_guids[guid] = [guid_to_layer[guid]]
duplicate_guids[guid].append(layer_name)
else:
unique_guids.add(guid)
guid_to_layer[guid] = layer_name
else:
errors.append((file, 'Error parsing GUID'))

return duplicate_guids, errors

# Export duplicates to CSV
def export_duplicates_to_csv(duplicate_guids, errors):
with open('duplicates.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['GUID', 'Layer Names', 'Error Message']
writer = csv.writer(csvfile)
writer.writerow(fieldnames)

for guid, layer_names in duplicate_guids.items():
for layer_name in layer_names:
writer.writerow([guid, layer_name, ''])

for file_path, error in errors:
writer.writerow(['', file_path, error])

def main():
files = [os.path.join(input_metadata_dir , f) for f in os.listdir(input_metadata_dir ) if f.endswith('.iso.xml')]
duplicate_guids, errors = find_duplicates(files)
export_duplicates_to_csv(duplicate_guids, errors)
logging.info("Duplicates exported to duplicates.csv")

if __name__ == '__main__':
main()
33 changes: 19 additions & 14 deletions metadata_updater/metadata_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, cwd = None):
raise FileNotFoundError('Can not find config file')

with open(cwd, 'r') as f:
config = yaml.safe_load(f)
config = yaml.load(f, Loader=yaml.Loader)

# CONNECTION
if 'Connection' in config:
Expand Down Expand Up @@ -142,10 +142,12 @@ def get_metadata(layer, dir, overwrite):
file_exists(file_destination)

if layer.metadata != None:
layer.metadata.get_xml(file_destination)
return file_destination
else:
return None
try:
layer.metadata.get_xml(file_destination)
return file_destination
except Exception as e:
logger.error(f"No Metadata Found for: {layer}. Error: {e}")


def update_metadata(dest_file, mapping):
"""
Expand Down Expand Up @@ -331,15 +333,18 @@ def file_has_text(search_text, ignore_case, file):
if there are no changes to be made.
"""

with open(file, 'r') as f:
for line in f:
if ignore_case:
match = re.search(search_text, line, flags=re.IGNORECASE)
else:
match = re.search(search_text, line)
if match:
return True
return False
try:
with open(file, 'r') as f:
for line in f:
if ignore_case:
match = re.search(search_text, line, flags=re.IGNORECASE)
else:
match = re.search(search_text, line)
if match:
return True
return False
except FileNotFoundError as e:
logger.error(f"File not found: {file}. Error: {e}")

def create_backup(file, overwrite=False):
"""
Expand Down
Loading