-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtest_validity.py
127 lines (107 loc) · 4.31 KB
/
test_validity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import pandas as pd
import numpy as np
import operator
import os
import re
import glob
import trace
def INVALID_OBS():
invalid_obs = []
for DIR_NAME in all_observations:
current_obslog = glob.glob(data_dir+DIR_NAME+'/'+'*.obslog')
if current_obslog == []:
continue
#Extract substring that contains obslog relative path
relative_path = re.findall(r'[/][\d]+[.]obslog', current_obslog[0])[0][1:]
#Invalid file (not fitting given constraints i.e. < 900 MHz and IF BW != 6,16,32)
if relative_path not in valid_observations:
invalid_obs.append(data_dir+DIR_NAME)
#Valid obslog file with no LTA file in the DIR
if relative_path in valid_observations:
if glob.glob(data_dir+DIR_NAME+'/'+'*.lta') == []:
#print data_dir+DIR_NAME
invalid_obs.append(data_dir+DIR_NAME)
return invalid_obs
def VALID_OBS():
valid_obs = []
for DIR_NAME in all_observations:
current_obslog = glob.glob(data_dir+DIR_NAME+'/'+'*.obslog')
if current_obslog == []:
continue
#Extract substring that contains obslog relative path
relative_path = re.findall(r'[/][\d]+[.]obslog', current_obslog[0])[0][1:]
if relative_path in valid_observations:
if glob.glob(data_dir+DIR_NAME+'/'+'*.lta') != []:
valid_obs.append(data_dir+DIR_NAME)
return valid_obs
def NUM_LTA():
valid_obs = []
all_lta_files = []
for DIR_NAME in all_observations:
current_obslog = glob.glob(data_dir+DIR_NAME+'/'+'*.obslog')
#Extract substring that contains obslog relative path
relative_path = re.findall(r'[/][\d]+[.]obslog', current_obslog[0])[0][1:]
if relative_path in valid_observations:
current_dir_lta_files = glob.glob(data_dir+DIR_NAME+'/'+'*.lta*')
#print current_dir_lta_files
if(current_dir_lta_files != []):
valid_obs.append(data_dir+DIR_NAME)
all_lta_files.append(current_dir_lta_files)
return len([i for j in all_lta_files for i in j])
def extract( file_name ):
with open(file_name) as f:
for i,line in enumerate(f,1):
if "SCN" in line:
return i
def main(lta_name):
calibrator_list = ['3C48', '3C147', '3C286']
os.system('ltahdr -i'+ lta_name + '> lta_file.txt')
dictionary = {}
try:
skipped_rows = extract('lta_file.txt')-1
header = pd.read_csv('lta_file.txt',skiprows=skipped_rows,delimiter=r"\s+")
flux = list(set(header["OBJECT"]))
#print flux
header['Nrecs'] = header['Nrecs'].astype(float)
for i in flux :
temp = header.loc[header.OBJECT==i,'Nrecs'].values
temp = np.mean(temp)
dictionary[i]=temp
#Sort the list of targets according to the number of recordings
list_of_targets = sorted(dictionary.iteritems(),key=operator.itemgetter(1), reverse=True)
for i in list_of_targets:
if i[0] in calibrator_list:
continue
else:
os.system('rm lta_file.txt')
return i[0]
#Return this value if entire list of targets is populated with calibrators
return -1
except:
pass
def test_validity():
#valid_observations = VALID_OBS()
#lta_file_count = 0
#for DIR in valid_observations:
lta_uvfits_files = glob.glob('*.lta*')
uvfits_files = glob.glob('*.lta*.UVFITS')
all_lta_files = [i for i in lta_uvfits_files if i not in uvfits_files]
output_list = []
for LTA_FILE in all_lta_files:
try:
output = main(LTA_FILE)
#Check if valid target name is found
#if output != -1:
#lta_success_file.write(LTA_FILE + '\n' + output + '\n')
#lta_file_count += 1
# output_list.append([output, LTA_FILE])
if output == -1:
continue
#ltahdr_error_dir.write(DIR+ '/' + LTA_FILE + '\n')
#lta_file_count += 1
except:
continue
#Log the error directory to a file
#ltahdr_error_dir.write(DIR+ '/' + LTA_FILE + '\n')
#lta_file_count += 1
return output