-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhyperspectral_pifm_utils.py
310 lines (291 loc) · 18.1 KB
/
hyperspectral_pifm_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import sys
import os
import numpy as np
from pyUSID.io.translator import Translator
from pyUSID.io import write_utils
from pyUSID import USIDataset
class PiFMTranslator(Translator):
"""
Class that writes images, spectrograms, point spectra and associated ancillary data sets to h5 file in pyUSID data
structure.
"""
def __init__(self, path=None):
self.path = path
# super(HyperspectralTranslator, self).__init__(*args, **kwargs)
def get_path(self):
"""writes full path, directory, and file name as attributes to class"""
# get paths/get params dictionary, img/spectrogram/spectrum descriptions
full_path = os.path.realpath(self.path)
directory = os.path.dirname(full_path)
# file name
basename = os.path.basename(self.path)
self.full_path = full_path
self.directory = directory
self.basename = basename
#these dictionary parameters will be written to hdf5 file under measurement attributes
## make into function
def read_anfatec_params(self):
"""reads the scan parameters and writes them to a dictionary"""
params_dictionary = {}
params = True
with open(self.path, 'r', encoding="ISO-8859-1") as f:
for line in f:
if params:
sline = [val.strip() for val in line.split(':')]
if len(sline) == 2 and sline[0][0] != ';':
params_dictionary[sline[0]] = sline[1]
#in ANFATEC parameter files, all attributes are written before file references.
if sline[0].startswith('FileDesc'):
params = False
f.close()
self.params_dictionary = params_dictionary
self.x_len, self.y_len = int(params_dictionary['xPixel']), int(params_dictionary['yPixel'])
def read_file_desc(self):
"""reads spectrogram, image, and spectra file descriptions and stores all to dictionary where
the key:value pairs are filename:[all descriptors]"""
spectrogram_desc = {}
img_desc = {}
spectrum_desc = {}
with open(self.path,'r', encoding="ISO-8859-1") as f:
## can be made more concise...by incorporating conditons with loop control
lines = f.readlines()
for index, line in enumerate(lines):
sline = [val.strip() for val in line.split(':')]
#if true, then file describes image.
if sline[0].startswith('FileDescBegin'):
no_descriptors = 5
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#img_desc['filename'] = caption, scale, physical unit, offset
img_desc[file_desc[0]] = file_desc[1:]
#if true, file describes spectrogram (ie hyperspectral image)
if sline[0].startswith('FileDesc2Begin'):
no_descriptors = 10
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#caption, bytes perpixel, scale, physical unit, offset, offset, datatype, bytes per reading
#filename wavelengths, phys units wavelengths.
spectrogram_desc[file_desc[0]] = file_desc[1:]
if sline[0].startswith('AFMSpectrumDescBegin'):
no_descriptors = 3
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#file name, position x, position y
spectrum_desc[file_desc[0]] = file_desc[1:]
f.close()
self.img_desc = img_desc
self.spectrogram_desc = spectrogram_desc
self.spectrum_desc = spectrum_desc
def read_spectrograms(self):
"""reads spectrograms, associated spectral values, and saves them in two dictionaries"""
spectrograms = {}
spectrogram_spec_vals = {}
for file_name, descriptors in self.spectrogram_desc.items():
#load and save spectroscopic values
spec_vals_i = np.loadtxt(os.path.join(self.directory, file_name.strip('.int') + 'Wavelengths.txt'))
spectrogram_spec_vals[file_name] = spec_vals_i
#load and save spectrograms
spectrogram_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
spectrograms[file_name] = np.zeros((self.x_len, self.y_len, len(spec_vals_i)))
for y, line in enumerate(np.split(spectrogram_i, self.y_len)):
for x, pt_spectrum in enumerate(np.split(line, self.x_len)):
spectrograms[file_name][x, y, :] = pt_spectrum * float(descriptors[2])
self.spectrograms = spectrograms
self.spectrogram_spec_vals = spectrogram_spec_vals
def read_imgs(self):
"""reads images and saves to dictionary"""
imgs = {}
for file_name, descriptors in self.img_desc.items():
img_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
imgs[file_name] = np.zeros((self.x_len, self.y_len))
for y, line in enumerate(np.split(img_i, self.y_len)):
for x, pixel in enumerate(np.split(line, self.x_len)):
imgs[file_name][x, y] = pixel * float(descriptors[1])
self.imgs = imgs
def read_spectra(self):
"""reads all point spectra and saves to dictionary"""
spectra = {}
spectra_spec_vals = {}
spectra_x_y_dim_name = {}
for file_name, descriptors in self.spectrum_desc.items():
spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1)
spectra_spec_vals[file_name] = spectrum_f[:, 0]
spectra[file_name] = spectrum_f[:,1]
with open(os.path.join(self.directory, file_name)) as f:
spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t')
self.spectra = spectra
self.spectra_spec_vals = spectra_spec_vals
self.spectra_x_y_dim_name = spectra_x_y_dim_name
def make_pos_vals_inds_dims(self):
x_range = float(self.params_dictionary['XScanRange'])
y_range = float(self.params_dictionary['YScanRange'])
x_center = float(self.params_dictionary['xCenter'])
y_center = float(self.params_dictionary['yCenter'])
x_start = x_center-(x_range/2); x_end = x_center+(x_range/2)
y_start = y_center-(y_range/2); y_end = y_center+(y_range/2)
dx = x_range/self.x_len
dy = y_range/self.y_len
#assumes y scan direction:down; scan angle: 0 deg
y_linspace = -np.arange(y_start, y_end, step=dy)
x_linspace = np.arange(x_start, x_end, step=dx)
pos_ind, pos_val = write_utils.build_ind_val_matrices(unit_values=(x_linspace, y_linspace), is_spectral=False)
#usid.write_utils.Dimension uses ascii encoding, which can not encode
# micron symbol, so we replace it, if present, with the letter u.
pos_dims = [usid.write_utils.Dimension('X', self.params_dictionary['XPhysUnit'].replace('\xb5', 'u'), self.x_len),
usid.write_utils.Dimension('Y', self.params_dictionary['YPhysUnit'].replace('\xb5', 'u'), self.y_len)]
self.pos_ind, self.pos_val, self.pos_dims = pos_ind, pos_val, pos_dims
def create_hdf5_file(self):
h5_path = os.path.join(self.directory, self.basename.replace('.txt', '.h5'))
try:
self.h5_f = h5py.File(h5_path, mode='w')
#if file already exists. (maybe there is a better way to check for this)
except OSError:
self.h5_f = h5py.File(h5_path, mode='r+')
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
usid.hdf_utils.write_simple_attrs(self.h5_meas_grp, self.params_dictionary)
def write_spectrograms(self):
if bool(self.spectrogram_desc):
for spectrogram_f, descriptors in self.spectrogram_desc.items():
channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
spec_vals_i = self.spectrogram_spec_vals[spectrogram_f]
spectrogram_spec_dims = usid.write_utils.Dimension('Wavelength', descriptors[8], spec_vals_i)
h5_raw = usid.hdf_utils.write_main_dataset(channel_i, # parent HDF5 group
(self.x_len *
self.y_len, len(spec_vals_i)), # shape of Main dataset
'Raw_Data', # Name of main dataset
'Spectrogram', # Physical quantity contained in Main dataset
descriptors[3], # Units for the physical quantity
self.pos_dims, # Position dimensions
spectrogram_spec_dims, # Spectroscopic dimensions
dtype=np.float32, # data type / precision
main_dset_attrs={'Caption': descriptors[0],
'Bytes_Per_Pixel': descriptors[1],
'Scale': descriptors[2],
'Physical_Units': descriptors[3],
'Offset': descriptors[4],
'Datatype': descriptors[5],
'Bytes_Per_Reading': descriptors[6],
'Wavelength_File': descriptors[7],
'Wavelength_Units': descriptors[8]})
h5_raw.h5_pos_vals[:, :] = self.pos_val
h5_raw[:, :] = self.spectrograms[spectrogram_f].reshape(h5_raw.shape)
def write_images(self):
if bool(self.img_desc):
for img_f, descriptors in self.img_desc.items():
#check for existing spectrogram or image and link position/spec inds/vals
#at most two channels worth of need to be checked
try:
str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_000']))
i_beg = str_main.find('located at: \n\t') + 14
i_end = str_main.find('\nData contains') - 1
data_loc = str_main[i_beg:i_end]
channel_data = USIDataset(self.h5_f[data_loc])
h5_pos_inds = channel_data.h5_pos_inds
h5_pos_vals = channel_data.h5_pos_vals
pos_dims = None
write_pos_vals = False
if channel_data.spec_dim_sizes[0] == 1:
h5_spec_inds = channel_data.h5_spec_inds
h5_spec_vals = channel_data.h5_spec_vals
spec_dims = None
#if channel 000 is spectrogram, check next dataset
elif channel_data.spec_dim_sizes[0] !=1:
str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_001']))
i_beg = str_main.find('located at: \n\t') + 14
i_end = str_main.find('\nData contains') - 1
data_loc = str_main[i_beg:i_end]
channel_data = USIDataset(self.h5_f[data_loc])
#channel data is an image, & we link their spec inds/vals
if channel_data.spec_dim_sizes[0] == 1:
h5_spec_inds = channel_data.h5_spec_inds
h5_spec_vals = channel_data.h5_spec_vals
spec_dims = None
#in case where channel does not exist, we make new spec/pos inds/vals
except KeyError:
#pos dims
h5_pos_inds = None
h5_pos_vals = None
pos_dims = self.pos_dims
write_pos_vals = True
#spec dims
h5_spec_inds = None
h5_spec_vals = None
spec_dims = usid.write_utils.Dimension('arb', 'a.u', 1)
channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp,'Channel_')
h5_raw = usid.hdf_utils.write_main_dataset(channel_i, #parent HDF5 group
(self.x_len * self.y_len, 1), # shape of Main dataset
'Raw_' + descriptors[0].replace('-', '_'),
# Name of main dataset
descriptors[0],
# Physical quantity contained in Main dataset
descriptors[2], # Units for the physical quantity
h5_pos_inds=h5_pos_inds,
h5_pos_vals=h5_pos_vals,
# Position dimensions
pos_dims=pos_dims,
# Spectroscopic dimensions
h5_spec_inds=h5_spec_inds,
h5_spec_vals=h5_spec_vals,
spec_dims=spec_dims,
dtype=np.float32, # data type / precision
main_dset_attrs={'Caption': descriptors[0],
'Scale': descriptors[1],
'Physical_Units': descriptors[2],
'Offset': descriptors[3]})
h5_raw[:, :] = self.imgs[img_f].reshape(h5_raw.shape)
if write_pos_vals:
h5_raw.h5_pos_vals[:, :] = self.pos_val
def write_spectra(self):
if bool(self.spectrum_desc):
for spec_f, descriptors in self.spectrogram_desc.items():
#create new measurement group for ea spectrum
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0]
x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1]
y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0]
y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1]
spec_i_spec_dims = usid.write_utils.Dimension(x_name, x_unit, self.spectra_spec_vals[spec_f])
spec_i_pos_dims = [usid.write_utils.Dimension('X',
self.params_dictionary['XPhysUnit'].replace('\xb5','u'),
float(descriptors[0])),
usid.write_utils.Dimension('Y',
self.params_dictionary['YPhysUnit'].replace('\xb5','u'),
float(descriptors[1]))]
#write data to a channel in the measurement group
spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
h5_raw = usid.hdf_utils.write_main_dataset(spec_i_ch, # parent HDF5 group
(1, len(self.spectra_spec_vals[spec_f])), # shape of Main dataset
'Raw_Spectrum',
# Name of main dataset
y_name,
# Physical quantity contained in Main dataset
y_unit, # Units for the physical quantity
# Position dimensions
pos_dims=spec_i_pos_dims, spec_dims=spec_i_spec_dims,
# Spectroscopic dimensions
dtype=np.float32, # data type / precision
main_dset_attrs={'XLoc': descriptors[0],
'YLoc': descriptors[1]})
h5_raw[:, :] = self.spectra[spec_f].reshape(h5_raw.shape)
def translate(self):
"""
:return: h5 file.
"""
self.get_path()
self.read_anfatec_params()
self.read_file_desc()
self.read_spectrograms()
self.read_imgs()
self.read_spectra()
self.make_pos_vals_inds_dims()
self.create_hdf5_file()
self.write_spectrograms()
self.write_images()
self.write_spectra()
return self.h5_f