-
Notifications
You must be signed in to change notification settings - Fork 0
/
filter.py
120 lines (97 loc) · 4.76 KB
/
filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from enum import Enum
import numpy as np
import subprocess
import laspy
import os
import shutil
import requests
class TypeColor(Enum):
original = 1
green_cracks = 2
red_stains = 3
blue_spalls = 4
def getName(enum_class, value):
for enum_member in enum_class:
if enum_member.value == value:
return enum_member.name
return None
def filter_points_laspy(file_path, output_file, color):
# Open the LAS file
in_file = laspy.read(file_path)
# Extract RGB color information
colors = np.vstack((in_file.red, in_file.green, in_file.blue)).T / 256.0
# # Print the range of the RGB channels
# print("Range of R channel:", np.min(colors[:, 0]), np.max(colors[:, 0]))
# print("Range of G channel:", np.min(colors[:, 1]), np.max(colors[:, 1]))
# print("Range of B channel:", np.min(colors[:, 2]), np.max(colors[:, 2]))
# Define filtered condition: R > 0.8, G < 0.2, B < 0.2 (color values are in range 0 to 1)
match color:
case TypeColor.green_cracks.value: # green
filtered_mask = (colors[:, 0] < 0.4) & (colors[:, 1] > 0.5) & (colors[:, 2] < 0.4)
case TypeColor.red_stains.value: # red
filtered_mask = (colors[:, 0] > 0.5) & (colors[:, 1] < 0.4) & (colors[:, 2] < 0.4)
case _: # blue
filtered_mask = (colors[:, 0] < 0.3) & (colors[:, 1] < 0.3) & (colors[:, 2] > 0.7)
# Apply the mask to filter points and colors
filtered_points = np.vstack((in_file.x, in_file.y, in_file.z)).T[filtered_mask]
filtered_colors = (colors * 255)[filtered_mask].astype(np.uint16)
filtered_info = np.hstack((filtered_points, filtered_colors))
# Create a new LAS file for the filtered points
with laspy.open(output_file, mode='w', header=in_file.header) as out_writer:
out_file = laspy.ScaleAwarePointRecord.zeros(filtered_info.shape[0], header=in_file.header)
# Copy header and points data
out_file.x = filtered_points[:, 0]
out_file.y = filtered_points[:, 1]
out_file.z = filtered_points[:, 2]
# Update color information
out_file.red = filtered_colors[:, 0]
out_file.green = filtered_colors[:, 1]
out_file.blue = filtered_colors[:, 2]
out_writer.write_points(out_file)
print(f"Filtered LAS file saved to {output_file}", flush=True)
def download_file(url, save_path):
token = authenticate()
headers = {'Authorization': 'JWT {}'.format(token)}
# Send a GET request to the URL
response = requests.get(url, headers=headers)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Open a file in binary write mode ('wb') and write the content of the response
with open(save_path, 'wb') as f:
f.write(response.content)
print(f"File downloaded successfully and saved to: {save_path}", flush=True)
else:
print(f"Failed to download file. Status code: {response.status_code}", flush=True)
def lazTolas(laz, las):
# pdal = "/home/roboticslab/Developer/laimatt/laimatt_pdal/.conda/bin/pdal"
pdal = "pdal"
command = [
pdal,
"translate",
laz,
las
]
result = subprocess.run(command, capture_output=True, text=True, check=True)
def authenticate():
url = "https://webodm.boshang.online/api/token-auth/"
data = {
"username": "authentication",
"password": "authkeyword"
}
response = requests.post(url, data=data)
return response.json()['token']
def filter_from_webodm(project_id, task_id, color): # entire pipeline
# webodm_path = '/var/lib/docker/volumes/webodm_appmedia/_data/project/{}/task/{}/assets/'.format(project_id, task_id) + 'ccny_postprocessing'
# if not os.path.exists(webodm_path):
# # Create the directory
# os.makedirs(webodm_path)
task_path = 'tasks/projID_{}/pointclouds/{}'.format(project_id, getName(TypeColor, color))
input_file = task_path + '/{}_original_model.laz'.format(getName(TypeColor, color)) # Replace with your input LAS/LAZ file path
input_las = task_path + '/{}_original_model.las'.format(getName(TypeColor, color))
output_file = task_path + '/{}_filtered_model.las'.format(getName(TypeColor, color)) # Specify output file path for filtered points
url = 'https://webodm.boshang.online/api/projects/{}/tasks/{}/download/georeferenced_model.laz'.format(project_id, task_id)
download_file(url, input_file)
# alt = '/var/lib/docker/volumes/webodm_appmedia/_data/project/{}/task/{}/assets/odm_georeferencing/odm_georeferenced_model.laz'.format(project_id, task_id)
lazTolas(input_file, input_las)
filter_points_laspy(input_las, output_file, color)
# filter_from_webodm(272, '641c9c48-c021-44fe-a2a1-fe54004b8f43', 2)