Skip to content

Commit

Permalink
Fixed FSR summary reconstruction on truncation #10
Browse files Browse the repository at this point in the history
  • Loading branch information
mliberty1 committed Feb 14, 2024
1 parent 229c840 commit e8ea672
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 143 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ This file contains the list of changes made to the JLS project.

## 0.9.2

2024 Feb 12 [in progress]
2024 Feb 14

* Fixed index error in file repair operation.
* Fixed FSR summary reconstruction on truncation #10


## 0.9.1
Expand Down
10 changes: 8 additions & 2 deletions example/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def parser():
p.add_argument('--sample_decimate_factor', type=int, default=1000, help='The samples per summary entry.')
p.add_argument('--entries_per_summary', type=int, default=20000, help='The entries per summary chunk.')
p.add_argument('--summary_decimate_factor', type=int, default=100, help='The summaries per summary entry.')
p.add_argument('--skip_close', action='store_true', help='Skip cleanly closing the file to test recovery.')
p.add_argument('--sample_id_offset', type=int, default=0, help='The starting sample_id offset.')
p.add_argument('--add',
action='append',
help='The waveform definition to add, which is one of:'
Expand Down Expand Up @@ -118,8 +120,9 @@ def run():

# Write to file
y_len = len(x)
sample_id = 0
with Writer(args.filename) as wr:
sample_id = args.sample_id_offset
wr = Writer(args.filename)
try:
wr.source_def_from_struct(source)
wr.signal_def_from_struct(signal)
wr.user_data(0, 'string user data at start')
Expand All @@ -135,6 +138,9 @@ def run():
x += 1.0 # increment
wr.user_data(42, b'binary data')
wr.user_data(43, {'my': 'data', 'json': [1, 2, 3]})
finally:
if not args.skip_close:
wr.close()


if __name__ == "__main__":
Expand Down
7 changes: 5 additions & 2 deletions example/jls/info.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,12 @@ int on_info(struct app_s * self, int argc, char * argv[]) {

if (chunks) {
struct jls_raw_s * raw;
ROE(jls_raw_open(&raw, path, "r"));
int32_t rc = jls_raw_open(&raw, path, "r");
if (rc && (rc != JLS_ERROR_TRUNCATED)) {
printf("Could not open for reading chunks\n");
return rc;
}

int32_t rc = 0;
struct jls_chunk_header_s hdr;

/*
Expand Down
2 changes: 2 additions & 0 deletions include_prv/jls/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ int32_t jls_core_scan_initial(struct jls_core_s * self);
int32_t jls_core_sources(struct jls_core_s * self, struct jls_source_def_s ** sources, uint16_t * count);
int32_t jls_core_signals(struct jls_core_s * self, struct jls_signal_def_s ** signals, uint16_t * count);
int32_t jls_core_signal(struct jls_core_s * self, uint16_t signal_id, struct jls_signal_def_s * signal);
int32_t jls_core_fsr_sample_buffer_alloc(struct jls_core_fsr_s * self);
void jls_core_fsr_sample_buffer_free(struct jls_core_fsr_s * self);
int32_t jls_core_fsr_seek(struct jls_core_s * self, uint16_t signal_id, uint8_t level, int64_t sample_id);
int32_t jls_core_fsr_length(struct jls_core_s * self, uint16_t signal_id, int64_t * samples);
int32_t jls_core_fsr(struct jls_core_s * self, uint16_t signal_id, int64_t start_sample_id,
Expand Down
156 changes: 85 additions & 71 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ int32_t jls_core_wr_summary(struct jls_core_s * self, uint16_t signal_id, enum j
chunk.offset = jls_raw_chunk_tell(self->raw);

// write
if (JLS_LOG_CHECK_STATIC(JLS_LOG_LEVEL_DEBUG3)) {
struct jls_payload_header_s * hdr = (struct jls_payload_header_s *) payload;
JLS_LOGD3("wr_summary(signal_id=%d, level=%d, timestamp=%" PRIi64 ", entries=%" PRIu32
") => offset=%" PRIi64,
(int) signal_id, (int) level,
hdr->timestamp, hdr->entry_count,
jls_raw_chunk_tell(self->raw));
}
ROE(jls_raw_wr(self->raw, &chunk.hdr, payload));
ROE(jls_core_update_item_head(self, &track->summary_head[level], &chunk));
return 0;
Expand Down Expand Up @@ -796,9 +804,12 @@ int32_t jls_core_fsr_length(struct jls_core_s * self, uint16_t signal_id, int64_
int64_t * offsets = self->signal_info[signal_id].tracks[JLS_TRACK_TYPE_FSR].head_offsets;
int level = JLS_SUMMARY_LEVEL_COUNT - 1;
for (; level >= 0; --level) {
if (offsets[level]) {
offset = offsets[level];
offset = offsets[level];
if (offset && (0 == jls_raw_chunk_seek(self->raw, offset))) {
break;
} else {
offset = 0;
offsets[level] = 0;
}
}
if (!offset) {
Expand Down Expand Up @@ -1215,103 +1226,106 @@ int32_t jls_core_repair_fsr(struct jls_core_s * self, uint16_t signal_id) {
struct jls_core_track_s * track = &signal_info->tracks[JLS_SIGNAL_TYPE_FSR];
track->parent = signal_info;

JLS_LOGI("repair signal_id %d", (int) signal_id);

// find first non-empty level
int64_t * offsets = signal_info->tracks[JLS_TRACK_TYPE_FSR].head_offsets;
int level = JLS_SUMMARY_LEVEL_COUNT - 1;
for (; (level > 0); --level) {
if (offsets[level]) {
break;
if (0 == jls_raw_chunk_seek(self->raw, offsets[level])) {
break;
} else {
offsets[level] = 0;
}
}
}

int64_t offset_next = 0;
int64_t offset_this = 0;
int64_t offset = 0;
bool skip_this = false;
bool skip_next = false;

for (; level > 0; --level) {
JLS_LOGI("repair signal_id %d, level %d", (int) signal_id, (int) level);
offset = offset_next ? offset_next : offsets[level];
skip_next = offset_next;
offset_next = 0;
offset_this = 0;
jls_core_fsr_summary_level_alloc(signal_info->track_fsr, level);
struct jls_core_fsr_level_s * lvl = signal_info->track_fsr->level[level];

while (offset) {
offset_this = offset;
skip_this = skip_next;
skip_next = false;
JLS_LOGI("repair signal_id %d, level %d, offset %" PRIi64 " %s",
(int) signal_id, (int) level, offset, skip_this ? "skip" : "");
// read index and summary chunks
if (jls_raw_chunk_seek(self->raw, offset)) {
break;
}
if (jls_core_rd_chunk(self)) {
break;
}
track->index_head[level] = self->chunk_cur;
int64_t offset_index_next = 0;
int64_t offset = offsets[level];
struct jls_core_chunk_s index_head;

memcpy(lvl->index, self->buf->start, self->chunk_cur.hdr.payload_length);
signal_info->tracks[JLS_TRACK_TYPE_FSR].index_head[level] = self->chunk_cur;
offset = self->chunk_cur.hdr.item_next;
jls_core_fsr_summary_level_alloc(signal_info->track_fsr, level);
struct jls_core_fsr_level_s * lvl = signal_info->track_fsr->level[level];
bool skip_summary = false;

if (jls_core_rd_chunk(self)) {
break;
}
track->summary_head[level] = self->chunk_cur;
while (level > 0) {
JLS_LOGI("repair_fsr signal_id %d, level %d, offset %" PRIi64, (int) signal_id, (int) level, offset);

memcpy(lvl->summary, self->buf->start, self->chunk_cur.hdr.payload_length);
signal_info->tracks[JLS_TRACK_TYPE_FSR].summary_head[level] = self->chunk_cur;
if (jls_core_rd_chunk(self)) { // read index
break;
}
index_head = self->chunk_cur;
memcpy(lvl->index, self->buf->start, self->chunk_cur.hdr.payload_length);

struct jls_fsr_index_s * r = lvl->index;
if (r->header.entry_size_bits != (sizeof(r->offsets[0]) * 8)) {
JLS_LOGE("invalid FSR index entry size: %d bits", (int) r->header.entry_size_bits);
return JLS_ERROR_PARAMETER_INVALID;
}
size_t sz = sizeof(r->header) + r->header.entry_count * sizeof(r->offsets[0]);
if (sz > self->buf->length) {
JLS_LOGE("invalid payload length");
return JLS_ERROR_PARAMETER_INVALID;
}
if (jls_core_rd_chunk(self)) { // read summary
break;
}
track->index_head[level] = index_head;
offset_index_next = index_head.hdr.item_next;
track->summary_head[level] = self->chunk_cur;
memcpy(lvl->summary, self->buf->start, self->chunk_cur.hdr.payload_length);

jls_raw_seek_end(self->raw);
if (!skip_this) {
if (jls_core_fsr_summaryN(signal_info->track_fsr, level + 1, offset_this)) {
JLS_LOGW("could not create summary - repair may not work");
}
}
struct jls_fsr_index_s * r = lvl->index;
if (r->header.entry_size_bits != (sizeof(r->offsets[0]) * 8)) {
JLS_LOGE("invalid FSR index entry size: %d bits", (int) r->header.entry_size_bits);
return JLS_ERROR_PARAMETER_INVALID;
}
size_t sz = sizeof(r->header) + r->header.entry_count * sizeof(r->offsets[0]);
if (sz > self->buf->length) {
JLS_LOGE("invalid payload length");
return JLS_ERROR_PARAMETER_INVALID;
}

jls_raw_seek_end(self->raw);
if (!skip_summary && jls_core_fsr_summaryN(signal_info->track_fsr, level + 1, offset)) {
JLS_LOGE("repair_fsr signal_id %d could not create summary - cannot repair this track", (int) signal_id);
}
skip_summary = false;

if ((offset_index_next > 0) && (0 == jls_raw_chunk_seek(self->raw, offset_index_next))) {
offset = offset_index_next;
} else {
skip_summary = true;
--level;
if (r->header.entry_count > 0) {
offset_next = r->offsets[r->header.entry_count - 1];
offset = r->offsets[r->header.entry_count - 1];
lvl->index->header.entry_count = 0;
lvl->summary->header.entry_count = 0;
if (0 != jls_raw_chunk_seek(self->raw, offset)) {
JLS_LOGE("Could not seek to lower-level index. Cannot repair.");
break;
}
} else {
JLS_LOGE("Empty index. Cannot repair.");
return JLS_ERROR_NOT_SUPPORTED;
}
lvl->index->header.entry_count = 0;
lvl->summary->header.entry_count = 0;
}
}

// update level 0 (data)
offset = offset_next ? offset_next : offsets[0];
skip_next = offset_next;
jls_core_fsr_sample_buffer_alloc(signal_info->track_fsr);
while (offset) {
if (jls_core_rd_chunk(self)) {
if (jls_raw_chunk_seek(self->raw, offset) || jls_core_rd_chunk(self)) {
break;
}
if (!skip_next) {
if (jls_core_fsr_summary1(signal_info->track_fsr, offset)) {
JLS_LOGW("could not create summary - repair may not work");
}
memcpy(signal_info->track_fsr->data, self->buf->start, self->buf->length);
JLS_LOGI("repair_fsr signal_id %d, level %d, offset %" PRIi64 " sample_id %" PRIi64 " to %" PRIi64 " data[0]=%f",
(int) signal_id, (int) level, offset,
signal_info->track_fsr->data->header.timestamp,
signal_info->track_fsr->data->header.timestamp + signal_info->track_fsr->data->header.entry_count,
signal_info->track_fsr->data->data[0]);
signal_info->track_fsr->data_length = signal_info->track_fsr->data->header.entry_count;

if (!skip_summary && jls_core_fsr_summary1(signal_info->track_fsr, offset)) {
JLS_LOGW("could not create summary - repair may not work");
}
skip_next = false;
skip_summary = false;
offset = self->chunk_cur.hdr.item_next;
}
jls_core_fsr_sample_buffer_free(signal_info->track_fsr);

JLS_LOGI("repair signal_id %d finalizing", (int) signal_id);
JLS_LOGI("repair_fsr signal_id %d finalizing", (int) signal_id);
jls_raw_seek_end(self->raw);

ROE(jls_fsr_close(signal_info->track_fsr));
signal_info->track_fsr = NULL;
return 0;
Expand Down
11 changes: 10 additions & 1 deletion src/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ int32_t jls_rd_open(struct jls_rd_s ** instance, const char * path) {
jls_track_repair_pointers(&signal_info->tracks[track_idx]);
}
}
}

GOE(jls_core_scan_fsr_sample_id(core));

for (uint16_t signal_idx = 0; signal_idx < JLS_SIGNAL_COUNT; ++signal_idx) {
struct jls_core_signal_s * signal_info = &core->signal_info[signal_idx];
if (signal_info->signal_def.signal_id != signal_idx) {
continue;
}

if (signal_info->signal_def.signal_type == JLS_SIGNAL_TYPE_FSR) {
GOE(jls_core_repair_fsr(core, signal_idx));
Expand Down Expand Up @@ -238,7 +247,7 @@ static inline void f64_to_stats(struct jls_statistics_s * stats, const double *
static int32_t rd_stats_chunk(struct jls_core_s * self, uint16_t signal_id, uint8_t level) {
ROE(jls_core_rd_chunk(self));
if (JLS_TAG_TRACK_FSR_SUMMARY != self->chunk_cur.hdr.tag) {
JLS_LOGW("unexpected chunk tag %d", (int) self->chunk_cur.hdr.tag);
JLS_LOGW("unexpected chunk tag %d at %" PRIi64, (int) self->chunk_cur.hdr.tag, self->chunk_cur.offset);
return JLS_ERROR_IO;
}
uint16_t metadata = (signal_id & SIGNAL_MASK) | (((uint16_t) level) << 12);
Expand Down
Loading

0 comments on commit e8ea672

Please sign in to comment.