Skip to content

Commit

Permalink
handle broken evaluation worker pipes more gracefully
Browse files Browse the repository at this point in the history
writeLine will throw a SysError exception, which obfuscates out-of-memory events where the eval worker is killed by the OS.
readLine is suffering from the same problem and will be handled in a subsequent commit.
  • Loading branch information
Mic92 committed Dec 9, 2023
1 parent 6f9a8d2 commit ab989d7
Showing 1 changed file with 56 additions and 8 deletions.
64 changes: 56 additions & 8 deletions src/nix-eval-jobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include <iostream>
#include <thread>
#include <filesystem>

#include <nix/eval-settings.hh>
#include <nix/config.h>
#include <nix/shared.hh>
Expand Down Expand Up @@ -283,6 +282,22 @@ std::string attrPathJoin(json input) {
});
}

[[nodiscard]] static int tryWriteLine(int fd, std::string s) {
s += "\n";
std::string_view sv{s};
while (!sv.empty()) {
checkInterrupt();
ssize_t res = write(fd, sv.data(), sv.size());
if (res == -1 && errno != EINTR) {
return -errno;
}
if (res > 0) {
sv.remove_prefix(res);
}
}
return 0;
}

static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
AutoCloseFD &from) {

Expand All @@ -303,7 +318,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,

while (true) {
/* Wait for the collector to send us a job name. */
writeLine(to.get(), "next");
if (tryWriteLine(to.get(), "next") < 0) {
return; // main process died
}

auto s = readLine(from.get());
if (s == "exit")
Expand Down Expand Up @@ -388,7 +405,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
printError(e.msg());
}

writeLine(to.get(), reply.dump());
if (tryWriteLine(to.get(), reply.dump()) < 0) {
return; // main process died
}

/* If our RSS exceeds the maximum, exit. The collector will
start a new process. */
Expand All @@ -398,7 +417,9 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
break;
}

writeLine(to.get(), "restart");
if (tryWriteLine(to.get(), "restart") < 0) {
return; // main process died
};
}

typedef std::function<void(ref<EvalState> state, Bindings &autoArgs,
Expand Down Expand Up @@ -430,10 +451,14 @@ struct Proc {
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
if (tryWriteLine(to->get(), err.dump()) < 0) {
return; // main process died
};
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
if (tryWriteLine(to->get(), "restart") < 0) {
return; // main process died
}
}
},
ProcessOptions{.allowVfork = false});
Expand All @@ -452,6 +477,25 @@ struct State {
std::exception_ptr exc;
};

void handleBrokenWorkerPipe(pid_t child) {
while (1) {
int rc = waitpid(child, nullptr, WNOHANG);
if (rc == 0) {
throw Error("BUG: worker pipe closed but worker still running?");
} else if (rc == -1) {
throw Error("BUG: waitpid waiting for worker failed: %s", strerror(errno));
} else {
if (WIFEXITED(rc)) {
throw Error("evaluation worker exited with %d",
WEXITSTATUS(rc));
} else if (WIFSIGNALED(rc)) {
throw Error("evaluation worker killed by signal %d",
WTERMSIG(rc));
} // else ignore WIFSTOPPED and WIFCONTINUED
}
}
}

std::function<void()> collector(Sync<State> &state_,
std::condition_variable &wakeup) {
return [&]() {
Expand Down Expand Up @@ -481,7 +525,9 @@ std::function<void()> collector(Sync<State> &state_,
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) ||
state->exc) {
writeLine(proc->to.get(), "exit");
if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(proc->pid);
}
return;
}
if (!state->todo.empty()) {
Expand All @@ -494,7 +540,9 @@ std::function<void()> collector(Sync<State> &state_,
}

/* Tell the worker to evaluate it. */
writeLine(proc->to.get(), "do " + attrPath.dump());
if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(proc->pid);
}

/* Wait for the response. */
auto respString = readLine(proc->from.get());
Expand Down

0 comments on commit ab989d7

Please sign in to comment.