Skip to content

Commit

Permalink
adding spiliting functionality. i.e. spliting an input into equal par…
Browse files Browse the repository at this point in the history
…ts to be processed simultaneously

Signed-off-by: gal salomon <[email protected]>
  • Loading branch information
galsalomon66 committed May 25, 2022
1 parent 0e7e69b commit a9de18f
Showing 1 changed file with 152 additions and 21 deletions.
173 changes: 152 additions & 21 deletions example/s3select_scaleup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,116 @@ class csv_streamer {
}
};

int splitter()
int csv_splitter(const char* fn,std::vector<std::pair<size_t,size_t>>& ranges)
{
//get single object , split by size , search for \n bounderies
//thread per { split-data-portion --> {while(not-end-of-data-portion){read , process_stream()) }

char row_delim=10;
std::ifstream is(fn, std::ifstream::binary);
//size of file
is.seekg (0, is.end);
uint64_t length = is.tellg();
is.seekg (0, is.beg);
uint16_t num_of_split = getenv("NUM_OF_INST") ? atoi(getenv("NUM_OF_INST")) : 8;//number of cores
//calculate split size
uint64_t split_sz = length / num_of_split;

const uint32_t max_row_size=(split_sz > (num_of_split*1024)) ? 1024 : split_sz/10 ;//should twice as bigger than row max size
char buff[max_row_size];

uint64_t mark=0;
uint64_t prev_mark=0;
int range_no=0;

do
{

is.seekg(mark+(split_sz-max_row_size));//jump to location of next "cut"
is.read(buff,max_row_size); //reading small buff
uint64_t current_pos = is.tellg();
uint64_t actual_read=is.gcount();

char* p=&buff[actual_read-1];

while(*p != row_delim && p != &buff[0])p--;

if(*p != row_delim)
{
printf("row delimiter not found. abort\n");
break;
}

prev_mark = mark;

range_no++;

if(range_no<num_of_split)
{
mark=current_pos - (&buff[actual_read-1] - p);
}
else
{
mark = length;
}

ranges.push_back(std::pair<size_t,size_t>(prev_mark,mark));
printf("%d: range[%ld %ld] %ld\n",range_no,prev_mark,mark,mark-prev_mark);

}while(mark!=length);

return 0;
}

//TODO stream_chunk()
int stream_partof_file(const char* file, csv_streamer *cs,size_t from,size_t to)
{//each part is processed on seperate thread
std::ifstream input_file_stream;
size_t length = to - from;
size_t bytes_been_read = 0;
int status=0;

//open-file
try {
input_file_stream = std::ifstream(file, std::ios::in | std::ios::binary);
input_file_stream.seekg(from);
}
catch( ... )
{
std::cout << "failed to open file " << file << std::endl;
return(-1);
}

//read-chunk
#define BUFFER_SIZE (4*1024*1024)
std::string buff(BUFFER_SIZE,0);
size_t buffer_to_read = BUFFER_SIZE;
while (true)
{
if(buffer_to_read > (length - bytes_been_read) )
{
buffer_to_read = length - bytes_been_read;
}

size_t read_sz = input_file_stream.readsome(buff.data(),buffer_to_read);
bytes_been_read += read_sz;
if(!read_sz || input_file_stream.eof())
{//signaling end of stream
cs->process_stream(0,0,true);
break;
}

status = cs->process_stream(buff.data(),read_sz,false);
if(status<0)
{
std::cout << "failure on execution " << std::endl;
break;
}

if(!read_sz || input_file_stream.eof())
{
break;
}
}
return 0;
}

int stream_file(char* file, csv_streamer *cs)
{//each file processed on seperate thread
Expand Down Expand Up @@ -151,23 +253,12 @@ int stream_file(char* file, csv_streamer *cs)
return 0;
}

int start_multiple_execution_flows(std::string q, std::vector<char*> files)
int start_multiple_execution_flows(std::string q, std::vector<csv_streamer*>& all_streamers, std::vector<std::function<int(void)>>& vec_of_fp, shared_queue& sq)
{ //the object-set defines one finite data-set for the query.

shared_queue sq;
boost::thread_group producer_threads, consumer_threads;
std::vector<std::function<int(void)>> vec_of_fp;
std::vector<csv_streamer*> all_streamers;
std::vector<s3select*> s3select_processing_objects;

for(auto f : files)
{
csv_streamer *cs = new csv_streamer(q,&sq);
all_streamers.push_back(cs);
auto thread_func = [f,cs](){return stream_file(f,cs);};
vec_of_fp.push_back( thread_func );
}

for(auto& t : vec_of_fp)
{
//start with query processing
Expand Down Expand Up @@ -215,14 +306,15 @@ int start_multiple_execution_flows(std::string q, std::vector<char*> files)

return 0;
}

int main(int argc, char **argv)
int main_for_many_files(int argc, char **argv)
{
if(argc<2) return -1;

char* query=argv[1];
std::string sql_query;
sql_query.assign(query);
sql_query.assign(argv[1]);
shared_queue sq;
std::vector<std::function<int(void)>> vec_of_fp;
std::vector<csv_streamer*> all_streamers;

std::vector<char*> list_of_files;
setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout
Expand All @@ -232,8 +324,47 @@ int main(int argc, char **argv)
list_of_files.push_back(argv[i]);
}

start_multiple_execution_flows(sql_query, list_of_files);
for(auto f : list_of_files)
{
csv_streamer *cs = new csv_streamer(sql_query,&sq);
all_streamers.push_back(cs);
auto thread_func = [f,cs](){return stream_file(f,cs);};
vec_of_fp.push_back( thread_func );
}

start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq);

return 0;
}

int main(int argc, char **argv)
{
if(argc<2) return -1;

std::string sql_query;
sql_query.assign(argv[1]);
shared_queue sq;
std::vector<std::function<int(void)>> vec_of_fp;
std::vector<csv_streamer*> all_streamers;

std::string fn;
fn.assign(argv[2]);

setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout

//spiliting single input file into ranges
std::vector<std::pair<size_t,size_t>> ranges;
csv_splitter(fn.data(),ranges);

for(auto r : ranges)
{
csv_streamer *cs = new csv_streamer(sql_query,&sq);
all_streamers.push_back(cs);
auto thread_func = [fn,r,cs](){return stream_partof_file(fn.data(), cs, r.first, r.second);};
vec_of_fp.push_back( thread_func );
}

start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq);

return 0;
}

0 comments on commit a9de18f

Please sign in to comment.