-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Binary cache: async push_success #908
base: main
Are you sure you want to change the base?
Changes from 49 commits
95f0438
9d999d8
163d9cd
2a54205
0912655
5d7288c
10189ac
2567607
ecdd000
8e7ae61
850d7c9
548be38
6dbbf06
74b86fd
5171d3e
d69ed8f
2df42d5
5f1786e
93303c3
8a26c8b
aa7e52f
d46a4d6
5e51718
a9ac558
4faf674
b9be8c6
78ca081
579bfa9
103968e
dd32416
b666f94
15bb503
d995bfd
24cd026
92fc76b
3527227
48305b3
27fa076
bcd459a
f958d36
7a24007
50114f9
ca5f2b1
eccd9ee
e7837e0
969e7fc
2d5586f
809d0b6
455e29b
03fdfea
f4bad8c
26bbbd5
814e434
290e586
3cc3378
978ceae
47b56ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#pragma once | ||
|
||
#include <condition_variable> | ||
#include <mutex> | ||
#include <vector> | ||
|
||
template<class T> | ||
class BatchQueue | ||
{ | ||
public: | ||
template<class... Args> | ||
void push(Args&&... args) | ||
{ | ||
forward.emplace_back(std::forward<Args>(args)...); | ||
} | ||
|
||
bool empty() const { return forward.empty(); } | ||
|
||
void pop(std::vector<T>& out) | ||
{ | ||
out.clear(); | ||
swap(out, forward); | ||
} | ||
|
||
private: | ||
std::vector<T> forward; | ||
}; | ||
|
||
template<class WorkItem> | ||
struct BGThreadBatchQueue | ||
{ | ||
template<class... Args> | ||
void push(Args&&... args) | ||
{ | ||
std::lock_guard<std::mutex> lock(m_mtx); | ||
m_tasks.push(std::forward<Args>(args)...); | ||
m_cv.notify_all(); | ||
} | ||
|
||
void wait_for_items(std::vector<WorkItem>& out) | ||
{ | ||
std::unique_lock<std::mutex> lock(m_mtx); | ||
m_cv.wait(lock, [this]() { return !m_tasks.empty() || !m_running; }); | ||
m_tasks.pop(out); | ||
} | ||
|
||
void stop() | ||
{ | ||
std::lock_guard<std::mutex> lock(m_mtx); | ||
m_running = false; | ||
m_cv.notify_all(); | ||
} | ||
|
||
bool stopped() const { return !m_running; } | ||
|
||
private: | ||
std::mutex m_mtx; | ||
std::condition_variable m_cv; | ||
BatchQueue<WorkItem> m_tasks; | ||
bool m_running = true; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,4 +10,5 @@ namespace vcpkg | |
|
||
struct FileSink; | ||
struct CombiningSink; | ||
struct BGMessageSink; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
|
||
#include <vcpkg/base/messages.h> | ||
|
||
#include <mutex> | ||
|
||
namespace vcpkg | ||
{ | ||
|
||
|
@@ -75,4 +77,30 @@ namespace vcpkg | |
CombiningSink(MessageSink& first, MessageSink& second) : m_first(first), m_second(second) { } | ||
void print(Color c, StringView sv) override; | ||
}; | ||
|
||
struct BGMessageSink final : MessageSink | ||
{ | ||
BGMessageSink(MessageSink& out_sink) : out_sink(out_sink) { } | ||
~BGMessageSink() { publish_directly_to_out_sink(); } | ||
// must be called from producer | ||
void print(Color c, StringView sv) override; | ||
using MessageSink::print; | ||
BillyONeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// must be called from consumer (synchronizer of out) | ||
void print_published(); | ||
|
||
void publish_directly_to_out_sink(); | ||
|
||
private: | ||
MessageSink& out_sink; | ||
|
||
std::mutex m_published_lock; | ||
std::vector<std::pair<Color, std::string>> m_published; | ||
|
||
// buffers messages until newline is reached | ||
// guarded by m_print_directly_lock | ||
std::vector<std::pair<Color, std::string>> m_unpublished; | ||
BillyONeal marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+98
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @BillyONeal Now that you have implemented the "error document" type stuff, I am right that this should be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure yet, I ran into a problem where I'm not sure exactly how "status" type information should be conveyed through this infrastructure; I'm working on it over here: main...BillyONeal:vcpkg-tool:message-sink-line As part of that I realized that #1137 touches the same area and would be easier to merge first and I'm doing that right now... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does a "status" need to be conveyed here at all? This are only informational messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some messages, like "I am about to touch the network now", are time sensitive and under normal conditions must be printed. However, errors/warnings like "the download failed" might need to be suppressed, if a subsequent retry / alternate makes the overall process succeed. For example, when downloading a file, in "time order" let's say this happens:
If any of this is happening on 'background' threads, even the 'timely' messages need to be held until the next synchronization point with the thread that owns the console. This is what 'statusln' is for in my WIP. They need to share one channel rather than just passing both MessageSink and DiagnosticContext to handle this background case where a caller wants to keep the original-time-order of diagnostics and status. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This PR already does this (for the scope of the PR)
That is already the case in the PR. But yeah this PR don't know when a message ends (if it spans multiple lines), the whole reason for the error document type stuff to be created. But could this not simply be solved by buffering There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Then 'inner' parts have no way to emit the 'intended to be timely' messages. What I'm doing is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Iiuc 'intended to be timely' = must be printed immediately: The inner parts that don't run in the background can print there stuff immediately via the MessageSink/DiagnosticContext and the stuff in the background thread is never allowed to print stuff immediately otherwise you get interleaved messages with the build output. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, there's a third condition, which is step 4 in my example above. We need to buffer errors and warnings from the inner operation, because we may want to swallow and not emit them if a subsequent attempt succeeds, but we must not buffer any of the timely status messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aah thanks for the example! :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it has nothing to do with your PR. It's a problem I ran into trying to |
||
std::mutex m_print_directly_lock; | ||
bool m_print_directly_to_out_sink = false; | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,12 +3,15 @@ | |||||
#include <vcpkg/base/fwd/message_sinks.h> | ||||||
|
||||||
#include <vcpkg/fwd/binarycaching.h> | ||||||
#include <vcpkg/fwd/build.h> | ||||||
#include <vcpkg/fwd/dependencies.h> | ||||||
#include <vcpkg/fwd/tools.h> | ||||||
#include <vcpkg/fwd/vcpkgpaths.h> | ||||||
|
||||||
#include <vcpkg/base/batch-queue.h> | ||||||
#include <vcpkg/base/downloads.h> | ||||||
#include <vcpkg/base/expected.h> | ||||||
#include <vcpkg/base/message_sinks.h> | ||||||
#include <vcpkg/base/path.h> | ||||||
|
||||||
#include <vcpkg/archives.h> | ||||||
|
@@ -18,6 +21,7 @@ | |||||
#include <iterator> | ||||||
#include <set> | ||||||
#include <string> | ||||||
#include <thread> | ||||||
#include <unordered_map> | ||||||
#include <vector> | ||||||
|
||||||
|
@@ -196,23 +200,41 @@ namespace vcpkg | |||||
|
||||||
struct BinaryCache : ReadOnlyBinaryCache | ||||||
BillyONeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
static ExpectedL<BinaryCache> make(const VcpkgCmdArguments& args, const VcpkgPaths& paths, MessageSink& sink); | ||||||
static ExpectedL<std::unique_ptr<BinaryCache>> make(const VcpkgCmdArguments& args, | ||||||
const VcpkgPaths& paths, | ||||||
MessageSink& sink); | ||||||
|
||||||
BinaryCache(const Filesystem& fs); | ||||||
BinaryCache(const BinaryCache&) = delete; | ||||||
BinaryCache(BinaryCache&&) = default; | ||||||
BinaryCache(BinaryCache&&) = delete; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree with this design change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How would you implement that? The function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have now put all data in an extra struct that is hold via a std::unique_ptr by the BinaryCache class. |
||||||
~BinaryCache(); | ||||||
|
||||||
/// Called upon a successful build of `action` to store those contents in the binary cache. | ||||||
void push_success(const InstallPlanAction& action); | ||||||
|
||||||
void print_push_success_messages(); | ||||||
void wait_for_async_complete_and_join(); | ||||||
|
||||||
private: | ||||||
BinaryCache(BinaryProviders&& providers, const Filesystem& fs); | ||||||
|
||||||
const Filesystem& m_fs; | ||||||
Optional<ZipTool> m_zip_tool; | ||||||
bool m_needs_nuspec_data = false; | ||||||
bool m_needs_zip_file = false; | ||||||
|
||||||
struct ActionToPush | ||||||
{ | ||||||
BinaryPackageWriteInfo request; | ||||||
CleanPackages clean_after_push; | ||||||
}; | ||||||
|
||||||
void push_thread_main(); | ||||||
|
||||||
BGMessageSink m_bg_msg_sink; | ||||||
BGThreadBatchQueue<ActionToPush> m_actions_to_push; | ||||||
std::atomic_int m_remaining_packages_to_push = 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This member existing at all is a smell to me. The number of items in the queue should be a part of the queue, not something tracked externally. Moreover, the queue already contains locks and stuff so I'm not sure why we need an atomic here. |
||||||
std::thread m_push_thread; | ||||||
}; | ||||||
|
||||||
ExpectedL<DownloadManagerConfig> parse_download_configuration(const Optional<std::string>& arg); | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1613,10 +1613,6 @@ | |
"UpgradeInManifest": "Upgrade upgrades a classic mode installation and thus does not support manifest mode. Consider updating your dependencies by updating your baseline to a current value with vcpkg x-update-baseline and running vcpkg install.", | ||
"_UpgradeInManifest.comment": "'vcpkg x-update-baseline' and 'vcpkg install' are command lines and should not be localized.", | ||
"UpgradeRunWithNoDryRun": "If you are sure you want to rebuild the above packages, run this command with the --no-dry-run option.", | ||
"UploadedBinaries": "Uploaded binaries to {count} {vendor}.", | ||
"_UploadedBinaries.comment": "An example of {count} is 42. An example of {vendor} is Azure.", | ||
"UploadedPackagesToVendor": "Uploaded {count} package(s) to {vendor} in {elapsed}", | ||
"_UploadedPackagesToVendor.comment": "An example of {count} is 42. An example of {elapsed} is 3.532 min. An example of {vendor} is Azure.", | ||
"UploadingBinariesToVendor": "Uploading binaries for '{spec}' to '{vendor}' source \"{path}\".", | ||
"_UploadingBinariesToVendor.comment": "An example of {spec} is zlib:x64-windows. An example of {vendor} is Azure. An example of {path} is /foo/bar.", | ||
"UploadingBinariesUsingVendor": "Uploading binaries for '{spec}' using '{vendor}' \"{path}\".", | ||
|
@@ -1718,6 +1714,8 @@ | |
"VersionTableHeader": "Version", | ||
"VersionVerifiedOK": "OK: {version_spec} -> {commit_sha}", | ||
"_VersionVerifiedOK.comment": "An example of {version_spec} is zlib:[email protected]. An example of {commit_sha} is 7cfad47ae9f68b183983090afd6337cd60fd4949.", | ||
"WaitUntilPackagesUploaded": "Wait until the remaining packages ({count}) are uploaded", | ||
"_WaitUntilPackagesUploaded.comment": "An example of {count} is 42.", | ||
"WaitingForChildrenToExit": "Waiting for child processes to exit...", | ||
"WaitingToTakeFilesystemLock": "waiting to take filesystem lock on {path}...", | ||
"_WaitingToTakeFilesystemLock.comment": "An example of {path} is /foo/bar.", | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||||
#include <vcpkg/base/file_sink.h> | ||||||||
#include <vcpkg/base/message_sinks.h> | ||||||||
#include <vcpkg/base/strings.h> | ||||||||
|
||||||||
namespace | ||||||||
{ | ||||||||
|
@@ -58,4 +59,74 @@ namespace vcpkg | |||||||
m_second.print(c, sv); | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::print(Color c, StringView sv) | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> print_lk(m_print_directly_lock); | ||||||||
if (m_print_directly_to_out_sink) | ||||||||
{ | ||||||||
out_sink.print(c, sv); | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
auto pos = Strings::find_last(sv, '\n'); | ||||||||
if (pos != std::string::npos) | ||||||||
{ | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
m_published.insert(m_published.end(), | ||||||||
std::make_move_iterator(m_unpublished.begin()), | ||||||||
std::make_move_iterator(m_unpublished.end())); | ||||||||
m_published.emplace_back(c, sv.substr(0, pos + 1)); | ||||||||
} | ||||||||
m_unpublished.clear(); | ||||||||
if (sv.size() > pos + 1) | ||||||||
{ | ||||||||
m_unpublished.emplace_back(c, sv.substr(pos + 1)); | ||||||||
} | ||||||||
} | ||||||||
else | ||||||||
{ | ||||||||
m_unpublished.emplace_back(c, sv); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::print_published() | ||||||||
{ | ||||||||
std::vector<std::pair<Color, std::string>> tmp; | ||||||||
for (;;) | ||||||||
{ | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
swap(tmp, m_published); | ||||||||
} | ||||||||
|
||||||||
if (tmp.empty()) | ||||||||
{ | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
for (auto&& msg : tmp) | ||||||||
{ | ||||||||
out_sink.print(msg.first, msg.second); | ||||||||
} | ||||||||
|
||||||||
tmp.clear(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
void BGMessageSink::publish_directly_to_out_sink() | ||||||||
{ | ||||||||
std::lock_guard<std::mutex> print_lk(m_print_directly_lock); | ||||||||
std::lock_guard<std::mutex> lk(m_published_lock); | ||||||||
Comment on lines
+119
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
if this survives There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you actually mean |
||||||||
|
||||||||
m_print_directly_to_out_sink = true; | ||||||||
for (auto& messages : {&m_published, &m_unpublished}) | ||||||||
{ | ||||||||
for (auto&& msg : *messages) | ||||||||
{ | ||||||||
out_sink.print(msg.first, msg.second); | ||||||||
} | ||||||||
messages->clear(); | ||||||||
} | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this thing being called queue given that this is how it works. Given that we expect this to be a multi producer single consumer queue, can we instead put the vector inside and note that only one thread may call pop but any number of threads may call push? That would also resolve the criticism over separate tracking atomics below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchQueue
alone is not thread safe.I don't get what you have in mind here 😅
Do you have an idea for a better name? :)