From 762c6fa1915a3568a678117b404a4238754c6bae Mon Sep 17 00:00:00 2001 From: AlexanderJuestel Date: Mon, 22 Jul 2024 20:49:27 +0200 Subject: [PATCH] Format misc.py --- gemgis/misc.py | 874 ++++++++++++++++++++++++++++++------------------- 1 file changed, 537 insertions(+), 337 deletions(-) diff --git a/gemgis/misc.py b/gemgis/misc.py index 16f895bb..bdb7a2cb 100644 --- a/gemgis/misc.py +++ b/gemgis/misc.py @@ -31,8 +31,8 @@ # Borehole logs can be requested at no charge from the Geological Survey from the database DABO: # https://www.gd.nrw.de/gd_archive_dabo.htm -def load_pdf(path: str, - save_as_txt: bool = True) -> str: + +def load_pdf(path: str, save_as_txt: bool = True) -> str: """ Load PDF file containing borehole data. @@ -73,17 +73,21 @@ def load_pdf(path: str, try: import pypdf except ModuleNotFoundError: - raise ModuleNotFoundError('PyPDF package is not installed. Use pip install pypdf to install the latest version') + raise ModuleNotFoundError( + "PyPDF package is not installed. Use pip install pypdf to install the latest version" + ) # Trying to import tqdm but returning error if tqdm is not installed try: from tqdm import tqdm except ModuleNotFoundError: - raise ModuleNotFoundError('tqdm package is not installed. Use pip install tqdm to install the latest version') + raise ModuleNotFoundError( + "tqdm package is not installed. Use pip install tqdm to install the latest version" + ) # Checking that the file path is of type string if not isinstance(path, str): - raise TypeError('Path/Name must be of type string') + raise TypeError("Path/Name must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -94,14 +98,14 @@ def load_pdf(path: str, # Checking that the file exists if not os.path.exists(path): - raise FileNotFoundError('File not found') + raise FileNotFoundError("File not found") # Checking that save_as_bool is of type bool if not isinstance(save_as_txt, bool): - raise TypeError('Save_as_txt variable must be of type bool') + raise TypeError("Save_as_txt variable must be of type bool") # Open the file as binary object - data = open(path, 'rb') + data = open(path, "rb") # Create new PdfFileReader object filereader = pypdf.PdfReader(data) @@ -110,7 +114,7 @@ def load_pdf(path: str, number_of_pages = len(filereader.pages) # Create empty string to store page content - page_content = '' + page_content = "" # Retrieve page content for each page for i in tqdm(range(number_of_pages)): @@ -121,14 +125,14 @@ def load_pdf(path: str, # Saving a txt-file of the retrieved page content for further usage if save_as_txt: # Split path to get original file name - name = path.split('.pdf')[0] + name = path.split(".pdf")[0] # Open new text file - with open(name + '.txt', "w") as text_file: + with open(name + ".txt", "w") as text_file: text_file.write(page_content) # Print out message if saving was successful - print('%s.txt successfully saved' % name) + print("%s.txt successfully saved" % name) return page_content @@ -181,7 +185,7 @@ def load_symbols(path: str) -> list: # Checking that the path is of type string if not isinstance(path, str): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -192,11 +196,11 @@ def load_symbols(path: str) -> list: # Checking that the file exists if not os.path.exists(path): - raise FileNotFoundError('File not found') + raise FileNotFoundError("File not found") # Opening file with open(path, "r") as text_file: - symbols = [(i, '') for i in text_file.read().splitlines()] + symbols = [(i, "") for i in text_file.read().splitlines()] return symbols @@ -237,7 +241,7 @@ def load_formations(path: str) -> list: # Checking that the path is of type string if not isinstance(path, str): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -248,13 +252,15 @@ def load_formations(path: str) -> list: # Checking that the file exists if not os.path.exists(path): - raise FileNotFoundError('File not found') + raise FileNotFoundError("File not found") # Opening file with open(path, "rb") as text_file: formations = text_file.read().decode("UTF-8").split() - formations = [(formations[i], formations[i + 1]) for i in range(0, len(formations) - 1, 2)] + formations = [ + (formations[i], formations[i + 1]) for i in range(0, len(formations) - 1, 2) + ] return formations @@ -319,155 +325,181 @@ def get_meta_data(page: List[str]) -> list: # Checking that the data is of type list if not isinstance(page, list): - raise TypeError('Page must be of type list') + raise TypeError("Page must be of type list") # Checking that all elements are of type str if not all(isinstance(n, str) for n in page): - raise TypeError('All elements of the list must be of type str') + raise TypeError("All elements of the list must be of type str") # Obtaining DABO Number - well_dabo = page[page.index('Bnum:') + 1:page.index('Bnum:') + 2] - well_dabo = ''.join(well_dabo) - well_dabo = well_dabo.split('Object')[0] - well_dabo = 'DABO_' + well_dabo + well_dabo = page[page.index("Bnum:") + 1 : page.index("Bnum:") + 2] + well_dabo = "".join(well_dabo) + well_dabo = well_dabo.split("Object")[0] + well_dabo = "DABO_" + well_dabo # Obtaining Name of Well - well_name = page[page.index('Name') + 1:page.index('Bohrungs-')] - well_name = ''.join(well_name).replace(':', '') + well_name = page[page.index("Name") + 1 : page.index("Bohrungs-")] + well_name = "".join(well_name).replace(":", "") # Obtaining Number of Well - well_number = page[page.index('Aufschluß-Nr.') + 1:page.index('Aufschluß-Nr.') + 4] - well_number = ''.join(well_number).replace(':', '') - well_number = well_number.split('Archiv-Nr.')[0] + well_number = page[ + page.index("Aufschluß-Nr.") + 1 : page.index("Aufschluß-Nr.") + 4 + ] + well_number = "".join(well_number).replace(":", "") + well_number = well_number.split("Archiv-Nr.")[0] # Obtaining Depth of well - well_depth = page[page.index('Endteufe') + 3:page.index('Endteufe') + 4] - well_depth = float(''.join(well_depth).replace(':', '')) + well_depth = page[page.index("Endteufe") + 3 : page.index("Endteufe") + 4] + well_depth = float("".join(well_depth).replace(":", "")) # Obtaining Stratigraphie der Endteufe - well_strat = page[page.index('Stratigraphie') + 3:page.index('Stratigraphie') + 4] - well_strat = ''.join(well_strat).replace(':', '') + well_strat = page[page.index("Stratigraphie") + 3 : page.index("Stratigraphie") + 4] + well_strat = "".join(well_strat).replace(":", "") # Obtaining Topographic Map Sheet Number - well_tk = page[page.index('TK') + 2:page.index('TK') + 5] - well_tk = ''.join(well_tk).replace(':', '') - well_tk = ''.join(well_tk).replace('[TK', ' [TK ') + well_tk = page[page.index("TK") + 2 : page.index("TK") + 5] + well_tk = "".join(well_tk).replace(":", "") + well_tk = "".join(well_tk).replace("[TK", " [TK ") # Obtaining Commune - well_gemarkung = page[page.index('Gemarkung') + 1:page.index('Gemarkung') + 2] - well_gemarkung = ''.join(well_gemarkung).replace(':', '') + well_gemarkung = page[page.index("Gemarkung") + 1 : page.index("Gemarkung") + 2] + well_gemarkung = "".join(well_gemarkung).replace(":", "") # Obtaining GK Coordinates of wells - well_coord_x_gk = page[page.index('Rechtswert/Hochwert') + 3:page.index('Rechtswert/Hochwert') + 4] - well_coord_x_gk = ''.join(well_coord_x_gk).replace(':', '') + well_coord_x_gk = page[ + page.index("Rechtswert/Hochwert") + 3 : page.index("Rechtswert/Hochwert") + 4 + ] + well_coord_x_gk = "".join(well_coord_x_gk).replace(":", "") - well_coord_y_gk = page[page.index('Rechtswert/Hochwert') + 5:page.index('Rechtswert/Hochwert') + 6] - well_coord_y_gk = ''.join(well_coord_y_gk).replace(':', '') + well_coord_y_gk = page[ + page.index("Rechtswert/Hochwert") + 5 : page.index("Rechtswert/Hochwert") + 6 + ] + well_coord_y_gk = "".join(well_coord_y_gk).replace(":", "") # Obtaining UTM Coordinates of wells - well_coord_x = page[page.index('East/North') + 3:page.index('East/North') + 4] - well_coord_x = ''.join(well_coord_x).replace(':', '') + well_coord_x = page[page.index("East/North") + 3 : page.index("East/North") + 4] + well_coord_x = "".join(well_coord_x).replace(":", "") - well_coord_y = page[page.index('East/North') + 5:page.index('East/North') + 6] - well_coord_y = ''.join(well_coord_y).replace(':', '') + well_coord_y = page[page.index("East/North") + 5 : page.index("East/North") + 6] + well_coord_y = "".join(well_coord_y).replace(":", "") - well_coord_z = page[page.index('Ansatzpunktes') + 3:page.index('Ansatzpunktes') + 4] - well_coord_z = ''.join(well_coord_z).replace(':', '') + well_coord_z = page[ + page.index("Ansatzpunktes") + 3 : page.index("Ansatzpunktes") + 4 + ] + well_coord_z = "".join(well_coord_z).replace(":", "") # Obtaining Coordinates Precision - well_coords = page[page.index('Koordinatenbestimmung') + 1:page.index('Koordinatenbestimmung') + 7] - well_coords = ' '.join(well_coords).replace(':', '') - well_coords = well_coords.split(' Hoehenbestimmung')[0] + well_coords = page[ + page.index("Koordinatenbestimmung") + + 1 : page.index("Koordinatenbestimmung") + + 7 + ] + well_coords = " ".join(well_coords).replace(":", "") + well_coords = well_coords.split(" Hoehenbestimmung")[0] # Obtaining height precision - well_height = page[page.index('Hoehenbestimmung') + 1:page.index('Hoehenbestimmung') + 8] - well_height = ' '.join(well_height).replace(':', '') - well_height = ''.join(well_height).replace(' .', '') - well_height = well_height.split(' Hauptzweck')[0] + well_height = page[ + page.index("Hoehenbestimmung") + 1 : page.index("Hoehenbestimmung") + 8 + ] + well_height = " ".join(well_height).replace(":", "") + well_height = "".join(well_height).replace(" .", "") + well_height = well_height.split(" Hauptzweck")[0] # Obtaining Purpose - well_zweck = page[page.index('Aufschlusses') + 1:page.index('Aufschlusses') + 4] - well_zweck = ' '.join(well_zweck).replace(':', '') - well_zweck = well_zweck.split(' Aufschlussart')[0] + well_zweck = page[page.index("Aufschlusses") + 1 : page.index("Aufschlusses") + 4] + well_zweck = " ".join(well_zweck).replace(":", "") + well_zweck = well_zweck.split(" Aufschlussart")[0] # Obtaining Kind - well_aufschlussart = page[page.index('Aufschlussart') + 1:page.index('Aufschlussart') + 3] - well_aufschlussart = ' '.join(well_aufschlussart).replace(':', '') - well_aufschlussart = well_aufschlussart.split(' Aufschlussverfahren')[0] + well_aufschlussart = page[ + page.index("Aufschlussart") + 1 : page.index("Aufschlussart") + 3 + ] + well_aufschlussart = " ".join(well_aufschlussart).replace(":", "") + well_aufschlussart = well_aufschlussart.split(" Aufschlussverfahren")[0] # Obtaining Procedure - well_aufschlussverfahren = page[page.index('Aufschlussverfahren') + 1:page.index('Aufschlussverfahren') + 4] - well_aufschlussverfahren = ' '.join(well_aufschlussverfahren).replace(':', '') - well_aufschlussverfahren = well_aufschlussverfahren.split(' Vertraulichkeit')[0] + well_aufschlussverfahren = page[ + page.index("Aufschlussverfahren") + 1 : page.index("Aufschlussverfahren") + 4 + ] + well_aufschlussverfahren = " ".join(well_aufschlussverfahren).replace(":", "") + well_aufschlussverfahren = well_aufschlussverfahren.split(" Vertraulichkeit")[0] # Obtaining Confidentiality - well_vertraulichkeit = page[page.index('Vertraulichkeit') + 1:page.index('Vertraulichkeit') + 14] - well_vertraulichkeit = ' '.join(well_vertraulichkeit).replace(':', '') - well_vertraulichkeit = well_vertraulichkeit.split(' Art')[0] + well_vertraulichkeit = page[ + page.index("Vertraulichkeit") + 1 : page.index("Vertraulichkeit") + 14 + ] + well_vertraulichkeit = " ".join(well_vertraulichkeit).replace(":", "") + well_vertraulichkeit = well_vertraulichkeit.split(" Art")[0] # Obtaining Type of Record - well_aufnahme = page[page.index('Aufnahme') + 1:page.index('Aufnahme') + 10] - well_aufnahme = ' '.join(well_aufnahme).replace(':', '') - well_aufnahme = well_aufnahme.split(' . Schichtenverzeichnis')[0] + well_aufnahme = page[page.index("Aufnahme") + 1 : page.index("Aufnahme") + 10] + well_aufnahme = " ".join(well_aufnahme).replace(":", "") + well_aufnahme = well_aufnahme.split(" . Schichtenverzeichnis")[0] # Obtaining Lithlog Version - well_version = page[page.index('Version') + 1:page.index('Version') + 3] - well_version = ' '.join(well_version).replace(':', '') - well_version = well_version.split(' Qualität')[0] + well_version = page[page.index("Version") + 1 : page.index("Version") + 3] + well_version = " ".join(well_version).replace(":", "") + well_version = well_version.split(" Qualität")[0] # Obtaining Quality - well_quality = page[page.index('Qualität') + 1:page.index('Qualität') + 9] - well_quality = ' '.join(well_quality).replace(':', '') - well_quality = well_quality.split(' erster')[0] + well_quality = page[page.index("Qualität") + 1 : page.index("Qualität") + 9] + well_quality = " ".join(well_quality).replace(":", "") + well_quality = well_quality.split(" erster")[0] # Obtaining Drilling Period - well_date = page[page.index('Bohrtag') + 1:page.index('Bohrtag') + 6] - well_date = ' '.join(well_date).replace(':', '') - well_date = well_date.split(' . Grundwasserstand')[0] + well_date = page[page.index("Bohrtag") + 1 : page.index("Bohrtag") + 6] + well_date = " ".join(well_date).replace(":", "") + well_date = well_date.split(" . Grundwasserstand")[0] # Obtaining Remarks - well_remarks = page[page.index('Bemerkung') + 1:page.index('Bemerkung') + 14] - well_remarks = ' '.join(well_remarks).replace(':', '') - well_remarks = well_remarks.split(' . Originalschichtenverzeichnis')[0] + well_remarks = page[page.index("Bemerkung") + 1 : page.index("Bemerkung") + 14] + well_remarks = " ".join(well_remarks).replace(":", "") + well_remarks = well_remarks.split(" . Originalschichtenverzeichnis")[0] # Obtaining Availability of Lithlog - well_lithlog = page[page.index('Originalschichtenverzeichnis') + 1:page.index('Originalschichtenverzeichnis') + 7] - well_lithlog = ' '.join(well_lithlog).replace(':', '') - well_lithlog = well_lithlog.split(' .Schichtdaten')[0] - well_lithlog = well_lithlog.split(' .Geologischer Dienst NRW')[0] + well_lithlog = page[ + page.index("Originalschichtenverzeichnis") + + 1 : page.index("Originalschichtenverzeichnis") + + 7 + ] + well_lithlog = " ".join(well_lithlog).replace(":", "") + well_lithlog = well_lithlog.split(" .Schichtdaten")[0] + well_lithlog = well_lithlog.split(" .Geologischer Dienst NRW")[0] # Create list with data - data = [well_dabo, - well_name, - well_number, - float(well_depth), - float(well_coord_x), - float(well_coord_y), - float(well_coord_z), - float(well_coord_x_gk), - float(well_coord_y_gk), - well_strat, - well_tk, - well_gemarkung, - well_coords, - well_height, - well_zweck, - well_aufschlussart, - well_aufschlussverfahren, - well_vertraulichkeit, - well_aufnahme, - well_version, - well_quality, - well_date, - well_remarks, - well_lithlog] + data = [ + well_dabo, + well_name, + well_number, + float(well_depth), + float(well_coord_x), + float(well_coord_y), + float(well_coord_z), + float(well_coord_x_gk), + float(well_coord_y_gk), + well_strat, + well_tk, + well_gemarkung, + well_coords, + well_height, + well_zweck, + well_aufschlussart, + well_aufschlussverfahren, + well_vertraulichkeit, + well_aufnahme, + well_version, + well_quality, + well_date, + well_remarks, + well_lithlog, + ] return data -def get_meta_data_df(data: str, - name: str = 'GD', - return_gdf: bool = True) -> Union[pd.DataFrame, gpd.geodataframe.GeoDataFrame]: +def get_meta_data_df( + data: str, name: str = "GD", return_gdf: bool = True +) -> Union[pd.DataFrame, gpd.geodataframe.GeoDataFrame]: """Function to create a dataframe with coordinates and meta data of the different boreholes Parameters @@ -524,110 +556,124 @@ def get_meta_data_df(data: str, # Checking that the data is of type list if not isinstance(data, str): - raise TypeError('Data must be provided as list of strings') + raise TypeError("Data must be provided as list of strings") # Checking that the name is of type string if not isinstance(name, str): - raise TypeError('Path/Name must be of type string') + raise TypeError("Path/Name must be of type string") # Checking that the return_gdf variable is of type bool if not isinstance(return_gdf, bool): - raise TypeError('Return_gdf variable must be of type bool') + raise TypeError("Return_gdf variable must be of type bool") # Split Data data = data.split() - data = '#'.join(data) - data = data.split('-#Stammdaten') - data = [item.split('|')[0] for item in data] - data = [item.split('#') for item in data] + data = "#".join(data) + data = data.split("-#Stammdaten") + data = [item.split("|")[0] for item in data] + data = [item.split("#") for item in data] # Filter out wells without Stratigraphic Column - data = [item for item in data if 'Beschreibung' in item] + data = [item for item in data if "Beschreibung" in item] # Get Coordinates of data coordinates = [get_meta_data(page=item) for item in data] # Create dataframe from coordinates - coordinates_dataframe = pd.DataFrame(data=coordinates, columns=['DABO No.', - 'Name', - 'Number', - 'Depth', - 'X', - 'Y', - 'Z', - 'X_GK', - 'Y_GK', - 'Last Stratigraphic Unit', - 'Map Sheet', - 'Commune', - 'Coordinates Precision', - 'Height Precision', - 'Purpose', - 'Kind', - 'Procedure', - 'Confidentiality', - 'Record Type', - 'Lithlog Version', - 'Quality', - 'Drilling Period', - 'Remarks', - 'Availability Lithlog']) + coordinates_dataframe = pd.DataFrame( + data=coordinates, + columns=[ + "DABO No.", + "Name", + "Number", + "Depth", + "X", + "Y", + "Z", + "X_GK", + "Y_GK", + "Last Stratigraphic Unit", + "Map Sheet", + "Commune", + "Coordinates Precision", + "Height Precision", + "Purpose", + "Kind", + "Procedure", + "Confidentiality", + "Record Type", + "Lithlog Version", + "Quality", + "Drilling Period", + "Remarks", + "Availability Lithlog", + ], + ) # Creating an empty list for indices index = [] # Filling index list with indices for i in range(len(coordinates_dataframe)): - index = np.append(index, [name + '{0:04}'.format(i + 1)]) - index = pd.DataFrame(data=index, columns=['Index']) + index = np.append(index, [name + "{0:04}".format(i + 1)]) + index = pd.DataFrame(data=index, columns=["Index"]) # Creating DataFrame coordinates_dataframe = pd.concat([coordinates_dataframe, index], axis=1) # Selecting columns - coordinates_dataframe = coordinates_dataframe[['Index', - 'DABO No.', - 'Name', - 'Number', - 'Depth', - 'X', - 'Y', - 'Z', - 'X_GK', - 'Y_GK', - 'Last Stratigraphic Unit', - 'Map Sheet', - 'Commune', - 'Coordinates Precision', - 'Height Precision', - 'Purpose', - 'Kind', - 'Procedure', - 'Confidentiality', - 'Record Type', - 'Lithlog Version', - 'Quality', - 'Drilling Period', - 'Remarks', - 'Availability Lithlog' - ]] + coordinates_dataframe = coordinates_dataframe[ + [ + "Index", + "DABO No.", + "Name", + "Number", + "Depth", + "X", + "Y", + "Z", + "X_GK", + "Y_GK", + "Last Stratigraphic Unit", + "Map Sheet", + "Commune", + "Coordinates Precision", + "Height Precision", + "Purpose", + "Kind", + "Procedure", + "Confidentiality", + "Record Type", + "Lithlog Version", + "Quality", + "Drilling Period", + "Remarks", + "Availability Lithlog", + ] + ] # Remove duplicates containing identical X, Y and Z coordinates - coordinates_dataframe = coordinates_dataframe[~coordinates_dataframe.duplicated(subset=['X', 'Y', 'Z'])] + coordinates_dataframe = coordinates_dataframe[ + ~coordinates_dataframe.duplicated(subset=["X", "Y", "Z"]) + ] # Convert df to gdf if return_gdf: - coordinates_dataframe = gpd.GeoDataFrame(data=coordinates_dataframe, - geometry=gpd.points_from_xy(x=coordinates_dataframe.X, - y=coordinates_dataframe.Y, - crs='EPSG:4647')) + coordinates_dataframe = gpd.GeoDataFrame( + data=coordinates_dataframe, + geometry=gpd.points_from_xy( + x=coordinates_dataframe.X, y=coordinates_dataframe.Y, crs="EPSG:4647" + ), + ) return coordinates_dataframe -def get_stratigraphic_data(text: list, - symbols: List[Tuple[str, str]], - formations: List[Tuple[str, str]], ) -> list: +def get_stratigraphic_data( + text: list, + symbols: List[Tuple[str, str]], + formations: List[Tuple[str, str]], +) -> list: """Function to retrieve the stratigraphic data from borehole logs Parameters @@ -672,15 +718,15 @@ def get_stratigraphic_data(text: list, # Checking if the provided text is of type list if not isinstance(text, list): - raise TypeError('The provided data must be of type list') + raise TypeError("The provided data must be of type list") # Checking if the provided symbols are of type list if not isinstance(symbols, list): - raise TypeError('The provided symbols must be of type list') + raise TypeError("The provided symbols must be of type list") # Checking if the provided formations are of type list if not isinstance(formations, list): - raise TypeError('The provided formations must be of type list') + raise TypeError("The provided formations must be of type list") # Creating empty lists depth = [] @@ -691,52 +737,120 @@ def get_stratigraphic_data(text: list, txt = text # Join elements of list - txt = ''.join(txt) + txt = "".join(txt) # Obtaining Name of Well - well_name = text[text.index('Name') + 1:text.index('Bohrungs-')] - well_name = ''.join(well_name).replace(':', '') + well_name = text[text.index("Name") + 1 : text.index("Bohrungs-")] + well_name = "".join(well_name).replace(":", "") # Obtaining Depth of well - well_depth = text[text.index('Endteufe') + 3:text.index('Endteufe') + 4] - well_depth = float(''.join(well_depth).replace(':', '')) + well_depth = text[text.index("Endteufe") + 3 : text.index("Endteufe") + 4] + well_depth = float("".join(well_depth).replace(":", "")) # Obtaining UTM Coordinates of wells - well_coord_x = text[text.index('East/North') + 3:text.index('East/North') + 4] - well_coord_x = ''.join(well_coord_x).replace(':', '') + well_coord_x = text[text.index("East/North") + 3 : text.index("East/North") + 4] + well_coord_x = "".join(well_coord_x).replace(":", "") - well_coord_y = text[text.index('East/North') + 5:text.index('East/North') + 6] - well_coord_y = ''.join(well_coord_y).replace(':', '') + well_coord_y = text[text.index("East/North") + 5 : text.index("East/North") + 6] + well_coord_y = "".join(well_coord_y).replace(":", "") - well_coord_z = text[text.index('Ansatzpunktes') + 3:text.index('Ansatzpunktes') + 4] - well_coord_z = ''.join(well_coord_z).replace(':', '') + well_coord_z = text[ + text.index("Ansatzpunktes") + 3 : text.index("Ansatzpunktes") + 4 + ] + well_coord_z = "".join(well_coord_z).replace(":", "") # Defining Phrases - phrases = ['Fachaufsicht:GeologischerDienstNRW', 'Auftraggeber:GeologischerDienstNRW', - 'Bohrunternehmer:GeologischerDienstNRW', 'aufgestelltvon:GeologischerDienstNRW', - 'geol./stratgr.bearbeitetvon:GeologischerDienstNRW', 'NachRh.W.B.-G.', 'Vol.-', 'Mst.-Bänke', 'Cen.-', - 'Tst.-Stücke', 'mit Mst. - Stücken', 'Flaserstruktur(O.-', 'FlaserstrukturO.-', 'Kalkst.-', - 'gca.-Mächtigkeit', 'ca.-', 'Karbonsst.-Gerölle', - 'Mst.-Stücken', 'Mst.-Bank17,1-17,2m', 'Tst.-Stücke', 'Mst.-Bank', 'Mst. - Stücken', 'hum.-torfig', - 'rötl.-ocker', 'Pfl.-Reste', 'Utbk.-Flözg', 'Glauk.-', 'Toneisensteinlagenu.-', 'Ostrac.-', 'Stromat.-', - 'u.-knötchen', 'U.-Camp.', 'Kalkmergelst.-Gerölle', 'Pfl.-Laden', 'Pfl.-Häcksel', 'ca.-Angabe,', 'Z.-', - 'Hgd.-Schiefer', 'Sdst.-Fame', 'Orig.-Schi', 'Mergels.-', 'Kst.-', 'Steink.-G', 'Steink.-', 'Sst.-', - 'bzw.-anfang', 'nd.-er', 'u.-knäuel', 'u.-konk', 'u.-knoten', 'ng.-Bür', 'Ton.-', 'org.-', 'FS.-', - 'dkl.-', 'Schluff.-', 'Erw.-', 'Abl.-', 'abl.-', 'Sch.-', 'alsU.-', 'Plänerkst.-', 'Süßw.-', 'KV.-', - 'duchläss.-', 'Verwitt.-', 'durchlass.-', 'San.-', 'Unterkr.-', 'grünl.-', 'Stringocephal.-', 'Zinkbl.-', - 'Amphip.-', 'Tonst.-', 'Öffn.-', 'Trennflä.-', 'Randkalku.-dolomit', - 'keineAngaben,Bemerkung:nachOrig.-SV:"Lehm",'] + phrases = [ + "Fachaufsicht:GeologischerDienstNRW", + "Auftraggeber:GeologischerDienstNRW", + "Bohrunternehmer:GeologischerDienstNRW", + "aufgestelltvon:GeologischerDienstNRW", + "geol./stratgr.bearbeitetvon:GeologischerDienstNRW", + "NachRh.W.B.-G.", + "Vol.-", + "Mst.-Bänke", + "Cen.-", + "Tst.-Stücke", + "mit Mst. - Stücken", + "Flaserstruktur(O.-", + "FlaserstrukturO.-", + "Kalkst.-", + "gca.-Mächtigkeit", + "ca.-", + "Karbonsst.-Gerölle", + "Mst.-Stücken", + "Mst.-Bank17,1-17,2m", + "Tst.-Stücke", + "Mst.-Bank", + "Mst. - Stücken", + "hum.-torfig", + "rötl.-ocker", + "Pfl.-Reste", + "Utbk.-Flözg", + "Glauk.-", + "Toneisensteinlagenu.-", + "Ostrac.-", + "Stromat.-", + "u.-knötchen", + "U.-Camp.", + "Kalkmergelst.-Gerölle", + "Pfl.-Laden", + "Pfl.-Häcksel", + "ca.-Angabe,", + "Z.-", + "Hgd.-Schiefer", + "Sdst.-Fame", + "Orig.-Schi", + "Mergels.-", + "Kst.-", + "Steink.-G", + "Steink.-", + "Sst.-", + "bzw.-anfang", + "nd.-er", + "u.-knäuel", + "u.-konk", + "u.-knoten", + "ng.-Bür", + "Ton.-", + "org.-", + "FS.-", + "dkl.-", + "Schluff.-", + "Erw.-", + "Abl.-", + "abl.-", + "Sch.-", + "alsU.-", + "Plänerkst.-", + "Süßw.-", + "KV.-", + "duchläss.-", + "Verwitt.-", + "durchlass.-", + "San.-", + "Unterkr.-", + "grünl.-", + "Stringocephal.-", + "Zinkbl.-", + "Amphip.-", + "Tonst.-", + "Öffn.-", + "Trennflä.-", + "Randkalku.-dolomit", + 'keineAngaben,Bemerkung:nachOrig.-SV:"Lehm",', + ] # Replace phrases for i in phrases: - txt = txt.replace(i, '') + txt = txt.replace(i, "") # Replace Symbols for a, b in symbols: if a in txt: txt = txt.replace(a, b) - if 'TiefeBeschreibungStratigraphie' in txt: + if "TiefeBeschreibungStratigraphie" in txt: # Every line ends with a '.' and every new line starts with '-', # the string will be separated there, the result is that every line of stratigraphy will be one string now @@ -752,27 +866,29 @@ def get_stratigraphic_data(text: list, # else: # txt = txt.split('TiefeBeschreibungStratigraphie..-')[1] - txt = txt.split('TiefeBeschreibungStratigraphie..-')[1] + txt = txt.split("TiefeBeschreibungStratigraphie..-")[1] except IndexError: # Create data - data = [well_name, - float(well_depth), - float(well_coord_x), - float(well_coord_y), - float(well_coord_z), - depth, - strings, - subs, - form] + data = [ + well_name, + float(well_depth), + float(well_coord_x), + float(well_coord_y), + float(well_coord_z), + depth, + strings, + subs, + form, + ] return data # Join txt - txt = ''.join(txt) + txt = "".join(txt) # Split text at .- - txt = txt.split('.-') + txt = txt.split(".-") # For loop over every string that contains layer information for a in range(len(txt)): @@ -781,35 +897,35 @@ def get_stratigraphic_data(text: list, break else: # Every string is combined to a sequence of characters - string = ''.join(txt[a]) - if string not in (None, ''): + string = "".join(txt[a]) + if string not in (None, ""): try: # The depth information is extracted from the string - depth.append(float(string.split('m', 1)[0])) + depth.append(float(string.split("m", 1)[0])) # The depth information is cut off from the string and # only the lithologies and stratigraphy is kept - string = string.split('m', 1)[1] + string = string.split("m", 1)[1] # Remove all numbers from string (e.g. von 10m bis 20m) - string = ''.join(f for f in string if not f.isdigit()) + string = "".join(f for f in string if not f.isdigit()) except ValueError: pass else: pass # Removing symbols from string - string = string.replace(':', '') - string = string.replace('-', '') - string = string.replace('.', '') - string = string.replace(',', '') - string = string.replace('?', '') - string = string.replace('/', '') + string = string.replace(":", "") + string = string.replace("-", "") + string = string.replace(".", "") + string = string.replace(",", "") + string = string.replace("?", "") + string = string.replace("/", "") # Replace PDF-formation with formation name forms = string for q, r in formations: if "..---.m" not in forms: - if 'keineAngaben' in forms: - formation = 'NichtEingestuft' + if "keineAngaben" in forms: + formation = "NichtEingestuft" elif q in forms: new_string = forms.split(q, 1) forma = forms.split(new_string[0], 1)[1] @@ -822,25 +938,29 @@ def get_stratigraphic_data(text: list, form.append(formation) # Create Data - data = [well_name, - float(well_depth), - float(well_coord_x), - float(well_coord_y), - float(well_coord_z), - depth, - strings, - subs, - form] + data = [ + well_name, + float(well_depth), + float(well_coord_x), + float(well_coord_y), + float(well_coord_z), + depth, + strings, + subs, + form, + ] return data -def get_stratigraphic_data_df(data: str, - name: str, - symbols: List[Tuple[str, str]], - formations: List[Tuple[str, str]], - remove_last: bool = False, - return_gdf: bool = True) -> Union[pd.DataFrame, gpd.geodataframe.GeoDataFrame]: +def get_stratigraphic_data_df( + data: str, + name: str, + symbols: List[Tuple[str, str]], + formations: List[Tuple[str, str]], + remove_last: bool = False, + return_gdf: bool = True, +) -> Union[pd.DataFrame, gpd.geodataframe.GeoDataFrame]: """Function to create a dataframe with coordinates and the stratigraphy of the different boreholes Parameters @@ -848,7 +968,7 @@ def get_stratigraphic_data_df(data: str, data : list List containing the strings of the borehole log - + name : str Name for index reference, e.g. ``name='GD'`` @@ -909,36 +1029,36 @@ def get_stratigraphic_data_df(data: str, # Checking that the data is provided as string if not isinstance(data, str): - raise TypeError('Data must be provided as string') + raise TypeError("Data must be provided as string") # Checking that the name of the index is provided as string if not isinstance(name, str): - raise TypeError('Index name must be provided as string') + raise TypeError("Index name must be provided as string") # Checking that the symbols are provided as list if not isinstance(symbols, list): - raise TypeError('Symbols must be provided as list of tuples of strings') + raise TypeError("Symbols must be provided as list of tuples of strings") # Checking that the formations are provided as list if not isinstance(formations, list): - raise TypeError('Formations must be provided as list of tuples of strings') + raise TypeError("Formations must be provided as list of tuples of strings") # Checking that the remove_last variable is of type bool if not isinstance(remove_last, bool): - raise TypeError('Remove_last variable must be of type bool') + raise TypeError("Remove_last variable must be of type bool") # Checking that the return_gdf variable is of type bool if not isinstance(return_gdf, bool): - raise TypeError('Return_gdf variable must be of type bool') + raise TypeError("Return_gdf variable must be of type bool") # Splitting the entire string into a list data = data.split() # Join all elements of list/all pages of the borehole logs and separate with # - data = '#'.join(data) + data = "#".join(data) # Split entire string at each new page into separate elements of a list - data = data.split('-#Stammdaten') + data = data.split("-#Stammdaten") # Cut off the last part of each element, this is not done for each page # Segment to filter out stratigraphic tables that have multiple versions and are on multiple pages @@ -951,137 +1071,217 @@ def get_stratigraphic_data_df(data: str, # else: # data = [item.split('|Geologischer#Dienst#NRW#')[0] for item in data] - data = [item.split('|Geologischer#Dienst#NRW#')[0] for item in data] + data = [item.split("|Geologischer#Dienst#NRW#")[0] for item in data] # Remove last part of each page if log stretches over multiple pages - data = [re.sub(r'Geologischer#Dienst#NRW#\d\d.\d\d.\d\d\d\d-#\d+#-#', '#', item) for item in data] - data = [re.sub(r'Geologischer#Dienst#NRW#\d\d.\d\d.\d\d\d\d-#\d+#-', '#', item) for item in data] + data = [ + re.sub(r"Geologischer#Dienst#NRW#\d\d.\d\d.\d\d\d\d-#\d+#-#", "#", item) + for item in data + ] + data = [ + re.sub(r"Geologischer#Dienst#NRW#\d\d.\d\d.\d\d\d\d-#\d+#-", "#", item) + for item in data + ] # Connect different parts of each element - data = [''.join(item) for item in data] + data = ["".join(item) for item in data] # Split each element at # - data = [item.split('#') for item in data] + data = [item.split("#") for item in data] # Filter out wells without Stratigraphic Column - data = [item for item in data if 'Beschreibung' in item] + data = [item for item in data if "Beschreibung" in item] # Create empty list for indices index = [] # Get stratigraphic data for each well - stratigraphy = [get_stratigraphic_data(text=item, - symbols=symbols, - formations=formations) for item in data] + stratigraphy = [ + get_stratigraphic_data(text=item, symbols=symbols, formations=formations) + for item in data + ] # Create DataFrame from list of stratigraphic data stratigraphy = pd.DataFrame(data=stratigraphy) # Create DataFrame for index for i in range(len(stratigraphy)): - index = np.append(index, [str(name + '{0:04}'.format(i + 1))]) + index = np.append(index, [str(name + "{0:04}".format(i + 1))]) index = pd.DataFrame(index) # Concatenate DataFrames stratigraphy_dataframe_new = pd.concat([stratigraphy, index], axis=1) # Label DataFrame Columns - stratigraphy_dataframe_new.columns = ['Name', 'Depth', 'X', 'Y', 'Altitude', 'Z', 'PDF-Formation', 'Subformation', - 'formation', 'Index'] + stratigraphy_dataframe_new.columns = [ + "Name", + "Depth", + "X", + "Y", + "Altitude", + "Z", + "PDF-Formation", + "Subformation", + "formation", + "Index", + ] # Select Columns stratigraphy_dataframe_new = stratigraphy_dataframe_new[ - ['Index', 'Name', 'X', 'Y', 'Z', 'Depth', 'Altitude', 'PDF-Formation', 'Subformation', 'formation']] + [ + "Index", + "Name", + "X", + "Y", + "Z", + "Depth", + "Altitude", + "PDF-Formation", + "Subformation", + "formation", + ] + ] # Adjust data - strati_depth = stratigraphy_dataframe_new[['Index', 'Z']] - lst_col1 = 'Z' - depth = pd.DataFrame({ - col: np.repeat(strati_depth['Index'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_depth = stratigraphy_dataframe_new[['Name', 'Z']] - lst_col1 = 'Z' - names = pd.DataFrame({ - col: np.repeat(strati_depth['Name'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_depth = stratigraphy_dataframe_new[['X', 'Z']] - lst_col1 = 'Z' - x_coord = pd.DataFrame({ - col: np.repeat(strati_depth['X'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_depth = stratigraphy_dataframe_new[['Y', 'Z']] - lst_col1 = 'Z' - y_coord = pd.DataFrame({ - col: np.repeat(strati_depth['Y'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_depth = stratigraphy_dataframe_new[['Altitude', 'Z']] - lst_col1 = 'Z' - altitude = pd.DataFrame({ - col: np.repeat(strati_depth['Altitude'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_depth = stratigraphy_dataframe_new[['Depth', 'Z']] - lst_col1 = 'Z' - welldepth = pd.DataFrame({ - col: np.repeat(strati_depth['Depth'].values, strati_depth[lst_col1].str.len()) - for col in strati_depth.columns.drop(lst_col1)} - ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[strati_depth.columns] - - strati_formation = stratigraphy_dataframe_new[['Index', 'formation']] - lst_col4 = 'formation' - formation = pd.DataFrame({ - col: np.repeat(strati_formation['Index'].values, strati_formation[lst_col4].str.len()) - for col in strati_formation.columns.drop(lst_col4)} - ).assign(**{lst_col4: np.concatenate(strati_formation[lst_col4].values)})[strati_formation.columns] + strati_depth = stratigraphy_dataframe_new[["Index", "Z"]] + lst_col1 = "Z" + depth = pd.DataFrame( + { + col: np.repeat( + strati_depth["Index"].values, strati_depth[lst_col1].str.len() + ) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_depth = stratigraphy_dataframe_new[["Name", "Z"]] + lst_col1 = "Z" + names = pd.DataFrame( + { + col: np.repeat( + strati_depth["Name"].values, strati_depth[lst_col1].str.len() + ) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_depth = stratigraphy_dataframe_new[["X", "Z"]] + lst_col1 = "Z" + x_coord = pd.DataFrame( + { + col: np.repeat(strati_depth["X"].values, strati_depth[lst_col1].str.len()) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_depth = stratigraphy_dataframe_new[["Y", "Z"]] + lst_col1 = "Z" + y_coord = pd.DataFrame( + { + col: np.repeat(strati_depth["Y"].values, strati_depth[lst_col1].str.len()) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_depth = stratigraphy_dataframe_new[["Altitude", "Z"]] + lst_col1 = "Z" + altitude = pd.DataFrame( + { + col: np.repeat( + strati_depth["Altitude"].values, strati_depth[lst_col1].str.len() + ) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_depth = stratigraphy_dataframe_new[["Depth", "Z"]] + lst_col1 = "Z" + welldepth = pd.DataFrame( + { + col: np.repeat( + strati_depth["Depth"].values, strati_depth[lst_col1].str.len() + ) + for col in strati_depth.columns.drop(lst_col1) + } + ).assign(**{lst_col1: np.concatenate(strati_depth[lst_col1].values)})[ + strati_depth.columns + ] + + strati_formation = stratigraphy_dataframe_new[["Index", "formation"]] + lst_col4 = "formation" + formation = pd.DataFrame( + { + col: np.repeat( + strati_formation["Index"].values, strati_formation[lst_col4].str.len() + ) + for col in strati_formation.columns.drop(lst_col4) + } + ).assign(**{lst_col4: np.concatenate(strati_formation[lst_col4].values)})[ + strati_formation.columns + ] # Create DataFrame - strat = pd.concat([names, x_coord, y_coord, depth, altitude, welldepth, formation], - axis=1) + strat = pd.concat( + [names, x_coord, y_coord, depth, altitude, welldepth, formation], axis=1 + ) # Name Columns of DataFrame - strat = strat[['Index', 'Name', 'X', 'Y', 'Z', 'Altitude', 'Depth', 'formation']] + strat = strat[["Index", "Name", "X", "Y", "Z", "Altitude", "Depth", "formation"]] # Delete Duplicated columns (Index) strat = strat.loc[:, ~strat.columns.duplicated()] # Rename columns of Data Frame - strat.columns = ['Index', 'Name', 'X', 'Y', 'DepthLayer', 'Altitude', 'Depth', - 'formation'] + strat.columns = [ + "Index", + "Name", + "X", + "Y", + "DepthLayer", + "Altitude", + "Depth", + "formation", + ] # Create Depth Column Usable for GemPy - strat['Z'] = strat['Altitude'] - strat['DepthLayer'] + strat["Z"] = strat["Altitude"] - strat["DepthLayer"] # Reorder Columns of DataFrame - strat = strat[['Index', 'Name', 'X', 'Y', 'Z', 'Altitude', 'Depth', 'formation']] + strat = strat[["Index", "Name", "X", "Y", "Z", "Altitude", "Depth", "formation"]] # Delete Last - strat = strat.groupby(['Index', 'formation']).last().sort_values(by=['Index', 'Z'], - ascending=[True, False]).reset_index() + strat = ( + strat.groupby(["Index", "formation"]) + .last() + .sort_values(by=["Index", "Z"], ascending=[True, False]) + .reset_index() + ) # Selecting Data - strat = strat[['Index', 'Name', 'X', 'Y', 'Z', 'Altitude', 'Depth', 'formation']] + strat = strat[["Index", "Name", "X", "Y", "Z", "Altitude", "Depth", "formation"]] # Remove unusable entries - strat = strat[strat['formation'] != 'NichtEingestuft'] + strat = strat[strat["formation"] != "NichtEingestuft"] # Removing the last interfaces of each well since it does not represent a true interfaces if remove_last: - strat = strat[strat.groupby('Index').cumcount(ascending=False) > 0] + strat = strat[strat.groupby("Index").cumcount(ascending=False) > 0] # Convert df to gdf if return_gdf: - strat = gpd.GeoDataFrame(data=strat, - geometry=gpd.points_from_xy(x=strat.X, - y=strat.Y, - crs='EPSG:4647')) + strat = gpd.GeoDataFrame( + data=strat, + geometry=gpd.points_from_xy(x=strat.X, y=strat.Y, crs="EPSG:4647"), + ) return strat