Skip to content

Commit

Permalink
Get IPFS CID back from uploader and invoke RECORDING_END trigger
Browse files Browse the repository at this point in the history
Get IPFS CID back from uploader and invoke RECORDING_END trigger
  • Loading branch information
cyberj0g authored Oct 4, 2022
2 parents 04a3ad7 + 86c6e4a commit d69a61e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 11 deletions.
4 changes: 2 additions & 2 deletions lib/procs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in
return pid;
}

pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){
pid_t Util::Procs::startConverted(const char *const *argv, int *fdin, std::string triggerPayload){
pid_t pid;
int pipein[2];
setHandler();
Expand Down Expand Up @@ -479,7 +479,7 @@ pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){
}
dup2(ch_stdin, 0);
close(ch_stdin);
convertLogs(argv[0]);
convertLogs(argv[0], triggerPayload);
execvp(argv[0], (char *const *)argv);
/*LTS-START*/
char *trggr = getenv("MIST_TRIGGER");
Expand Down
2 changes: 1 addition & 1 deletion lib/procs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Util{
static std::string getOutputOf(std::deque<std::string> &argDeq);
static pid_t StartPiped(const char *const *argv, int *fdin, int *fdout, int *fderr);
static pid_t StartPiped(std::deque<std::string> &argDeq, int *fdin, int *fdout, int *fderr);
static pid_t startConverted(const char *const *argv, int *fdin);
static pid_t startConverted(const char *const *argv, int *fdin, std::string triggerPayload = "");
static void Stop(pid_t name);
static void Murder(pid_t name);
static void StopAll();
Expand Down
18 changes: 15 additions & 3 deletions lib/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endif
#include <stdlib.h>
#include <sys/resource.h>
#include <mist/triggers.h>

#define RAXHDR_FIELDOFFSET p[1]
#define RAX_REQDFIELDS_LEN 36
Expand Down Expand Up @@ -325,7 +326,7 @@ namespace Util{
}
}

void convertLogs(const char *progName){
void convertLogs(const char *progName, std::string triggerPayload){
int pipeErr[2];
int pipeOut[2];
pid_t thisPid = getpid();
Expand All @@ -352,7 +353,7 @@ namespace Util{
sigaction(SIGHUP, &new_action, NULL);
sigaction(SIGTERM, &new_action, NULL);
sigaction(SIGPIPE, &new_action, NULL);
Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid);
Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid, triggerPayload);
exit(0);
}
Util::Procs::fork_complete();
Expand All @@ -369,7 +370,7 @@ namespace Util{
}
}

void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid){
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload){
Socket::Connection errStream(-1, inErr);
Socket::Connection outStream(-1, inOut);
errStream.setBlocking(false);
Expand All @@ -389,6 +390,17 @@ namespace Util{
while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));}
while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));}
dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str());
if (!strcmp(progName, "livepeer-catalyst-uploader") && !triggerPayload.empty()) {
// output is one-line JSON with URL
const JSON::Value &value = JSON::fromString(line);
// insert received IPFS CID
unsigned long cid_pos = triggerPayload.find("IPFS_CID");
if (cid_pos != std::string::npos) {
triggerPayload.replace(cid_pos, 8, value["uri"].asString());
// invoke RECORDING_END trigger
Triggers::doTrigger("RECORDING_END", triggerPayload, streamName);
}
}
line.clear();
}
}else{Util::sleep(25);}
Expand Down
4 changes: 2 additions & 2 deletions lib/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ namespace Util{
void logParser(int in, int out, bool colored,
void callback(const std::string &, const std::string &, const std::string &, uint64_t, bool) = 0);
void redirectLogsIfNeeded();
void convertLogs(const char *progName);
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid);
void convertLogs(const char *progName, std::string triggerPayload = "");
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload = "");

/// Holds type, size and offset for RelAccX class internal data fields.
class RelAccXFieldData{
Expand Down
27 changes: 25 additions & 2 deletions src/output/output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,8 @@ namespace Mist{
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
Triggers::doTrigger("CONN_CLOSE", payload, streamName);
}
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName) &&
config->getString("target").substr(0, 7) != "ipfs://"){
uint64_t rightNow = Util::epoch();
std::stringstream payl;
payl << streamName << '\n';
Expand Down Expand Up @@ -1876,7 +1877,29 @@ namespace Mist{
return false;
}
Util::Procs::forget(child);
}else{
} else if (file.substr(0,7) == "ipfs://") {
// Create RECORDING_END trigger payload to be invoked once IPFS CID is known
std::stringstream payl;
payl << streamName << '\n';
payl << "IPFS_CID" << '\n';
payl << capa["name"].asStringRef() << '\n';
// Can't fill these fields without knowing when the trigger will be fired
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
const char *cmd[] = {"livepeer-catalyst-uploader", "-t", "2592000s", (char*)file.c_str(), 0};
pid_t child = Util::Procs::startConverted(cmd, &outFile, payl.str());
if (child == -1){
ERROR_MSG("livepeer-catalyst-uploader process did not start, aborting");
return false;
}
Util::Procs::forget(child);
}
else{
int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC);
if (!Util::createPathFor(file)){
Expand Down
2 changes: 1 addition & 1 deletion src/output/output_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Mist{
capa["methods"][0u]["hrn"] = "FLV progressive";
capa["methods"][0u]["priority"] = 5;
capa["methods"][0u]["player_url"] = "/oldflashplayer.swf";
capa["push_urls"].append("/*.flv");
capa["push_urls"].append("*.flv");

JSON::Value opt;
opt["arg"] = "string";
Expand Down

0 comments on commit d69a61e

Please sign in to comment.