Skip to content

Commit

Permalink
removing the recursion only
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jan 23, 2025
1 parent 3233cf4 commit 226b934
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions system/loggerd/loggerd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ struct RemoteEncoder {
std::unique_ptr<VideoWriter> writer;
int encoderd_segment_offset;
int current_segment = -1;
std::vector<std::unique_ptr<Message>> q;
std::vector<Message *> q;
int dropped_frames = 0;
bool recording = false;
bool marked_ready_to_rotate = false;
bool seen_first_packet = false;
};

size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) {
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
auto flags = idx.getFlags();
Expand Down Expand Up @@ -116,15 +116,9 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, struct Re
return new_msg.size();
}

size_t write_encode_data(LoggerdState *s, Message *msg, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
return write_encode_data(s, event, re, encoder_info);
}

int handle_encoder_msg(LoggerdState *s, Message *raw_msg, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int bytes_count = 0;
std::unique_ptr<Message> msg(raw_msg);

// extract the message
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
Expand All @@ -135,7 +129,7 @@ int handle_encoder_msg(LoggerdState *s, Message *raw_msg, struct RemoteEncoder &
if (!re.seen_first_packet) {
re.seen_first_packet = true;
re.encoderd_segment_offset = idx.getSegmentNum();
LOGD("%s: has encoderd offset %d", encoder_info.publish_name, re.encoderd_segment_offset);
LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset);
}
int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset;

Expand All @@ -152,30 +146,34 @@ int handle_encoder_msg(LoggerdState *s, Message *raw_msg, struct RemoteEncoder &
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto &qmsg : re.q) {
bytes_count += write_encode_data(s, qmsg.get(), re, encoder_info);
for (auto qmsg : re.q) {
capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += write_encode_data(s, reader.getRoot<cereal::Event>(), re, encoder_info);
delete qmsg;
}
re.q.clear();
}
}
bytes_count += write_encode_data(s, event, re, encoder_info);
delete msg;
} else if (offset_segment_num > s->logger.segment()) {
// encoderd packet has a newer segment, this means encoderd has rolled over
if (!re.marked_ready_to_rotate) {
re.marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
s->logger.segment(), offset_segment_num,
s->ready_to_rotate.load(), s->max_waiting, encoder_info.publish_name);
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}
// queue up all the new segment messages, they go in after the rotate
re.q.emplace_back(std::move(msg));
re.q.push_back(msg);
} else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d",
encoder_info.publish_name, idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
// free the message, it's useless. this should never happen
// actually, this can happen if you restart encoderd
re.encoderd_segment_offset = -s->logger.segment();
delete msg;
}

return bytes_count;
Expand Down Expand Up @@ -268,7 +266,7 @@ void loggerd_thread() {
const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0);
if (service.encoder) {
s.last_camera_seen_tms = millis_since_boot();
bytes_count += handle_encoder_msg(&s, msg, remote_encoders[sock], encoder_infos_dict[service.name]);
bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]);
} else {
s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog);
bytes_count += msg->getSize();
Expand Down

0 comments on commit 226b934

Please sign in to comment.