-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhw1_1.cpp
242 lines (226 loc) · 8.86 KB
/
hw1_1.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#include <cstdio>
#include <string>
#include <cstring>
#include <iostream>
#include <sstream>
#include <cinttypes>
#include <chrono>
#include <iomanip>
#include "myutil.h"
#include "mpi.h"
#include "record_parser.h"
#include "ad_tester.h"
//#include "shell.h"
int rank, nodes;
long long set_wbuf(char* &p, std::vector<string>& vec) {
long long res = 0;
for (auto& s: vec) {
res += s.length() + 1;
}
p = new char[res];
char* ptr = p;
for (auto& s : vec) {
strcpy(ptr, s.c_str());
ptr += s.length();
*ptr = '\n';
ptr++;
}
return res;
}
int mpi_group_read(char const* fname, char* &buf, int rank, int nodes) {
MPI_File fh;
MPI_Status status;
MPI_Offset file_sz;
int ret = MPI_File_open(MPI_COMM_WORLD, fname, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
if (ret) {
std::cout << "file open error, mpi_fopen ret:" << ret << std::endl;
return ret;
}
ret = MPI_File_get_size(fh, &file_sz);
std::cout << "mpi_file_sz ret:" << ret << " file size:" << file_sz << std::endl;
long long tmp_sz = file_sz / nodes;
long long read_cnt = rank==nodes-1 ? file_sz - (nodes-1)*tmp_sz : tmp_sz;
std::cout << "rank is:" << rank << " nodes number is:" << nodes << " read from " << rank * tmp_sz
<< " to " << rank * tmp_sz + read_cnt << std::endl;
buf = new char[read_cnt + 1];
ret = MPI_File_read_at(fh, rank*tmp_sz, buf, read_cnt, MPI_BYTE, &status);
MPI_File_close(&fh);
if (ret) {
std::cout << "mpi read error code:" << ret;
delete[] buf;
return ret;
}
buf[read_cnt] = 0;
return 0;
}
int mpi_group_write(char const* fname,char* wbuf, long long wsz, int rank, int nodes) {
int ret;
MPI_File fh;
MPI_Status status;
long long *allsz = (long long*)malloc(sizeof(long long) * nodes);
MPI_Allgather(&wsz, 1, MPI_LONG, allsz, 1, MPI_LONG, MPI_COMM_WORLD);
for (int i = 1; i < nodes; i++) {
allsz[i] += allsz[i-1];
}
ret = MPI_File_open(MPI_COMM_WORLD, fname, MPI_MODE_WRONLY|MPI_MODE_CREATE, MPI_INFO_NULL, &fh);
ret = MPI_File_set_size(fh, 0);
if (ret) {
std::cout << "file open error, mpi_fopen ret:" << ret << std::endl;
return 0;
}
ret = MPI_File_write_at_all(fh, allsz[rank] - wsz, (void*)wbuf, wsz, MPI_BYTE, &status);
if (ret) {
std::cout << "file write error, mpi_fwrite ret:" << ret << std::endl;
return 0;
}
MPI_File_close(&fh);
}
int main(int argc, char **argv){
using clock = std::chrono::system_clock;
using duration = std::chrono::duration<double, std::milli>;
std::stringstream ss;
char* fname = argv[1];
MPI_File fh;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nodes);
char* buf;
// read data into memory
ss << get_time() << "Rank:" << rank << " start reading...\n";
auto start_read = clock::now();
int ret = mpi_group_read(fname, buf, rank, nodes);
auto finish_read = clock::now();
if (ret) {
return ret;
}
ss << get_time() << "Rank:" << rank << " finished reading...\n";
auto diff = finish_read-start_read;
ss << get_time() << "Rank:" << rank << " reading cost time:" << diff.count() << " milli secs\n";
// scrub data into 2 vector
rec_parser blk;
char* pbuf = buf;
std::vector<string> valid_res;
std::vector<string> invalid_res;
std::vector<double> ret_res;
int cntsum = 0;
ss << get_time() << " Rank:" << rank << " start scrubing...\n";
auto start_scrub = clock::now();
cntsum = blk.parse(pbuf, valid_res, invalid_res, ret_res);
auto end_scrub = clock::now();
ss << get_time() << " Rank:" << rank << " finished scrubing...\n";
diff = end_scrub - start_scrub;
ss << get_time() << " Rank:" << rank << " Scrubing cost time " << diff.count() << " milli secs\n";
double ret_stat[3] = {blk.ret_sum, blk.ret_sq, double(ret_res.size())};
std::cout << "rank:" << rank << " sum:" << ret_stat[0] << " sq:" << ret_stat[1] << " cnt:" << ret_stat[2] << "\n";
delete[] buf;
// write noise to disk and clear the memory
char* wbuf;
ss << get_time() << " Rank:" << rank << " start writing noise data...\n";
auto start_write_noise = clock::now();
long long wsz = set_wbuf(wbuf, invalid_res);
char fname2[] = "noise.txt";
ret = mpi_group_write(fname2, wbuf, wsz, rank, nodes);
auto end_write_noise = clock::now();
ss << get_time() << " Rank:" << rank << " finished writing noise data...\n";
diff = end_write_noise - start_write_noise;
ss << get_time() << " Rank:" << rank << " writing noise data cost time:" << diff.count() << " milli secs\n";
if (wbuf) {
delete[] wbuf;
}
invalid_res.clear();
// write signal to disk and clear the memory
ss << get_time() << " Rank:" << rank << " start writing signal data...\n";
auto start_write_signal = clock::now();
wsz = set_wbuf(wbuf, valid_res);
char fname3[] = "signal.txt";
ret = mpi_group_write(fname3, wbuf, wsz, rank, nodes);
auto end_write_signal = clock::now();
ss << get_time() << " Rank:" << rank << " finished writing signal data...\n";
diff = end_write_signal - start_write_signal;
ss << get_time() << " Rank:" << rank << " writing signal data cost time:" << diff.count() << "\n";
if (wbuf) {
delete[] wbuf;
}
// write log to disk
char logname1[] = "scrub_log.txt";
std::vector<string> logvec(1, ss.str());
wsz = set_wbuf(wbuf, logvec);
ret = mpi_group_write(logname1, wbuf, wsz, rank, nodes);
if (wbuf) {
delete[] wbuf;
}
//diff.count();
/*************************************************/
/**** Part II normality test of asset return *****/
/*************************************************/
std::stringstream ss2;
ss.str("");
ss.clear();
ss << get_time() << " Rank:" << rank << " start testing normality by aderson darling test...\n";
double* all_ret_stat = new double[nodes * 3];
MPI_Allgather(ret_stat, 3, MPI_DOUBLE, all_ret_stat, 3, MPI_DOUBLE, MPI_COMM_WORLD);
double ret_mean = 0, ret_sd = 1, ret_cnt = 0;
for (int i = 0; i < nodes; i++) {
ret_mean += all_ret_stat[3*i];
ret_sd += all_ret_stat[3*i+1];
ret_cnt += all_ret_stat[3*i+2];
}
ret_mean /= ret_cnt;
ret_sd = sqrt(ret_sd/ret_cnt - ret_mean * ret_mean);
AD_tester norm_tester(ret_mean, ret_sd, ret_cnt);
ss << get_time() << " Rank:" << rank << " global sample mean and variance calculated\n";
if (rank == 0) {
ss2 << "Mean:" << ret_mean << "\n" << "Stderr:" << ret_sd << "\n" << "Sample size:" << (long long)(ret_cnt) << "\n";
}
std::vector<long long> stat_loc(nodes, 0);
for (int i = 0; i < nodes -1; i++) {
stat_loc[i+1] += stat_loc[i] + (long long)(all_ret_stat[3*i+2]);
}
double ad_stat = 0;
for (long long i = 0; i < ret_res.size(); i++) {
norm_tester.update_ad_stat(stat_loc[rank] + i, ret_res[i]);
}
ss << get_time() << " Rank:" << rank << " local AD statistic calculated\n";
double local_ad_sum = norm_tester.get_sum();
double* pall_ad_sum = new double[nodes];
MPI_Allgather(&local_ad_sum, 1, MPI_DOUBLE, pall_ad_sum, 1, MPI_DOUBLE, MPI_COMM_WORLD);
double all_ad_sum = 0;
for (int i = 0; i < nodes; i++) {
all_ad_sum += pall_ad_sum[i];
}
ss << get_time() << " Rank:" << rank << " global AD statistic calculated\n";
double p_value = norm_tester.get_p_value(norm_tester.get_stat(all_ad_sum));
ss << get_time() << " Rank:" << rank << " normality p value calculated\n";
if (rank == 0) {
ss2 << "AD statistic = " << norm_tester.get_stat(all_ad_sum) << "\n";
ss2 << "P value = " << p_value << "\n";
ss2 << "Normality can be rejected at significance level " << p_value;
}
//std::cout << " Rank:" << rank << " sum:" << all_ad_sum << " stat:" << norm_tester.get_stat(all_ad_sum) << " p:" << p_value << "\n";
if (all_ret_stat) {
delete[] all_ret_stat;
}
if (pall_ad_sum) {
delete[] pall_ad_sum;
}
char logname2[] = "test_normality_log.txt";
char logname3[] = "test_normality_result.txt";
logvec.clear();
logvec.push_back(ss.str());
wsz = set_wbuf(wbuf, logvec);
ret = mpi_group_write(logname2, wbuf, wsz, rank, nodes);
if (wbuf) {
delete[] wbuf;
}
logvec.clear();
logvec.push_back(ss2.str());
wsz = set_wbuf(wbuf, logvec);
ret = mpi_group_write(logname3, wbuf, wsz, rank, nodes);
if (wbuf) {
delete[] wbuf;
}
//std::cout << shell_get_command_output("free") << "\n";
MPI_Finalize();
return 0;
}