diff --git a/include/vcpkg/base/batch-queue.h b/include/vcpkg/base/batch-queue.h new file mode 100644 index 0000000000..ef3e993ed8 --- /dev/null +++ b/include/vcpkg/base/batch-queue.h @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include + +template +class BatchQueue +{ +public: + template + void push(Args&&... args) + { + forward.emplace_back(std::forward(args)...); + } + + bool empty() const { return forward.empty(); } + + void pop(std::vector& out) + { + out.clear(); + swap(out, forward); + } + +private: + std::vector forward; +}; + +template +struct BGThreadBatchQueue +{ + template + void push(Args&&... args) + { + std::lock_guard lock(m_mtx); + m_tasks.push(std::forward(args)...); + m_cv.notify_all(); + } + + void wait_for_items(std::vector& out) + { + std::unique_lock lock(m_mtx); + m_cv.wait(lock, [this]() { return !m_tasks.empty() || !m_running; }); + m_tasks.pop(out); + } + + void stop() + { + std::lock_guard lock(m_mtx); + m_running = false; + m_cv.notify_all(); + } + + bool stopped() + { + std::lock_guard lock(m_mtx); + return !m_running; + } + +private: + std::mutex m_mtx; + std::condition_variable m_cv; + BatchQueue m_tasks; + bool m_running = true; +}; diff --git a/include/vcpkg/base/fwd/message_sinks.h b/include/vcpkg/base/fwd/message_sinks.h index 6def368015..f0c8d94fbd 100644 --- a/include/vcpkg/base/fwd/message_sinks.h +++ b/include/vcpkg/base/fwd/message_sinks.h @@ -13,4 +13,5 @@ namespace vcpkg struct FileSink; struct TeeSink; + struct BGMessageSink; } diff --git a/include/vcpkg/base/message-data.inc.h b/include/vcpkg/base/message-data.inc.h index 412500eb4b..a25859080e 100644 --- a/include/vcpkg/base/message-data.inc.h +++ b/include/vcpkg/base/message-data.inc.h @@ -3263,6 +3263,7 @@ DECLARE_MESSAGE(VSExaminedPaths, (), "", "The following paths were examined for DECLARE_MESSAGE(VSNoInstances, (), "", "Could not locate a complete Visual Studio instance") DECLARE_MESSAGE(WaitingForChildrenToExit, (), "", "Waiting for child processes to exit...") DECLARE_MESSAGE(WaitingToTakeFilesystemLock, (msg::path), "", "waiting to take filesystem lock on {path}...") +DECLARE_MESSAGE(WaitUntilPackagesUploaded, (msg::count), "", "Wait until the remaining packages ({count}) are uploaded") DECLARE_MESSAGE(WarningsTreatedAsErrors, (), "", "previous warnings being interpreted as errors") DECLARE_MESSAGE(WarnOnParseConfig, (msg::path), "", "Found the following warnings in configuration {path}:") DECLARE_MESSAGE(WhileCheckingOutBaseline, (msg::commit_sha), "", "while checking out baseline {commit_sha}") diff --git a/include/vcpkg/base/message_sinks.h b/include/vcpkg/base/message_sinks.h index 85cd345840..342f9e5672 100644 --- a/include/vcpkg/base/message_sinks.h +++ b/include/vcpkg/base/message_sinks.h @@ -4,6 +4,7 @@ #include +#include #include #include @@ -79,4 +80,26 @@ namespace vcpkg virtual void println(Color color, const LocalizedString& line) override; virtual void println(Color color, LocalizedString&& line) override; }; + + struct BGMessageSink final : MessageSink + { + BGMessageSink(MessageSink& out_sink) : out_sink(out_sink) { } + ~BGMessageSink() { publish_directly_to_out_sink(); } + // must be called from producer + virtual void println(const MessageLine& line) override; + virtual void println(MessageLine&& line) override; + using MessageSink::println; + + // must be called from consumer (synchronizer of out) + void print_published(); + + void publish_directly_to_out_sink(); + + private: + MessageSink& out_sink; + + std::mutex m_published_lock; + std::vector m_published; + bool m_print_directly_to_out_sink = false; + }; } diff --git a/include/vcpkg/base/strings.h b/include/vcpkg/base/strings.h index af7ca77acf..a596d04738 100644 --- a/include/vcpkg/base/strings.h +++ b/include/vcpkg/base/strings.h @@ -198,6 +198,8 @@ namespace vcpkg::Strings const char* find_first_of(StringView searched, StringView candidates); + [[nodiscard]] std::string::size_type find_last(StringView searched, char c); + [[nodiscard]] std::vector find_all_enclosed(StringView input, StringView left_delim, StringView right_delim); diff --git a/include/vcpkg/binarycaching.h b/include/vcpkg/binarycaching.h index dbd0e82a6c..6e1fbf69fd 100644 --- a/include/vcpkg/binarycaching.h +++ b/include/vcpkg/binarycaching.h @@ -8,8 +8,10 @@ #include #include +#include #include #include +#include #include #include @@ -20,6 +22,7 @@ #include #include #include +#include #include #include @@ -200,11 +203,24 @@ namespace vcpkg std::vector precheck(View actions); protected: - BinaryProviders m_config; + struct ReadOnlyBinaryCacheData + { + BinaryProviders m_config; - std::unordered_map m_status; + std::unordered_map m_status; + }; + std::unique_ptr data = std::make_unique(); }; + // compression and upload of binary cache entries happens on a single 'background' thread, `m_push_thread` + // Thread safety is achieved within the binary cache providers by: + // 1. Only using one thread in the background for this work. + // 2. Forming a queue of work for that thread to consume in `m_actions_to_push`, which maintains its own thread + // safety + // 3. Sending any replies from the background thread through `m_bg_msg_sink` + // 4. Ensuring any supporting data, such as tool exes, is provided before the background thread is started. + // 5. Ensuring that work is not submitted to the background thread until the corresponding `packages` directory to + // upload is no longer being actively touched by the foreground thread. struct BinaryCache : ReadOnlyBinaryCache { static ExpectedL make(const VcpkgCmdArguments& args, const VcpkgPaths& paths, MessageSink& sink); @@ -212,17 +228,38 @@ namespace vcpkg BinaryCache(const Filesystem& fs); BinaryCache(const BinaryCache&) = delete; BinaryCache(BinaryCache&&) = default; + ~BinaryCache(); /// Called upon a successful build of `action` to store those contents in the binary cache. void push_success(CleanPackages clean_packages, const InstallPlanAction& action); + void print_push_success_messages(); + void wait_for_async_complete_and_join(); + private: BinaryCache(BinaryProviders&& providers, const Filesystem& fs); - - const Filesystem& m_fs; - Optional m_zip_tool; - bool m_needs_nuspec_data = false; - bool m_needs_zip_file = false; + struct ActionToPush + { + BinaryPackageWriteInfo request; + CleanPackages clean_after_push; + }; + struct BinaryCacheData + { + BinaryCacheData(const Filesystem& fs, ReadOnlyBinaryCacheData* data); + const Filesystem& m_fs; + Optional m_zip_tool; + bool m_needs_nuspec_data = false; + bool m_needs_zip_file = false; + + BGMessageSink m_bg_msg_sink; + BGThreadBatchQueue m_actions_to_push; + std::atomic_int m_remaining_packages_to_push = 0; + std::thread m_push_thread; + + void push_thread_main(ReadOnlyBinaryCacheData* ro_data); + void wait_for_async_complete_and_join(); + }; + std::unique_ptr bc_data; }; ExpectedL parse_download_configuration(const Optional& arg); diff --git a/locales/messages.json b/locales/messages.json index c6aa16f284..9e71b6edac 100644 --- a/locales/messages.json +++ b/locales/messages.json @@ -1720,6 +1720,8 @@ "_VersionSpecMismatch.comment": "An example of {path} is /foo/bar. An example of {expected_version} is 1.3.8. An example of {actual_version} is 1.3.8.", "VersionVerifiedOK": "{version_spec} is correctly in the version database ({git_tree_sha})", "_VersionVerifiedOK.comment": "An example of {version_spec} is zlib:x64-windows@1.0.0. An example of {git_tree_sha} is 7cfad47ae9f68b183983090afd6337cd60fd4949.", + "WaitUntilPackagesUploaded": "Wait until the remaining packages ({count}) are uploaded", + "_WaitUntilPackagesUploaded.comment": "An example of {count} is 42.", "WaitingForChildrenToExit": "Waiting for child processes to exit...", "WaitingToTakeFilesystemLock": "waiting to take filesystem lock on {path}...", "_WaitingToTakeFilesystemLock.comment": "An example of {path} is /foo/bar.", diff --git a/src/vcpkg-test/strings.cpp b/src/vcpkg-test/strings.cpp index d31a7d956f..71b88febc7 100644 --- a/src/vcpkg-test/strings.cpp +++ b/src/vcpkg-test/strings.cpp @@ -69,6 +69,14 @@ TEST_CASE ("find_first_of", "[strings]") REQUIRE(find_first_of("abcdefg", "gb") == std::string("bcdefg")); } +TEST_CASE ("find_last", "[strings]") +{ + using vcpkg::Strings::find_last; + REQUIRE(find_last("abcdefg", 'a') == 0); + REQUIRE(find_last("abcdefg", 'g') == 6); + REQUIRE(find_last("abcdefg", 'z') == std::string::npos); +} + TEST_CASE ("contains_any_ignoring_c_comments", "[strings]") { using Strings::contains_any_ignoring_c_comments; diff --git a/src/vcpkg/base/message_sinks.cpp b/src/vcpkg/base/message_sinks.cpp index c73c0c851c..4faf05c3be 100644 --- a/src/vcpkg/base/message_sinks.cpp +++ b/src/vcpkg/base/message_sinks.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace { @@ -279,4 +280,65 @@ namespace vcpkg m_first.println(color, line); m_second.println(color, std::move(line)); } + + void BGMessageSink::println(const MessageLine& line) + { + std::lock_guard lk(m_published_lock); + if (m_print_directly_to_out_sink) + { + out_sink.println(line); + return; + } + + m_published.push_back(line); + } + + void BGMessageSink::println(MessageLine&& line) + { + std::lock_guard lk(m_published_lock); + if (m_print_directly_to_out_sink) + { + out_sink.println(std::move(line)); + return; + } + + m_published.push_back(std::move(line)); + } + + void BGMessageSink::print_published() + { + std::vector tmp; + for (;;) + { + { + std::lock_guard lk(m_published_lock); + swap(tmp, m_published); + } + + if (tmp.empty()) + { + return; + } + + for (auto&& line : tmp) + { + out_sink.println(std::move(line)); + } + + tmp.clear(); + } + } + + void BGMessageSink::publish_directly_to_out_sink() + { + std::lock_guard lk(m_published_lock); + + m_print_directly_to_out_sink = true; + for (auto&& msg : m_published) + { + out_sink.println(std::move(msg)); + } + + m_published.clear(); + } } diff --git a/src/vcpkg/base/strings.cpp b/src/vcpkg/base/strings.cpp index 6da87de4af..9296885e33 100644 --- a/src/vcpkg/base/strings.cpp +++ b/src/vcpkg/base/strings.cpp @@ -339,6 +339,12 @@ const char* Strings::find_first_of(StringView input, StringView chars) return std::find_first_of(input.begin(), input.end(), chars.begin(), chars.end()); } +std::string::size_type Strings::find_last(StringView searched, char c) +{ + auto iter = std::find(searched.rbegin(), searched.rend(), c); + return iter == searched.rend() ? std::string::npos : (&*iter - searched.begin()); +} + std::vector Strings::find_all_enclosed(StringView input, StringView left_delim, StringView right_delim) { auto it_left = input.begin(); diff --git a/src/vcpkg/binarycaching.cpp b/src/vcpkg/binarycaching.cpp index 0d4fdd702d..679aea4d8a 100644 --- a/src/vcpkg/binarycaching.cpp +++ b/src/vcpkg/binarycaching.cpp @@ -2313,14 +2313,17 @@ namespace vcpkg return std::move(ret); } - ReadOnlyBinaryCache::ReadOnlyBinaryCache(BinaryProviders&& providers) : m_config(std::move(providers)) { } + ReadOnlyBinaryCache::ReadOnlyBinaryCache(BinaryProviders&& providers) + : data(new ReadOnlyBinaryCacheData{std::move(providers)}) + { + } void ReadOnlyBinaryCache::fetch(View actions) { std::vector action_ptrs; std::vector restores; std::vector statuses; - for (auto&& provider : m_config.read) + for (auto&& provider : data->m_config.read) { action_ptrs.clear(); restores.clear(); @@ -2329,7 +2332,7 @@ namespace vcpkg { if (actions[i].package_abi()) { - CacheStatus& status = m_status[*actions[i].package_abi().get()]; + CacheStatus& status = data->m_status[*actions[i].package_abi().get()]; if (status.should_attempt_restore(provider.get())) { action_ptrs.push_back(&actions[i]); @@ -2364,8 +2367,8 @@ namespace vcpkg { if (auto abi = action.package_abi().get()) { - auto it = m_status.find(*abi); - if (it != m_status.end()) return it->second.is_restored(); + auto it = data->m_status.find(*abi); + if (it != data->m_status.end()) return it->second.is_restored(); } return false; } @@ -2374,13 +2377,13 @@ namespace vcpkg { std::vector statuses = Util::fmap(actions, [this](const auto& action) { if (!action.package_abi()) Checks::unreachable(VCPKG_LINE_INFO); - return &m_status[*action.package_abi().get()]; + return &data->m_status[*action.package_abi().get()]; }); std::vector action_ptrs; std::vector cache_result; std::vector indexes; - for (auto&& provider : m_config.read) + for (auto&& provider : data->m_config.read) { action_ptrs.clear(); cache_result.clear(); @@ -2400,7 +2403,7 @@ namespace vcpkg for (size_t i = 0; i < action_ptrs.size(); ++i) { - auto&& this_status = m_status[*action_ptrs[i]->package_abi().get()]; + auto&& this_status = data->m_status[*action_ptrs[i]->package_abi().get()]; if (cache_result[i] == CacheAvailability::available) { this_status.mark_available(provider.get()); @@ -2417,92 +2420,165 @@ namespace vcpkg }); } - BinaryCache::BinaryCache(const Filesystem& fs) : m_fs(fs) { } + BinaryCache::BinaryCache(const Filesystem& fs) : bc_data(std::make_unique(fs, data.get())) { } ExpectedL BinaryCache::make(const VcpkgCmdArguments& args, const VcpkgPaths& paths, MessageSink& sink) { - return make_binary_providers(args, paths).then([&](BinaryProviders&& p) -> ExpectedL { - BinaryCache b(std::move(p), paths.get_filesystem()); - b.m_needs_nuspec_data = Util::any_of(b.m_config.write, [](auto&& p) { return p->needs_nuspec_data(); }); - b.m_needs_zip_file = Util::any_of(b.m_config.write, [](auto&& p) { return p->needs_zip_file(); }); - if (b.m_needs_zip_file) + return make_binary_providers(args, paths).then([&](BinaryProviders&& providers) -> ExpectedL { + BinaryCache cache(std::move(providers), paths.get_filesystem()); + cache.bc_data->m_needs_nuspec_data = + Util::any_of(cache.data->m_config.write, [](auto&& p) { return p->needs_nuspec_data(); }); + cache.bc_data->m_needs_zip_file = + Util::any_of(cache.data->m_config.write, [](auto&& p) { return p->needs_zip_file(); }); + if (cache.bc_data->m_needs_zip_file) { - auto maybe_zt = ZipTool::make(paths.get_tool_cache(), sink); - if (auto z = maybe_zt.get()) + auto maybe_zip_tool = ZipTool::make(paths.get_tool_cache(), sink); + if (auto zip_tool = maybe_zip_tool.get()) { - b.m_zip_tool.emplace(std::move(*z)); + cache.bc_data->m_zip_tool.emplace(std::move(*zip_tool)); } else { - return std::move(maybe_zt).error(); + return std::move(maybe_zip_tool).error(); } } - return std::move(b); + return std::move(cache); }); } BinaryCache::BinaryCache(BinaryProviders&& providers, const Filesystem& fs) - : ReadOnlyBinaryCache(std::move(providers)), m_fs(fs) + : ReadOnlyBinaryCache(std::move(providers)), bc_data(std::make_unique(fs, data.get())) { } + BinaryCache::BinaryCacheData::BinaryCacheData(const Filesystem& fs, ReadOnlyBinaryCacheData* data) + : m_fs(fs), m_bg_msg_sink(stdout_sink), m_push_thread([this, data]() { push_thread_main(data); }) + { + } + BinaryCache::~BinaryCache() + { + if (bc_data) + { + wait_for_async_complete_and_join(); + } + } void BinaryCache::push_success(CleanPackages clean_packages, const InstallPlanAction& action) { if (auto abi = action.package_abi().get()) { - bool restored = m_status[*abi].is_restored(); + bool restored = data->m_status[*abi].is_restored(); // Purge all status information on push_success (cache invalidation) // - push_success may delete packages/ (invalidate restore) // - push_success may make the package available from providers (invalidate unavailable) - m_status.erase(*abi); - if (!restored && !m_config.write.empty()) + data->m_status.erase(*abi); + if (!restored && !data->m_config.write.empty()) { ElapsedTimer timer; BinaryPackageWriteInfo request{action}; - if (m_needs_nuspec_data) + if (bc_data->m_needs_nuspec_data) { - request.nuspec = - generate_nuspec(request.package_dir, action, m_config.nuget_prefix, m_config.nuget_repo); + request.nuspec = generate_nuspec( + request.package_dir, action, data->m_config.nuget_prefix, data->m_config.nuget_repo); } + + bc_data->m_remaining_packages_to_push++; + bc_data->m_actions_to_push.push(ActionToPush{std::move(request), clean_packages}); + return; + } + } + + if (clean_packages == CleanPackages::Yes) + { + bc_data->m_fs.remove_all(action.package_dir.value_or_exit(VCPKG_LINE_INFO), VCPKG_LINE_INFO); + } + } + + void BinaryCache::print_push_success_messages() { bc_data->m_bg_msg_sink.print_published(); } + void BinaryCache::wait_for_async_complete_and_join() { bc_data->wait_for_async_complete_and_join(); } + + void BinaryCache::BinaryCacheData::wait_for_async_complete_and_join() + { + bool have_remaining_packages = m_remaining_packages_to_push > 0; + if (have_remaining_packages) + { + m_bg_msg_sink.print_published(); + msg::println(msgWaitUntilPackagesUploaded, msg::count = m_remaining_packages_to_push.load()); + } + m_bg_msg_sink.publish_directly_to_out_sink(); + m_actions_to_push.stop(); + if (m_push_thread.joinable()) + { + m_push_thread.join(); + } + } + + void BinaryCache::BinaryCacheData::push_thread_main(ReadOnlyBinaryCacheData* ro_data) + { + std::vector my_tasks; + int count_pushed = 0; + while (true) + { + m_actions_to_push.wait_for_items(my_tasks); + if (my_tasks.empty()) + { + break; + } + for (auto& action_to_push : my_tasks) + { + ElapsedTimer timer; if (m_needs_zip_file) { - Path zip_path = request.package_dir + ".zip"; - auto compress_result = m_zip_tool.value_or_exit(VCPKG_LINE_INFO) - .compress_directory_to_zip(m_fs, request.package_dir, zip_path); + Path zip_path = action_to_push.request.package_dir + ".zip"; + auto compress_result = + m_zip_tool.value_or_exit(VCPKG_LINE_INFO) + .compress_directory_to_zip(m_fs, action_to_push.request.package_dir, zip_path); if (compress_result) { - request.zip_path = std::move(zip_path); + action_to_push.request.zip_path = std::move(zip_path); } else { - out_sink.println(Color::warning, - msg::format_warning(msgCompressFolderFailed, msg::path = request.package_dir) - .append_raw(' ') - .append_raw(compress_result.error())); + auto this_line = DiagnosticLine{DiagKind::Warning, + msg::format(msgCompressFolderFailed, + msg::path = action_to_push.request.package_dir)} + .to_message_line(); + this_line.print(" "); + this_line.print(compress_result.error()); + m_bg_msg_sink.println(std::move(this_line)); } } size_t num_destinations = 0; - for (auto&& provider : m_config.write) + for (auto&& provider : ro_data->m_config.write) { - if (!provider->needs_zip_file() || request.zip_path.has_value()) + if (!provider->needs_zip_file() || action_to_push.request.zip_path.has_value()) { - num_destinations += provider->push_success(request, out_sink); + num_destinations += provider->push_success(action_to_push.request, stdout_sink); } } - if (request.zip_path) + if (action_to_push.request.zip_path) { - m_fs.remove(*request.zip_path.get(), IgnoreErrors{}); + m_fs.remove(*action_to_push.request.zip_path.get(), IgnoreErrors{}); } - out_sink.println( + + auto status_line = msg::format( msgStoredBinariesToDestinations, msg::count = num_destinations, msg::elapsed = timer.elapsed()); - } - } + m_remaining_packages_to_push.fetch_sub(1); + if (m_actions_to_push.stopped()) + { + count_pushed++; + status_line.append_raw( + fmt::format(" ({}/{})", count_pushed, count_pushed + m_remaining_packages_to_push.load())); + } - if (clean_packages == CleanPackages::Yes) - { - m_fs.remove_all(action.package_dir.value_or_exit(VCPKG_LINE_INFO), VCPKG_LINE_INFO); + m_bg_msg_sink.println(std::move(status_line)); + if (action_to_push.clean_after_push == CleanPackages::Yes) + { + m_fs.remove_all(action_to_push.request.package_dir, VCPKG_LINE_INFO); + } + } + my_tasks.clear(); } } diff --git a/src/vcpkg/commands.ci.cpp b/src/vcpkg/commands.ci.cpp index 8d26cf8846..c58ec8bfb9 100644 --- a/src/vcpkg/commands.ci.cpp +++ b/src/vcpkg/commands.ci.cpp @@ -578,7 +578,7 @@ namespace vcpkg Checks::exit_fail(VCPKG_LINE_INFO); } } - + binary_cache.wait_for_async_complete_and_join(); Checks::exit_success(VCPKG_LINE_INFO); } } // namespace vcpkg diff --git a/src/vcpkg/commands.install.cpp b/src/vcpkg/commands.install.cpp index b7eb7ec8e0..b87c1602c8 100644 --- a/src/vcpkg/commands.install.cpp +++ b/src/vcpkg/commands.install.cpp @@ -597,6 +597,7 @@ namespace vcpkg for (auto&& action : action_plan.install_actions) { + binary_cache.print_push_success_messages(); TrackedPackageInstallGuard this_install(action_index++, action_count, results, action); auto result = perform_install_plan_action( args, paths, host_triplet, build_options, action, status_db, binary_cache, build_logs_recorder); @@ -616,6 +617,7 @@ namespace vcpkg VCPKG_LINE_INFO); return issue_body_path; })); + binary_cache.wait_for_async_complete_and_join(); Checks::exit_fail(VCPKG_LINE_INFO); } @@ -1409,7 +1411,7 @@ namespace vcpkg install_print_usage_information(*bpgh, printed_usages, fs, paths.installed()); } } - + binary_cache.wait_for_async_complete_and_join(); Checks::exit_with_code(VCPKG_LINE_INFO, summary.failed()); } diff --git a/src/vcpkg/commands.set-installed.cpp b/src/vcpkg/commands.set-installed.cpp index d525535c4c..6662a27ce3 100644 --- a/src/vcpkg/commands.set-installed.cpp +++ b/src/vcpkg/commands.set-installed.cpp @@ -268,6 +268,7 @@ namespace vcpkg summary.print_failed(); if (build_options.only_downloads == OnlyDownloads::No) { + binary_cache.wait_for_async_complete_and_join(); Checks::exit_fail(VCPKG_LINE_INFO); } } @@ -299,6 +300,7 @@ namespace vcpkg fs.write_contents(json_file_path, json_contents, VCPKG_LINE_INFO); } + binary_cache.wait_for_async_complete_and_join(); Checks::exit_success(VCPKG_LINE_INFO); } diff --git a/src/vcpkg/commands.upgrade.cpp b/src/vcpkg/commands.upgrade.cpp index 7cc1f00323..eb7dc2e040 100644 --- a/src/vcpkg/commands.upgrade.cpp +++ b/src/vcpkg/commands.upgrade.cpp @@ -214,6 +214,7 @@ namespace vcpkg msg::print(summary.format()); } + binary_cache.wait_for_async_complete_and_join(); Checks::exit_success(VCPKG_LINE_INFO); } } // namespace vcpkg