diff --git a/stf-inc/stf_compressed_ifstream.hpp b/stf-inc/stf_compressed_ifstream.hpp index 58f9867..7d725b6 100644 --- a/stf-inc/stf_compressed_ifstream.hpp +++ b/stf-inc/stf_compressed_ifstream.hpp @@ -76,6 +76,21 @@ namespace stf { })); } + /** + * Closes the file + */ + int close_() override { + if(!stream_) { + return 0; + } + + if(decompression_in_progress_) { + decompress_result_.get(); + } + + return STFIFstream::close_(); + } + public: STFCompressedIFstream() = default; @@ -87,14 +102,13 @@ namespace stf { explicit STFCompressedIFstream(const std::string_view filename) : // cppcheck-suppress passedByValue STFCompressedIFstream() { - open(filename); + STFCompressedIFstream::open(filename); } // Have to override the base class destructor to ensure that *our* close method gets called before destruction inline ~STFCompressedIFstream() override { - if(stream_) { - STFCompressedIFstream::close(); - } + STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK(); + STFCompressedIFstream::close_(); } /** @@ -108,21 +122,6 @@ namespace stf { readNextChunk_(); } - /** - * Closes the file - */ - int close() override { - if(!stream_) { - return 0; - } - - if(decompression_in_progress_) { - decompress_result_.get(); - } - - return STFIFstream::close(); - } - /** * Seeks by the specified number of marker records * \param num_markers Number of marker records to seek by diff --git a/stf-inc/stf_compressed_ofstream.hpp b/stf-inc/stf_compressed_ofstream.hpp index 4ec60c5..79d9ba4 100644 --- a/stf-inc/stf_compressed_ofstream.hpp +++ b/stf-inc/stf_compressed_ofstream.hpp @@ -220,6 +220,32 @@ namespace stf { chunk_indices_.emplace_back(end, next_chunk_pc, 0); } + /** + * Closes the file + */ + int close_() override { + if(!stream_) { + return 0; + } + + // Finish any pending chunk + if(pending_chunk_) { + // Skip writing the chunk in the buffer if it would break any readers that try to use the trace + if(incomplete_chunk_) { + std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl; + } + else { + compressChunk_(); + } + } + if(compression_in_progress_) { + compression_done_.get(); + compression_in_progress_ = false; + } + + return STFOFstream::close_(); + } + public: /** * Constructs an STFCompressedOFstream @@ -258,9 +284,8 @@ namespace stf { // Have to override the base class destructor to ensure that *our* close method gets called before destruction inline ~STFCompressedOFstream() override { - if(stream_) { - STFCompressedOFstream::close(); - } + STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK(); + STFCompressedOFstream::close_(); } /** @@ -317,31 +342,6 @@ namespace stf { next_chunk_end_ = marker_record_chunk_size_; } - /** - * Closes the file - */ - int close() override { - if(!stream_) { - return 0; - } - - // Finish any pending chunk - if(pending_chunk_) { - // Skip writing the chunk in the buffer if it would break any readers that try to use the trace - if(incomplete_chunk_) { - std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl; - } - else { - compressChunk_(); - } - } - if(compression_in_progress_) { - compression_done_.get(); - compression_in_progress_ = false; - } - return STFOFstream::close(); - } - void markerRecordCallback() override { STFOFstream::markerRecordCallback(); incomplete_chunk_ = false; // This chunk is safe to write now diff --git a/stf-inc/stf_fstream.hpp b/stf-inc/stf_fstream.hpp index 15e2048..03d16f1 100644 --- a/stf-inc/stf_fstream.hpp +++ b/stf-inc/stf_fstream.hpp @@ -15,6 +15,13 @@ #include "stf_protocol_id.hpp" #include "stf_vlen.hpp" +/** + * \macro STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK + * + * This macro must be invoked before calling the close_() method in an STFFstream-derived class + */ +#define STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK() std::lock_guard l(STFFstream::open_close_mutex_) + namespace stf { /** * \class STFFstream @@ -38,6 +45,7 @@ namespace stf { size_t num_records_read_ = 0; /**< Number of records seen so far */ size_t num_marker_records_ = 0; /**< Number of marker records seen so far */ bool has_32bit_events_ = false; /**< If true, EventRecord event values are packed into 32 bits */ + std::mutex open_close_mutex_; /**< Ensures open() and close() are not called from more than 1 thread simultaneously */ STFFstream() = default; @@ -106,15 +114,49 @@ namespace stf { } } + /** + * Virtual method that does the actual work of closing a file. + * WARNING: This function should not be called unless the open/close mutex has been locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK + * This can be overridden by subclasses, but note that any subclass that overrides it must also override the destructor. + */ + inline virtual int close_() { + int retcode = 0; + if(stream_) { + if(stream_ == stdout) { + fflush(stream_); // need to manually flush stdout + } + else if(stream_ != stdin) { // don't close stdin/stdout + if (used_popen_) { + retcode = pclose (stream_); + } + else if (stream_ != stdout) { + retcode = fclose (stream_); + } + } + stream_ = nullptr; + } + num_records_read_ = 0; + num_marker_records_ = 0; + + // If we aren't closing this from the atexit handler, go ahead and remove ourselves + // from open_streams_ + if(!lock_open_streams_) { + std::lock_guard l(open_streams_mutex_); + open_streams_.erase(this); + } + return retcode; + } + public: // Prevents copying any STF I/O objects STFFstream(const STFFstream&) = delete; void operator=(const STFFstream&) = delete; + // Any class that overrides close_ must also override the destructor! + // Also note that the open/close mutex must be manually locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK in any overridden destructor virtual inline ~STFFstream() { - if (stream_) { - STFFstream::close(); - } + STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK(); + STFFstream::close_(); } /** @@ -143,6 +185,10 @@ namespace stf { * \param rw_mode R/W mode */ inline void open(const std::string_view filename, const std::string_view rw_mode) { // cppcheck-suppress passedByValue + STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK(); + + stf_assert(!stream_, "Stream is already open. Call close() first."); + // special handling for stdin/stdout if(filename.compare("-") == 0) { if(rw_mode.compare("rb") == 0) { @@ -166,32 +212,9 @@ namespace stf { /** * \brief close the trace reader/writer */ - virtual inline int close() { - int retcode = 0; - if (stream_) { - if(stream_ == stdout) { - fflush(stream_); // need to manually flush stdout - } - else if(stream_ != stdin) { // don't close stdin/stdout - if (used_popen_) { - retcode = pclose (stream_); - } - else if (stream_ != stdout) { - retcode = fclose (stream_); - } - } - stream_ = nullptr; - } - num_records_read_ = 0; - num_marker_records_ = 0; - - // If we aren't closing this from the atexit handler, go ahead and remove ourselves - // from open_streams_ - if(!lock_open_streams_) { - std::lock_guard l(open_streams_mutex_); - open_streams_.erase(this); - } - return retcode; + inline int close() { + STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK(); + return close_(); } /**