Skip to content

Commit

Permalink
Merge pull request #30 from sparcians/dev/brettd/stf-fstream-thread-s…
Browse files Browse the repository at this point in the history
…afety-fixes

Make open() and close() methods of STFFstream (and all subclasses) thread-safe
  • Loading branch information
bdutro authored Mar 5, 2024
2 parents 852792f + 6b7d7ee commit 2c0770d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 76 deletions.
37 changes: 18 additions & 19 deletions stf-inc/stf_compressed_ifstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ namespace stf {
}));
}

/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}

if(decompression_in_progress_) {
decompress_result_.get();
}

return STFIFstream::close_();
}

public:
STFCompressedIFstream() = default;

Expand All @@ -87,14 +102,13 @@ namespace stf {
explicit STFCompressedIFstream(const std::string_view filename) : // cppcheck-suppress passedByValue
STFCompressedIFstream()
{
open(filename);
STFCompressedIFstream::open(filename);
}

// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedIFstream() override {
if(stream_) {
STFCompressedIFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedIFstream::close_();
}

/**
Expand All @@ -108,21 +122,6 @@ namespace stf {
readNextChunk_();
}

/**
* Closes the file
*/
int close() override {
if(!stream_) {
return 0;
}

if(decompression_in_progress_) {
decompress_result_.get();
}

return STFIFstream::close();
}

/**
* Seeks by the specified number of marker records
* \param num_markers Number of marker records to seek by
Expand Down
56 changes: 28 additions & 28 deletions stf-inc/stf_compressed_ofstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,32 @@ namespace stf {
chunk_indices_.emplace_back(end, next_chunk_pc, 0);
}

/**
* Closes the file
*/
int close_() override {
if(!stream_) {
return 0;
}

// Finish any pending chunk
if(pending_chunk_) {
// Skip writing the chunk in the buffer if it would break any readers that try to use the trace
if(incomplete_chunk_) {
std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl;
}
else {
compressChunk_();
}
}
if(compression_in_progress_) {
compression_done_.get();
compression_in_progress_ = false;
}

return STFOFstream::close_();
}

public:
/**
* Constructs an STFCompressedOFstream
Expand Down Expand Up @@ -258,9 +284,8 @@ namespace stf {

// Have to override the base class destructor to ensure that *our* close method gets called before destruction
inline ~STFCompressedOFstream() override {
if(stream_) {
STFCompressedOFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFCompressedOFstream::close_();
}

/**
Expand Down Expand Up @@ -317,31 +342,6 @@ namespace stf {
next_chunk_end_ = marker_record_chunk_size_;
}

/**
* Closes the file
*/
int close() override {
if(!stream_) {
return 0;
}

// Finish any pending chunk
if(pending_chunk_) {
// Skip writing the chunk in the buffer if it would break any readers that try to use the trace
if(incomplete_chunk_) {
std::cerr << "WARNING: The pending chunk in the STF compressed writer buffer is in an inconsistent state. It will not be written to the output file." << std::endl;
}
else {
compressChunk_();
}
}
if(compression_in_progress_) {
compression_done_.get();
compression_in_progress_ = false;
}
return STFOFstream::close();
}

void markerRecordCallback() override {
STFOFstream::markerRecordCallback();
incomplete_chunk_ = false; // This chunk is safe to write now
Expand Down
81 changes: 52 additions & 29 deletions stf-inc/stf_fstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
#include "stf_protocol_id.hpp"
#include "stf_vlen.hpp"

/**
* \macro STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK
*
* This macro must be invoked before calling the close_() method in an STFFstream-derived class
*/
#define STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK() std::lock_guard<std::mutex> l(STFFstream::open_close_mutex_)

namespace stf {
/**
* \class STFFstream
Expand All @@ -38,6 +45,7 @@ namespace stf {
size_t num_records_read_ = 0; /**< Number of records seen so far */
size_t num_marker_records_ = 0; /**< Number of marker records seen so far */
bool has_32bit_events_ = false; /**< If true, EventRecord event values are packed into 32 bits */
std::mutex open_close_mutex_; /**< Ensures open() and close() are not called from more than 1 thread simultaneously */

STFFstream() = default;

Expand Down Expand Up @@ -106,15 +114,49 @@ namespace stf {
}
}

/**
* Virtual method that does the actual work of closing a file.
* WARNING: This function should not be called unless the open/close mutex has been locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK
* This can be overridden by subclasses, but note that any subclass that overrides it must also override the destructor.
*/
inline virtual int close_() {
int retcode = 0;
if(stream_) {
if(stream_ == stdout) {
fflush(stream_); // need to manually flush stdout
}
else if(stream_ != stdin) { // don't close stdin/stdout
if (used_popen_) {
retcode = pclose (stream_);
}
else if (stream_ != stdout) {
retcode = fclose (stream_);
}
}
stream_ = nullptr;
}
num_records_read_ = 0;
num_marker_records_ = 0;

// If we aren't closing this from the atexit handler, go ahead and remove ourselves
// from open_streams_
if(!lock_open_streams_) {
std::lock_guard<std::mutex> l(open_streams_mutex_);
open_streams_.erase(this);
}
return retcode;
}

public:
// Prevents copying any STF I/O objects
STFFstream(const STFFstream&) = delete;
void operator=(const STFFstream&) = delete;

// Any class that overrides close_ must also override the destructor!
// Also note that the open/close mutex must be manually locked with STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK in any overridden destructor
virtual inline ~STFFstream() {
if (stream_) {
STFFstream::close();
}
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
STFFstream::close_();
}

/**
Expand Down Expand Up @@ -143,6 +185,10 @@ namespace stf {
* \param rw_mode R/W mode
*/
inline void open(const std::string_view filename, const std::string_view rw_mode) { // cppcheck-suppress passedByValue
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();

stf_assert(!stream_, "Stream is already open. Call close() first.");

// special handling for stdin/stdout
if(filename.compare("-") == 0) {
if(rw_mode.compare("rb") == 0) {
Expand All @@ -166,32 +212,9 @@ namespace stf {
/**
* \brief close the trace reader/writer
*/
virtual inline int close() {
int retcode = 0;
if (stream_) {
if(stream_ == stdout) {
fflush(stream_); // need to manually flush stdout
}
else if(stream_ != stdin) { // don't close stdin/stdout
if (used_popen_) {
retcode = pclose (stream_);
}
else if (stream_ != stdout) {
retcode = fclose (stream_);
}
}
stream_ = nullptr;
}
num_records_read_ = 0;
num_marker_records_ = 0;

// If we aren't closing this from the atexit handler, go ahead and remove ourselves
// from open_streams_
if(!lock_open_streams_) {
std::lock_guard<std::mutex> l(open_streams_mutex_);
open_streams_.erase(this);
}
return retcode;
inline int close() {
STF_FSTREAM_ACQUIRE_OPEN_CLOSE_LOCK();
return close_();
}

/**
Expand Down

0 comments on commit 2c0770d

Please sign in to comment.