-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmaster.py
123 lines (102 loc) · 4.47 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
import os, sys, h5py, json, re, errno
from multiprocessing import Pool, Manager
import datawell
import xds
from pathlib import Path
from jsonenc import JSONEnc
class Master(object):
# Generating a constructor for the class:
def __init__(self, args, masterfilepath, num_of_total_frames, output_directory):
self.args = args
self.masterfilepath = masterfilepath
self.frames_per_degree = args.framesperdegree
self.frames_per_well = int(args.oscillation * args.framesperdegree)
self.frames_to_process = int(args.oscillationperwell * args.framesperdegree)
self.total_frames = num_of_total_frames
self.output = output_directory
self.xdsparams = xds.get_xds_params(self.args)
self.detparams = xds.get_detector_params(self.args.detector)
# Variables defined within class:
self.master_dictionary = Manager().dict()
self.new_list = []
# Functions called within class:
self.create_master_directory(self.masterfilepath) # creating masterfile directories
self.master_directory_path = self.get_master_directory_path(self.masterfilepath)
# creating datawell directory and run XDS in it (with parallelization)
self.run()
self.write_to_json()
def run(self):
p = Pool()
for framenum in range(1,self.total_frames,self.frames_per_well):
p.apply_async(self.create_and_run_data_wells, args=(framenum, self.master_dictionary))
p.close()
p.join()
def create_master_directory(self, masterfilepath):
# Generate a name for masterfile directory:
suffix = masterfilepath.stem.replace('_master','')
new_dir_path = Path(self.output / suffix)
# Create a masterfile directory:
try:
new_dir_path.mkdir()
except:
sys.exit("Creation of the directory {} failed.".format(new_dir_path))
def create_and_run_data_wells(self, framenum, md):
# Generate datawell directories by creating instances of class called 'Datawell' (from datawell.py):
print("Processing frames {}-{}...".format(framenum,framenum+self.frames_to_process-1))
dw = datawell.Datawell(framenum, framenum+self.frames_to_process-1, self.master_directory_path,
self.masterfilepath, self.xdsparams, self.detparams)
dw.setup_datawell_directory()
dw.gen_XDS()
dw.run()
#dw.check_and_rerun()
md[dw.getframes()] = dw.get_dw_dict()
def get_master_directory_name(self,masterfilepath):
return masterfilepath.name.strip('_master.h5')
def get_master_directory_path(self, masterfilepath):
# Return master directory path. Used in the above function.
suffix = masterfilepath.stem.replace('_master','')
return Path(self.output / suffix)
def get_master_dictionary(self):
return self.master_dictionary.copy()
def write_to_json(self):
Path(self.master_directory_path / 'results_{b}.json'.format(b=self.get_master_directory_name(self.masterfilepath))).write_text(
json.dumps(self.master_dictionary.copy(), indent=2, sort_keys=True, cls=JSONEnc)
)
def create_output_directory(path):
try:
Path(path).mkdir()
except OSError:
sys.exit("Creation of the directory {} failed. Such file may already exist.".format(path))
def get_h5_file(path):
try:
file = h5py.File(path, 'r')
except Exception:
raise IOError("Not a valid h5 file")
return file
def get_number_of_files(path, args):
print(path)
f = get_h5_file(path)
lastdata = list(f['/entry/data/'].keys())[-1]
try:
numimages = f['/entry/data/'].get(lastdata).attrs.get('image_nr_high')
except:
numimages = len(list(f['/entry/data/'].keys())) * int(args.oscillation) * int(args.framesperdegree)
return numimages
def check_args(args):
def args_exit():
sys.exit("Please check your input!")
if args.oscillationperwell is None:
args.oscillationperwell = args.oscillation
if args.input is None:
print("Input directory must be specified.")
args_exit()
if args.beamcenter is None or len(args.beamcenter) != 2:
print("Please specify the beam center.")
args_exit()
if args.oscillation is None:
print("Please provide the oscillation per well.")
args_exit()
if args.distance is None:
print("Please provide the detector distance.")
args_exit()