Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better G3Reader.tell() method #172

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/include/core/G3Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class G3Reader : public G3Module {
public:
G3Reader(std::string filename, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);
size_t buffersize = 1024*1024, bool counter = false);
G3Reader(std::vector<std::string> filenames, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);
size_t buffersize = 1024*1024, bool counter = false);
Copy link
Member

@nwhitehorn nwhitehorn Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does counter = true cause a measurable performance drop? Or can we just make it always be on, the way it is for the writer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried. Maybe @mhasself or @tskisner can test this out for their use case?


void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
off_t Seek(off_t offset);
Expand All @@ -32,6 +32,7 @@ class G3Reader : public G3Module {
float timeout_;
bool track_filename_;
size_t buffersize_;
bool counter_;

SET_LOGGER("G3Reader");
};
Expand Down
4 changes: 3 additions & 1 deletion core/include/core/dataio.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ typedef boost::iostreams::filtering_ostream g3_ostream;
* read until EOF.
* @param timeout Timeout in seconds for socket connections.
* @param buffersize Advisory buffer size in bytes for aggregating reads
* @param counter If true, add a counter to the stream configuration,
* for use by the g3_istream_tell function.
* @return File descriptor for socket connections, or -1 for file input.
*/
int g3_istream_from_path(g3_istream &stream, const std::string &path,
float timeout=-1.0, size_t buffersize=1024*1024);
float timeout=-1.0, size_t buffersize=1024*1024, bool counter=false);

/**
* Seek to a byte offset in an open input file stream.
Expand Down
38 changes: 21 additions & 17 deletions core/src/G3Reader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
#include <G3Reader.h>

G3Reader::G3Reader(std::string filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
float timeout, bool track_filename, size_t buffersize, bool counter) :
prefix_file_(false), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
track_filename_(track_filename), buffersize_(buffersize),
counter_(counter)
{
g3_check_input_path(filename);
StartFile(filename);
}

G3Reader::G3Reader(std::vector<std::string> filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
float timeout, bool track_filename, size_t buffersize, bool counter) :
prefix_file_(false), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
track_filename_(track_filename), buffersize_(buffersize),
counter_(counter)
{
if (filename.size() == 0)
log_fatal("Empty file list provided to G3Reader");
Expand All @@ -34,7 +36,7 @@ void G3Reader::StartFile(std::string path)
log_info("Starting file %s\n", path.c_str());
cur_file_ = path;
n_frames_cur_ = 0;
(void) g3_istream_from_path(stream_, path, timeout_, buffersize_);
(void) g3_istream_from_path(stream_, path, timeout_, buffersize_, counter_);
}

void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
Expand Down Expand Up @@ -100,14 +102,14 @@ void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
}

off_t G3Reader::Seek(off_t offset) {
try {
return g3_istream_seek(stream_, offset);
} catch (...) {
log_fatal("Cannot seek %s; stream closed at EOF.", cur_file_.c_str());
}
if (!counter_)
log_fatal("Cannot seek %s; stream opened without counter.", cur_file_.c_str());
return g3_istream_seek(stream_, offset);
}

off_t G3Reader::Tell() {
if (!counter_)
log_fatal("Cannot tell %s; stream opened without counter.", cur_file_.c_str());
return g3_istream_tell(stream_);
}

Expand All @@ -125,15 +127,17 @@ PYBINDINGS("core") {
"streams, resulting in EOF behavior on expiry; unfortunately this "
"cannot be used for polling, you have to close the connection. "
"Use the `tell` and `seek` methods to record the position of and "
"seek to the beginning of a particular frame in the file. Set "
"track_filename to True to record the filename for each frame in "
"the ._filename attribute (fragile).",
init<std::string, int, float, bool, size_t>((arg("filename"),
"seek to the beginning of a particular frame in the file "
"(requires counter=True). Set track_filename to True to record "
"the filename for each frame in the ._filename attribute (fragile).",
init<std::string, int, float, bool, size_t, bool>((arg("filename"),
arg("n_frames_to_read")=0,arg("timeout")=-1.,
arg("track_filename")=false,arg("buffersize")=1024*1024)))
.def(init<std::vector<std::string>, int, float, bool, size_t>((
arg("track_filename")=false,arg("buffersize")=1024*1024,
arg("counter")=false)))
.def(init<std::vector<std::string>, int, float, bool, size_t, bool>((
arg("filename"), arg("n_frames_to_read")=0, arg("timeout")=-1.,
arg("track_filename")=false,arg("buffersize")=1024*1024)))
arg("track_filename")=false,arg("buffersize")=1024*1024,
arg("counter")=false)))
.def("tell", &G3Reader::Tell,
"Return the current byte offset from start of stream.")
.def("seek", &G3Reader::Seek,
Expand Down
17 changes: 16 additions & 1 deletion core/src/counter64.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class basic_counter64 {
typedef Ch char_type;
struct category
: dual_use,
filter_tag,
seekable_filter_tag,
multichar_tag,
optimally_buffered_tag
{ };
Expand All @@ -59,6 +59,21 @@ class basic_counter64 {
return result;
}

template<typename Device>
std::streampos seek(Device &dev, stream_offset off, BOOST_IOS::seekdir way)
{
chars_ = iostreams::seek(dev, off, way);
return chars_;
}

template<typename Device>
std::streampos seek(Device &dev, stream_offset off, BOOST_IOS::seekdir way,
BOOST_IOS::openmode which)
{
chars_ = iostreams::seek(dev, off, way, which);
return chars_;
}

template<typename Sink>
std::streamsize write(Sink& snk, const char_type* s, std::streamsize n)
{
Expand Down
13 changes: 11 additions & 2 deletions core/src/dataio.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

int
g3_istream_from_path(g3_istream &stream, const std::string &path,
float timeout, size_t buffersize)
float timeout, size_t buffersize, bool counter)
{
stream.reset();
if (path.size() > 3 && !path.compare(path.size() - 3, 3, ".gz"))
Expand All @@ -34,6 +34,9 @@ g3_istream_from_path(g3_istream &stream, const std::string &path,
#endif
}

if (counter)
stream.push(boost::iostreams::counter64());

int fd = -1;

// Figure out what kind of ultimate data source this is
Expand Down Expand Up @@ -165,7 +168,13 @@ g3_istream_seek(g3_istream &stream, off_t offset)
off_t
g3_istream_tell(g3_istream &stream)
{
return boost::iostreams::seek(stream, 0, std::ios_base::cur);
boost::iostreams::counter64 *counter =
stream.component<boost::iostreams::counter64>(
stream.size() - 2);
if (!counter)
log_fatal("Could not get stream counter");

return counter->characters();
}

void
Expand Down
4 changes: 2 additions & 2 deletions core/tests/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def checkinfo(fr):
# Indexing
class CachingReader:
def __init__(self, filename='test.g3'):
self.reader = core.G3Reader(filename='test.g3')
self.reader = core.G3Reader(filename='test.g3', counter=True)
self.w_pos = None

def __call__(self, frame):
Expand All @@ -88,7 +88,7 @@ def __call__(self, frame):
# Using cached index
class CachedReader:
def __init__(self, filename='test.g3', start=None):
self.reader = core.G3Reader(filename=filename)
self.reader = core.G3Reader(filename=filename, counter=True)
self.pos = start

def __call__(self, frame):
Expand Down