From 11379e509b46bcc46973fae01b667ee57e7ea62b Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Fri, 20 Oct 2023 21:58:12 -0400 Subject: [PATCH] add http service, plan parsing, catalog serialization Signed-off-by: Alex Chi --- CMakeLists.txt | 1 + src/common/bustub_instance.cpp | 44 ++++++ src/include/common/bustub_instance.h | 4 + third_party/CMakeLists.txt | 4 + tools/CMakeLists.txt | 1 + tools/http-server/CMakeLists.txt | 5 + tools/http-server/http-server.cpp | 205 +++++++++++++++++++++++++++ 7 files changed, 264 insertions(+) create mode 100644 tools/http-server/CMakeLists.txt create mode 100644 tools/http-server/http-server.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 901bb388c..37b7fc909 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ set(BUSTUB_THIRD_PARTY_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/argparse/include ${PROJECT_SOURCE_DIR}/third_party/cpp_random_distributions ${PROJECT_SOURCE_DIR}/third_party/backward-cpp/include + ${PROJECT_SOURCE_DIR}/third_party/json/include ) include_directories(${BUSTUB_SRC_INCLUDE_DIR} ${BUSTUB_TEST_INCLUDE_DIR} ${BUSTUB_THIRD_PARTY_INCLUDE_DIR}) diff --git a/src/common/bustub_instance.cpp b/src/common/bustub_instance.cpp index 945027fb0..3b07f04fa 100644 --- a/src/common/bustub_instance.cpp +++ b/src/common/bustub_instance.cpp @@ -251,6 +251,50 @@ auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer, } } +auto BustubInstance::ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool { + auto txn = txn_manager_->Begin(); + try { + auto result = ExecutePlanTxn(plan, txn, writer); + txn_manager_->Commit(txn); + delete txn; + return result; + } catch (bustub::Exception &ex) { + txn_manager_->Abort(txn); + delete txn; + throw ex; + } +} + +auto BustubInstance::ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool { + // Execute the query. + auto exec_ctx = MakeExecutorContext(txn, false); + std::vector result_set{}; + auto is_successful = execution_engine_->Execute(plan, &result_set, txn, exec_ctx.get()); + + // Return the result set as a vector of string. + auto schema = plan->OutputSchema(); + + // Generate header for the result set. + writer.BeginTable(false); + writer.BeginHeader(); + for (const auto &column : schema.GetColumns()) { + writer.WriteHeaderCell(column.GetName()); + } + writer.EndHeader(); + + // Transforming result set into strings. + for (const auto &tuple : result_set) { + writer.BeginRow(); + for (uint32_t i = 0; i < schema.GetColumnCount(); i++) { + writer.WriteCell(tuple.GetValue(&schema, i).ToString()); + } + writer.EndRow(); + } + writer.EndTable(); + + return is_successful; +} + auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn, std::shared_ptr check_options) -> bool { if (!sql.empty() && sql[0] == '\\') { diff --git a/src/include/common/bustub_instance.h b/src/include/common/bustub_instance.h index 3c08d848e..2334cfc51 100644 --- a/src/include/common/bustub_instance.h +++ b/src/include/common/bustub_instance.h @@ -26,6 +26,7 @@ #include "common/config.h" #include "common/util/string_util.h" #include "execution/check_options.h" +#include "execution/plans/abstract_plan.h" #include "libfort/lib/fort.hpp" #include "type/value.h" @@ -265,6 +266,9 @@ class BustubInstance { /** Get the current transaction. */ auto CurrentManagedTxn() -> Transaction *; + auto ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool; + auto ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool; + /** * FOR TEST ONLY. Generate test tables in this BusTub instance. * It's used in the shell to predefine some tables, as we don't support diff --git a/third_party/CMakeLists.txt b/third_party/CMakeLists.txt index 03dd8ce05..128c32eec 100644 --- a/third_party/CMakeLists.txt +++ b/third_party/CMakeLists.txt @@ -17,3 +17,7 @@ add_subdirectory(utf8proc) add_subdirectory(backward-cpp) add_subdirectory(readerwriterqueue) + +add_subdirectory(json) + +add_subdirectory(cpp-httplib) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 40e176ae9..eec72b87c 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(terrier_bench) add_subdirectory(bpm_bench) add_subdirectory(btree_bench) add_subdirectory(htable_bench) +add_subdirectory(http-server) add_backward(shell) add_backward(nc-shell) diff --git a/tools/http-server/CMakeLists.txt b/tools/http-server/CMakeLists.txt new file mode 100644 index 000000000..883614123 --- /dev/null +++ b/tools/http-server/CMakeLists.txt @@ -0,0 +1,5 @@ +set(HTTP_SERVER_SOURCES http-server.cpp) +add_executable(http-server ${HTTP_SERVER_SOURCES}) + +target_link_libraries(http-server bustub) +set_target_properties(http-server PROPERTIES OUTPUT_NAME bustub-http-server) diff --git a/tools/http-server/http-server.cpp b/tools/http-server/http-server.cpp new file mode 100644 index 000000000..50dbf7f37 --- /dev/null +++ b/tools/http-server/http-server.cpp @@ -0,0 +1,205 @@ +#include +#include +#include +#include +#include +#include +#include "binder/table_ref/bound_join_ref.h" +#include "catalog/column.h" +#include "catalog/schema.h" +#include "common/bustub_instance.h" +#include "common/exception.h" +#include "cpp-httplib/httplib.h" +#include "execution/expressions/abstract_expression.h" +#include "execution/expressions/column_value_expression.h" +#include "execution/expressions/comparison_expression.h" +#include "execution/plans/abstract_plan.h" +#include "execution/plans/nested_loop_join_plan.h" +#include "execution/plans/projection_plan.h" +#include "execution/plans/seq_scan_plan.h" +#include "type/type.h" +#include "type/type_id.h" + +using json = nlohmann::json; + +auto TransformType(const std::string &ft) -> bustub::TypeId { + if (ft == "INTEGER") { + return bustub::TypeId::INTEGER; + } + if (ft == "BOOLEAN") { + return bustub::TypeId::BOOLEAN; + } + throw bustub::Exception("unsupported field type"); +} + +auto TransformExpr(json expr, const std::vector &children) + -> bustub::AbstractExpressionRef { + bustub::TypeId out_type = TransformType(expr["outType"]); + if (expr.contains("op")) { + json op = expr["op"]; + std::string name = op["name"]; + std::vector operands_json = expr["operands"]; + std::vector operands; + operands.reserve(operands_json.size()); + for (const auto &operand_json : operands_json) { + operands.emplace_back(TransformExpr(operand_json, children)); + } + if (name == "=") { + return std::make_shared(operands[0], operands[1], bustub::ComparisonType::Equal); + } + throw bustub::Exception("unsupported op"); + } + + // column value expression + std::string name = expr["name"]; + size_t input = expr["input"]; + size_t i = 0; + for (; i < children.size(); i++) { + size_t cnt = children[i]->OutputSchema().GetColumnCount(); + if (input < cnt) { + break; + } + input -= cnt; + } + return std::make_shared(i, input, out_type); +} + +auto TransformRootRel(bustub::BustubInstance &bustub, const std::map &rels, json root_rel) + -> bustub::AbstractPlanNodeRef { + std::string rel_op = root_rel["relOp"]; + std::vector inputs = root_rel["inputs"]; + std::vector input_plan_nodes; + for (const auto &input : inputs) { + auto input_rel = rels.find(input)->second; + auto input_plan_node = TransformRootRel(bustub, rels, input_rel); + input_plan_nodes.emplace_back(std::move(input_plan_node)); + } + std::vector fields = root_rel["fields"]; + std::vector field_types = root_rel["fieldTypes"]; + std::vector columns; + for (size_t i = 0; i < fields.size(); i++) { + auto ft = field_types[i]; + columns.emplace_back(fields[i], TransformType(ft)); + } + bustub::SchemaRef schema = std::make_shared(columns); + if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableJoin") { + std::string join_type = root_rel["joinType"]; + bustub::JoinType jt; + if (join_type == "inner") { + jt = bustub::JoinType::INNER; + } else { + throw bustub::Exception("unsupported join type"); + } + auto predicate = TransformExpr(root_rel["condition"], input_plan_nodes); + return std::make_shared(std::move(schema), input_plan_nodes[0], input_plan_nodes[1], + predicate, jt); + } + if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableTableScan") { + std::string table_name = root_rel["table"][0]; + auto table_info = bustub.catalog_->GetTable(table_name); + return std::make_shared(std::move(schema), table_info->oid_, table_name); + } + if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableProject") { + std::vector exprs; + std::vector exprs_json = root_rel["exprs"]; + exprs.reserve(exprs_json.size()); + for (const auto &expr_json : exprs_json) { + exprs.emplace_back(TransformExpr(expr_json, input_plan_nodes)); + } + return std::make_shared(std::move(schema), exprs, input_plan_nodes[0]); + } + throw bustub::Exception("unsupported rel type"); +} + +auto BuildPlanNodeFromJson(bustub::BustubInstance &bustub, json plan) -> bustub::AbstractPlanNodeRef { + std::map rels; + std::vector json_rels = plan["rels"]; + for (const auto &rel : json_rels) { + rels[rel["id"]] = rel; + } + json root_rel = *json_rels.rbegin(); + return TransformRootRel(bustub, rels, root_rel); +} + +// NOLINTNEXTLINE +auto main(int argc, char **argv) -> int { + auto bustub = std::make_unique(); + + // HTTP + httplib::Server svr; + + svr.set_exception_handler([](const auto &req, auto &res, std::exception_ptr ep) { + std::string exception; + try { + std::rethrow_exception(ep); + } catch (bustub::Exception &e) { + exception = e.what(); + } catch (std::exception &e) { + exception = e.what(); + } catch (...) { // See the following NOTE + exception = "unknown exception"; + } + res.set_content(exception, "text/plain"); + res.status = 500; + }); + + svr.Post("/sql", [&bustub](const httplib::Request &req, httplib::Response &res) { + std::stringstream ss; + bustub::SimpleStreamWriter writer(ss, false); + json data = json::parse(req.body); + std::cerr << "SQL request: " << data["sql"] << std::endl; + bustub->ExecuteSql(data["sql"], writer); + res.set_content(ss.str(), "text/plain"); + }); + + svr.Post("/plan", [&bustub](const httplib::Request &req, httplib::Response &res) { + std::stringstream ss; + bustub::SimpleStreamWriter writer(ss, false); + std::cerr << "Plan request:"; + json json_plan = json::parse(req.body); + std::cerr << json_plan << std::endl; + auto plan = BuildPlanNodeFromJson(*bustub, json_plan); + std::cerr << "BusTub plan:" << std::endl << plan->ToString(true) << std::endl; + bustub->ExecutePlan(plan, writer); + res.set_content(ss.str(), "text/plain"); + }); + + svr.Get("/catalog", [&bustub](const httplib::Request &req, httplib::Response &res) { + std::cerr << "Catalog request" << std::endl; + auto tables = bustub->catalog_->GetTableNames(); + std::vector catalog; + for (const auto &tbl_name : tables) { + auto table = bustub->catalog_->GetTable(tbl_name); + std::vector schema; + for (size_t c = 0; c < table->schema_.GetColumnCount(); c++) { + auto col = table->schema_.GetColumn(c); + switch (col.GetType()) { + case bustub::TypeId::BIGINT: { + schema.emplace_back(std::map{{"name", col.GetName()}, {"type", "bigint"}}); + break; + } + case bustub::TypeId::INTEGER: { + schema.emplace_back(std::map{{"name", col.GetName()}, {"type", "integer"}}); + break; + } + case bustub::TypeId::VARCHAR: { + schema.emplace_back(std::map{{"name", col.GetName()}, {"type", "varchar"}}); + break; + } + default: + throw bustub::Exception("unsupported column type"); + } + } + catalog.emplace_back(std::map{{std::string("name"), json(table->name_)}, + {std::string("oid"), json(table->oid_)}, + {std::string("schema"), json(schema)}}); + } + res.set_content(json(std::map{{"catalog", catalog}}).dump(), "text/plain"); + }); + + std::cerr << "BusTub HTTP server listening" << std::endl; + + svr.listen("127.0.0.1", 23333); + + return 0; +}