Skip to content

Commit

Permalink
Add a read test and rename "parts" to "chunks".
Browse files Browse the repository at this point in the history
Read check fails at the moment for >1 chunks per rank.
  • Loading branch information
ykempf committed Mar 20, 2023
1 parent 049c58e commit 01cc20e
Showing 1 changed file with 102 additions and 40 deletions.
142 changes: 102 additions & 40 deletions timed_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,43 @@
#include <random>

#include "vlsv_writer.h"
#include "vlsv_reader_parallel.h"
#include "vlsv_amr.h"

using namespace vlsv;

std::vector<int> sizes;
std::vector<int> parts;
std::vector<int> chunkSizes;
std::vector<int> chunkCounts;


int *intData;
std::vector<int> intData;
int myRank, nRanks;


void writeNint(vlsv::Writer &vlsv, int N, int writeParts, int vectorSize)
void writeNint(vlsv::Writer &vlsv, int chunkSize, int chunkCount)
{
std::map<std::string, std::string> xmlAttributes;
xmlAttributes["Arr"] = "name";

vlsv.startMultiwrite(getStringDatatype<int>(),N*writeParts,vectorSize,sizeof(int));
for (int i = 0; i < writeParts; i++)
vlsv.startMultiwrite(getStringDatatype<int>(),chunkSize*chunkCount,1 /*vectorSize */,sizeof(int));
for (int i = 0; i < chunkCount; i++)
{
vlsv.addMultiwriteUnit((char*)intData,N);
vlsv.addMultiwriteUnit(intData.data(),chunkSize);
}
vlsv.endMultiwrite("arrayName"+std::to_string(N),xmlAttributes);
vlsv.endMultiwrite("arrayName"+std::to_string(chunkSize),xmlAttributes);
}

void readNint(vlsv::ParallelReader &vlsv, int chunkSize, int chunkCount, uint64_t fileOffset)
{
std::list<std::pair<std::string,std::string> > xmlAttributes;
xmlAttributes.push_back({"Arr", "name"});

vlsv.startMultiread("Arr", xmlAttributes);
for (int i = 0; i < chunkCount; i++)
{
vlsv.addMultireadUnit((char*)intData.data() + chunkSize*i*sizeof(int), chunkSize);
}
vlsv.endMultiread(fileOffset);
}


Expand All @@ -42,7 +56,7 @@ int main(int argc,char* argv[]) {

if (argc < 6 || (argc >= 6 && argc % 2 != 0))
{
std::cout << "usage : srun timed_test buffer_size min_rank_part_size max_rank_part_size min_rank_part_count max_rank_part_count [mpiio_hint mpiio_value]" << std::endl;
std::cout << "usage : srun timed_test buffer_size min_rank_chunk_size max_rank_chunk_size min_rank_chunk_count max_rank_chunk_count [mpiio_hint mpiio_value]" << std::endl;
}

// Init MPI:
Expand All @@ -53,10 +67,10 @@ int main(int argc,char* argv[]) {
const double GiB = 1024*1024*1024;

int bufferSize = std::atoi(argv[1]);
int minPartSize = std::atoi(argv[2]);
int maxPartSize = std::atoi(argv[3]);
int minPartCount = std::atoi(argv[4]);
int maxPartCount = std::atoi(argv[5]);
int minChunkSize = std::atoi(argv[2]);
int maxChunkSize = std::atoi(argv[3]);
int minChunkCount = std::atoi(argv[4]);
int maxChunkCount = std::atoi(argv[5]);

MPI_Info MPIinfo;
if (argc == 6) {
Expand All @@ -70,66 +84,114 @@ int main(int argc,char* argv[]) {
}
if(myRank == 0) {
std::cout << "INFO: This will write int data, int has size " << sizeof(int) << std::endl;
std::cout << "INFO: This will write between " << minPartCount << " and " << maxPartCount << " parts of between " << minPartSize*sizeof(int) / GiB << " and " << maxPartSize*sizeof(int) / GiB << " GiB on each of the " << nRanks << " tasks." << std::endl;
std::cout << "INFO: This will write between " << minChunkCount << " and " << maxChunkCount << " chunks of between " << minChunkSize*sizeof(int) / GiB << " and " << maxChunkSize*sizeof(int) / GiB << " GiB on each of the " << nRanks << " tasks." << std::endl;
}

intData = new int[maxPartSize];

for (int i = 0; i < maxPartSize; i++)
{
intData[i] = myRank;
}

std::default_random_engine generatorSizes(42), generatorParts(43);
std::uniform_int_distribution<int> randomSizes(minPartSize, maxPartSize), randomParts(minPartCount, maxPartCount);
std::default_random_engine generatorChunkSizes(42), generatorChunkCounts(43);
std::uniform_int_distribution<int> randomChunkSizes(minChunkSize, maxChunkSize), randomChunkCounts(minChunkCount, maxChunkCount);
for (int i=0; i<nRanks; i++) {
sizes.push_back(randomSizes(generatorSizes));
parts.push_back(randomParts(generatorParts));
chunkSizes.push_back(randomChunkSizes(generatorChunkSizes));
chunkCounts.push_back(randomChunkCounts(generatorChunkCounts));
}

if(myRank == 0) {
std::stringstream stream;
stream << "# Rank \t parts \t size (GiB) \t total (GiB) \t time (s) \t speed (GiB/s)" << std::endl;
stream << "# RD/WR \t Rank \t chunks \t size (GiB) \t total (GiB) \t time (s) \t speed (GiB/s)" << std::endl;
std::cerr << stream.str();
}

vlsv::Writer vlsv;
vlsv.setBuffer(bufferSize);
if (vlsv.open("file.out",MPI_COMM_WORLD,0, MPIinfo) == false) {

// WRITE

// For writing, we need only one chunk that'll be written a number of times
intData.assign(chunkSizes[myRank], myRank);

vlsv::Writer vlsvWriter;
vlsvWriter.setBuffer(bufferSize);
if (vlsvWriter.open("file.out",MPI_COMM_WORLD,0, MPIinfo) == false) {
success = false;
MPI_Finalize();
return 1;
}

const double partSize = sizes[myRank]*sizeof(int) / GiB;
const double chunkSize = chunkSizes[myRank]*sizeof(int) / GiB;

MPI_Barrier(MPI_COMM_WORLD);

const double tStart = MPI_Wtime();
writeNint(vlsv, sizes[myRank], parts[myRank], 1);
const double tTime = MPI_Wtime() - tStart;
double tStart = MPI_Wtime();
writeNint(vlsvWriter, chunkSizes[myRank], chunkCounts[myRank]);
double tTime = MPI_Wtime() - tStart;

std::stringstream stream;
stream << myRank << "\t" << parts[myRank] << "\t" << partSize << "\t" << parts[myRank]*partSize << "\t" << tTime << "\t" << parts[myRank]*partSize / tTime << std::endl;
stream << "WR\t" << myRank << "\t" << chunkCounts[myRank] << "\t" << chunkSize << "\t" << chunkCounts[myRank]*chunkSize << "\t" << tTime << "\t" << chunkCounts[myRank]*chunkSize / tTime << std::endl;
std::cerr << stream.str();

if (vlsv.close() == false) {
if (vlsvWriter.close() == false) {
success = false;
}

// Do this here as the close call does the actual writing when buffering
MPI_Barrier(MPI_COMM_WORLD);
uint64_t totalChunkCount = 0;
double totalSize = 0;
if(myRank == 0) {
const double totalTime = MPI_Wtime() - tStart;
uint64_t totalParts = 0;
double totalSize = 0;
for (int i=0; i<nRanks; i++) {
totalParts += parts[i];
totalSize += parts[i]*sizes[i];
totalChunkCount += chunkCounts[i];
totalSize += chunkCounts[i]*chunkSizes[i];
}
totalSize *= sizeof(int) / GiB;
std::stringstream stream;
stream << nRanks << "\t" << totalParts << "\t" << totalSize / totalParts << "\t" << totalSize << "\t" << totalTime << "\t" << totalSize / totalTime << std::endl;
stream << "WR\t" << nRanks << "\t" << totalChunkCount << "\t" << totalSize / totalChunkCount << "\t" << totalSize << "\t" << totalTime << "\t" << totalSize / totalTime << std::endl;
std::cerr << stream.str();
}


// READ

// For reading we want the buffer to have the full size
intData.resize(chunkSizes[myRank]*chunkCounts[myRank]);

vlsv::ParallelReader vlsvReader;
if (vlsvReader.open("file.out",MPI_COMM_WORLD,0, MPIinfo) == false) {
success = false;
MPI_Finalize();
return 1;
}

uint64_t myFileOffset = 0;
for(int t=0; t<myRank; t++) {
myFileOffset += chunkSizes[t]*chunkCounts[t]*sizeof(int);
}

MPI_Barrier(MPI_COMM_WORLD);

tStart = MPI_Wtime();
readNint(vlsvReader, chunkSizes[myRank], chunkCounts[myRank], myFileOffset);
tTime = MPI_Wtime() - tStart;

stream.clear();
stream.str(std::string());
stream << "RD\t" << myRank << "\t" << chunkCounts[myRank] << "\t" << chunkSize << "\t" << chunkCounts[myRank]*chunkSize << "\t" << tTime << "\t" << chunkCounts[myRank]*chunkSize / tTime << std::endl;
std::cerr << stream.str();

if (vlsvReader.close() == false) {
success = false;
}
stream.clear();
stream.str(std::string());
for (auto i : intData) {
stream << myRank << " " << intData.size() << " " << intData[i] << std::endl;
std::cerr << stream.str();
stream.clear();
stream.str(std::string());
}

MPI_Barrier(MPI_COMM_WORLD);
if(myRank == 0) {
const double totalTime = MPI_Wtime() - tStart;
std::stringstream stream;
stream << "RD\t" << nRanks << "\t" << totalChunkCount << "\t" << totalSize / totalChunkCount << "\t" << totalSize << "\t" << totalTime << "\t" << totalSize / totalTime << std::endl;
std::cerr << stream.str();
}

Expand Down

0 comments on commit 01cc20e

Please sign in to comment.