From 2a598bd4ecec866f9a774ccf1f9ba8cd7e433197 Mon Sep 17 00:00:00 2001 From: AlexanderJuestel Date: Mon, 22 Jul 2024 20:48:36 +0200 Subject: [PATCH] Format raster.py --- gemgis/raster.py | 1079 +++++++++++++++++++++++++++------------------- 1 file changed, 630 insertions(+), 449 deletions(-) diff --git a/gemgis/raster.py b/gemgis/raster.py index 20ed37cb..5eeb7e94 100644 --- a/gemgis/raster.py +++ b/gemgis/raster.py @@ -35,10 +35,12 @@ import pyproj -def sample_from_array(array: np.ndarray, - extent: Sequence[float], - point_x: Union[float, int, list, np.ndarray], - point_y: Union[float, int, list, np.ndarray], ) -> Union[np.ndarray, float]: +def sample_from_array( + array: np.ndarray, + extent: Sequence[float], + point_x: Union[float, int, list, np.ndarray], + point_y: Union[float, int, list, np.ndarray], +) -> Union[np.ndarray, float]: """Sampling the value of a np.ndarray at a given point and given the arrays true extent Parameters @@ -95,58 +97,58 @@ def sample_from_array(array: np.ndarray, # Checking is the array is a np.ndarray if not isinstance(array, np.ndarray): - raise TypeError('Object must be of type np.ndarray') + raise TypeError("Object must be of type np.ndarray") # Checking if the extent is a list if not isinstance(extent, Sequence): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking that the length of the list is either four or six if len(extent) not in [4, 6]: - raise ValueError('The extent must include only four or six values') + raise ValueError("The extent must include only four or six values") # Checking if the point coordinates are stored as a list if not isinstance(point_x, (list, np.ndarray, float, int)): - raise TypeError('Point_x must be of type list or np.ndarray') + raise TypeError("Point_x must be of type list or np.ndarray") # Checking if the point coordinates are stored as a list if not isinstance(point_y, (list, np.ndarray, float, int)): - raise TypeError('Point_y must be of type list or np.ndarray') + raise TypeError("Point_y must be of type list or np.ndarray") # Checking the length of the point list if not isinstance(point_x, (float, int)) and not isinstance(point_y, (float, int)): if len(point_x) != len(point_y): - raise ValueError('Length of both point lists/arrays must be equal') + raise ValueError("Length of both point lists/arrays must be equal") # Checking that all elements of the extent are of type int or float if not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Extent values must be of type int or float') + raise TypeError("Extent values must be of type int or float") # Checking that all elements of the point list are of type int or float if isinstance(point_x, (list, np.ndarray)): if not all(isinstance(n, (int, float, np.int32)) for n in point_x): - raise TypeError('Point values must be of type int or float') + raise TypeError("Point values must be of type int or float") # Checking that all elements of the point list are of type int or float if isinstance(point_y, (list, np.ndarray)): if not all(isinstance(n, (int, float)) for n in point_y): - raise TypeError('Point values must be of type int or float') + raise TypeError("Point values must be of type int or float") # Checking if the point is located within the provided extent if isinstance(point_x, (list, np.ndarray)): if any(x < extent[0] for x in point_x) or any(x > extent[1] for x in point_x): - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError("One or multiple points are located outside of the extent") if isinstance(point_y, (list, np.ndarray)): if any(y < extent[2] for y in point_y) or any(y > extent[3] for y in point_y): - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError("One or multiple points are located outside of the extent") # Checking if the point is located within the provided extent if isinstance(point_x, (float, int)): if point_x < extent[0] or point_x > extent[1]: - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError("One or multiple points are located outside of the extent") if isinstance(point_y, (float, int)): if point_y < extent[2] or point_y > extent[3]: - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError("One or multiple points are located outside of the extent") # Converting lists of coordinates to np.ndarrays if isinstance(point_x, list) and isinstance(point_y, list): @@ -154,15 +156,21 @@ def sample_from_array(array: np.ndarray, point_y = np.array(point_y) # Getting the column number based on the extent and shape of the array - column = np.int32(np.round((point_x - extent[0]) / (extent[1] - extent[0]) * array.shape[1])) + column = np.int32( + np.round((point_x - extent[0]) / (extent[1] - extent[0]) * array.shape[1]) + ) # Getting the row number based on the extent and shape of the array - row = np.int32(np.round((point_y - extent[2]) / (extent[3] - extent[2]) * array.shape[0])) + row = np.int32( + np.round((point_y - extent[2]) / (extent[3] - extent[2]) * array.shape[0]) + ) # Checking that all elements for the column and row numbers are of type int if isinstance(row, np.ndarray) and isinstance(column, np.ndarray): - if not all(isinstance(n, np.int32) for n in column) and not all(isinstance(n, np.int32) for n in row): - raise TypeError('Column and row values must be of type int for indexing') + if not all(isinstance(n, np.int32) for n in column) and not all( + isinstance(n, np.int32) for n in row + ): + raise TypeError("Column and row values must be of type int for indexing") # Flip array so that the column and row indices are correct array = np.flipud(array) @@ -178,11 +186,13 @@ def sample_from_array(array: np.ndarray, return sample -def sample_from_rasterio(raster: rasterio.io.DatasetReader, - point_x: Union[float, int, list, np.ndarray], - point_y: Union[float, int, list, np.ndarray], - sample_outside_extent: bool = True, - sample_all_bands: bool = False) -> Union[list, float]: +def sample_from_rasterio( + raster: rasterio.io.DatasetReader, + point_x: Union[float, int, list, np.ndarray], + point_y: Union[float, int, list, np.ndarray], + sample_outside_extent: bool = True, + sample_all_bands: bool = False, +) -> Union[list, float]: """Sampling the value of a rasterio object at a given point within the extent of the raster Parameters @@ -240,56 +250,68 @@ def sample_from_rasterio(raster: rasterio.io.DatasetReader, # Checking that the raster is a rasterio object if not isinstance(raster, rasterio.io.DatasetReader): - raise TypeError('Raster must be provided as rasterio object') + raise TypeError("Raster must be provided as rasterio object") # Checking if the point coordinates are stored as a list if not isinstance(point_x, (list, np.ndarray, float, int)): - raise TypeError('Point_x must be of type list or np.ndarray') + raise TypeError("Point_x must be of type list or np.ndarray") # Checking if the point coordinates are stored as a list if not isinstance(point_y, (list, np.ndarray, float, int)): - raise TypeError('Point_y must be of type list or np.ndarray') + raise TypeError("Point_y must be of type list or np.ndarray") # Checking the length of the point list if not isinstance(point_x, (float, int)) and not isinstance(point_y, (float, int)): if len(point_x) != len(point_y): - raise ValueError('Length of both point lists/arrays must be equal') + raise ValueError("Length of both point lists/arrays must be equal") # Checking that all elements of the point list are of type int or float if isinstance(point_x, (list, np.ndarray)): if not all(isinstance(n, (int, float, np.int32, np.float64)) for n in point_x): - raise TypeError('Point values must be of type int or float') + raise TypeError("Point values must be of type int or float") # Checking that all elements of the point list are of type int or float if isinstance(point_y, (list, np.ndarray)): if not all(isinstance(n, (int, float, np.int32, np.float64)) for n in point_y): - raise TypeError('Point values must be of type int or float') + raise TypeError("Point values must be of type int or float") # Checking that sample_outside_extent is of type bool if not isinstance(sample_outside_extent, bool): - raise TypeError('Sample_outside_extent argument must be of type bool') + raise TypeError("Sample_outside_extent argument must be of type bool") # Checking that sample_all_bands is of type bool if not isinstance(sample_all_bands, bool): - raise TypeError('Sample_all_bands argument must be of type bool') + raise TypeError("Sample_all_bands argument must be of type bool") # If sample_outside extent is true, a nodata value will be assigned if not sample_outside_extent: # Checking if the point is located within the provided raster extent if isinstance(point_x, (list, np.ndarray)): - if any(x < raster.bounds[0] for x in point_x) or any(x > raster.bounds[2] for x in point_x): - raise ValueError('One or multiple points are located outside of the extent') + if any(x < raster.bounds[0] for x in point_x) or any( + x > raster.bounds[2] for x in point_x + ): + raise ValueError( + "One or multiple points are located outside of the extent" + ) if isinstance(point_y, (list, np.ndarray)): - if any(y < raster.bounds[1] for y in point_y) or any(y > raster.bounds[3] for y in point_y): - raise ValueError('One or multiple points are located outside of the extent') + if any(y < raster.bounds[1] for y in point_y) or any( + y > raster.bounds[3] for y in point_y + ): + raise ValueError( + "One or multiple points are located outside of the extent" + ) # Checking if the point is located within the provided raster extent if isinstance(point_x, (float, int)): if point_x < raster.bounds[0] or point_x > raster.bounds[2]: - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError( + "One or multiple points are located outside of the extent" + ) if isinstance(point_y, (float, int)): if point_y < raster.bounds[1] or point_y > raster.bounds[3]: - raise ValueError('One or multiple points are located outside of the extent') + raise ValueError( + "One or multiple points are located outside of the extent" + ) # Converting lists of coordinates to np.ndarrays if isinstance(point_x, list) and isinstance(point_y, list): @@ -314,10 +336,12 @@ def sample_from_rasterio(raster: rasterio.io.DatasetReader, return sample -def sample_randomly(raster: Union[np.ndarray, rasterio.io.DatasetReader], - n: int = 1, - extent: Optional[Sequence[float]] = None, - seed: int = None) -> tuple: +def sample_randomly( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + n: int = 1, + extent: Optional[Sequence[float]] = None, + seed: int = None, +) -> tuple: """Sampling randomly from a raster (array or rasterio object) using sample_from_array or sample_from_rasterio and a randomly drawn point within the array/raster extent @@ -370,31 +394,31 @@ def sample_randomly(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if the array is of type np.ndarrays if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Array must be of type np.ndarray') + raise TypeError("Array must be of type np.ndarray") # Checking that n is of type int if not isinstance(n, int): - raise TypeError('Number of samples n must be provided as int') + raise TypeError("Number of samples n must be provided as int") # Checking if seed is of type int if not isinstance(seed, (int, type(None))): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking that if a seed was provided that the seed is of type int if seed is not None: if not isinstance(seed, int): - raise TypeError('Seed must be of type int') + raise TypeError("Seed must be of type int") np.random.seed(seed) # Checking if extent is a list if not isinstance(extent, (list, type(None))): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Sampling from Array # Checking that all values are either ints or floats if isinstance(raster, np.ndarray): if not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Extent values must be of type int or float') + raise TypeError("Extent values must be of type int or float") # Drawing random values x and y within the provided extent x = np.random.uniform(extent[0], extent[1], n) @@ -402,9 +426,9 @@ def sample_randomly(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if the drawn values are floats if not isinstance(x, np.ndarray): - raise TypeError('x must be of type np.ndarray') + raise TypeError("x must be of type np.ndarray") if not isinstance(y, np.ndarray): - raise TypeError('y must be of type np.ndarray') + raise TypeError("y must be of type np.ndarray") # Sampling from the provided array and the random point sample = sample_from_array(array=raster, extent=extent, point_x=x, point_y=y) @@ -419,9 +443,9 @@ def sample_randomly(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if the drawn values are floats if not isinstance(x, np.ndarray): - raise TypeError('x must be of type np.ndarray') + raise TypeError("x must be of type np.ndarray") if not isinstance(y, np.ndarray): - raise TypeError('y must be of type np.ndarray') + raise TypeError("y must be of type np.ndarray") sample = sample_from_rasterio(raster=raster, point_x=x, point_y=y) @@ -434,15 +458,17 @@ def sample_randomly(raster: Union[np.ndarray, rasterio.io.DatasetReader], return sample, [x, y] -def sample_orientations(raster: Union[np.ndarray, rasterio.io.DatasetReader], - extent: List[Union[int, float]] = None, - point_x: Union[float, int, list, np.ndarray] = None, - point_y: Union[float, int, list, np.ndarray] = None, - random_samples: int = None, - formation: str = None, - seed: int = None, - sample_outside_extent: bool = False, - crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None) -> gpd.geodataframe.GeoDataFrame: +def sample_orientations( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + extent: List[Union[int, float]] = None, + point_x: Union[float, int, list, np.ndarray] = None, + point_y: Union[float, int, list, np.ndarray] = None, + random_samples: int = None, + formation: str = None, + seed: int = None, + sample_outside_extent: bool = False, + crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None, +) -> gpd.geodataframe.GeoDataFrame: """Sampling orientations from a raster Parameters @@ -514,92 +540,115 @@ def sample_orientations(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if the rasterio of type np.ndarray or a rasterio object if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray or a rasterio object') + raise TypeError("Raster must be of type np.ndarray or a rasterio object") # Checking if the extent is of type list if an array is provided if isinstance(raster, np.ndarray) and not isinstance(extent, list): - raise TypeError('Extent must be of type list when providing an array') + raise TypeError("Extent must be of type list when providing an array") # Checking that all elements of the extent are of type float or int - if isinstance(raster, np.ndarray) and not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Extent values must be of type int or float') + if isinstance(raster, np.ndarray) and not all( + isinstance(n, (int, float)) for n in extent + ): + raise TypeError("Extent values must be of type int or float") # Checking if the number of samples is of type int if point_x is None and point_y is None and not isinstance(random_samples, int): - raise TypeError('Number of samples must be of type int if no points are provided') + raise TypeError( + "Number of samples must be of type int if no points are provided" + ) # Checking if the points are of the correct type - if isinstance(random_samples, type(None)) and not isinstance(point_x, (float, int, list, np.ndarray)): - raise TypeError('Point_x must either be an int, float or a list or array of coordinates') + if isinstance(random_samples, type(None)) and not isinstance( + point_x, (float, int, list, np.ndarray) + ): + raise TypeError( + "Point_x must either be an int, float or a list or array of coordinates" + ) # Checking if the points are of the correct type - if isinstance(random_samples, type(None)) and not isinstance(point_y, (float, int, list, np.ndarray)): - raise TypeError('Point_y must either be an int, float or a list or array of coordinates') + if isinstance(random_samples, type(None)) and not isinstance( + point_y, (float, int, list, np.ndarray) + ): + raise TypeError( + "Point_y must either be an int, float or a list or array of coordinates" + ) # Checking if the seed is of type int if not isinstance(seed, (int, type(None))): - raise TypeError('Seed must be of type int') + raise TypeError("Seed must be of type int") # Checking that sampling outside extent is of type bool if not isinstance(sample_outside_extent, bool): - raise TypeError('Sampling_outside_extent must be of type bool') + raise TypeError("Sampling_outside_extent must be of type bool") # Checking that the crs is either a string or of type bool if not isinstance(crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS, type(None))): - raise TypeError('CRS must be provided as string, pyproj or rasterio object') + raise TypeError("CRS must be provided as string, pyproj or rasterio object") # Calculate slope and aspect of raster - slope = calculate_slope(raster=raster, - extent=extent) + slope = calculate_slope(raster=raster, extent=extent) - aspect = calculate_aspect(raster=raster, - extent=extent) + aspect = calculate_aspect(raster=raster, extent=extent) # Sampling interfaces - gdf = sample_interfaces(raster=raster, - extent=extent, - point_x=point_x, - point_y=point_y, - random_samples=random_samples, - formation=formation, - seed=seed, - sample_outside_extent=sample_outside_extent, - crs=crs) + gdf = sample_interfaces( + raster=raster, + extent=extent, + point_x=point_x, + point_y=point_y, + random_samples=random_samples, + formation=formation, + seed=seed, + sample_outside_extent=sample_outside_extent, + crs=crs, + ) # Setting the array extent for the dip and azimuth sampling if isinstance(raster, rasterio.io.DatasetReader): - raster_extent = [raster.bounds[0], raster.bounds[2], raster.bounds[1], raster.bounds[3]] + raster_extent = [ + raster.bounds[0], + raster.bounds[2], + raster.bounds[1], + raster.bounds[3], + ] else: raster_extent = extent # Sampling dip and azimuth at the given locations - dip = sample_from_array(array=slope, - extent=raster_extent, - point_x=gdf['X'].values, - point_y=gdf['Y'].values) - - azimuth = sample_from_array(array=aspect, - extent=raster_extent, - point_x=gdf['X'].values, - point_y=gdf['Y'].values) + dip = sample_from_array( + array=slope, + extent=raster_extent, + point_x=gdf["X"].values, + point_y=gdf["Y"].values, + ) + + azimuth = sample_from_array( + array=aspect, + extent=raster_extent, + point_x=gdf["X"].values, + point_y=gdf["Y"].values, + ) # Adding columns to the GeoDataFrame - gdf['dip'] = dip - gdf['azimuth'] = azimuth - gdf['polarity'] = 1 + gdf["dip"] = dip + gdf["azimuth"] = azimuth + gdf["polarity"] = 1 return gdf -def sample_interfaces(raster: Union[np.ndarray, rasterio.io.DatasetReader], - extent: List[Union[int, float]] = None, - point_x: Union[float, int, list, np.ndarray] = None, - point_y: Union[float, int, list, np.ndarray] = None, - random_samples: int = None, - formation: str = None, - seed: int = None, - sample_outside_extent: bool = False, - crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None) -> gpd.geodataframe.GeoDataFrame: +def sample_interfaces( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + extent: List[Union[int, float]] = None, + point_x: Union[float, int, list, np.ndarray] = None, + point_y: Union[float, int, list, np.ndarray] = None, + random_samples: int = None, + formation: str = None, + seed: int = None, + sample_outside_extent: bool = False, + crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None, +) -> gpd.geodataframe.GeoDataFrame: """Sampling interfaces from a raster Parameters @@ -671,60 +720,72 @@ def sample_interfaces(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if the rasterio of type np.ndarray or a rasterio object if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray or a rasterio object') + raise TypeError("Raster must be of type np.ndarray or a rasterio object") # Checking if the extent is of type list if an array is provided if isinstance(raster, np.ndarray) and not isinstance(extent, list): - raise TypeError('Extent must be of type list when providing an array') + raise TypeError("Extent must be of type list when providing an array") # Checking that all elements of the extent are of type float or int - if isinstance(raster, np.ndarray) and not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Extent values must be of type int or float') + if isinstance(raster, np.ndarray) and not all( + isinstance(n, (int, float)) for n in extent + ): + raise TypeError("Extent values must be of type int or float") # Checking if the number of samples is of type int if point_x is None and point_y is None and not isinstance(random_samples, int): - raise TypeError('Number of samples must be of type int if no points are provided') + raise TypeError( + "Number of samples must be of type int if no points are provided" + ) # Checking if the points are of the correct type - if isinstance(random_samples, type(None)) and not isinstance(point_x, (float, int, list, np.ndarray)): - raise TypeError('Point_x must either be an int, float or a list or array of coordinates') + if isinstance(random_samples, type(None)) and not isinstance( + point_x, (float, int, list, np.ndarray) + ): + raise TypeError( + "Point_x must either be an int, float or a list or array of coordinates" + ) # Checking if the points are of the correct type - if isinstance(random_samples, type(None)) and not isinstance(point_y, (float, int, list, np.ndarray)): - raise TypeError('Point_y must either be an int, float or a list or array of coordinates') + if isinstance(random_samples, type(None)) and not isinstance( + point_y, (float, int, list, np.ndarray) + ): + raise TypeError( + "Point_y must either be an int, float or a list or array of coordinates" + ) # Checking if the seed is of type int if not isinstance(seed, (int, type(None))): - raise TypeError('Seed must be of type int') + raise TypeError("Seed must be of type int") # Checking that sampling outside extent is of type bool if not isinstance(sample_outside_extent, bool): - raise TypeError('Sampling_outside_extent must be of type bool') + raise TypeError("Sampling_outside_extent must be of type bool") # Checking that the crs is either a string or of type bool if not isinstance(crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS, type(None))): - raise TypeError('CRS must be provided as string, pyproj CRS or rasterio CRS') + raise TypeError("CRS must be provided as string, pyproj CRS or rasterio CRS") # Sampling by points if random_samples is None and point_x is not None and point_y is not None: # Sampling from Raster if isinstance(raster, rasterio.io.DatasetReader): - z = sample_from_rasterio(raster=raster, - point_x=point_x, - point_y=point_y, - sample_outside_extent=sample_outside_extent) + z = sample_from_rasterio( + raster=raster, + point_x=point_x, + point_y=point_y, + sample_outside_extent=sample_outside_extent, + ) # Sampling from array else: - z = sample_from_array(array=raster, - extent=extent, - point_x=point_x, - point_y=point_y) + z = sample_from_array( + array=raster, extent=extent, point_x=point_x, point_y=point_y + ) # Sampling randomly elif random_samples is not None and point_x is None and point_y is None: - samples = sample_randomly(raster=raster, - n=random_samples, - extent=extent, - seed=seed) + samples = sample_randomly( + raster=raster, n=random_samples, extent=extent, seed=seed + ) # Assigning X, Y and Z values z = [i for i in samples[0]] @@ -734,31 +795,31 @@ def sample_interfaces(raster: Union[np.ndarray, rasterio.io.DatasetReader], point_y = [i for i in samples[1][1]] else: - raise TypeError('Either provide only lists or array of points or a number of random samples, not both.') + raise TypeError( + "Either provide only lists or array of points or a number of random samples, not both." + ) # Creating GeoDataFrame if isinstance(point_x, Iterable) and isinstance(point_y, Iterable): - gdf = gpd.GeoDataFrame(data=pd.DataFrame(data=[point_x, point_y, z]).T, - geometry=gpd.points_from_xy(x=point_x, - y=point_y, - crs=crs) - ) + gdf = gpd.GeoDataFrame( + data=pd.DataFrame(data=[point_x, point_y, z]).T, + geometry=gpd.points_from_xy(x=point_x, y=point_y, crs=crs), + ) else: - gdf = gpd.GeoDataFrame(data=pd.DataFrame(data=[point_x, point_y, z]).T, - geometry=gpd.points_from_xy(x=[point_x], - y=[point_y], - crs=crs) - ) + gdf = gpd.GeoDataFrame( + data=pd.DataFrame(data=[point_x, point_y, z]).T, + geometry=gpd.points_from_xy(x=[point_x], y=[point_y], crs=crs), + ) # Setting the column names - gdf.columns = ['X', 'Y', 'Z', 'geometry'] + gdf.columns = ["X", "Y", "Z", "geometry"] # Assigning formation name if formation is not None: if isinstance(formation, str): - gdf['formation'] = formation + gdf["formation"] = formation else: - raise TypeError('Formation must be provided as string or set to None') + raise TypeError("Formation must be provided as string or set to None") return gdf @@ -767,11 +828,13 @@ def sample_interfaces(raster: Union[np.ndarray, rasterio.io.DatasetReader], ############################### -def calculate_hillshades(raster: Union[np.ndarray, rasterio.io.DatasetReader], - extent: List[Union[int, float]] = None, - azdeg: Union[int, float] = 225, - altdeg: Union[int, float] = 45, - band_no: int = 1) -> np.ndarray: +def calculate_hillshades( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + extent: List[Union[int, float]] = None, + azdeg: Union[int, float] = 225, + altdeg: Union[int, float] = 45, + band_no: int = 1, +) -> np.ndarray: """Calculating Hillshades based on digital elevation model/raster Parameters @@ -826,27 +889,27 @@ def calculate_hillshades(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking that the raster is of type rasterio object or numpy array if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be provided as rasterio object or NumPy array') + raise TypeError("Raster must be provided as rasterio object or NumPy array") # Checking if extent is of type list if not isinstance(extent, (type(None), list)): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking that altdeg is of type float or int if not isinstance(altdeg, (float, int)): - raise TypeError('altdeg must be of type int or float') + raise TypeError("altdeg must be of type int or float") # Checking that azdeg is of type float or int if not isinstance(azdeg, (float, int)): - raise TypeError('azdeg must be of type int or float') + raise TypeError("azdeg must be of type int or float") # Checking that altdeg is not out of bounds if altdeg > 90 or altdeg < 0: - raise ValueError('altdeg must be between 0 and 90 degrees') + raise ValueError("altdeg must be between 0 and 90 degrees") # Checking that azdeg is not out of bounds if azdeg > 360 or azdeg < 0: - raise ValueError('azdeg must be between 0 and 360 degrees') + raise ValueError("azdeg must be between 0 and 360 degrees") # Checking if object is rasterio object if isinstance(raster, rasterio.io.DatasetReader): @@ -861,24 +924,25 @@ def calculate_hillshades(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if object is of type np.ndarray if not isinstance(raster, np.ndarray): - raise TypeError('Input object must be of type np.ndarray') + raise TypeError("Input object must be of type np.ndarray") # Checking if dimension of array is correct if not raster.ndim == 2: - raise ValueError('Array must be of dimension 2') + raise ValueError("Array must be of dimension 2") # Calculate hillshades azdeg = 360 - azdeg x, y = np.gradient(raster) x = x / res[0] y = y / res[1] - slope = np.pi / 2. - np.arctan(np.sqrt(x * x + y * y)) + slope = np.pi / 2.0 - np.arctan(np.sqrt(x * x + y * y)) aspect = np.arctan2(-x, y) - azimuthrad = azdeg * np.pi / 180. - altituderad = altdeg * np.pi / 180. + azimuthrad = azdeg * np.pi / 180.0 + altituderad = altdeg * np.pi / 180.0 - shaded = np.sin(altituderad) * np.sin(slope) + np.cos(altituderad) * np.cos(slope) * np.cos( - (azimuthrad - np.pi / 2.) - aspect) + shaded = np.sin(altituderad) * np.sin(slope) + np.cos(altituderad) * np.cos( + slope + ) * np.cos((azimuthrad - np.pi / 2.0) - aspect) # Calculate color values hillshades = 255 * (shaded + 1) / 2 @@ -886,9 +950,11 @@ def calculate_hillshades(raster: Union[np.ndarray, rasterio.io.DatasetReader], return hillshades -def calculate_slope(raster: Union[np.ndarray, rasterio.io.DatasetReader], - extent: List[Union[int, float]] = None, - band_no: int = 1) -> np.ndarray: +def calculate_slope( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + extent: List[Union[int, float]] = None, + band_no: int = 1, +) -> np.ndarray: """Calculating the slope based on digital elevation model/raster Parameters @@ -937,11 +1003,11 @@ def calculate_slope(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking that the raster is of type rasterio object or numpy array if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be provided as rasterio object or NumPy array') + raise TypeError("Raster must be provided as rasterio object or NumPy array") # Checking if extent is of type list if not isinstance(extent, (type(None), list)): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking if object is rasterio object if isinstance(raster, rasterio.io.DatasetReader): @@ -956,11 +1022,11 @@ def calculate_slope(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if object is of type np.ndarray if not isinstance(raster, np.ndarray): - raise TypeError('Input object must be of type np.ndarray') + raise TypeError("Input object must be of type np.ndarray") # Checking if dimension of array is correct if not raster.ndim == 2: - raise ValueError('Array must be of dimension 2') + raise ValueError("Array must be of dimension 2") # Calculate slope y, x = np.gradient(raster) @@ -972,9 +1038,11 @@ def calculate_slope(raster: Union[np.ndarray, rasterio.io.DatasetReader], return slope -def calculate_aspect(raster: Union[np.ndarray, rasterio.io.DatasetReader], - extent: List[Union[int, float]] = None, - band_no: int = 1) -> np.ndarray: +def calculate_aspect( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + extent: List[Union[int, float]] = None, + band_no: int = 1, +) -> np.ndarray: """Calculating the aspect based on a digital elevation model/raster Parameters @@ -1023,11 +1091,11 @@ def calculate_aspect(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking that the raster is of type rasterio object or numpy array if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be provided as rasterio object or NumPy array') + raise TypeError("Raster must be provided as rasterio object or NumPy array") # Checking if extent is of type list if not isinstance(extent, (type(None), list)): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking if object is rasterio object if isinstance(raster, rasterio.io.DatasetReader): @@ -1042,11 +1110,11 @@ def calculate_aspect(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if object is of type np.ndarray if not isinstance(raster, np.ndarray): - raise TypeError('Input object must be of type np.ndarray') + raise TypeError("Input object must be of type np.ndarray") # Checking if dimension of array is correct if not raster.ndim == 2: - raise ValueError('Array must be of dimension 2') + raise ValueError("Array must be of dimension 2") # Calculate aspect y, x = np.gradient(raster) @@ -1059,9 +1127,11 @@ def calculate_aspect(raster: Union[np.ndarray, rasterio.io.DatasetReader], return aspect -def calculate_difference(raster1: Union[np.ndarray, rasterio.io.DatasetReader], - raster2: Union[np.ndarray, rasterio.io.DatasetReader], - flip_array: bool = False) -> np.ndarray: +def calculate_difference( + raster1: Union[np.ndarray, rasterio.io.DatasetReader], + raster2: Union[np.ndarray, rasterio.io.DatasetReader], + flip_array: bool = False, +) -> np.ndarray: """Calculating the difference between two rasters Parameters @@ -1111,14 +1181,16 @@ def calculate_difference(raster1: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if array1 is of type np.ndarray or a rasterio object if not isinstance(raster1, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster1 must be of type np.ndarray or a rasterio object') + raise TypeError("Raster1 must be of type np.ndarray or a rasterio object") # Checking if array2 is of type np.ndarray or a rasterio object if not isinstance(raster2, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster2 must be of type np.ndarray or a rasterio object') + raise TypeError("Raster2 must be of type np.ndarray or a rasterio object") # Subtracting rasterio objects - if isinstance(raster1, rasterio.io.DatasetReader) and isinstance(raster2, rasterio.io.DatasetReader): + if isinstance(raster1, rasterio.io.DatasetReader) and isinstance( + raster2, rasterio.io.DatasetReader + ): array_diff = raster1.read() - raster2.read() else: @@ -1126,8 +1198,7 @@ def calculate_difference(raster1: Union[np.ndarray, rasterio.io.DatasetReader], if raster1.shape != raster2.shape: # Rescale array - array_rescaled = resize_by_array(raster=raster2, - array=raster1) + array_rescaled = resize_by_array(raster=raster2, array=raster1) # Flip array if flip_array is True if flip_array: array_rescaled = np.flipud(array_rescaled) @@ -1150,13 +1221,15 @@ def calculate_difference(raster1: Union[np.ndarray, rasterio.io.DatasetReader], ###################### -def clip_by_bbox(raster: Union[rasterio.io.DatasetReader, np.ndarray], - bbox: List[Union[int, float]], - raster_extent: List[Union[int, float]] = None, - save_clipped_raster: bool = False, - path: str = 'raster_clipped.tif', - overwrite_file: bool = False, - create_directory: bool = False) -> np.ndarray: +def clip_by_bbox( + raster: Union[rasterio.io.DatasetReader, np.ndarray], + bbox: List[Union[int, float]], + raster_extent: List[Union[int, float]] = None, + save_clipped_raster: bool = False, + path: str = "raster_clipped.tif", + overwrite_file: bool = False, + create_directory: bool = False, +) -> np.ndarray: """Clipping a rasterio raster or np.ndarray by a given extent Parameters @@ -1224,23 +1297,23 @@ def clip_by_bbox(raster: Union[rasterio.io.DatasetReader, np.ndarray], # Checking that the raster is of type np.ndarray or a rasterio object if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray or a rasterio object') + raise TypeError("Raster must be of type np.ndarray or a rasterio object") # Checking that the extent is of type list if not isinstance(bbox, list): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking that all values are either ints or floats if not all(isinstance(n, (int, float)) for n in bbox): - raise TypeError('Bounds values must be of type int or float') + raise TypeError("Bounds values must be of type int or float") # Checking that save_clipped_raster is of type bool if not isinstance(save_clipped_raster, bool): - raise TypeError('save_clipped_raster must either be True or False') + raise TypeError("save_clipped_raster must either be True or False") # Checking that the path is of type string if not isinstance(path, str): - raise TypeError('The path must be provided as string') + raise TypeError("The path must be provided as string") # Getting the absolute path path = os.path.abspath(path=path) @@ -1257,75 +1330,110 @@ def clip_by_bbox(raster: Union[rasterio.io.DatasetReader, np.ndarray], if create_directory: os.makedirs(path_dir) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) if not overwrite_file: if os.path.exists(path): - raise FileExistsError("The file already exists. Pass overwrite_file=True to overwrite the existing file") + raise FileExistsError( + "The file already exists. Pass overwrite_file=True to overwrite the existing file" + ) # Checking if raster is rasterio object if isinstance(raster, rasterio.io.DatasetReader): - raster_clipped, raster_transform = rasterio.mask.mask(dataset=raster, - shapes=[Polygon([(bbox[0], bbox[2]), - (bbox[1], bbox[2]), - (bbox[1], bbox[3]), - (bbox[0], bbox[3])])], - crop=True, - filled=False, - pad=False, - pad_width=0) + raster_clipped, raster_transform = rasterio.mask.mask( + dataset=raster, + shapes=[ + Polygon( + [ + (bbox[0], bbox[2]), + (bbox[1], bbox[2]), + (bbox[1], bbox[3]), + (bbox[0], bbox[3]), + ] + ) + ], + crop=True, + filled=False, + pad=False, + pad_width=0, + ) # Saving the raster if save_clipped_raster: # Updating meta data raster_clipped_meta = raster.meta - raster_clipped_meta.update({"driver": "GTiff", - "height": raster_clipped.shape[1], - "width": raster_clipped.shape[2], - "transform": raster_transform}) + raster_clipped_meta.update( + { + "driver": "GTiff", + "height": raster_clipped.shape[1], + "width": raster_clipped.shape[2], + "transform": raster_transform, + } + ) # Writing the file with rasterio.open(path, "w", **raster_clipped_meta) as dest: dest.write(raster_clipped) # Swap axes and remove dimension - raster_clipped = np.flipud(np.rot90(np.swapaxes(raster_clipped, 0, 2)[:, :, 0], 1)) + raster_clipped = np.flipud( + np.rot90(np.swapaxes(raster_clipped, 0, 2)[:, :, 0], 1) + ) else: # Checking that the extent is provided as list if not isinstance(raster_extent, list): - raise TypeError('The raster extent must be provided as list of corner values') + raise TypeError( + "The raster extent must be provided as list of corner values" + ) # Checking that all values are either ints or floats if not all(isinstance(n, (int, float)) for n in raster_extent): - raise TypeError('Bounds values must be of type int or float') + raise TypeError("Bounds values must be of type int or float") # Create column and row indices for clipping - column1 = int((bbox[0] - raster_extent[0]) / (raster_extent[1] - raster_extent[0]) * raster.shape[1]) - row1 = int((bbox[1] - raster_extent[2]) / (raster_extent[3] - raster_extent[2]) * raster.shape[0]) - column2 = int((bbox[2] - raster_extent[0]) / (raster_extent[1] - raster_extent[0]) * raster.shape[1]) - row2 = int((bbox[3] - raster_extent[2]) / (raster_extent[3] - raster_extent[2]) * raster.shape[0]) + column1 = int( + (bbox[0] - raster_extent[0]) + / (raster_extent[1] - raster_extent[0]) + * raster.shape[1] + ) + row1 = int( + (bbox[1] - raster_extent[2]) + / (raster_extent[3] - raster_extent[2]) + * raster.shape[0] + ) + column2 = int( + (bbox[2] - raster_extent[0]) + / (raster_extent[1] - raster_extent[0]) + * raster.shape[1] + ) + row2 = int( + (bbox[3] - raster_extent[2]) + / (raster_extent[3] - raster_extent[2]) + * raster.shape[0] + ) # Clip raster raster_clipped = raster[column1:row1, column2:row2] # Save raster if save_clipped_raster: - save_as_tiff(raster=raster_clipped, - path=path, - extent=bbox, - crs='EPSG:4326') + save_as_tiff(raster=raster_clipped, path=path, extent=bbox, crs="EPSG:4326") return raster_clipped -def clip_by_polygon(raster: Union[rasterio.io.DatasetReader, np.ndarray], - polygon: shapely.geometry.polygon.Polygon, - raster_extent: List[Union[int, float]] = None, - save_clipped_raster: bool = False, - path: str = 'raster_clipped.tif', - overwrite_file: bool = False, - create_directory: bool = False) -> np.ndarray: +def clip_by_polygon( + raster: Union[rasterio.io.DatasetReader, np.ndarray], + polygon: shapely.geometry.polygon.Polygon, + raster_extent: List[Union[int, float]] = None, + save_clipped_raster: bool = False, + path: str = "raster_clipped.tif", + overwrite_file: bool = False, + create_directory: bool = False, +) -> np.ndarray: """Clipping/masking a rasterio raster or np.ndarray by a given shapely Polygon Parameters @@ -1394,19 +1502,19 @@ def clip_by_polygon(raster: Union[rasterio.io.DatasetReader, np.ndarray], # Checking that the raster is of type np.ndarray or a rasterio object if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray or a rasterio object') + raise TypeError("Raster must be of type np.ndarray or a rasterio object") # Checking that the polygon is a Shapely Polygon if not isinstance(polygon, shapely.geometry.polygon.Polygon): - raise TypeError('Polygon must be a Shapely Polygon') + raise TypeError("Polygon must be a Shapely Polygon") # Checking that save_clipped_raster is of type bool if not isinstance(save_clipped_raster, bool): - raise TypeError('save_clipped_raster must either be True or False') + raise TypeError("save_clipped_raster must either be True or False") # Checking that the path is of type string if not isinstance(path, str): - raise TypeError('The path must be provided as string') + raise TypeError("The path must be provided as string") # Getting the absolute path path = os.path.abspath(path=path) @@ -1423,48 +1531,66 @@ def clip_by_polygon(raster: Union[rasterio.io.DatasetReader, np.ndarray], if create_directory: os.makedirs(path_dir) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) if not overwrite_file: if os.path.exists(path): raise FileExistsError( - "The file already exists. Pass overwrite_file=True to overwrite the existing file") + "The file already exists. Pass overwrite_file=True to overwrite the existing file" + ) # Masking raster if isinstance(raster, rasterio.io.DatasetReader): - raster_clipped, raster_transform = rasterio.mask.mask(dataset=raster, - shapes=[polygon], - crop=True, - filled=False, - pad=False, - pad_width=0) + raster_clipped, raster_transform = rasterio.mask.mask( + dataset=raster, + shapes=[polygon], + crop=True, + filled=False, + pad=False, + pad_width=0, + ) # Saving the raster if save_clipped_raster: # Updating meta data raster_clipped_meta = raster.meta - raster_clipped_meta.update({"driver": "GTiff", - "height": raster_clipped.shape[1], - "width": raster_clipped.shape[2], - "transform": raster_transform}) + raster_clipped_meta.update( + { + "driver": "GTiff", + "height": raster_clipped.shape[1], + "width": raster_clipped.shape[2], + "transform": raster_transform, + } + ) # Writing the raster to file with rasterio.open(path, "w", **raster_clipped_meta) as dest: dest.write(raster_clipped) # Swap axes and remove dimension - raster_clipped = np.flipud(np.rot90(np.swapaxes(raster_clipped, 0, 2)[:, :, 0], 1)) + raster_clipped = np.flipud( + np.rot90(np.swapaxes(raster_clipped, 0, 2)[:, :, 0], 1) + ) else: # Converting the polygon to a rectangular bbox - bbox = [polygon.bounds[0], polygon.bounds[2], polygon.bounds[1], polygon.bounds[2]] + bbox = [ + polygon.bounds[0], + polygon.bounds[2], + polygon.bounds[1], + polygon.bounds[2], + ] # Clipping raster - raster_clipped = clip_by_bbox(raster=raster, - bbox=bbox, - raster_extent=raster_extent, - save_clipped_raster=save_clipped_raster, - path=path) + raster_clipped = clip_by_bbox( + raster=raster, + bbox=bbox, + raster_extent=raster_extent, + save_clipped_raster=save_clipped_raster, + path=path, + ) return raster_clipped @@ -1473,8 +1599,10 @@ def clip_by_polygon(raster: Union[rasterio.io.DatasetReader, np.ndarray], ###################### -def resize_by_array(raster: Union[np.ndarray, rasterio.io.DatasetReader], - array: Union[np.ndarray, rasterio.io.DatasetReader]) -> np.ndarray: +def resize_by_array( + raster: Union[np.ndarray, rasterio.io.DatasetReader], + array: Union[np.ndarray, rasterio.io.DatasetReader], +) -> np.ndarray: """Rescaling raster to the size of another raster Parameters @@ -1524,23 +1652,23 @@ def resize_by_array(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if array1 is of type np.ndarray if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray or a rasterio object') + raise TypeError("Raster must be of type np.ndarray or a rasterio object") # Checking if array2 is of type np.ndarray if not isinstance(array, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('array must be of type np.ndarray or a rasterio object') + raise TypeError("array must be of type np.ndarray or a rasterio object") # Resize raster by shape of array - array_resized = resize_raster(raster=raster, - width=array.shape[1], - height=array.shape[0]) + array_resized = resize_raster( + raster=raster, width=array.shape[1], height=array.shape[0] + ) return array_resized -def resize_raster(raster: Union[np.ndarray, rasterio.io.DatasetReader], - width: int, - height: int) -> np.ndarray: +def resize_raster( + raster: Union[np.ndarray, rasterio.io.DatasetReader], width: int, height: int +) -> np.ndarray: """Resizing raster to given dimensions Parameters @@ -1591,11 +1719,12 @@ def resize_raster(raster: Union[np.ndarray, rasterio.io.DatasetReader], from skimage.transform import resize except ModuleNotFoundError: raise ModuleNotFoundError( - 'Scikit Image package is not installed. Use pip install scikit-image to install the latest version') + "Scikit Image package is not installed. Use pip install scikit-image to install the latest version" + ) # Checking if array1 is of type np.ndarray if not isinstance(raster, (np.ndarray, rasterio.io.DatasetReader)): - raise TypeError('Raster must be of type np.ndarray') + raise TypeError("Raster must be of type np.ndarray") # Converting rasterio object to array if isinstance(raster, rasterio.io.DatasetReader): @@ -1603,14 +1732,13 @@ def resize_raster(raster: Union[np.ndarray, rasterio.io.DatasetReader], # Checking if dimensions are of type int if not isinstance(width, int): - raise TypeError('Width must be of type int') + raise TypeError("Width must be of type int") if not isinstance(height, int): - raise TypeError('Height must be of type int') + raise TypeError("Height must be of type int") # Resizing the array - array_resized = resize(image=raster, - output_shape=(height, width)) + array_resized = resize(image=raster, output_shape=(height, width)) return array_resized @@ -1619,10 +1747,7 @@ def resize_raster(raster: Union[np.ndarray, rasterio.io.DatasetReader], ############################################# # Defining dtype Conversion -dtype_conversion = { - "Integer": np.int32, - "Double": np.float64 -} +dtype_conversion = {"Integer": np.int32, "Double": np.float64} def read_msh(path: Union[str, Path]) -> Dict[str, np.ndarray]: @@ -1675,7 +1800,7 @@ def read_msh(path: Union[str, Path]) -> Dict[str, np.ndarray]: # Checking that the path is of type string or a path if not isinstance(path, (str, Path)): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -1693,15 +1818,16 @@ def read_msh(path: Union[str, Path]) -> Dict[str, np.ndarray]: f.seek(header_end + 0x14) # Extracting data from each line - for line in chunk[chunk.find(b"[index]") + 8:header_end].decode("utf-8").strip().split("\n"): + for line in ( + chunk[chunk.find(b"[index]") + 8 : header_end] + .decode("utf-8") + .strip() + .split("\n") + ): name, dtype, *shape = line.strip().rstrip(";").split() shape = list(map(int, reversed(shape))) dtype = dtype_conversion[dtype] - data[name] = np.fromfile( - f, - dtype, - np.prod(shape) - ).reshape(shape) + data[name] = np.fromfile(f, dtype, np.prod(shape)).reshape(shape) return data @@ -1760,7 +1886,7 @@ def read_ts(path: Union[str, Path]) -> Tuple[list, list]: # Checking that the path is of type string or a path if not isinstance(path, (str, Path)): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -1867,7 +1993,7 @@ def read_asc(path: Union[str, Path]) -> dict: # Checking that the path is of type string or a path if not isinstance(path, (str, Path)): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -1882,21 +2008,21 @@ def read_asc(path: Union[str, Path]) -> dict: if not line.strip(): continue line_value, *values = line.split() - if line_value == 'ncols': + if line_value == "ncols": ncols = int(values[0]) - if line_value == 'nrows': + if line_value == "nrows": nrows = int(values[0]) - if line_value == 'xllcenter': + if line_value == "xllcenter": xllcenter = float(values[0]) - if line_value == 'yllcenter': + if line_value == "yllcenter": yllcenter = float(values[0]) - if line_value == 'cellsize': + if line_value == "cellsize": res = float(values[0]) - if line_value == 'xllcorner': + if line_value == "xllcorner": xllcenter = float(values[0]) + 0.5 * res - if line_value == 'yllcorner': + if line_value == "yllcorner": yllcenter = float(values[0]) + 0.5 * res - if line_value == 'NODATA_value' or line_value == 'nodata_value': + if line_value == "NODATA_value" or line_value == "nodata_value": nodata_val = float(values[0]) # Load data and replace nodata_values with np.nan @@ -1904,10 +2030,17 @@ def read_asc(path: Union[str, Path]) -> dict: data[data == nodata_val] = np.nan # Creating dict and store data - data = {'Data': data, - 'Extent': [xllcenter, xllcenter + res * ncols, yllcenter, yllcenter + res * nrows], - 'Resolution': res, - 'Nodata_val': np.nan} + data = { + "Data": data, + "Extent": [ + xllcenter, + xllcenter + res * ncols, + yllcenter, + yllcenter + res * nrows, + ], + "Resolution": res, + "Nodata_val": np.nan, + } return data @@ -1965,7 +2098,7 @@ def read_zmap(path: Union[str, Path]) -> dict: # Checking that the path is of type string or a path if not isinstance(path, (str, Path)): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Getting the absolute path path = os.path.abspath(path=path) @@ -2006,20 +2139,22 @@ def read_zmap(path: Union[str, Path]) -> dict: # Getting array data data = [ - (float(d) if d.strip() != nodata else np.nan) for line in f for d in line.split() + (float(d) if d.strip() != nodata else np.nan) + for line in f + for d in line.split() ] # Creating dict for data data = { - 'Data': np.array(data).reshape((nrows, ncols), order="F"), - 'Extent': extent, - 'Resolution': resolution, - 'Nodata_val': float(nodata), - 'Dimensions': (nrows, ncols), - 'CRS': crs, - 'Creation_date': creation_date, - 'Creation_time': creation_time, - 'File_name': zmap_file_name + "Data": np.array(data).reshape((nrows, ncols), order="F"), + "Extent": extent, + "Resolution": resolution, + "Nodata_val": float(nodata), + "Dimensions": (nrows, ncols), + "CRS": crs, + "Creation_date": creation_date, + "Creation_time": creation_time, + "File_name": zmap_file_name, } return data @@ -2029,14 +2164,16 @@ def read_zmap(path: Union[str, Path]) -> dict: ################################ -def save_as_tiff(raster: np.ndarray, - path: str, - extent: Union[List[Union[int, float]], Tuple[Union[int, float]]], - crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS], - nodata: Union[float, int] = None, - transform=None, - overwrite_file: bool = False, - create_directory: bool = False): +def save_as_tiff( + raster: np.ndarray, + path: str, + extent: Union[List[Union[int, float]], Tuple[Union[int, float]]], + crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS], + nodata: Union[float, int] = None, + transform=None, + overwrite_file: bool = False, + create_directory: bool = False, +): """Saving a np.array as tif file Parameters @@ -2091,7 +2228,7 @@ def save_as_tiff(raster: np.ndarray, # Checking if path is of type string if not isinstance(path, str): - raise TypeError('Path must be of type string') + raise TypeError("Path must be of type string") # Checking that the file has the correct file ending if not path.endswith(".tif"): @@ -2108,57 +2245,62 @@ def save_as_tiff(raster: np.ndarray, if create_directory: os.makedirs(path_dir) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) if not overwrite_file: if os.path.exists(path): raise FileExistsError( - "The file already exists. Pass overwrite_file=True to overwrite the existing file") + "The file already exists. Pass overwrite_file=True to overwrite the existing file" + ) # Checking if the array is of type np.ndarray if not isinstance(raster, np.ndarray): - raise TypeError('array must be of type np.ndarray') + raise TypeError("array must be of type np.ndarray") # Checking if the extent is of type list if not isinstance(extent, (list, tuple)): - raise TypeError('Extent must be of type list or be a tuple') + raise TypeError("Extent must be of type list or be a tuple") # Checking that all values are either ints or floats if not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Bound values must be of type int or float') + raise TypeError("Bound values must be of type int or float") # Checking if the crs is of type string if not isinstance(crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS, dict)): - raise TypeError('CRS must be of type string, dict, rasterio CRS or pyproj CRS') + raise TypeError("CRS must be of type string, dict, rasterio CRS or pyproj CRS") # Extracting the bounds minx, miny, maxx, maxy = extent[0], extent[2], extent[1], extent[3] # Creating the transform if not transform: - transform = rasterio.transform.from_bounds(minx, miny, maxx, maxy, raster.shape[1], raster.shape[0]) + transform = rasterio.transform.from_bounds( + minx, miny, maxx, maxy, raster.shape[1], raster.shape[0] + ) # Creating and saving the array as tiff with rasterio.open( - path, - 'w', - driver='GTiff', - height=raster.shape[0], - width=raster.shape[1], - count=1, - dtype=raster.dtype, - crs=crs, - transform=transform, - nodata=nodata + path, + "w", + driver="GTiff", + height=raster.shape[0], + width=raster.shape[1], + count=1, + dtype=raster.dtype, + crs=crs, + transform=transform, + nodata=nodata, ) as dst: dst.write(np.flipud(raster), 1) - print('Raster successfully saved') + print("Raster successfully saved") -def create_filepaths(dirpath: str, - search_criteria: str, - create_directory: bool = False) -> List[str]: +def create_filepaths( + dirpath: str, search_criteria: str, create_directory: bool = False +) -> List[str]: """Retrieving the file paths of the tiles to load and to process them later Parameters @@ -2204,7 +2346,7 @@ def create_filepaths(dirpath: str, # Checking if dirpath is of type string if not isinstance(dirpath, str): - raise TypeError('Path to directory must be of type string') + raise TypeError("Path to directory must be of type string") # Getting the absolute path dirpath = os.path.abspath(path=dirpath) @@ -2217,11 +2359,13 @@ def create_filepaths(dirpath: str, if create_directory: os.makedirs(path_dir) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) # Checking that the search criterion is of type string if not isinstance(search_criteria, str): - raise TypeError('Search Criterion must be of Type string') + raise TypeError("Search Criterion must be of Type string") # Join paths to form path to files source = os.path.join(dirpath, search_criteria) @@ -2232,10 +2376,12 @@ def create_filepaths(dirpath: str, return filepaths -def create_src_list(dirpath: str = '', - search_criteria: str = '', - filepaths: List[str] = None, - create_directory: bool = False) -> List[rasterio.io.DatasetReader]: +def create_src_list( + dirpath: str = "", + search_criteria: str = "", + filepaths: List[str] = None, + create_directory: bool = False, +) -> List[rasterio.io.DatasetReader]: """Creating a list of source files Parameters @@ -2292,7 +2438,7 @@ def create_src_list(dirpath: str = '', # Checking if dirpath is of type string if not isinstance(dirpath, str): - raise TypeError('Path to directory must be of type string') + raise TypeError("Path to directory must be of type string") # Getting the absolute path dirpath = os.path.abspath(path=dirpath) @@ -2305,24 +2451,27 @@ def create_src_list(dirpath: str = '', if create_directory: os.makedirs(path_dir) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) # Checking that the search criterion is of type string if not isinstance(search_criteria, str): - raise TypeError('Search Criterion must be of Type string') + raise TypeError("Search Criterion must be of Type string") # Checking that the filepaths are of type list if not isinstance(filepaths, (list, type(None))): - raise TypeError('Filepaths must be of type list') + raise TypeError("Filepaths must be of type list") # Retrieving the file paths of the tiles - if not dirpath == '': - if not search_criteria == '': + if not dirpath == "": + if not search_criteria == "": if not filepaths: - filepaths = create_filepaths(dirpath=dirpath, - search_criteria=search_criteria) + filepaths = create_filepaths( + dirpath=dirpath, search_criteria=search_criteria + ) else: - raise ValueError('Either provide a file path or a list of filepaths') + raise ValueError("Either provide a file path or a list of filepaths") # Create empty list for source files src_files = [] @@ -2337,13 +2486,15 @@ def create_src_list(dirpath: str = '', return src_files -def merge_tiles(src_files: List[rasterio.io.DatasetReader], - extent: List[Union[float, int]] = None, - res: int = None, - nodata: Union[float, int] = None, - precision: int = None, - indices: int = None, - method: str = 'first') -> Tuple[np.ndarray, affine.Affine]: +def merge_tiles( + src_files: List[rasterio.io.DatasetReader], + extent: List[Union[float, int]] = None, + res: int = None, + nodata: Union[float, int] = None, + precision: int = None, + indices: int = None, + method: str = "first", +) -> Tuple[np.ndarray, affine.Affine]: """Merging downloaded tiles to mosaic Parameters @@ -2433,45 +2584,47 @@ def merge_tiles(src_files: List[rasterio.io.DatasetReader], # Checking if source files are stored in a list if not isinstance(src_files, list): - raise TypeError('Files must be stored as list') + raise TypeError("Files must be stored as list") # Checking if extent is a list if not isinstance(extent, (list, type(None))): - raise TypeError('Extent must be of type list') + raise TypeError("Extent must be of type list") # Checking that all values are either ints or floats if extent: if not all(isinstance(n, (int, float)) for n in extent): - raise TypeError('Extent values must be of type int or float') + raise TypeError("Extent values must be of type int or float") # Checking that the resolution is of type int if not isinstance(res, (int, type(None))): - raise TypeError('Resolution must be of type int') + raise TypeError("Resolution must be of type int") # Checking that the nodata value is of type int or float if not isinstance(nodata, (int, float, type(None))): - raise TypeError('Nodata value must be of type int or float') + raise TypeError("Nodata value must be of type int or float") # Checking that the precision is of type int if not isinstance(precision, (int, type(None))): - raise TypeError('Precision value must be of type int') + raise TypeError("Precision value must be of type int") # Checking that the indices for the bands are of type int if not isinstance(indices, (int, type(None))): - raise TypeError('Band indices must be of type int') + raise TypeError("Band indices must be of type int") # Checking that the method is of type string if not isinstance(method, (str, type(None))): - raise TypeError('Type of method must be provided as string') + raise TypeError("Type of method must be provided as string") # Merging tiles - mosaic, transformation = merge(src_files, - bounds=extent, - res=res, - nodata=nodata, - precision=precision, - indexes=indices, - method=method) + mosaic, transformation = merge( + src_files, + bounds=extent, + res=res, + nodata=nodata, + precision=precision, + indexes=indices, + method=method, + ) # Swap axes and remove dimension mosaic = np.flipud(np.rot90(np.swapaxes(mosaic, 0, 2)[:, 0:, 0], 1)) @@ -2479,11 +2632,13 @@ def merge_tiles(src_files: List[rasterio.io.DatasetReader], return mosaic, transformation -def reproject_raster(path_in: str, - path_out: str, - dst_crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS], - overwrite_file: bool = False, - create_directory: bool = False): +def reproject_raster( + path_in: str, + path_out: str, + dst_crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS], + overwrite_file: bool = False, + create_directory: bool = False, +): """Reprojecting a raster into different CRS Parameters @@ -2524,7 +2679,7 @@ def reproject_raster(path_in: str, # Checking that the path_in is of type string if not isinstance(path_in, str): - raise TypeError('The path of the source file must be of type string') + raise TypeError("The path of the source file must be of type string") # Getting the absolute path path_in = os.path.abspath(path=path_in) @@ -2535,7 +2690,7 @@ def reproject_raster(path_in: str, # Checking that the path_out is type string if not isinstance(path_out, str): - raise TypeError('The path of the destination file must be of type string') + raise TypeError("The path of the destination file must be of type string") # Getting the absolute path path_out = os.path.abspath(path=path_out) @@ -2552,31 +2707,34 @@ def reproject_raster(path_in: str, if create_directory: os.makedirs(path_dir_out) else: - raise LookupError('Directory not found. Pass create_directory=True to create a new directory') + raise LookupError( + "Directory not found. Pass create_directory=True to create a new directory" + ) if not overwrite_file: if os.path.exists(path_out): raise FileExistsError( - "The file already exists. Pass overwrite_file=True to overwrite the existing file") + "The file already exists. Pass overwrite_file=True to overwrite the existing file" + ) # Checking that the dst_crs is of type string or a pyproj object if not isinstance(dst_crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS)): - raise TypeError('The destination CRS must be of type string, pyproj CRS or rasterio CRS') + raise TypeError( + "The destination CRS must be of type string, pyproj CRS or rasterio CRS" + ) # Opening the Source DataSet with rasterio.open(path_in) as src: transform, width, height = calculate_default_transform( - src.crs, dst_crs, src.width, src.height, *src.bounds) + src.crs, dst_crs, src.width, src.height, *src.bounds + ) kwargs = src.meta.copy() - kwargs.update({ - 'crs': dst_crs, - 'transform': transform, - 'width': width, - 'height': height - }) + kwargs.update( + {"crs": dst_crs, "transform": transform, "width": width, "height": height} + ) # Writing the Destination DataSet - with rasterio.open(path_out, 'w', **kwargs) as dst: + with rasterio.open(path_out, "w", **kwargs) as dst: for i in range(1, src.count + 1): reproject( source=rasterio.band(src, i), @@ -2585,14 +2743,16 @@ def reproject_raster(path_in: str, src_crs=src.crs, dst_transform=transform, dst_crs=dst_crs, - resampling=Resampling.nearest) + resampling=Resampling.nearest, + ) -def extract_contour_lines_from_raster(raster: Union[rasterio.io.DatasetReader, np.ndarray, str], - interval: int, - extent: Union[Optional[Sequence[float]], Optional[Sequence[int]]] = None, - target_crs: Union[ - str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None) -> gpd.GeoDataFrame: +def extract_contour_lines_from_raster( + raster: Union[rasterio.io.DatasetReader, np.ndarray, str], + interval: int, + extent: Union[Optional[Sequence[float]], Optional[Sequence[int]]] = None, + target_crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None, +) -> gpd.GeoDataFrame: """Extracting contour lines from raster with a provided interval. Parameters @@ -2624,11 +2784,14 @@ def extract_contour_lines_from_raster(raster: Union[rasterio.io.DatasetReader, n from skimage import measure except ModuleNotFoundError: raise ModuleNotFoundError( - 'skimage package is not installed. Use pip install skimage to install the latest version') + "skimage package is not installed. Use pip install skimage to install the latest version" + ) # Checking if provided raster is either a file loaded with rasterio, an np.ndarray or a path directing to a .tif file if not isinstance(raster, (rasterio.io.DatasetReader, np.ndarray, str)): - raise TypeError("Raster must be a raster loaded with rasterio or a path directing to a .tif file") + raise TypeError( + "Raster must be a raster loaded with rasterio or a path directing to a .tif file" + ) # Checking if provided raster is of type str. If provided raster is a path (directing to a .tif file), load the file with rasterio if isinstance(raster, str): @@ -2638,26 +2801,35 @@ def extract_contour_lines_from_raster(raster: Union[rasterio.io.DatasetReader, n if isinstance(raster, np.ndarray): if extent is None: raise UnboundLocalError( - "For np.ndarray an extent must be provided to extract contour lines from an array") + "For np.ndarray an extent must be provided to extract contour lines from an array" + ) if extent is not None and not isinstance(extent, Sequence): raise TypeError("extent values must be of type float or int") if len(extent) != 4: - raise TypeError("Not enough arguments in extent to extract contour lines from an array") + raise TypeError( + "Not enough arguments in extent to extract contour lines from an array" + ) if target_crs is None: raise UnboundLocalError("For np.ndarray a target crs must be provided") - if target_crs is not None and not isinstance(target_crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS)): - raise TypeError("target_crs must be of type string, pyproj CRS or rasterio CRS") - - save_as_tiff(raster=np.flipud(raster), - path='input_raster.tif', - extent=extent, - crs=target_crs, - overwrite_file=True) - raster = rasterio.open('input_raster.tif') + if target_crs is not None and not isinstance( + target_crs, (str, pyproj.crs.crs.CRS, rasterio.crs.CRS) + ): + raise TypeError( + "target_crs must be of type string, pyproj CRS or rasterio CRS" + ) + + save_as_tiff( + raster=np.flipud(raster), + path="input_raster.tif", + extent=extent, + crs=target_crs, + overwrite_file=True, + ) + raster = rasterio.open("input_raster.tif") # Checking if provided interval is of type int if not isinstance(interval, int): @@ -2672,15 +2844,16 @@ def extract_contour_lines_from_raster(raster: Union[rasterio.io.DatasetReader, n values = [] # Calculating minimum and maximum value from the given raster value - min_val = int(interval * round(np.amin(raster.read(1)[~np.isnan(raster.read(1))]) / interval)) - max_val = int(interval * round(np.amax(raster.read(1)[~np.isnan(raster.read(1))]) / interval)) + min_val = int( + interval * round(np.amin(raster.read(1)[~np.isnan(raster.read(1))]) / interval) + ) + max_val = int( + interval * round(np.amax(raster.read(1)[~np.isnan(raster.read(1))]) / interval) + ) # Extracting contour lines and appending to lists - for value in range(min_val, - max_val, - interval): - contour = measure.find_contours(np.fliplr(raster.read(1).T), - value) + for value in range(min_val, max_val, interval): + contour = measure.find_contours(np.fliplr(raster.read(1).T), value) contours.append(contour) values.extend([value for i in range(len(contour))]) @@ -2695,28 +2868,32 @@ def extract_contour_lines_from_raster(raster: Union[rasterio.io.DatasetReader, n x_left, y_bottom, x_right, y_top = raster.bounds # Transforming and defining the coordinates of contours based on raster extent - x_new = [x_left + (x_right - x_left) / columns * contours_new[i][:, 0] for i in range(len(contours_new))] - y_new = [y_bottom + (y_top - y_bottom) / rows * contours_new[i][:, 1] for i in range(len(contours_new))] + x_new = [ + x_left + (x_right - x_left) / columns * contours_new[i][:, 0] + for i in range(len(contours_new)) + ] + y_new = [ + y_bottom + (y_top - y_bottom) / rows * contours_new[i][:, 1] + for i in range(len(contours_new)) + ] # Converting the contours to lines (LineStrings - Shapely) - lines = [LineString(np.array([x_new[i], - y_new[i]]).T) for i in range(len(x_new))] + lines = [LineString(np.array([x_new[i], y_new[i]]).T) for i in range(len(x_new))] # Creating GeoDataFrame from lines - gdf_lines = gpd.GeoDataFrame(geometry=lines, - crs=raster.crs) + gdf_lines = gpd.GeoDataFrame(geometry=lines, crs=raster.crs) # Adding value column to GeoDataframe - gdf_lines['Z'] = values + gdf_lines["Z"] = values return gdf_lines -def read_raster_gdb(path: str, - crs: Union[str, - pyproj.crs.crs.CRS, - rasterio.crs.CRS] = None, - path_out: str = ''): +def read_raster_gdb( + path: str, + crs: Union[str, pyproj.crs.crs.CRS, rasterio.crs.CRS] = None, + path_out: str = "", +): """Read Raster from OpenFileGDB. Parameters @@ -2735,15 +2912,15 @@ def read_raster_gdb(path: str, try: from osgeo import gdal, osr except ModuleNotFoundError: - raise ModuleNotFoundError('osgeo package is not installed') + raise ModuleNotFoundError("osgeo package is not installed") # Checking that the path is of type string if not isinstance(path, str): - raise TypeError('Path to the OpenFileGDB must be provided as string') + raise TypeError("Path to the OpenFileGDB must be provided as string") # Checking that the output path is of type string if not isinstance(path_out, str): - raise TypeError('Output path must be provided as string') + raise TypeError("Output path must be provided as string") # Opening Database ds = gdal.Open(path) @@ -2765,26 +2942,30 @@ def read_raster_gdb(path: str, # Creating CRS from projection or manually if dataset.GetProjection(): proj = osr.SpatialReference(wkt=dataset.GetProjection()) - epsg = proj.GetAttrValue('AUTHORITY', 1) - crs = 'EPSG:' + epsg + epsg = proj.GetAttrValue("AUTHORITY", 1) + crs = "EPSG:" + epsg else: if not crs: raise ValueError( - 'Raster does not have a projection, please provide a valid coordinate reference system') + "Raster does not have a projection, please provide a valid coordinate reference system" + ) # Saving raster to file with rasterio.open( - path_out + ds.GetSubDatasets()[i][1].replace(' ', '') + '.tif', - 'w', - driver='GTiff', - height=raster.shape[0], - width=raster.shape[1], - count=1, - dtype=raster.dtype, - crs=crs, - transform=affine.Affine.from_gdal(*dataset.GetGeoTransform()), - nodata=raster_band.GetNoDataValue() + path_out + ds.GetSubDatasets()[i][1].replace(" ", "") + ".tif", + "w", + driver="GTiff", + height=raster.shape[0], + width=raster.shape[1], + count=1, + dtype=raster.dtype, + crs=crs, + transform=affine.Affine.from_gdal(*dataset.GetGeoTransform()), + nodata=raster_band.GetNoDataValue(), ) as dst: dst.write(raster, 1) - print(ds.GetSubDatasets()[i][1].replace(' ', '') + '.tif successfully saved to file') \ No newline at end of file + print( + ds.GetSubDatasets()[i][1].replace(" ", "") + + ".tif successfully saved to file" + )