From 046caebeadb45090e08afe9007582f71e35cd363 Mon Sep 17 00:00:00 2001 From: Serge Rogatch <serge.rogatch@gmail.com> Date: Tue, 2 Nov 2021 22:48:23 +0100 Subject: [PATCH] Parallelize DIMACS formula printing --- src/common | 6 ++-- src/solvers/sat/dimacs_cnf.cpp | 64 +++++++++++++++++++++++++++------- src/solvers/sat/dimacs_cnf.h | 24 +++++++++++++ src/util/blocking_queue.h | 57 ++++++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 16 deletions(-) create mode 100644 src/util/blocking_queue.h diff --git a/src/common b/src/common index d6de5219be7..f7815660db3 100644 --- a/src/common +++ b/src/common @@ -52,7 +52,7 @@ ifeq ($(filter-out OSX OSX_Universal,$(BUILD_ENV_)),) LINKFLAGS += -force_cpusubtype_ALL -arch arm64 -arch x86_64 endif LINKLIB = /usr/bin/libtool -static -o $@ $^ - LINKBIN = $(CXX) $(LINKFLAGS) -o $@ $^ $(LIBS) + LINKBIN = $(CXX) $(LINKFLAGS) -o $@ $^ -pthread $(LIBS) LINKNATIVE = $(HOSTCXX) -o $@ $^ ifeq ($(origin CC),default) CC = clang @@ -63,7 +63,7 @@ ifeq ($(filter-out OSX OSX_Universal,$(BUILD_ENV_)),) else ifeq ($(filter-out FreeBSD,$(BUILD_ENV_)),) CP_CXXFLAGS += LINKLIB = ar rcT $@ $^ - LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group $(LIBS) + LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group -lpthread $(LIBS) LINKNATIVE = $(HOSTCXX) -o $@ $^ ifeq ($(origin CC),default) CC = clang @@ -73,7 +73,7 @@ else ifeq ($(filter-out FreeBSD,$(BUILD_ENV_)),) endif else LINKLIB = ar rcT $@ $^ - LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group $(LIBS) + LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group -pthread $(LIBS) LINKNATIVE = $(HOSTCXX) -o $@ $^ ifeq ($(origin CC),default) CC = gcc diff --git a/src/solvers/sat/dimacs_cnf.cpp b/src/solvers/sat/dimacs_cnf.cpp index 40f2a7c9f82..ac63ed68719 100644 --- a/src/solvers/sat/dimacs_cnf.cpp +++ b/src/solvers/sat/dimacs_cnf.cpp @@ -13,6 +13,7 @@ Author: Daniel Kroening, kroening@kroening.com #include <util/magic.h> #include <iostream> +#include <thread> dimacs_cnft::dimacs_cnft(message_handlert &message_handler) : cnf_clause_listt(message_handler), break_lines(false) @@ -76,25 +77,62 @@ void dimacs_cnft::write_dimacs_clause( out << "0" << "\n"; } +void dimacs_cnft::wait_file_block(const size_t ordinal) { + std::unique_lock<std::mutex> lock(writing_sync); + for(;;) { + assert(next_file_block <= ordinal); + if(next_file_block == ordinal) { + return; + } + block_written.wait(lock); + } +} + +void dimacs_cnft::printer_entry(std::ostream *out) { + clause_range item; + std::stringstream output_block; + while(print_queue.pop(item)) { + output_block.str(""); + output_block.clear(); + + for(clausest::const_iterator it=item.first; it!=item.limit; it++) { + write_dimacs_clause(*it, output_block, break_lines); + } + + wait_file_block(item.ordinal); + *out << output_block.str(); + std::unique_lock<std::mutex> lock(writing_sync); + next_file_block++; + block_written.notify_all(); + } +} + void dimacs_cnft::write_clauses(std::ostream &out) { - std::size_t count = 0; - std::stringstream output_block; + std::vector<std::thread> pool; + const size_t thread_count = std::max(2u, std::thread::hardware_concurrency()) - 1; + for(size_t i=0; i<thread_count; i++) { + pool.emplace_back(&dimacs_cnft::printer_entry, this, &out); + } + next_file_block = 0; + size_t total_blocks = 0; for(clausest::const_iterator it=clauses.begin(); - it!=clauses.end(); it++) + it!=clauses.end(); ) { - write_dimacs_clause(*it, output_block, break_lines); - - // print the block once in a while - if(++count % CNF_DUMP_BLOCK_SIZE == 0) - { - out << output_block.str(); - output_block.str(""); + clausest::const_iterator first = it; + size_t total_size = 0; + while(total_size < target_block_size && it != clauses.end()) { + total_size += it->size(); + it++; } + print_queue.push(clause_range(total_blocks, first, it)); + total_blocks++; + } + print_queue.request_shutdown(); + wait_file_block(total_blocks); + for(size_t i=0; i<pool.size(); i++) { + pool[i].join(); } - - // make sure the final block is printed as well - out << output_block.str(); } void dimacs_cnf_dumpt::lcnf(const bvt &bv) diff --git a/src/solvers/sat/dimacs_cnf.h b/src/solvers/sat/dimacs_cnf.h index 4f4296e8f91..c676ae935e3 100644 --- a/src/solvers/sat/dimacs_cnf.h +++ b/src/solvers/sat/dimacs_cnf.h @@ -11,11 +11,29 @@ Author: Daniel Kroening, kroening@kroening.com #define CPROVER_SOLVERS_SAT_DIMACS_CNF_H #include <iosfwd> +#include <atomic> +#include <condition_variable> #include "cnf_clause_list.h" +#include <util/blocking_queue.h> class dimacs_cnft:public cnf_clause_listt { +protected: + // Optimal block size, in clauses + const size_t target_block_size = 1<<16; + + struct clause_range { + size_t ordinal; + clausest::const_iterator first, limit; + explicit clause_range(const size_t ordinal, + const clausest::const_iterator first, + const clausest::const_iterator limit) + : ordinal(ordinal), first(first), limit(limit) + { } + clause_range() : ordinal(0), first(nullptr), limit(nullptr) { } + }; + public: explicit dimacs_cnft(message_handlert &); virtual ~dimacs_cnft() { } @@ -38,8 +56,14 @@ class dimacs_cnft:public cnf_clause_listt protected: void write_problem_line(std::ostream &out); void write_clauses(std::ostream &out); + void wait_file_block(const size_t ordinal); + void printer_entry(std::ostream *out); bool break_lines; + size_t next_file_block; + std::mutex writing_sync; + std::condition_variable block_written; + blocking_queue<clause_range> print_queue; }; class dimacs_cnf_dumpt:public cnft diff --git a/src/util/blocking_queue.h b/src/util/blocking_queue.h new file mode 100644 index 00000000000..fe1ac35f186 --- /dev/null +++ b/src/util/blocking_queue.h @@ -0,0 +1,57 @@ +#ifndef CPROVER_UTIL_BLOCKING_QUEUE +#define CPROVER_UTIL_BLOCKING_QUEUE + +#include <mutex> +#include <condition_variable> +#include <queue> + +template <typename T> class blocking_queue { + std::condition_variable can_pop; + std::mutex sync; + std::queue<T> qu; + bool shutdown = false; + +public: + void push(const T& item) + { + { + std::unique_lock<std::mutex> lock(sync); + qu.push(item); + } + can_pop.notify_one(); + } + + void request_shutdown() + { + { + std::unique_lock<std::mutex> lock(sync); + shutdown = true; + } + can_pop.notify_all(); + } + + bool pop(T &item) + { + std::unique_lock<std::mutex> lock(sync); + for (;;) + { + if (qu.empty()) + { + if (shutdown) + { + return false; + } + } + else + { + break; + } + can_pop.wait(lock); + } + item = std::move(qu.front()); + qu.pop(); + return true; + } +}; + +#endif // CPROVER_UTIL_BLOCKING_QUEUE