From 046caebeadb45090e08afe9007582f71e35cd363 Mon Sep 17 00:00:00 2001
From: Serge Rogatch <serge.rogatch@gmail.com>
Date: Tue, 2 Nov 2021 22:48:23 +0100
Subject: [PATCH] Parallelize DIMACS formula printing

---
 src/common                     |  6 ++--
 src/solvers/sat/dimacs_cnf.cpp | 64 +++++++++++++++++++++++++++-------
 src/solvers/sat/dimacs_cnf.h   | 24 +++++++++++++
 src/util/blocking_queue.h      | 57 ++++++++++++++++++++++++++++++
 4 files changed, 135 insertions(+), 16 deletions(-)
 create mode 100644 src/util/blocking_queue.h

diff --git a/src/common b/src/common
index d6de5219be7..f7815660db3 100644
--- a/src/common
+++ b/src/common
@@ -52,7 +52,7 @@ ifeq ($(filter-out OSX OSX_Universal,$(BUILD_ENV_)),)
     LINKFLAGS += -force_cpusubtype_ALL -arch arm64 -arch x86_64
   endif
   LINKLIB = /usr/bin/libtool -static -o $@ $^
-  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ $^ $(LIBS)
+  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ $^ -pthread $(LIBS)
   LINKNATIVE = $(HOSTCXX) -o $@ $^
   ifeq ($(origin CC),default)
     CC     = clang
@@ -63,7 +63,7 @@ ifeq ($(filter-out OSX OSX_Universal,$(BUILD_ENV_)),)
 else ifeq ($(filter-out FreeBSD,$(BUILD_ENV_)),)
   CP_CXXFLAGS +=
   LINKLIB = ar rcT $@ $^
-  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group $(LIBS)
+  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group -lpthread $(LIBS)
   LINKNATIVE = $(HOSTCXX) -o $@ $^
   ifeq ($(origin CC),default)
     CC     = clang
@@ -73,7 +73,7 @@ else ifeq ($(filter-out FreeBSD,$(BUILD_ENV_)),)
   endif
 else
   LINKLIB = ar rcT $@ $^
-  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group $(LIBS)
+  LINKBIN = $(CXX) $(LINKFLAGS) -o $@ -Wl,--start-group $^ -Wl,--end-group -pthread $(LIBS)
   LINKNATIVE = $(HOSTCXX) -o $@ $^
   ifeq ($(origin CC),default)
     CC     = gcc
diff --git a/src/solvers/sat/dimacs_cnf.cpp b/src/solvers/sat/dimacs_cnf.cpp
index 40f2a7c9f82..ac63ed68719 100644
--- a/src/solvers/sat/dimacs_cnf.cpp
+++ b/src/solvers/sat/dimacs_cnf.cpp
@@ -13,6 +13,7 @@ Author: Daniel Kroening, kroening@kroening.com
 #include <util/magic.h>
 
 #include <iostream>
+#include <thread>
 
 dimacs_cnft::dimacs_cnft(message_handlert &message_handler)
   : cnf_clause_listt(message_handler), break_lines(false)
@@ -76,25 +77,62 @@ void dimacs_cnft::write_dimacs_clause(
   out << "0" << "\n";
 }
 
+void dimacs_cnft::wait_file_block(const size_t ordinal) {
+  std::unique_lock<std::mutex> lock(writing_sync);
+  for(;;) {
+    assert(next_file_block <= ordinal);
+    if(next_file_block == ordinal) {
+      return;
+    }
+    block_written.wait(lock);
+  }
+}
+
+void dimacs_cnft::printer_entry(std::ostream *out) {
+  clause_range item;
+  std::stringstream output_block;
+  while(print_queue.pop(item)) {
+    output_block.str("");
+    output_block.clear();
+
+    for(clausest::const_iterator it=item.first; it!=item.limit; it++) {
+      write_dimacs_clause(*it, output_block, break_lines);
+    }
+
+    wait_file_block(item.ordinal);
+    *out << output_block.str();
+    std::unique_lock<std::mutex> lock(writing_sync);
+    next_file_block++;
+    block_written.notify_all();
+  }
+}
+
 void dimacs_cnft::write_clauses(std::ostream &out)
 {
-  std::size_t count = 0;
-  std::stringstream output_block;
+  std::vector<std::thread> pool;
+  const size_t thread_count = std::max(2u, std::thread::hardware_concurrency()) - 1;
+  for(size_t i=0; i<thread_count; i++) {
+    pool.emplace_back(&dimacs_cnft::printer_entry, this, &out);
+  }
+  next_file_block = 0;
+  size_t total_blocks = 0;
   for(clausest::const_iterator it=clauses.begin();
-      it!=clauses.end(); it++)
+      it!=clauses.end(); )
   {
-    write_dimacs_clause(*it, output_block, break_lines);
-
-    // print the block once in a while
-    if(++count % CNF_DUMP_BLOCK_SIZE == 0)
-    {
-      out << output_block.str();
-      output_block.str("");
+    clausest::const_iterator first = it;
+    size_t total_size = 0;
+    while(total_size < target_block_size && it != clauses.end()) {
+      total_size += it->size();
+      it++;
     }
+    print_queue.push(clause_range(total_blocks, first, it));
+    total_blocks++;
+  }
+  print_queue.request_shutdown();
+  wait_file_block(total_blocks);
+  for(size_t i=0; i<pool.size(); i++) {
+    pool[i].join();
   }
-
-  // make sure the final block is printed as well
-  out << output_block.str();
 }
 
 void dimacs_cnf_dumpt::lcnf(const bvt &bv)
diff --git a/src/solvers/sat/dimacs_cnf.h b/src/solvers/sat/dimacs_cnf.h
index 4f4296e8f91..c676ae935e3 100644
--- a/src/solvers/sat/dimacs_cnf.h
+++ b/src/solvers/sat/dimacs_cnf.h
@@ -11,11 +11,29 @@ Author: Daniel Kroening, kroening@kroening.com
 #define CPROVER_SOLVERS_SAT_DIMACS_CNF_H
 
 #include <iosfwd>
+#include <atomic>
+#include <condition_variable>
 
 #include "cnf_clause_list.h"
+#include <util/blocking_queue.h>
 
 class dimacs_cnft:public cnf_clause_listt
 {
+protected:
+  // Optimal block size, in clauses
+  const size_t target_block_size = 1<<16;
+
+  struct clause_range {
+    size_t ordinal;
+    clausest::const_iterator first, limit;
+    explicit clause_range(const size_t ordinal,
+                          const clausest::const_iterator first,
+                          const clausest::const_iterator limit)
+      : ordinal(ordinal), first(first), limit(limit)
+    { }
+    clause_range() : ordinal(0), first(nullptr), limit(nullptr) { }
+  };
+
 public:
   explicit dimacs_cnft(message_handlert &);
   virtual ~dimacs_cnft() { }
@@ -38,8 +56,14 @@ class dimacs_cnft:public cnf_clause_listt
 protected:
   void write_problem_line(std::ostream &out);
   void write_clauses(std::ostream &out);
+  void wait_file_block(const size_t ordinal);
+  void printer_entry(std::ostream *out);
 
   bool break_lines;
+  size_t next_file_block;
+  std::mutex writing_sync;
+  std::condition_variable block_written;
+  blocking_queue<clause_range> print_queue;
 };
 
 class dimacs_cnf_dumpt:public cnft
diff --git a/src/util/blocking_queue.h b/src/util/blocking_queue.h
new file mode 100644
index 00000000000..fe1ac35f186
--- /dev/null
+++ b/src/util/blocking_queue.h
@@ -0,0 +1,57 @@
+#ifndef CPROVER_UTIL_BLOCKING_QUEUE
+#define CPROVER_UTIL_BLOCKING_QUEUE
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+
+template <typename T> class blocking_queue {
+  std::condition_variable can_pop;
+  std::mutex sync;
+  std::queue<T> qu;
+  bool shutdown = false;
+
+public:
+  void push(const T& item)
+  {
+    {
+      std::unique_lock<std::mutex> lock(sync);
+      qu.push(item);
+    }
+    can_pop.notify_one();
+  }
+
+  void request_shutdown()
+  {
+    {
+      std::unique_lock<std::mutex> lock(sync);
+      shutdown = true;
+    }
+    can_pop.notify_all();
+  }
+
+  bool pop(T &item)
+  {
+    std::unique_lock<std::mutex> lock(sync);
+    for (;;)
+    {
+      if (qu.empty())
+      {
+        if (shutdown)
+        {
+          return false;
+        }
+      }
+      else
+      {
+        break;
+      }
+      can_pop.wait(lock);
+    }
+    item = std::move(qu.front());
+    qu.pop();
+    return true;
+  }
+};
+
+#endif // CPROVER_UTIL_BLOCKING_QUEUE