Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into auth-execption
Browse files Browse the repository at this point in the history
  • Loading branch information
HangyuanLiu committed Aug 23, 2023
2 parents e9127bc + 4667c99 commit 1808502
Show file tree
Hide file tree
Showing 63 changed files with 2,273 additions and 83 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/ci-doc-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
ignore: node_modules
version: 0.28.1

behavior-unchange:
behavior-change-unset:
runs-on: ubuntu-latest
needs: add-doc-label
steps:
Expand All @@ -97,10 +97,14 @@ jobs:
githubToken: ${{ secrets.PAT }}
find: '[x] Yes, this PR will result in a change in behavior.'
replace: '[ ] Yes, this PR will result in a change in behavior.'


behavior-unchange-set:
runs-on: ubuntu-latest
needs: add-doc-label
steps:
- name: Replace Pull Request Body
uses: ivangabriele/[email protected]
with:
githubToken: ${{ secrets.PAT }}
find: '[ ] No, this PR will not result in a change in behavior.'
replace: '[x] No, this PR will not result in a change in behavior.'
replace: '[x] No, this PR will not result in a change in behavior.'
9 changes: 5 additions & 4 deletions .github/workflows/release-docker-image.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
name: 'release docker images'

on:
release:
types: [published]
# Don't trigger actions automatically
#on:
# release:
# types: [published]

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
Expand Down Expand Up @@ -32,7 +33,7 @@ jobs:

- name: build artifact docker image
run: |
DOCKER_BUILDKIT=1 docker build --rm=true --build-arg RELEASE_VERSION=${RELEASE_VERSION} -f docker/dockerfiles/artifacts/artifact-ubuntu.Dockerfile -t artifact-ubuntu:${RELEASE_VERSION} .
DOCKER_BUILDKIT=1 docker build --rm=true --build-arg RELEASE_VERSION=${RELEASE_VERSION} -f docker/dockerfiles/artifacts/artifact.Dockerfile -t artifact-ubuntu:${RELEASE_VERSION} .
- name: build and publish fe docker image
run: |
Expand Down
7 changes: 5 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "storage/task/engine_storage_migration_task.h"
#include "storage/update_manager.h"
#include "storage/utils.h"
#include "util/misc.h"
#include "util/starrocks_metrics.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
Expand Down Expand Up @@ -600,7 +601,8 @@ void* ReportTaskWorkerPool::_worker_thread_callback(void* arg_this) {
<< ", err=" << status;
}

sleep(config::report_task_interval_seconds);
nap_sleep(config::report_task_interval_seconds,
[worker_pool_this] { return worker_pool_this->_stopped.load(); });
}

return nullptr;
Expand Down Expand Up @@ -740,7 +742,8 @@ void* ReportWorkgroupTaskWorkerPool::_worker_thread_callback(void* arg_this) {
if (result.__isset.workgroup_ops) {
workgroup::WorkGroupManager::instance()->apply(result.workgroup_ops);
}
sleep(config::report_workgroup_interval_seconds);
nap_sleep(config::report_workgroup_interval_seconds,
[worker_pool_this] { return worker_pool_this->_stopped.load(); });
}

return nullptr;
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "util/gc_helper.h"
#include "util/logging.h"
#include "util/mem_info.h"
#include "util/misc.h"
#include "util/monotime.h"
#include "util/network_util.h"
#include "util/starrocks_metrics.h"
Expand Down Expand Up @@ -100,7 +101,7 @@ void gc_memory(void* arg_this) {

auto* daemon = static_cast<Daemon*>(arg_this);
while (!daemon->stopped()) {
sleep(static_cast<unsigned int>(config::memory_maintenance_sleep_time_s));
nap_sleep(config::memory_maintenance_sleep_time_s, [daemon] { return daemon->stopped(); });

ReleaseColumnPool releaser(kFreeRatio);
ForEach<ColumnPoolList>(releaser);
Expand Down Expand Up @@ -184,7 +185,7 @@ void calculate_metrics(void* arg_this) {
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value());

sleep(15); // 15 seconds
nap_sleep(15, [daemon] { return daemon->stopped(); });
}
}

Expand Down
8 changes: 2 additions & 6 deletions be/src/runtime/broker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/await.h"
#include "util/misc.h"
#include "util/starrocks_metrics.h"
#include "util/thread.h"

Expand Down Expand Up @@ -106,9 +106,6 @@ void BrokerMgr::ping(const TNetworkAddress& addr) {
}

void BrokerMgr::ping_worker() {
Awaitility await;
// timeout in 5 seconds with 200ms check interval
await.timeout(5 * 1000L * 1000).interval(200 * 1000L);
while (!_thread_stop) {
std::vector<TNetworkAddress> addresses;
{
Expand All @@ -120,8 +117,7 @@ void BrokerMgr::ping_worker() {
for (auto& addr : addresses) {
ping(addr);
}
// block until _thread_stop is true or timeout
await.until([this] { return _thread_stop; });
nap_sleep(5, [this] { return _thread_stop; });
}
}

Expand Down
9 changes: 2 additions & 7 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
#include "runtime/runtime_filter_cache.h"
#include "runtime/runtime_filter_worker.h"
#include "service/backend_options.h"
#include "util/await.h"
#include "util/misc.h"
#include "util/starrocks_metrics.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
Expand Down Expand Up @@ -542,9 +542,6 @@ void FragmentMgr::close() {

void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
Awaitility await;
// timeout in 1 second and check every 200ms
await.timeout(1000 * 1000).interval(200 * 1000);
while (!_stop) {
std::vector<TUniqueId> to_delete;
DateTimeValue now = DateTimeValue::local_time();
Expand All @@ -560,9 +557,7 @@ void FragmentMgr::cancel_worker() {
cancel(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment " << print_id(id);
}

// block until timeout or _stop is true
await.until([this] { return _stop; });
nap_sleep(1, [this] { return _stop; });
}
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
Expand Down
7 changes: 2 additions & 5 deletions be/src/runtime/profile_report_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include "exec/pipeline/query_context.h"
#include "runtime/fragment_mgr.h"
#include "util/await.h"
#include "util/misc.h"

namespace starrocks {

Expand Down Expand Up @@ -113,17 +113,14 @@ void ProfileReportWorker::execute() {

int32_t interval = config::profile_report_interval;

Awaitility await;
await.timeout(interval * 1000LL * 1000).interval(200 * 1000L);
while (!_stop.load(std::memory_order_consume)) {
_start_report_profile();

if (interval <= 0) {
LOG(WARNING) << "profile_report_interval config is illegal: " << interval << ", force set to 1";
interval = 1;
}
// block until timeout or _stop is true
await.until([this] { return _stop.load(std::memory_order_consume); });
nap_sleep(interval, [this] { return _stop.load(std::memory_order_consume); });
}
LOG(INFO) << "ProfileReportWorker going to exit.";
}
Expand Down
8 changes: 2 additions & 6 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

#include "gen_cpp/InternalService_types.h"
#include "runtime/buffer_control_block.h"
#include "util/await.h"
#include "util/misc.h"
#include "util/starrocks_metrics.h"
#include "util/thread.h"

Expand Down Expand Up @@ -146,9 +146,6 @@ Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& quer
void ResultBufferMgr::cancel_thread() {
LOG(INFO) << "result buffer manager cancel thread begin.";

Awaitility await;
// check every 200ms until 1 second and timeout
await.timeout(1000 * 1000).interval(200 * 1000);
while (!_is_stop) {
// get query
std::vector<TUniqueId> query_to_cancel;
Expand All @@ -170,8 +167,7 @@ void ResultBufferMgr::cancel_thread() {
for (auto& i : query_to_cancel) {
cancel(i);
}
// wait until timeout or _is_stop returns true
await.until([this] { return _is_stop; });
nap_sleep(1, [this] { return _is_stop; });
}

LOG(INFO) << "result buffer manager cancel thread finish.";
Expand Down
10 changes: 2 additions & 8 deletions be/src/runtime/routine_load/data_consumer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "common/config.h"
#include "data_consumer.h"
#include "runtime/routine_load/data_consumer_group.h"
#include "util/await.h"
#include "util/misc.h"
#include "util/thread.h"

namespace starrocks {
Expand Down Expand Up @@ -173,17 +173,11 @@ void DataConsumerPool::start_bg_worker() {
#endif

uint32_t interval = 60;
Awaitility await;
// timeout in `interval` seconds and check every 200ms
await.timeout(interval * 1000L * 1000).interval(200 * 1000);
while (true) {
if (*is_closed) {
return;
}

_clean_idle_consumer_bg();
// block until timeout or *is_closed is true
await.until([&is_closed] { return *is_closed; });
nap_sleep(interval, [&is_closed] { return *is_closed; });
}
});
Thread::set_thread_name(_clean_idle_consumer_thread, "clean_idle_cm");
Expand Down
7 changes: 2 additions & 5 deletions be/src/runtime/stream_load/transaction_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "util/await.h"
#include "util/byte_buffer.h"
#include "util/debug_util.h"
#include "util/defer_op.h"
#include "util/json_util.h"
#include "util/metrics.h"
#include "util/misc.h"
#include "util/starrocks_metrics.h"
#include "util/string_parser.hpp"
#include "util/thrift_rpc_helper.h"
Expand Down Expand Up @@ -87,12 +87,9 @@ TransactionMgr::TransactionMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
ProfilerRegisterThread();
#endif

Awaitility await;
// check every 200ms until timeout with `interval` seconds
await.timeout(interval * 1000L * 1000L).interval(200 * 1000);
while (!_is_stopped.load()) {
_clean_stream_context();
await.until([this] { return _is_stopped.load(); });
nap_sleep(interval, [this] { return _is_stopped.load(); });
}
});
Thread::set_thread_name(_transaction_clean_thread, "transaction_clean");
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/service_be/starrocks_be.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void start_be(const std::vector<StorePath>& paths, bool as_cn) {
LOG(INFO) << "BE started successfully";

while (!(k_starrocks_exit.load()) && !(k_starrocks_exit_quick.load())) {
sleep(10);
sleep(1);
}

int exit_step = 1;
Expand Down
1 change: 1 addition & 0 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(UTIL_FILES
starrocks_metrics.cpp
mem_info.cpp
metrics.cpp
misc.cpp
murmur_hash3.cpp
network_util.cpp
parse_util.cpp
Expand Down
33 changes: 33 additions & 0 deletions be/src/util/misc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "util/misc.h"

#include "util/await.h"

namespace starrocks {

#ifdef BE_TEST
static constexpr int sleep_interval = 100 * 1000; // 100ms
#else
static constexpr int sleep_interval = 1 * 1000 * 1000; // 1 seconds
#endif

void nap_sleep(int32_t sleep_secs, std::function<bool()> stop_condition) {
Awaitility await;
await.timeout(sleep_secs * 1000LL * 1000LL).interval(sleep_interval);
await.until(stop_condition);
}

} // namespace starrocks
24 changes: 24 additions & 0 deletions be/src/util/misc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>

namespace starrocks {

// take a sleep with small intervals until time out by `sleep_secs` or the `stop_condition()` is true
void nap_sleep(int32_t sleep_secs, std::function<bool()> stop_condition);

} // namespace starrocks
2 changes: 1 addition & 1 deletion bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export_shared_envvars() {

# https://github.com/aws/aws-cli/issues/5623
# https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
export AWS_EC2_METADATA_DISABLED=false
export AWS_EC2_METADATA_DISABLED=${AWS_EC2_METADATA_DISABLED:-false}
# ===================================================================================
}

Expand Down
Loading

0 comments on commit 1808502

Please sign in to comment.