-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcif_zip_to_hdf5.py
229 lines (207 loc) · 10.4 KB
/
cif_zip_to_hdf5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
from pymatgen.transformations.standard_transformations import *
from pymatgen.transformations.advanced_transformations import *
import h5py
import numpy as np
import pickle as pk
import matminer.featurizers.structure as struc_feat
from multiprocessing import Process, Pool, cpu_count
from matbench.bench import MatbenchBenchmark
import scipy as sp
import argparse
import sys
from os.path import exists
from tqdm import tqdm
from zipfile import ZipFile
from pymatgen.core import Structure
import pandas as pd
class OrthorhombicSupercellTransform(AbstractTransformation):
"""
This transformation generates a combined scaled and shear of the provided unit cell to achieve a roughly
OrthorhomicSupercell with roughly equal side lengths. Gets more accurate the larger the difference in size between primitive and supercell
Robust alternative to the existing cubic supercell method that guarantees the inverse matrix is not singular when rounded.
"""
def __init__(self, N_Atoms):
"""
Args:
charge_balance_sp: The desired number of atoms in the supercell
"""
self.N_Atoms = int(N_Atoms)
def apply_transformation(self, structure):
"""
Applies the transformation.
Args:
structure: Input Structure
Returns:
OrthorhombicSupercell
"""
lattice_matrix = structure.lattice.as_dict()["matrix"]
#RQ decomposition in this context provides a scale and shear matrix (R) that maps a Orthorhombic cell of unit volume to the current lattice parameters
R, Q = sp.linalg.rq(lattice_matrix)
#Invert R to get the scale+shear that maps the current unit cell to the Orthorhombic cell
R1 = np.linalg.inv(R)
#R1 is the inverse of R, we require the inverse of the diagonal component of R1 to remove the unwanted normalization included in the rq algorithm
R1_Diagonal = np.zeros(R1.shape)
np.fill_diagonal(R1_Diagonal,np.diagonal(R1))
#S is the 'ideal' normalized shearing, it is not yet suitable due to its non-integer components
S = sp.linalg.inv(R1_Diagonal) @ R1
#The lattice parameters of Q are the "ideal" attained by directly applying S, we compute our scaling matrix by iteratively incrementing the shortest lattice parameter on Q
#until any further increments breach the upper atom limit. These increments on Q are used to compute the scaling component of the transformation
start_len = len(structure)
Sheared_cell = S @ lattice_matrix
Sheared_abc = [np.linalg.norm(Sheared_cell[0]),np.linalg.norm(Sheared_cell[1]),np.linalg.norm(Sheared_cell[2])]
increments = (1,1,1)
found_transform = False
#Iteratively increment the shortest lattice parameters until doing so brings the number of atoms above the limit
while not found_transform:
new_increments = list(increments) #Deep copy
shortest = np.argmin([i*j for i,j in zip(increments,Sheared_abc)]) #Return the shortest lattice parameter of Q post scaling
new_increments[shortest] += 1
if np.prod(new_increments)*start_len <= self.N_Atoms: #If this increment brings the total number of atoms above the ceiling then return the transformation matrix, otherwise repeat
increments = new_increments
else:
found_transform=True
cubic_upscale_approx = np.rint(np.diag(increments) @ S) # Create combined scale and shear matrix and round the off diagonals to the nearest integer, this provides an integer approximation of the shear, larger supercells will be more precise
structure = SupercellTransformation(scaling_matrix=cubic_upscale_approx).apply_transformation(structure) # Apply the computed integer supercell transformation
return structure
@property
def inverse(self):
"""Returns: None"""
return None
@property
def is_one_to_many(self):
"""Returns: False"""
return False
#Processes the pymatgen structure provided, first transforming to a primitive, and then upscaling to a cubic supercell of a given size if enabled
class process_structures():
def __init__(self,cubic_supercell,supercell_size):
self.cubic_supercell = cubic_supercell
self.supercell_size = supercell_size
def process_structure(self,struct):
primitive_trans = PrimitiveCellTransformation()
struct = primitive_trans.apply_transformation(struct)
prim_size = len(struct)
if self.cubic_supercell:
orthog = OrthorhombicSupercellTransform(self.supercell_size)
supercell = orthog.apply_transformation(struct)
else:
supercell = struct
images = len(supercell)//len(struct)
#Matbench band gap dataset does not contain disorder, but this code is general
if not supercell.is_ordered:
try:
oxi_dec = AutoOxiStateDecorationTransformation()
supercell= oxi_dec.apply_transformation(supercell)
#discrete = DiscretizeOccupanciesTransformation(10)
#supercell = discrete.apply_transformation(supercell)
order_trans = OrderDisorderedStructureTransformation()
supercell = order_trans.apply_transformation(supercell)
prim_size = len(supercell)
images = 1
print("Succeed at ordering disordered cell")
except Exception as e:
print(e)
print("Failed to order disordered cell")
return None
return supercell,prim_size,images
def generate_crystal_dictionary(struc_and_target):
struc = struc_and_target[0]
composition = struc.composition.formula
crystal_dict = {
"structure": struc,
"composition": composition,
"target": struc_and_target[1],
"prim_size": struc_and_target[2],
"images": struc_and_target[3],
}
return crystal_dict
def divide_chunks(l, n):
for i in range(0, len(l), n):
yield l[i : i + n]
def h5_dataset_from_structure_list(hdf5_file_name, structure_dictionary,cpus):
f = h5py.File(hdf5_file_name, "w", libver="latest")
keys_list = list(structure_dictionary.keys())
keys_chunked = list(divide_chunks(keys_list, 2048))
for keys in tqdm(keys_chunked):
values = [structure_dictionary[key] for key in keys]
pool = Pool(processes=cpus)
processed_values = [
i
for i in tqdm(
pool.imap(
generate_crystal_dictionary,
values,
)
)
]
pool.close()
pool.join()
pool.terminate()
# crystal dict of dicts
cdd = {key: value for key, value in zip(keys, processed_values)}
print("dumping processed structures to hdf5 file")
for key in tqdm(cdd.keys()):
group = f.require_group(str(key))
group.create_dataset("composition", data=cdd[key]["composition"])
group.create_dataset(
"pymatgen_structure", data=np.void(pk.dumps(cdd[key]["structure"]))
)
group.create_dataset("target", data=cdd[key]["target"])
group.create_dataset("prim_size", data=cdd[key]["prim_size"])
group.create_dataset("images", data=cdd[key]["images"])
def dataset_to_hdf5(inputs,outputs,h5_file_name,cpus,supercell,supercell_size):
#Create tuples of crystal index names, pymatgen structures, and properties
structure_list = [(i, j, k) for i, j, k in zip(inputs.index, inputs, outputs)]
#Transform the structures into primitive unit cells, and then upscale if appropiate
processor = process_structures(supercell,supercell_size)
pool = Pool(processes=cpu_count())
processed_structures = [
i for i in tqdm(pool.imap(processor.process_structure, [i[1] for i in structure_list]))
]
print("STRUCTURES ARE NOW PROCESSED")
pool.close()
pool.join()
pool.terminate()
#Create tuple of processed structure, target proprety, size of primitive unit cell, and number of images
processed_structures = [
(processed_structures[i][0], structure_list[i][2],processed_structures[i][1],processed_structures[i][2])
for i in range(len(structure_list)) if processed_structures[i] != None
]
#Create a dictionary mapping each dataset index to the generated tuples
structure_dict = {i[0]: j for i, j in tqdm(zip(structure_list, processed_structures))}
#Initialize the h5 database with the pymatgen structures, the target, the primitive size, and the number of images
h5_dataset_from_structure_list(h5_file_name, structure_dict,cpus)
if __name__ == "__main__":
#Arguments for whether to generate primitive cells or supercells, and what size the supercells should be capped at
parser = argparse.ArgumentParser(description="ml options")
parser.add_argument('--cubic_supercell', default=False, action='store_true')
parser.add_argument('--primitive', default=False, action='store_true')
parser.add_argument("-s", "--supercell_size", default=100,type=int)
parser.add_argument("-w", "--number_of_worker_processes",default = 1,type=int)
parser.add_argument("-c","--cif_zip",default=None,type=str)
parser.add_argument("-d","--data_csv",default=None,type=str)
parser.add_argument("-hd", "--h5_path",default=None,type=str)
args = parser.parse_args()
if args.cubic_supercell:
h5_file_name = "matbench_mp_gap_cubic_" + str(args.supercell_size)
supercell = True
supercell_size = args.supercell_size
elif args.primitive:
h5_file_name = "matbench_mp_gap_primitive"
supercell = False
supercell_size = None
else:
raise(Exception("Need to specify either --primitive or --cubic_supercell on commandline, with -s argument controlling supercell size"))
def read_cif_try(file):
try:
return Structure.from_str(cif_zip.read(file).decode("utf-8"),"cif")
except:
print("FAILED TO READ CIF")
return None
with ZipFile(args.cif_zip,"r") as cif_zip:
tqdm.pandas()
data = pd.read_csv(args.data_csv)[:100]
data["structure"] = data["file"].progress_apply(read_cif_try)
data = data.dropna()
structures = pd.Series(data["structure"],data.index)
targets = pd.Series(data["target"],data.index)
dataset_to_hdf5(structures,targets,args.h5_path,args.number_of_worker_processes,supercell,supercell_size)