Skip to content

Commit

Permalink
Merge pull request #129 from nlesc-dirac/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
SarodYatawatta authored Aug 23, 2020
2 parents a16a373 + d7e88bb commit e15f727
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 113 deletions.
13 changes: 13 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use the latest 2.1 version of CircleCI pipeline process engine. See: https://circleci.com/docs/2.0/configuration-reference
version: 2.1
# Use a package of configuration called an orb.
orbs:
# Declare a dependency on the welcome-orb
welcome: circleci/[email protected]
# Orchestrate or schedule a set of jobs
workflows:
# Name the workflow "welcome"
welcome:
# Run the welcome/run job in its own container
jobs:
- welcome/run
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ sagecal -d my_data.MS -s my_skymodel -c my_clustering -n no.of.threads -t 60 -p
Use your solution interval (-t 60) so that its big enough to get a decent solution and not too big to make the parameters vary too much. (about 20 minutes per solution is reasonable).

Note: It is also possible to calibrate more than one MS together. See section 4 below.
Note: To fully use GPU acceleration use -E 1 option.
Note: To fully use GPU acceleration use ```-E 1``` option.

Simulations:
With -a 1, only a simulation of the sky model is done.
With -a 1 and -p 'solutions_file', simulation is done with the sky model corrupted with solutions in 'solutions_file'.
With -a 1 and -p 'solutions_file' and -z 'ignore_file', simulation is done with the solutions in the 'solutions_file', but ignoring the cluster ids in the 'ignore_file'.
Eg. If you need to ignore cluster ids '-1', '10', '999', create a text file :
With ```-a 1```, only a simulation of the sky model is done.
With ```-a 1``` and ```-p``` 'solutions_file', simulation is done with the sky model corrupted with solutions in 'solutions_file'.
With ```-a 1``` and ```-p``` 'solutions_file' and ```-z``` 'ignore_file', simulation is done with the solutions in the 'solutions_file', but ignoring the cluster ids in the 'ignore_file'.
E.g., If you need to ignore cluster ids '-1', '10', '999', create a text file :

```
-1
Expand All @@ -135,6 +135,9 @@ Eg. If you need to ignore cluster ids '-1', '10', '999', create a text file :

and use it as the 'ignore_file'.

Bandpass correction using **stochastic** calibration with consensus:
Use ```-N 1``` combined with options for ```-M```,```-w``` (see also section 4 below).


### 4) Distributed calibration

Expand All @@ -147,7 +150,7 @@ Use mpirun to run sagecal-mpi, example:
```

Specific options :
```-np 11``` : 11 processes : starts 10 slaves + 1 master
```-np 11``` : 11 processes : starts 10 workers + 1 master

```./machines``` : will list the host names of the 11 (or fewer) nodes used ( 1st name is the master ) : normally the node where you invoke mpirun

Expand All @@ -163,13 +166,17 @@ Specific options :

```-G textfile```: each cluster can have a different regularization factor, instead of using ```-r``` option when the regularization is the same for all clusters.

```-N 1```: enable **stochastic** calibration (minibatches of data), combined with options ```-M```, ```-w``` and ```-u```.

```-U 1```: use global solution instead of local solution for residual calculation.

MPI specific options:

```/scratch/users/sarod``` : this is where MPI stores temp files (default is probably ```/tmp```).

```--mca*```: various options to tune the networking and scheduling.

Note: the number of slaves (-np option) can be lower than the number of MS calibrated. The program will divide the workload among the number of available slaves.
Note: the number of workers (-np option) can be lower than the number of MS calibrated. The program will divide the workload among the number of available workers.


The rest of the options are similar to sagecal.
Expand Down
8 changes: 6 additions & 2 deletions src/MPI/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using namespace Data;

void
print_copyright(void) {
cout<<"SAGECal-MPI 0.7.1 (C) 2011-2020 Sarod Yatawatta"<<endl;
cout<<"SAGECal-MPI 0.7.2 (C) 2011-2020 Sarod Yatawatta"<<endl;
}


Expand Down Expand Up @@ -90,6 +90,7 @@ print_help(void) {
cout << "-V if given, enable verbose output: default "<<Data::verbose<<endl;
//cout << "-M if given, evaluate AIC/MDL criteria for polynomials starting from 1 term to the one given by -P and suggest the best polynomial terms to use based on the minimum AIC/MDL: default "<<Data::mdl<<endl;
cout << "-q solutions.txt: if given, initialize solutions by reading this file (need to have the same format as a solution file, only solutions for 1 timeslot needed)"<< endl;
cout << "-U 0,1: if >0, use global solution for final residual calculation: default " <<Data::use_global_solution << endl;
cout<<endl<<"Stochastic mode:"<<endl;
cout << "-N epochs, if >0, use stochastic calibration: default "<<Data::stochastic_calib_epochs<< endl;
cout << "-M minibatches, must be >0, split data to this many minibatches: default "<<Data::stochastic_calib_minibatches<< endl;
Expand All @@ -102,7 +103,7 @@ print_help(void) {
void
ParseCmdLine(int ac, char **av) {
int c;
while((c=getopt(ac, av, ":c:e:f:g:j:k:l:m:n:o:p:q:r:s:t:u:w:x:y:A:B:C:E:F:G:H:I:J:K:L:M:N:O:P:Q:R:S:T:W:E:MVh"))!= -1)
while((c=getopt(ac, av, ":c:e:f:g:j:k:l:m:n:o:p:q:r:s:t:u:w:x:y:A:B:C:E:F:G:H:I:J:K:L:M:N:O:P:Q:R:S:T:U:W:E:MVh"))!= -1)
{
switch(c)
{
Expand Down Expand Up @@ -235,6 +236,9 @@ ParseCmdLine(int ac, char **av) {
case 'u':
Data::federated_reg_alpha= atof(optarg);
break;
case 'U':
Data::use_global_solution= atoi(optarg);
break;
case 'h':
print_help();
MPI_Finalize();
Expand Down
76 changes: 71 additions & 5 deletions src/MPI/sagecal_master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ cout<<"Master received all "<<totalfiles<<" files"<<endl;
int mintotal=0;
for (int cm=0; cm<nslaves; cm++) {
int thistotal=Send[cm]-Sbegin[cm]+1;
cout<<"Slave "<<cm+1<<" MS range "<<Sbegin[cm]<<":"<<Send[cm]<<" total "<<thistotal<<endl;
cout<<"Worker "<<cm+1<<" MS range "<<Sbegin[cm]<<":"<<Send[cm]<<" total "<<thistotal<<endl;
if (mintotal<thistotal) { mintotal=thistotal; }
}
//print a warning if no of ADMM iterations is too low
Expand Down Expand Up @@ -240,7 +240,7 @@ cout<<"Master received all "<<totalfiles<<" files"<<endl;
MPI_Recv(bufint, 6, /* MS-id, N,Mo(actual clusters),M(with hybrid),tilesz,totalt */
MPI_INT, cm+1, TAG_MSAUX, MPI_COMM_WORLD, &status);
int thismsid=bufint[0];
cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<<bufint[3]<<" tilesz="<<bufint[4]<<" totaltime="<<bufint[5]<<endl;
cout<<"Worker "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<<bufint[3]<<" tilesz="<<bufint[4]<<" totaltime="<<bufint[5]<<endl;
if (cm==0 && ct==0) { /* update metadata */
iodata.N=bufint[1];
Mo=bufint[2];
Expand All @@ -250,7 +250,7 @@ cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<

} else { /* check metadata for problem consistency */
if ((iodata.N != bufint[1]) || (iodata.M != bufint[3]) || (iodata.tilesz != bufint[4])) {
cout<<"Slave "<<cm+1<<" parameters do not match N="<<bufint[1]<<" M="<<bufint[3]<<" tilesz="<<bufint[4]<<endl;
cout<<"Worker "<<cm+1<<" parameters do not match N="<<bufint[1]<<" M="<<bufint[3]<<" tilesz="<<bufint[4]<<endl;
exit(1);
}
if (iodata.totalt<bufint[5]) {
Expand All @@ -262,7 +262,7 @@ cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<
MPI_DOUBLE, cm+1, TAG_MSAUX, MPI_COMM_WORLD, &status);
iodata.freqs[thismsid]=bufdouble[0];
iodata.freq0 +=bufdouble[0];
cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" frequency (MHz)="<<bufdouble[0]*1e-6<<endl;
cout<<"Worker "<<cm+1<<" MS="<<thismsid<<" frequency (MHz)="<<bufdouble[0]*1e-6<<endl;
}
}
}
Expand Down Expand Up @@ -636,13 +636,13 @@ cout<<"Reference frequency (MHz)="<<iodata.freq0*1.0e-6<<endl;
}
}

if (Data::aadmm) {
/* BB : get updated rho from slaves */
for (int cm=0; cm<nslaves; cm++) {
if (Sbegin[cm]>=0) {
int mmid=Sbegin[cm]+Scurrent[cm];
MPI_Recv(arhoslave,Mo,MPI_DOUBLE,cm+1,TAG_RHO_UPDATE,MPI_COMM_WORLD,&status);
/* now copy this to proper locations, using chunkvec as guide */
//MPI_Recv(&rhok[mmid*iodata.M],iodata.M,MPI_DOUBLE,cm+1,TAG_RHO_UPDATE,MPI_COMM_WORLD,&status);
int chk1=0;
double *rhoptr=&rhok[mmid*iodata.M];
for (int chk2=0; chk2<Mo; chk2++) {
Expand All @@ -653,6 +653,7 @@ cout<<"Reference frequency (MHz)="<<iodata.freq0*1.0e-6<<endl;
}
}
}
}

/* go to the next MS in the next ADMM iteration */
for (int cm=0; cm<nslaves; cm++) {
Expand All @@ -662,6 +663,71 @@ cout<<"Reference frequency (MHz)="<<iodata.freq0*1.0e-6<<endl;
}
}
}

if (Data::use_global_solution) {
cout<<"Using Global"<<endl;

/* get back final solutions from each worker */
/* get Y_i+rho J_i */
for(int cm=0; cm<nslaves; cm++) {
if (Sbegin[cm]>=0) {
int scount=Send[cm]-Sbegin[cm]+1;
for (int ct1=0; ct1<scount; ct1++) {
int mmid=Sbegin[cm]+ct1;
MPI_Recv(&Y[mmid*iodata.N*8*iodata.M], iodata.N*8*iodata.M, MPI_DOUBLE, cm+1,TAG_YDATA, MPI_COMM_WORLD, &status);
}
}
}


/* recalculate Z based on final solutions */

/* update Z */
/* add to 8NM vector, multiplied by Npoly different scalars, Nms times */
for (int ci=0; ci<Npoly; ci++) {
my_dcopy(8*iodata.N*iodata.M,Y,1,&z[ci*8*iodata.N*iodata.M],1);
my_dscal(8*iodata.N*iodata.M,B[ci],&z[ci*8*iodata.N*iodata.M]);
}
for (int cm=1; cm<iodata.Nms; cm++) {
for (int ci=0; ci<Npoly; ci++) {
/* Note: no weighting of Y is needed, because slave has already weighted their rho (we have rho J here) */
my_daxpy(8*iodata.N*iodata.M, &Y[cm*8*iodata.N*iodata.M], B[cm*Npoly+ci], &z[ci*8*iodata.N*iodata.M]);
}
}
/* no need to scale by 1/rho here, because Bii is already divided by 1/rho */

/* find product z_tilde x Bi^T, z_tilde with proper reshaping */
my_dcopy(iodata.N*8*Npoly*iodata.M,Z,1,Zerr,1);
update_global_z_multi(Z,iodata.N,iodata.M,Npoly,z,Bii,Data::Nt);
/* find dual error ||Zold-Znew|| (note Zerr is destroyed here) */
my_daxpy(iodata.N*8*Npoly*iodata.M,Z,-1.0,Zerr);
/* dual residual per one real parameter */
if (Data::verbose) {
cout<<"ADMM : "<<Nadmm<<" dual residual="<<my_dnrm2(iodata.N*8*Npoly*iodata.M,Zerr)/sqrt((double)8*iodata.N*Npoly*iodata.M)<<endl;
} else {
cout<<"Timeslot:"<<ct<<" ADMM:"<<Nadmm<<endl;
}

/* calculate global solution for each MS and send them to workers */

for(int cm=0; cm<nslaves; cm++) {
if (Sbegin[cm]>=0) {
int scount=Send[cm]-Sbegin[cm]+1;

for (int ct1=0; ct1<scount; ct1++) {
int mmid=Sbegin[cm]+ct1;
for (int p=0; p<iodata.M; p++) {
memset(&z[8*iodata.N*p],0,sizeof(double)*(size_t)iodata.N*8);
for (int ci=0; ci<Npoly; ci++) {
my_daxpy(8*iodata.N, &Z[p*8*iodata.N*Npoly+ci*8*iodata.N], B[mmid*Npoly+ci], &z[8*iodata.N*p]);
}
}
MPI_Send(z, iodata.N*8*iodata.M, MPI_DOUBLE, cm+1,TAG_CONSENSUS, MPI_COMM_WORLD);
}
}
}

}

/* wait till all slaves are done writing data */
int resetcount=0;
Expand Down
33 changes: 29 additions & 4 deletions src/MPI/sagecal_slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ cout<<"Error in checking files matching pattern "<<buf1<<". Exiting."<<endl;
delete [] buf;
}
} else {
cout<<"Slave "<<myrank<<" has nothing to do"<<endl;
cerr<<"Error: Worker "<<myrank<<" has nothing to do"<<endl;
cerr<<"Error: Worker "<<myrank<<": Recheck your allocation or reduce number of workers"<<endl;
return 0;
}

Expand Down Expand Up @@ -474,7 +475,7 @@ cout<<"Slave "<<myrank<<" has nothing to do"<<endl;
MPI_Recv(&msgcode,1,MPI_INT,0,TAG_CTRL,MPI_COMM_WORLD,&status);
/* assume all MS are the same size */
if (msgcode==CTRL_END || !msitr[0]->more()) {
cout<<"Slave "<<myrank<<" quitting"<<endl;
cout<<"Worker "<<myrank<<" quitting"<<endl;
break;
} else if (msgcode==CTRL_SKIP) {
/* skip to next timeslot */
Expand Down Expand Up @@ -703,7 +704,6 @@ cout<<myrank<<" : "<<cm<<": downweight ratio ("<<iodata_vec[cm].fratio<<") based
/* for initial ADMM iteration, get back Y with common unitary ambiguity (for all MS) */
if (admm==0) {
for(int cm=0; cm<mymscount; cm++) {
//MPI_Recv(Y_vec[mmid], iodata_vec[mmid].N*8*Mt, MPI_DOUBLE, 0,TAG_YDATA, MPI_COMM_WORLD, &status);
MPI_Recv(Y_vec[cm], iodata_vec[cm].N*8*Mt, MPI_DOUBLE, 0,TAG_YDATA, MPI_COMM_WORLD, &status);
}
}
Expand Down Expand Up @@ -768,12 +768,14 @@ cout<<myrank<<" : "<<cm<<": downweight ratio ("<<iodata_vec[cm].fratio<<") based
if (Data::aadmm && ((mymscount>1 && admm>=mymscount)|| (mymscount==1 && admm>1 && admm%2==0))) {
update_rho_bb(arho_vec[mmid],arhoupper_vec[mmid],iodata_vec[mmid].N,M,Mt,carr_vec[mmid],Yhat,Yhat0_vec[mmid],p_vec[mmid],J0_vec[mmid],Data::Nt);
}
if (Data::aadmm) {
/* BB : send updated rho to master */
MPI_Send(arho_vec[mmid],M,MPI_DOUBLE,0,TAG_RHO_UPDATE,MPI_COMM_WORLD);

/* BB : store current Yhat and J as reference (k0) values */
my_dcopy(iodata_vec[mmid].N*8*Mt, Yhat, 1, Yhat0_vec[mmid], 1);
my_dcopy(iodata_vec[mmid].N*8*Mt, p_vec[mmid], 1, J0_vec[mmid], 1);
}

/* calculate primal residual J-BZ */
my_dcopy(iodata_vec[mmid].N*8*Mt, p_vec[mmid], 1, pres, 1);
Expand All @@ -787,6 +789,29 @@ cout<<myrank<<" : "<<cm<<": downweight ratio ("<<iodata_vec[cm].fratio<<") based
}
/******************** END ADMM *******************************/

if (Data::use_global_solution) {
cout<<"Using Global"<<endl;
/* send final solution of each MS to master */
/* calculate Y <= Y + rho J */
for(int cm=0; cm<mymscount; cm++) {
ck=0;
for (ci=0; ci<M; ci++) {
if (arho_vec[cm][ci]>0.0) {
my_daxpy(iodata_vec[cm].N*8*carr_vec[cm][ci].nchunk, &p_vec[cm][ck], arho_vec[cm][ci], &Y_vec[cm][ck]);
}
ck+=iodata_vec[cm].N*8*carr_vec[cm][ci].nchunk;
}

MPI_Send(Y_vec[cm], iodata_vec[cm].N*8*Mt, MPI_DOUBLE, 0,TAG_YDATA, MPI_COMM_WORLD);
}
/* get back global solution for each MS from master,
and replace local solution to calculate residuals */

for(int cm=0; cm<mymscount; cm++) {
MPI_Recv(p_vec[cm], iodata_vec[cm].N*8*Mt, MPI_DOUBLE, 0,TAG_CONSENSUS, MPI_COMM_WORLD, &status);
}

}
/* write residuals to output */
for(int cm=0; cm<mymscount; cm++) {
#ifndef HAVE_CUDA
Expand Down Expand Up @@ -843,7 +868,7 @@ cout<<myrank<<" : "<<cm<<": downweight ratio ("<<iodata_vec[cm].fratio<<") based
/* do not reset if initial residual is 0, because by def final one will be higher */
for(int cm=0; cm<mymscount; cm++) {
if (res_00[cm]!=0.0 && (res_01[cm]==0.0 || !isfinite(res_01[cm]) || res_01[cm]>res_ratio*res_prev[cm])) {
cout<<"Resetting Solution "<<cm<<endl;
cout<<myrank<<": Resetting Solution "<<cm<<endl;
/* reset solutions so next iteration has default initial values */
memcpy(p_vec[cm],pinit,(size_t)iodata_vec[cm].N*8*Mt*sizeof(double));
/* also assume iterations have restarted from scratch */
Expand Down
8 changes: 4 additions & 4 deletions src/MPI/sagecal_stochastic_master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ cout<<"Master received all "<<totalfiles<<" files"<<endl;
int mintotal=0;
for (int cm=0; cm<nslaves; cm++) {
int thistotal=Send[cm]-Sbegin[cm]+1;
cout<<"Slave "<<cm+1<<" MS range "<<Sbegin[cm]<<":"<<Send[cm]<<" total "<<thistotal<<endl;
cout<<"Worker "<<cm+1<<" MS range "<<Sbegin[cm]<<":"<<Send[cm]<<" total "<<thistotal<<endl;
if (mintotal<thistotal) { mintotal=thistotal; }
}
//print a warning if no of ADMM iterations is too low
Expand Down Expand Up @@ -240,7 +240,7 @@ cout<<"Master received all "<<totalfiles<<" files"<<endl;
MPI_Recv(bufint, 6, /* MS-id, N,Mo(actual clusters),M(with hybrid),tilesz,totalt */
MPI_INT, cm+1, TAG_MSAUX, MPI_COMM_WORLD, &status);
int thismsid=bufint[0];
cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<<bufint[3]<<" tilesz="<<bufint[4]<<" totaltime="<<bufint[5]<<endl;
cout<<"Worker "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<<bufint[3]<<" tilesz="<<bufint[4]<<" totaltime="<<bufint[5]<<endl;
if (cm==0 && ct==0) { /* update metadata */
iodata.N=bufint[1];
Mo=bufint[2];
Expand All @@ -250,7 +250,7 @@ cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<

} else { /* check metadata for problem consistency */
if ((iodata.N != bufint[1]) || (iodata.M != bufint[3]) || (iodata.tilesz != bufint[4])) {
cout<<"Slave "<<cm+1<<" parameters do not match N="<<bufint[1]<<" M="<<bufint[3]<<" tilesz="<<bufint[4]<<endl;
cout<<"Worker "<<cm+1<<" parameters do not match N="<<bufint[1]<<" M="<<bufint[3]<<" tilesz="<<bufint[4]<<endl;
exit(1);
}
if (iodata.totalt<bufint[5]) {
Expand All @@ -262,7 +262,7 @@ cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" N="<<bufint[1]<<" M="<<bufint[2]<<"/"<
MPI_DOUBLE, cm+1, TAG_MSAUX, MPI_COMM_WORLD, &status);
iodata.freqs[thismsid]=bufdouble[0];
iodata.freq0 +=bufdouble[0];
cout<<"Slave "<<cm+1<<" MS="<<thismsid<<" frequency (MHz)="<<bufdouble[0]*1e-6<<endl;
cout<<"Worker "<<cm+1<<" MS="<<thismsid<<" frequency (MHz)="<<bufdouble[0]*1e-6<<endl;
}
}
}
Expand Down
Loading

0 comments on commit e15f727

Please sign in to comment.