-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
365 lines (310 loc) · 13.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python
# coding: utf-8
# Copyright 2015 Helen Catanese
# This file is part of RepeatAnalyzer.
# RepeatAnalyzer is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# RepeatAnalyzer is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with RepeatAnalyzer. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
from logging.handlers import RotatingFileHandler
import toml
from matplotlib import pyplot as plt
from scipy.cluster.hierarchy import dendrogram
from RepeatAnalyzer.RA_DataStructures import (Species, identifystrain,
parserepeats)
from RepeatAnalyzer.RA_Functions import (exportdata, exportEditDistanceCSV,
exportRepeatCSV, generateAutonames,
get_working_directory, importdata,
readdatafromfile, updateGeocoding)
from RepeatAnalyzer.RA_Interface import (createMap, deployWindow,
getAllLocations, getGDLocation,
printspeciesdata, sanitize,
searchByLocation, searchByRepeat,
searchByStrain, searchWindow)
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set logger to the lowest level of logging
# File handler for logging DEBUG and above to file
os.makedirs(os.path.join(get_working_directory(),"logs"), exist_ok=True)
fh = RotatingFileHandler(
os.path.join(get_working_directory(),"logs", "repeatanalyzer.log"),
mode="w",
encoding="UTF-8",
maxBytes=2000,
backupCount=10
)
fh.setLevel(logging.DEBUG)
log_formatter = logging.Formatter('%(asctime)s: %(levelname)8s: %(message)s')
fh.setFormatter(log_formatter)
logger.addHandler(fh)
# Console handler for logging ERROR and above to console
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
ch.setFormatter(console_formatter)
logger.addHandler(ch)
logging.info("Starting RepeatAnalyzer...")
def fancy_dendrogram(*args, **kwargs):
max_d = kwargs.pop("max_d", None)
if max_d and "color_threshold" not in kwargs:
kwargs["color_threshold"] = max_d
annotate_above = kwargs.pop("annotate_above", 0)
ddata = dendrogram(*args, **kwargs)
if not kwargs.get("no_plot", False):
plt.title("Hierarchical Clustering Dendrogram of Anaplasma marginale Repeats")
plt.xlabel("Sample Name or (Cluster Size)")
plt.ylabel("Distance")
for i, d, c in zip(ddata["icoord"], ddata["dcoord"], ddata["color_list"]):
x = 0.5 * sum(i[1:3])
y = d[1]
if y > annotate_above:
plt.plot(x, y, "o", c=c)
plt.annotate(
"%.3g" % y,
(x, y),
xytext=(0, -5),
textcoords="offset points",
va="top",
ha="center",
)
if max_d:
plt.axhline(y=max_d, c="k")
return ddata
# clusters = dictionary of clusters #: [rID1, rID2,...]
def printClusterMap(species, clusters):
rlons = []
rlats = []
rIDs = []
cnames = {}
for key, c in clusters.items():
rlons.append([])
rlats.append([])
rIDs.append(key)
allLocations = set()
for repeatID in c:
for location in getAllLocations(repeatID, "r", species):
allLocations.add(location)
for location in list(allLocations):
rlons[-1].append(location.longitude)
rlats[-1].append(location.latitude)
cnames[(location.longitude, location.latitude)] = location.getString()
createMap(rlons, rlats, rIDs, [], [], [], cnames, species, 4.5, 1, 0, True)
def extract_version_from_pyproject_toml(file_path='pyproject.toml'):
try:
with open(file_path, 'r') as toml_file:
toml_content = toml.load(toml_file)
version = toml_content['tool']['poetry']['version']
return version
except FileNotFoundError:
print(f"Error: File {file_path} not found.")
return None
except KeyError:
print(f"Error: Unable to find version in {file_path}. Make sure the file structure is correct.")
return None
except PermissionError:
print(f"Error: Permission denied when opening {file_path}.")
return None
def menuloop(speciesList, currentSpecies):
# process=multiprocessing.Process(target=, args=())
# process.start()
version = extract_version_from_pyproject_toml()
goAgain = True
while goAgain == True:
print(f"\nWelcome to RepeatAnalyzer {version}.")
logging.info(f"RepeatAnalyzer {version} started.")
if speciesList != []:
print(
"Currently working on", speciesList[currentSpecies].name + ":", end=" "
)
print(len(speciesList[currentSpecies].strains), "strains,", end=" ")
print(len(speciesList[currentSpecies].repeats), "repeats.")
print("1: Identify repeats")
print("2: Change current species")
print("3: Search data")
print("4: Map data")
print("5: Input data from file")
print("6: Regional diversity analysis")
print("7: Remove a species")
print("8: Remove a strain")
print("9: Generate strain names")
print("10: Print all species data")
print("11: Update Geocodings")
print("0: Exit Program")
try:
command = int(input("Please enter the number of the option you want: "))
except ValueError:
print("\nPlease enter an integer")
continue
if command == 0: # Exit
return
# if command==9:
# look into multiple sequence alignment for change ranking
# align=MultipleSeqAlignment([])
# with open
# for repeat in speciesList[currentSpecies].repeats:
# align.extend([SeqRecord(Seq(repeat.sequence, generic_dna),id=listtostring(repeat.name,"; "))])
if command == 1: # Find strain from amino acid sequence
logging.debug("Identify repeats selected")
# idprocess=multiprocessing.Process(target=deployWindow, args=(speciesList,currentSpecies))
# idprocess.start()
deployWindow(speciesList, currentSpecies)
if command == 2: # Change current Species
logging.debug("Change current species selected")
i = 1
print("0: Add a Species")
for species in speciesList:
print(str(i) + ": " + species.name)
i += 1
new = int(input("Please enter the number of the option you want: ")) - 1
if new < 0:
name = str(input("Please enter the name of the new species: "))
print('Is this information correct(y/n)? Name: "' + name + '"', end=" ")
if sanitize(str(input(":"))) == "y":
speciesList.append(Species(name))
else:
print("Species not stored")
if new >= len(speciesList):
print("Error: Not a valid selection")
else:
currentSpecies = new
if command == 3: # Search by repeat, strain or location
logging.debug("Search data selected")
while goAgain == True:
print("\n What criteria would you like to search by?")
print("1: Search by repeat")
print("2: Search by location")
print("3: Search by strain")
print("0: Return to main menu")
try:
command = int(
input("Please enter the number of the option you want: ")
)
except ValueError:
print("\nPlease enter an integer")
continue
if command == 0:
goAgain = False
if command == 1:
logging.debug("Search by repeat selected")
searchByRepeat(speciesList[currentSpecies])
# raw_input("Press 'Enter' to continue")
if command == 2:
logging.debug("Search by location selected")
searchByLocation(speciesList[currentSpecies])
# raw_input("Press 'Enter' to continue")
if command == 3:
logging.debug("Search by strain selected")
searchByStrain(speciesList[currentSpecies])
# raw_input("Press 'Enter' to continue")
goAgain = True
continue
if command == 4: # Search for mapping
logging.debug("Map data selected")
searchWindow(speciesList[currentSpecies])
if command == 10: # Print summary
logging.debug("Print all species data selected")
with open(f"{get_working_directory()}/{speciesList[currentSpecies].name}.txt", "w", encoding="UTF-8") as out:
printspeciesdata(speciesList[currentSpecies], out)
exportRepeatCSV(
speciesList[currentSpecies],
speciesList[currentSpecies].name + " repeats.csv",
)
if command == 5: # Read in data
logging.debug("Read data from file selected")
readdatafromfile(
str(input("Enter the name of the file where the values are stored: ")),
speciesList[currentSpecies],
)
if command == 6: # calculate Genetic Diversity
logging.debug("Regional diversity analysis selected")
# find=raw_input("Please enter the name of the location in the form [country](, [state/province/region](, [county/town/city])): ")
# if len(find.split(","))>3:
# print "Error: too many commas in location. Remember to follow the format"
# continue
getGDLocation(speciesList[currentSpecies])
if command == 7: # Remove species
logging.debug("Remove a species selected")
i = 1
for species in speciesList:
print(str(i) + ":", species.name)
i += 1
d = int(input("Enter the number of the species you would like to delete: "))
if d > len(speciesList) or d < 1:
print("Error: Not a valid selection")
else:
print(
"You want to delete",
speciesList[d - 1].name,
"is that correct",
end=" ",
)
yn = str(input("(y/n):"))
if yn == "y":
print(speciesList[d - 1].name, "Deleted")
del speciesList[d - 1]
else:
print("Deletion cancelled.")
if command == 8: # Remove strain
logging.debug("Remove a strain selected")
i = 1
d = str(
input(
"Enter the repeat sequence of the strain you would like to delete: "
)
)
Srepeats = parserepeats(d.strip(), speciesList[currentSpecies])
Sid = identifystrain(Srepeats, speciesList[currentSpecies])
if Sid == None:
print("Error: Not a valid strain")
else:
print(
"You want to delete",
speciesList[currentSpecies].strains[Sid].name,
"is that correct",
end=" ",
)
yn = str(input("(y/n):"))
if yn == "y":
print(speciesList[currentSpecies].strains[Sid].name, "Deleted")
del speciesList[currentSpecies].strains[Sid]
else:
print("Deletion cancelled.")
if command == 11: # Update geocoding
logging.debug("Update geocoding selected")
updateGeocoding(speciesList[currentSpecies], True)
if command == 12:
logging.debug("Export edit distance CSV selected")
exportEditDistanceCSV(
speciesList[currentSpecies],
speciesList[currentSpecies].name + "EditDistances.csv",
)
if command == 9:
logging.debug("Generate strain names selected")
generateAutonames(speciesList[currentSpecies])
input("Press 'Enter' to continue")
exportdata(speciesList)
def main():
speciesList = importdata()
os.chdir(os.path.dirname(os.path.realpath(__file__)))
# print "C:"
# for i in getMostCommonRepeats(15,'c',speciesList[0].repeats, speciesList[0]):
# print i[0]
# print listtostring(i[1].name,";")
# print ""
# print"\n P:"
# for i in getMostCommonRepeats(10,'p',speciesList[0].repeats, speciesList[0]):
# print i[0]
# print listtostring(i[1].name,";")
# print ""
menuloop(speciesList, 0)
logging.info("Closing RepeatAnalyzer")
if __name__ == "__main__":
main()