-
Notifications
You must be signed in to change notification settings - Fork 0
/
annotation_preprocessor.py
181 lines (159 loc) · 6.61 KB
/
annotation_preprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import re
from configs.preprocess_configs import ID_FILE_ROOT, ANNOTATION_ROOT, ANNOTATION_PACKAGE_ROOT
from json_utils import load_annotation_list, save_json, save_jsonl, load_json
def remove_tag(sentence):
embedded_word_list = (re.findall(re.compile(r"[<](.*?)[>]", re.S), sentence))
for embedded_word in embedded_word_list:
if '/' in embedded_word:
kept_word = embedded_word.split('/')[-1]
sentence = str.replace(sentence, '<' + embedded_word + '>', '<' + kept_word + '>')
sentence = str.replace(sentence, '<', '')
sentence = str.replace(sentence, '>', '')
return sentence
def preprocess_annotation(test_list_path='test.txt', valid_list_path='valid.txt',
idfile_save_path=ID_FILE_ROOT, release_save_path=ANNOTATION_ROOT):
annotation_list = load_annotation_list()
id2seg_dict = dict()
seg2id_dict = dict()
sid = 0
id2query_dict = dict()
query2id_dict = dict()
qid = 0
# test split
test_set = set()
with open(test_list_path) as f:
for line in f:
vid = line.split('\n')[0]
test_set.add(vid)
# valid split
valid_set = set()
with open(valid_list_path) as f:
for line in f:
vid = line.split('\n')[0]
valid_set.add(vid)
release = []
for i in range(len(annotation_list)):
annotation = annotation_list[i]
vid = annotation[0]['videoID']
# segment id
for seg_idx in range(len(annotation[1]['segInfo'])):
id2seg_dict[sid] = vid + '_' + str(seg_idx)
seg2id_dict[vid + '_' + str(seg_idx)] = sid
sid += 1
for j in range(2, len(annotation)):
query = annotation[j]
# query id
id2query_dict[qid] = query['ID']
query2id_dict[query['ID']] = qid
qid += 1
# process query
# del query['Reason']
# query['Question'] = remove_tag(query['Question'])
annotation[j] = query
split = dict(split='train')
if vid in test_set:
split['split'] = 'test'
elif vid in valid_set:
split['split'] = 'val'
annotation.insert(1, split)
if vid not in test_set:
release.append(annotation)
id_dict = dict(
id2seg=id2seg_dict,
seg2id=seg2id_dict,
id2query=id2query_dict,
query2id=query2id_dict
)
save_json(id_dict, os.path.join(idfile_save_path, 'id.json'))
save_jsonl(release, os.path.join(release_save_path, 'annotation_release.jsonl'))
def package_annotation(idfile_root=ID_FILE_ROOT, test_list_path='test.txt', valid_list_path='valid.txt',
annotation_root=ANNOTATION_ROOT, save_path=ANNOTATION_PACKAGE_ROOT):
'''
"meta": {
"query_id": int,--in id.json
"text_query": str,--in annotation_release # purely text query
"original_query": str,--in load_annotation()
"query_image_path": str,--config + q_name
"vid_name": str,--in json list # youtube_id (11)
"answer_segment_name": list[str],--in load_annotation # name of segments: ["xtuiYd45q1W_segment1",...]
"answer_segment_id": list[segment_id],--in segment_name + id.json # unique_segment_id
"answer_segment_info": list[[st,ed], ... [st,ed]],--in load_annotation # start_time, end_time of coresponding segment
"sample_seg_id_for_training": int, # sample one segment for training
#####
}
'''
# load all required file
query2id = load_json(os.path.join(idfile_root, 'id.json'))['query2id']
id2query = load_json(os.path.join(idfile_root, 'id.json'))['id2query']
seg2id = load_json(os.path.join(idfile_root, 'id.json'))['seg2id']
annotation = load_annotation_list()
# generate query dict, the key is query_name
query_dict_by_name = dict()
for anno in annotation:
vid = anno[0]['videoID']
seg_info = anno[1]['segInfo']
query_list = anno[2:]
for q in query_list:
segment_name_list = [vid + '_' + str(seg_idx - 1) for seg_idx in q['Segment']]
query_dict_by_name[q['ID']] = dict(
query_id=query2id[q['ID']],
query_name=q['ID'],
text_query=remove_tag(q['Question']),
original_query=q['Question'],
query_img_path=os.path.join(annotation_root + '/image/' + vid, q['Filename']),
vid_name=vid,
query_type=q['QueryType'],
answer_segment_name=segment_name_list,
answer_segment_id=[seg2id[seg_name] for seg_name in segment_name_list],
answer_segment_info=[seg_info[seg_idx - 1] for seg_idx in q['Segment']],
)
# test split
test_set = set()
with open(test_list_path) as f:
for line in f:
vid = line.split('\n')[0]
test_set.add(vid)
only_in_test_set = set()
with open('only_in_test.txt') as f:
only_in_test = f.readlines()
for vid in only_in_test:
only_in_test_set.add(vid[:11])
# valid split
valid_set = set()
with open(valid_list_path) as f:
for line in f:
vid = line.split('\n')[0]
valid_set.add(vid)
only_in_valid_set = set()
with open('only_in_valid.txt') as f:
only_in_valid = f.readlines()
for vid in only_in_valid:
only_in_valid_set.add(vid[:11])
# package to iterable list for dataloader
train_package = []
valid_package = []
test_package = []
for _, query_name in id2query.items():
vid = query_name[:11]
query_item = query_dict_by_name[query_name]
if vid in test_set:
if vid in only_in_test_set:
query_item['not_in_train'] = True
else:
query_item['not_in_train'] = False
test_package.append(query_item)
elif vid in valid_set:
if vid in only_in_valid_set:
query_item['not_in_train'] = True
else:
query_item['not_in_train'] = False
valid_package.append(query_item)
else:
train_package.append(query_item)
save_jsonl(train_package, os.path.join(save_path, 'trainset.jsonl'))
save_jsonl(valid_package, os.path.join(save_path, 'validset.jsonl'))
save_jsonl(test_package, os.path.join(save_path, 'testset.jsonl'))
if __name__ == '__main__':
preprocess_annotation()
package_annotation()