diff --git a/python-bindings/chiapos.cpp b/python-bindings/chiapos.cpp index ecf8ff238..cff25ff6e 100644 --- a/python-bindings/chiapos.cpp +++ b/python-bindings/chiapos.cpp @@ -49,7 +49,9 @@ PYBIND11_MODULE(chiapos, m) uint32_t num_buckets, uint32_t stripe_size, uint8_t num_threads, - bool nobitfield) { + bool nobitfield, + const std::string runtime_dir, + uint32_t phase1_max_processes) { std::string memo_str(memo); const uint8_t *memo_ptr = reinterpret_cast(memo_str.data()); std::string id_str(id); @@ -68,7 +70,9 @@ PYBIND11_MODULE(chiapos, m) num_buckets, stripe_size, num_threads, - nobitfield); + nobitfield, + runtime_dir, + phase1_max_processes); } catch (const std::exception &e) { std::cout << "Caught plotting error: " << e.what() << std::endl; throw e; diff --git a/src/cli.cpp b/src/cli.cpp index e4e88881d..334a51312 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -82,6 +82,8 @@ int main(int argc, char *argv[]) try { string memo = "0102030405"; string id = "022fb42c08c12de3a6af053880199806532e79515f94e83461612101f9412f9e"; bool nobitfield = false; + uint32_t phase1_max_processes = 0; + string runtimedir = "."; uint32_t buffmegabytes = 0; options.allow_unrecognised_options().add_options()( @@ -96,6 +98,8 @@ int main(int argc, char *argv[]) try { "m, memo", "Memo to insert into the plot", cxxopts::value(memo))( "i, id", "Unique 32-byte seed for the plot", cxxopts::value(id))( "e, nobitfield", "Disable bitfield", cxxopts::value(nobitfield))( + "p1maxproc", "Phase 1 max process count", cxxopts::value(phase1_max_processes))( + "runtimedir", "Runtime directory", cxxopts::value(runtimedir))( "b, buffer", "Megabytes to be used as buffer for sorting and plotting", cxxopts::value(buffmegabytes))("help", "Print help"); @@ -145,7 +149,9 @@ int main(int argc, char *argv[]) try { num_buckets, num_stripes, num_threads, - nobitfield); + nobitfield, + runtimedir, + phase1_max_processes); } else if (operation == "prove") { if (argc < 3) { HelpAndQuit(options); diff --git a/src/disk_util.hpp b/src/disk_util.hpp new file mode 100644 index 000000000..267b1fce0 --- /dev/null +++ b/src/disk_util.hpp @@ -0,0 +1,351 @@ +// Copyright 2018 Chia Network Inc + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef SRC_CPP_DISK_UTIL_HPP_ +#define SRC_CPP_DISK_UTIL_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#include +#include +#include +#endif + +#if !defined(_WIN32) && !defined(__APPLE__) +#include +#endif + +#include "chia_filesystem.hpp" +#include "util.hpp" + +using namespace std::chrono; +using namespace std::chrono_literals; + +namespace DiskUtil { + +#if !defined(__APPLE__) && !defined(_WIN32) + inline fs::path DevicePath(dev_t dev_id) + { + dev_t dev_id_major = major(dev_id); + dev_t dev_id_minor = minor(dev_id); + + std::ostringstream os; + os << "/sys/dev/block/" << dev_id_major << ":" << dev_id_minor; + std::string symlink = os.str(); + + std::unique_ptr pathbuf { + realpath(symlink.c_str(), nullptr), free }; + if (!pathbuf) { + std::ostringstream err; + std::cerr << "Unable to find full device path: " + << strerror(errno) << std::endl; + return fs::path(); + } + + return fs::path(pathbuf.get()); + } +#endif + + inline bool IsRotational(const std::string &dir) + { +#if defined(__APPLE__) || defined(_WIN32) + return false; +#else + struct stat s{}; + + if (0 != stat(dir.c_str(), &s)) { + std::ostringstream err; + std::cerr << "Unable to find device name for dir " << dir << ": " + << strerror(errno) << std::endl; + return false; + } + + fs::path device_path = DevicePath(s.st_dev); + if (device_path.empty()) { + return false; + } + + fs::path filename; + for (;;) { + filename = device_path / "queue" / "rotational"; + + if (fs::exists(filename)) { + break; + } + + if (!device_path.has_parent_path()) { + std::cerr << "Unable to determine device media type" << std::endl; + return false; + } + device_path = device_path.parent_path(); + } + + std::ifstream file; + file.open(filename.c_str()); + + if (file.fail()) { + std::ostringstream err; + std::cerr << "Unable to open " << filename << " for reading: " + << strerror(errno) << std::endl; + return false; + } + + std::string line; + getline(file, line); + + file.close(); + + return !line.empty() && line.front() == '1'; +#endif + } + + inline bool ShouldLock(const std::string &dir) { + return DiskUtil::IsRotational(dir); + } + + inline int LockDirectory( + std::string dirname) + { +#ifdef _WIN32 + return -1; +#else + int dir_fd = open(dirname.c_str(), O_RDONLY | O_NOCTTY); + if (dir_fd == -1) { + std::cerr << "Unable to open directory for locking: " << dirname + << ". Error: " << strerror(errno) << std::endl; + return -1; + } + while (0 != flock(dir_fd, LOCK_EX | LOCK_NB)) { + if (EWOULDBLOCK == errno) { + std::this_thread::sleep_for(10s); + } else { + std::cerr << "Unable to lock directory (retrying in 1 minute): " + << ". Error: " << strerror(errno) << std::endl; + std::this_thread::sleep_for(60s); + } + } + return dir_fd; +#endif + } + + inline bool UnlockDirectory( + int dir_fd, + std::string dirname) + { +#ifdef _WIN32 + return false; +#else + if (-1 == flock(dir_fd, LOCK_UN)) { + std::cerr << "Failed to unlock the directory: " << dirname + << ". Error: " << strerror(errno) << std::endl; + return false; + } + if (-1 == close(dir_fd)) { + std::cerr << "Failed to close the directory during unlocking: " << dirname + << ". Error: " << strerror(errno) << std::endl; + return false; + } + return true; +#endif + } +} + +class DirectoryLock +{ +public: + DirectoryLock(const std::string &dirname, bool lock = true) + { + dirname_ = dirname; + if (lock) { + Lock(); + } + } + + DirectoryLock(const DirectoryLock&) = delete; + + virtual ~DirectoryLock() + { + Unlock(); + } + + bool Lock() + { + if (fd_ == -1) { + std::cout << "Acquiring directory lock: " << dirname_ << std::endl; + + steady_clock::time_point start = steady_clock::now(); + fd_ = DiskUtil::LockDirectory(dirname_); + steady_clock::time_point end = steady_clock::now(); + + std::cout << "Lock acquired (took " + << duration_cast(end - start).count() + << " sec)" << std::endl; + } + return fd_ != -1; + } + + bool Unlock() + { + if (fd_ == -1) { + return false; + } + std::cout << "Releasing directory lock: " << dirname_ << std::endl; + if (!DiskUtil::UnlockDirectory(fd_, dirname_)) { + return false; + } + fd_ = -1; + return true; + } + +private: + int fd_ = -1; + std::string dirname_; +}; + +class MultiFileLock +{ +public: + MultiFileLock(const std::string &runtime_dir, const std::string &lock_name, + int max_slots, bool lock = true) + { + runtime_dir_ = runtime_dir; + lock_name_ = lock_name; + + std::ostringstream prefix; + prefix << "." << lock_name << "-lock"; + prefix_ = prefix.str(); + + max_slots_ = max_slots; + if (lock) { + Lock(); + } + } + + MultiFileLock(const MultiFileLock&) = delete; + + virtual ~MultiFileLock() + { + Unlock(); + } + + bool Lock() + { +#ifdef _WIN32 + return false; +#else + if (max_slots_ < 1 || fd_ != -1) { + return false; + } + + std::cout << "Acquiring " << lock_name_ << " lock" << std::endl; + + steady_clock::time_point start = steady_clock::now(); + while (!TryLock()) { + std::this_thread::sleep_for(20s); + } + steady_clock::time_point end = steady_clock::now(); + + std::cout << "Lock acquired (took " + << duration_cast(end - start).count() + << " sec)" << std::endl; + + return true; +#endif + } + + bool Unlock() + { +#ifdef _WIN32 + return false; +#else + if (fd_ == -1) { + return false; + } + + std::cout << "Releasing " << lock_name_ << " lock" << std::endl; + + if (-1 == flock(fd_, LOCK_UN)) { + std::cerr << "Failed to unlock the file: " << strerror(errno) + << std::endl; + return false; + } + + if (-1 == close(fd_)) { + std::cerr << "Failed to close the file during unlocking: " + << strerror(errno) << std::endl; + return false; + } + + fd_ = -1; + + return true; +#endif + } + +private: +#ifndef _WIN32 + bool TryLock() + { + for (int current_slot = 0; current_slot < max_slots_; ++current_slot) { + fs::path path(runtime_dir_); + std::ostringstream filename; + filename << prefix_ << "-" << current_slot; + path.append(filename.str()); + + std::string fullname = path.string(); + + fd_ = open(fullname.c_str(), O_CREAT | O_RDONLY | O_NOCTTY, 0666); + if (fd_ == -1) { + std::cerr << "Unable to open file for locking: " << fullname + << ". Error: " << strerror(errno) << std::endl; + return false; + } + if (0 == flock(fd_, LOCK_EX | LOCK_NB)) { + return true; + } + if (EWOULDBLOCK != errno) { + std::cerr << "Error while trying to lock " << fullname << ": " + << strerror(errno) << std::endl; + } + if (-1 == close(fd_)) { + std::cerr << "Failed to close " << fullname << ": " + << strerror(errno) << std::endl; + } + fd_ = -1; + } + return false; + } +#endif + + int fd_ = -1; + std::string runtime_dir_; + std::string lock_name_; + std::string prefix_; + int max_slots_ = 0; +}; + +#endif // SRC_CPP_DISK_UTIL_HPP_ + + + diff --git a/src/plotter_disk.hpp b/src/plotter_disk.hpp index 14c1123ce..e490c2434 100644 --- a/src/plotter_disk.hpp +++ b/src/plotter_disk.hpp @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include "chia_filesystem.hpp" @@ -47,6 +49,7 @@ #include "pos_constants.hpp" #include "sort_manager.hpp" #include "util.hpp" +#include "disk_util.hpp" #define B17PHASE23 @@ -69,7 +72,9 @@ class DiskPlotter { uint32_t num_buckets_input = 0, uint64_t stripe_size_input = 0, uint8_t num_threads_input = 0, - bool nobitfield = false) + bool nobitfield = false, + std::string runtime_dir = ".", + uint32_t phase1_max_processes = 0) { // Increases the open file limit, we will open a lot of files. #ifndef _WIN32 @@ -153,9 +158,14 @@ class DiskPlotter { } #endif /* defined(_WIN32) || defined(__x86_64__) */ + const char is_parallel_writing_enabled = !DiskUtil::ShouldLock(final_dirname); + std::cout << std::endl << "Starting plotting progress into temporary dirs: " << tmp_dirname << " and " << tmp2_dirname << std::endl; + std::cout << "Final dir: " << final_dirname << " (parallel writing: " + << (is_parallel_writing_enabled ? "enabled" : "disabled") + << ")" << std::endl; std::cout << "ID: " << Util::HexStr(id, id_len) << std::endl; std::cout << "Plot size is: " << static_cast(k) << std::endl; std::cout << "Buffer size is: " << buf_megabytes << "MiB" << std::endl; @@ -208,12 +218,18 @@ class DiskPlotter { assert(id_len == kIdLen); + if (phase1_max_processes > 0) { + std::cout << std::endl; + } + MultiFileLock lock(runtime_dir, "phase1", phase1_max_processes); + std::cout << std::endl << "Starting phase 1/4: Forward Propagation into tmp files... " << Timer::GetNow(); - Timer p1; Timer all_phases; + + Timer p1; std::vector table_sizes = RunPhase1( tmp_1_disks, k, @@ -227,6 +243,8 @@ class DiskPlotter { num_threads); p1.PrintElapsed("Time for phase 1 ="); + lock.Unlock(); + uint64_t finalsize=0; if(nobitfield) @@ -373,8 +391,14 @@ class DiskPlotter { } } else { if (!bCopied) { + bool should_lock = DiskUtil::ShouldLock(final_dirname); + DirectoryLock dir_lock(final_dirname, should_lock); + fs::copy( tmp_2_filename, final_2_filename, fs::copy_options::overwrite_existing, ec); + + dir_lock.Unlock(); + if (ec.value() != 0) { std::cout << "Could not copy " << tmp_2_filename << " to " << final_2_filename << ". Error " << ec.message() @@ -404,11 +428,8 @@ class DiskPlotter { } if (!bRenamed) { -#ifdef _WIN32 - Sleep(5 * 60000); -#else - sleep(5 * 60); -#endif + using namespace std::chrono_literals; + std::this_thread::sleep_for(5min); } } while (!bRenamed); } diff --git a/tests/test_python_bindings.py b/tests/test_python_bindings.py index 1e413aebe..0ab8d4410 100644 --- a/tests/test_python_bindings.py +++ b/tests/test_python_bindings.py @@ -59,6 +59,8 @@ def test_k_21(self): 8192, 8, False, + ".", + 0 ) pl = None