Skip to content

Commit

Permalink
[fix](cloud-mow) Make loading task timeout more reasonable
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Jan 3, 2025
1 parent 77beb58 commit b90d365
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 6 deletions.
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(const CloudTablet&
}

Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator) {
int64_t initiator, bool is_schema_change) {
VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
<< ",lock_id:" << lock_id;
GetDeleteBitmapUpdateLockRequest req;
Expand All @@ -1110,7 +1110,11 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in
req.set_lock_id(lock_id);
req.set_initiator(initiator);
// set expiration time for compaction and schema_change
req.set_expiration(config::delete_bitmap_lock_expiration_seconds);
int lock_expiration = config::delete_bitmap_lock_expiration_seconds;
if (is_schema_change) {
lock_expiration *= config::schema_change_delete_bitmap_lock_expiration_multiplier;
}
req.set_expiration(lock_expiration);
int retry_times = 0;
Status st;
std::default_random_engine rng = make_random_engine();
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class CloudMetaMgr {
DeleteBitmap* delete_bitmap);

Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
int64_t initiator, bool is_schema_change = false);

Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,

// step 2, process incremental rowset with delete bitmap update lock
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, true));
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t new_max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,7 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_Int32(schema_change_delete_bitmap_lock_expiration_multiplier, "3");
#include "common/compile_check_end.h"
} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ DECLARE_mInt32(sync_load_for_tablets_thread);

DECLARE_Int32(delete_bitmap_lock_expiration_seconds);

DECLARE_Int32(schema_change_delete_bitmap_lock_expiration_multiplier);

// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,8 @@ DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

DEFINE_mInt32(mow_commit_rpc_timeout_multiplier, "2");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,8 @@ DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

DECLARE_mInt32(compaction_num_per_round);

DECLARE_mInt32(mow_commit_rpc_timeout_multiplier);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,16 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
#ifndef BE_TEST
int timeout_ms = config::txn_commit_rpc_timeout_ms;
if (ctx->is_mow_table()) {
timeout_ms *= config::mow_commit_rpc_timeout_multiplier;
}
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnCommit(result, request);
},
config::txn_commit_rpc_timeout_ms));
timeout_ms));
#else
result = k_stream_load_commit_result;
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate";

public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 60_000;

public static final String ENABLE_VECTORIZED_ENGINE = "enable_vectorized_engine";

Expand Down

0 comments on commit b90d365

Please sign in to comment.