Skip to content

Commit

Permalink
rebase. adding json flow
Browse files Browse the repository at this point in the history
Signed-off-by: galsalomon66 <[email protected]>
  • Loading branch information
galsalomon66 committed Sep 5, 2022
1 parent 0fc3e73 commit 974593d
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_executable(s3select_example s3select_example.cpp)
target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include)
add_executable(s3select_scaleup s3select_scaleup.cpp)
target_include_directories(s3select_scaleup PUBLIC ../include)
target_include_directories(s3select_scaleup PUBLIC ../include ../rapidjson/include)

find_package(Arrow QUIET)

Expand Down
1 change: 1 addition & 0 deletions example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ int run_on_localFile(char* input_query)
s3select_result result;
s3selectEngine::csv_object::csv_defintions csv;
csv.use_header_info = false;
bool do_aggregate = false;
//csv.column_delimiter='|';
//csv.row_delimiter='\t';

Expand Down
28 changes: 15 additions & 13 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2723,7 +2723,7 @@ class json_object : public base_s3object
JsonParserHandler JsonHandler;
size_t m_processed_bytes;
bool m_end_of_stream;
std::string s3select_result;
s3select_result m_result;
size_t m_row_count;
bool star_operation_ind;
std::string m_error_description;
Expand Down Expand Up @@ -2792,23 +2792,23 @@ class json_object : public base_s3object
//execute statement on row
//create response (TODO callback)

size_t result_len = s3select_result.size();
size_t result_len = m_result.size();
int status=0;
try{
status = getMatchRow(s3select_result);
status = getMatchRow(m_result);
}
catch(s3selectEngine::base_s3select_exception& e)
{
sql_error_handling(e,s3select_result);
sql_error_handling(e,m_result.str());
status = -1;
}

m_sa->clear_data();
if(star_operation_ind && (s3select_result.size() != result_len))
if(star_operation_ind && (m_result.size() != result_len))
{//as explained above the star-operation is displayed differently
std::string end_of_row;
end_of_row = "#=== " + std::to_string(m_row_count++) + " ===#\n";
s3select_result.append(end_of_row);
m_result.append(end_of_row);
}
return status;
}
Expand All @@ -2833,10 +2833,10 @@ class json_object : public base_s3object
//the error-handling takes care of the error flow.
m_error_description = e.what();
m_error_count++;
s3select_result.append(std::to_string(m_error_count));
s3select_result += " : ";
s3select_result.append(m_error_description);
s3select_result += m_csv_defintion.output_row_delimiter;
m_result.append(std::to_string(m_error_count));
m_result.append(std::string(" : "));
m_result.append(m_error_description);
m_result.append(&m_csv_defintion.output_row_delimiter,1);
}

public:
Expand All @@ -2845,21 +2845,21 @@ class json_object : public base_s3object
{
int status=0;
m_processed_bytes += stream_length;
s3select_result.clear();
m_result.clear();

if(!stream_length || !json_stream)//TODO m_processed_bytes(?)
{//last processing cycle
JsonHandler.process_json_buffer(0, 0, true);//TODO end-of-stream = end-of-row
m_end_of_stream = true;
sql_execution_on_row_cb();
result = s3select_result;
result = m_result.str();
return 0;
}

try{
//the handler is processing any buffer size and return results per each buffer
status = JsonHandler.process_json_buffer((char*)json_stream, stream_length);
result = s3select_result;//TODO remove this result copy
result = m_result.str();//TODO remove this result copy
}
catch(std::exception &e)
{
Expand All @@ -2878,6 +2878,8 @@ class json_object : public base_s3object

~json_object() = default;

};

class merge_results : public base_s3object
{//purpose: upon processing several stream on a single aggregate query, this object should merge results.

Expand Down
2 changes: 2 additions & 0 deletions include/s3select_oper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,8 @@ class base_statement
m_projection_alias(nullptr), m_eval_stack_depth(0), m_skip_non_aggregate_op(false),
execution_phase(multiple_executions_en::NA){}

multiple_executions_en execution_phase;

virtual value& eval()
{
//purpose: on aggregation flow to run only the correct subtree(aggregation subtree)
Expand Down
14 changes: 7 additions & 7 deletions test/s3select_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,7 @@ void test_single_column_single_row(const char* input_query,const char* expected_
#ifdef _ARROW_EXIST
parquet_csv_report_error(parquet_result.str(),csv_result.str());
#endif
json_csv_report_error(json_result, s3select_result);
ASSERT_EQ(s3select_result, std::string(expected_result));
json_csv_report_error(json_result, csv_result.str());
ASSERT_EQ(csv_result.str(), std::string(expected_result));
}

Expand Down Expand Up @@ -2664,7 +2663,8 @@ void generate_csv_quote_and_escape(std::string& out, char quote = '"', char escp

TEST(TestS3selectFunctions, csv_quote_string_and_escape_char)
{
std::string input, s3select_result_1, s3select_result_2, s3select_result_3;
std::string input;
s3select_result s3select_result_1, s3select_result_2, s3select_result_3;
csv_object::csv_defintions csv;
generate_csv_quote_and_escape(input);
s3select s3select_syntax1, s3select_syntax2, s3select_syntax3;
Expand All @@ -2683,7 +2683,7 @@ TEST(TestS3selectFunctions, csv_quote_string_and_escape_char)
s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax2, csv);
s3_csv_object_second.run_s3select_on_object(s3select_result_2, input.c_str(), input.size(), false, false, true);

EXPECT_EQ(s3select_result_1, s3select_result_2);
EXPECT_EQ(s3select_result_1.str(), s3select_result_2.str());

csv.escape_char = '\0';
csv.quot_char = '\0';
Expand All @@ -2695,13 +2695,13 @@ TEST(TestS3selectFunctions, csv_quote_string_and_escape_char)
s3selectEngine::csv_object s3_csv_object_third(&s3select_syntax3, csv);
s3_csv_object_third.run_s3select_on_object(s3select_result_3, input.c_str(), input.size(), false, false, true);

EXPECT_EQ(s3select_result_3, input);
EXPECT_EQ(s3select_result_3.str(), input);
}

TEST(TestS3selectFunctions, csv_comment_line_and_trim_char)
{
std::string input;
std::string s3select_result_1, s3select_result_2;
s3select_result s3select_result_1, s3select_result_2;
generate_csv_quote_and_escape(input);
s3select s3select_syntax;

Expand All @@ -2725,7 +2725,7 @@ TEST(TestS3selectFunctions, csv_comment_line_and_trim_char)
s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv);
s3_csv_object_second.run_s3select_on_object(s3select_result_2, input.c_str(), input.size(), false, false, true);

EXPECT_EQ(s3select_result_1, s3select_result_2);
EXPECT_EQ(s3select_result_1.str(), s3select_result_2.str());
}

TEST(TestS3selectFunctions, presto_syntax_alignments)
Expand Down
2 changes: 1 addition & 1 deletion test/s3select_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ std::string run_s3select(std::string expression,std::string input, const char* j
}

run_json_query(json_query, js, json_result);
json_csv_report_error(json_result, s3select_result);
json_csv_report_error(json_result, csv_result.str());

return csv_result.str();
}
Expand Down

0 comments on commit 974593d

Please sign in to comment.