Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Introducing sharding mode to Interactive #4410

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ jobs:
matrix:
BUILD_TEST: [ON, OFF]
BUILD_ODPS_FRAGMENT_LOADER: [ON, OFF]
DEFAULT_SHARDING_MODE: ["exclusive", "cooperative"]
steps:
- uses: actions/checkout@v4

Expand Down
22 changes: 22 additions & 0 deletions docs/flex/interactive/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ compiler:
query_timeout: 20000 # query timeout in milliseconds, default 20000
```

#### Sharded Service

The core query engine of Interactive is developed using [hiactor](https://github.com/alibaba/hiactor) which is based on [Seastar](https://github.com/scylladb/seastar). Seastar operates on a Share-nothing SMP architecture, where each core functions autonomously, without sharing memory, data structures, or CPU resources. Each Seastar core is commonly referred to as a shard.

Leveraging the future-promise API and a Cooperative micro-task scheduler, the sharded service significantly boosts performance and throughput. However, this setup can also lead to potential issues: an incoming request might experience delays even if some shards are idle, due to the shard scheduling algorithm potentially routing it to a busy shard. This can be problematic in Interactive, which typically hosts two services—`QueryService` and `AdminService`. Crucially, `AdminService` must remain responsive even when `QueryService` is under heavy load.

As discussed in [discussion-4409](https://github.com/alibaba/GraphScope/discussions/4409), one potential solution is to allocate different shards for handling distinct requests. This approach presents three scenarios:

- **Routine Scenario**: Here, users may execute both complex and simple queries, thus dedicating a shard exclusively for admin requests. However, since this shard won’t process queries, overall system performance may decline.

- **Performance-Critical Scenario**: In this scenario, users aim for peak performance from Interactive. All shards are used to process query requests, with admin requests being handled concurrently by them. Consequently, there may be instances of request delays.

By default, Interactive is configured for routine with the following:

```yaml
http_service:
sharding_mode: exclusive # In exclusive mode, a shard is exclusively reserved for admin requests. In cooperative mode, both query and admin requests can be processed by any shard.
```

By changing to `sharding_mode: cooperative`, you can fully utilize all the computational power for the QueryService.


##### Available Configurations

Expand All @@ -99,6 +120,7 @@ In this following table, we use the `.` notation to represent the hierarchy with
| compiler.planner.rules.FilterIntoJoinRule | N/A | A native Calcite optimization rule that pushes filter conditions to the Join participants before performing the join | 0.0.1 |
| compiler.planner.rules.NotMatchToAntiJoinRule | N/A | An optimization rule that transforms a "not exist" pattern into an anti-join operation | 0.0.1 |
| compiler.query_timeout | 3000000 | The maximum time for compiler to wait engine's reply, in `ms` | 0.0.3 |
| http_service.sharding_mode | exclusive | The sharding mode for http service, In exclusive mode, one shard is reserved exclusively for service admin request. In cooperative, both query request and admin request could be served by any shard. | 0.5 |

#### TODOs

Expand Down
3 changes: 3 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build op
option(USE_STATIC_ARROW "Whether to use static arrow" ON) # Whether to link arrow statically, default is ON
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF

# Default args for flex building
set(DEFAULT_SHARDING_MODE "exclusive" CACHE STRING "Default sharding mode for flex, see graph_db_service.h. To change to other options, cmake .. -DDEFAULT_SHARDING_MODE=coorperative")

#print options
message(STATUS "Build test: ${BUILD_TEST}")
message(STATUS "Build doc: ${BUILD_DOC}")
Expand Down
1 change: 1 addition & 0 deletions flex/engines/http_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ if (Hiactor_FOUND)
target_link_libraries(flex_server otel)
endif()
target_link_libraries(flex_server flex_metadata_store)
target_compile_definitions(flex_server PRIVATE DEFAULT_SHARDING_MODE="${DEFAULT_SHARDING_MODE}")
install_without_export_flex_target(flex_server)
endif ()
8 changes: 7 additions & 1 deletion flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,13 @@ seastar::future<admin_query_result> admin_actor::run_create_graph(
query_param&& query_param) {
LOG(INFO) << "Creating Graph: " << query_param.content;

auto request = gs::CreateGraphMetaRequest::FromJson(query_param.content);
gs::Result<std::string> preprocess_schema_str =
gs::preprocess_and_check_schema_json_string(query_param.content);
if (!preprocess_schema_str.ok()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(preprocess_schema_str.status()));
}
auto request = gs::CreateGraphMetaRequest::FromJson(preprocess_schema_str.value());
if (!request.ok()) {
LOG(ERROR) << "Fail to parse graph meta: "
<< request.status().error_message();
Expand Down
18 changes: 15 additions & 3 deletions flex/engines/http_server/graph_db_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ ServiceConfig::ServiceConfig()
enable_bolt(false),
metadata_store_type_(gs::MetadataStoreType::kLocalFile),
log_level(DEFAULT_LOG_LEVEL),
verbose_level(DEFAULT_VERBOSE_LEVEL) {}
verbose_level(DEFAULT_VERBOSE_LEVEL) {
#ifdef DEFAULT_SHARDING_MODE
VLOG(10) << "Set default sharding mode to: " << DEFAULT_SHARDING_MODE;
set_sharding_mode(DEFAULT_SHARDING_MODE);
#else
VLOG(10) << "No default sharding mode is set, use cooperative mode.";
set_sharding_mode("exclusive");
#endif
}

const std::string GraphDBService::DEFAULT_GRAPH_NAME = "modern_graph";
const std::string GraphDBService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
Expand Down Expand Up @@ -113,10 +121,14 @@ void GraphDBService::init(const ServiceConfig& config) {
actor_sys_ = std::make_unique<actor_system>(
config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool,
config.external_thread_num, [this]() { set_exit_state(); });
// NOTE that in sharding mode EXCLUSIVE, the last shard is reserved for admin
// requests.
query_hdl_ = std::make_unique<graph_db_http_handler>(
config.query_port, config.shard_num, config.enable_adhoc_handler);
config.query_port, config.shard_num, config.get_cooperative_shard_num(),
config.enable_adhoc_handler);
if (config.start_admin_service) {
admin_hdl_ = std::make_unique<admin_http_handler>(config.admin_port);
admin_hdl_ = std::make_unique<admin_http_handler>(
config.admin_port, config.shard_num, config.get_exclusive_shard_id());
}

initialized_.store(true);
Expand Down
42 changes: 42 additions & 0 deletions flex/engines/http_server/graph_db_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace server {
/* Stored service configuration, read from interactive_config.yaml
*/
struct ServiceConfig {
enum class ShardingMode { EXCLUSIVE, COOPERATIVE };
static constexpr const uint32_t DEFAULT_SHARD_NUM = 1;
static constexpr const uint32_t DEFAULT_QUERY_PORT = 10000;
static constexpr const uint32_t DEFAULT_ADMIN_PORT = 7777;
Expand Down Expand Up @@ -67,12 +68,39 @@ struct ServiceConfig {
// If we found GLOG_v in the environment, we will at the first place.
int log_level;
int verbose_level;
ShardingMode sharding_mode; // exclusive or cooperative. With exclusive mode,
// we will reserve one shard for only processing
// admin requests, and the other shards for
// processing query requests. With cooperative
// mode, all shards will process both admin and
// query requests. With only one shard available,
// the sharding mode must be cooperative.

// Those has not default value
std::string default_graph;
std::string engine_config_path; // used for codegen.

ServiceConfig();

void set_sharding_mode(const std::string& mode) {
if (mode == "exclusive") {
sharding_mode = ShardingMode::EXCLUSIVE;
} else if (mode == "cooperative") {
sharding_mode = ShardingMode::COOPERATIVE;
} else {
LOG(FATAL) << "Invalid sharding mode: " << mode;
}
}

int32_t get_exclusive_shard_id() const {
return sharding_mode == ShardingMode::EXCLUSIVE ? shard_num - 1 : -1;
}

int32_t get_cooperative_shard_num() const {
return sharding_mode == ShardingMode::EXCLUSIVE
? std::max((int32_t) shard_num - 1, 1)
: shard_num; // shard_num >= 1
}
};

class GraphDBService {
Expand Down Expand Up @@ -241,6 +269,20 @@ struct convert<server::ServiceConfig> {
LOG(INFO) << "admin_port not found, use default value "
<< service_config.admin_port;
}
if (http_service_node["sharding_mode"]) {
auto sharding_mode =
http_service_node["sharding_mode"].as<std::string>();
if (sharding_mode != "exclusive" && sharding_mode != "cooperative") {
LOG(ERROR) << "Unsupported sharding mode: " << sharding_mode;
return false;
}
if (sharding_mode == "exclusive" && service_config.shard_num == 1) {
LOG(ERROR) << "exclusive sharding mode requires at least 2 shards";
return false;
}
service_config.set_sharding_mode(sharding_mode);
VLOG(1) << "sharding_mode: " << sharding_mode;
}
} else {
LOG(ERROR) << "Fail to find http_service configuration";
return false;
Expand Down
Loading