Skip to content

Commit

Permalink
Handle piped in I/O from STDIN
Browse files Browse the repository at this point in the history
Modify the main recording loop to close corresponding I/O sink
file descriptors when EOF is signalled from a source read. This
is necessary to close one end of a pipe when I/O is pushed
to an interactive process like python via stdin.

In addition, execute the main recording transfer loop until the
child exits and a EIO/EOF condition is met from a source read.
  • Loading branch information
justin-stephenson committed Apr 20, 2020
1 parent 66dda1c commit a473529
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 12 deletions.
19 changes: 19 additions & 0 deletions include/tlog/pkt.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ extern void tlog_pkt_init_window(struct tlog_pkt *pkt,
unsigned short int width,
unsigned short int height);

/**
* Initialize an I/O EOF packet.
*
* @param pkt The packet to initialize.
* @param timestamp Timestamp of the EOF arrival.
* @param output True if output originated I/O, false if input.
*/
extern void tlog_pkt_init_eof(struct tlog_pkt *pkt,
const struct timespec *timestamp,
bool output);
/**
* Initialize an I/O data packet.
*
Expand Down Expand Up @@ -192,6 +202,15 @@ extern bool tlog_pkt_is_valid(const struct tlog_pkt *pkt);
*/
extern bool tlog_pkt_is_void(const struct tlog_pkt *pkt);

/**
* Check if a packet signalled an I/O EOF.
*
* @param pkt The packet to check.
*
* @return True if the packet indicates EOF, false otherwise.
*/
extern bool tlog_pkt_is_eof(const struct tlog_pkt *pkt);

/**
* Check if contents of one packet is equal to the contents of another one.
*
Expand Down
9 changes: 9 additions & 0 deletions include/tlog/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ extern tlog_grc tlog_sink_cut(struct tlog_sink *sink);
*/
extern tlog_grc tlog_sink_flush(struct tlog_sink *sink);

/**
* Close I/O fd of a tty sink.
*
* @param sink The sink to close an associated I/O file descriptor.
* @param output True if sink output will be closed, false if sink
* input will be closed.
*/
extern void tlog_sink_io_close(struct tlog_sink *sink, bool output);

/**
* Destroy (cleanup and free) a log sink.
*
Expand Down
10 changes: 10 additions & 0 deletions include/tlog/sink_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ typedef tlog_grc (*tlog_sink_type_cut_fn)(struct tlog_sink *sink);
*/
typedef tlog_grc (*tlog_sink_type_flush_fn)(struct tlog_sink *sink);

/**
* IO Close function prototype.
*
* @param sink The sink to close an associated I/O file descriptor.
* @param output Indicates which sink file descriptor to close.
*/
typedef void (*tlog_sink_type_io_close_fn)(struct tlog_sink *sink,
bool output);

/**
* Cleanup function prototype.
*
Expand All @@ -109,6 +118,7 @@ struct tlog_sink_type {
tlog_sink_type_write_fn write; /**< Writing function */
tlog_sink_type_cut_fn cut; /**< I/O-cutting function */
tlog_sink_type_flush_fn flush; /**< Flushing function */
tlog_sink_type_io_close_fn io_close; /**< I/O-close function */
tlog_sink_type_cleanup_fn cleanup; /**< Cleanup function */
};

Expand Down
22 changes: 22 additions & 0 deletions lib/tlog/pkt.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ tlog_pkt_init_window(struct tlog_pkt *pkt,
assert(tlog_pkt_is_valid(pkt));
}

void
tlog_pkt_init_eof(struct tlog_pkt *pkt,
const struct timespec *timestamp,
bool output)
{
assert(pkt != NULL);
memset(pkt, 0, sizeof(*pkt));
pkt->timestamp = *timestamp;
pkt->type = TLOG_PKT_TYPE_IO;
pkt->data.io.output = output;
pkt->data.io.len = 0;
assert(tlog_pkt_is_valid(pkt));
assert(tlog_pkt_is_eof(pkt));
}

void
tlog_pkt_init_io(struct tlog_pkt *pkt,
const struct timespec *timestamp,
Expand Down Expand Up @@ -114,6 +129,13 @@ tlog_pkt_is_void(const struct tlog_pkt *pkt)
return pkt->type == TLOG_PKT_TYPE_VOID;
}

bool
tlog_pkt_is_eof(const struct tlog_pkt *pkt)
{
assert(tlog_pkt_is_valid(pkt));
return pkt->type == TLOG_PKT_TYPE_IO && pkt->data.io.len == 0;
}

bool
tlog_pkt_is_equal(const struct tlog_pkt *a, const struct tlog_pkt *b)
{
Expand Down
36 changes: 32 additions & 4 deletions lib/tlog/rec.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ static volatile sig_atomic_t tlog_rec_alarm_set;
/* Number of ALRM signals caught */
static volatile sig_atomic_t tlog_rec_alarm_caught;

/* True if child stopped or terminated */
static volatile sig_atomic_t tlog_rec_child_exited;

static void
tlog_rec_alarm_sighandler(int signum)
{
Expand All @@ -74,6 +77,12 @@ tlog_rec_alarm_sighandler(int signum)
tlog_rec_alarm_caught++;
}

static void
tlog_rec_sigchld_handler()
{
tlog_rec_child_exited = true;
}

/**
* Get fully-qualified name of this host.
*
Expand Down Expand Up @@ -814,7 +823,7 @@ tlog_rec_transfer(struct tlog_errs **perrs,
{
const int exit_sig[] = {SIGINT, SIGTERM, SIGHUP};
tlog_grc return_grc = TLOG_RC_OK;
tlog_grc grc;
tlog_grc grc = TLOG_RC_OK;
size_t i, j;
struct sigaction sa;
bool log_pending = false;
Expand All @@ -827,6 +836,7 @@ tlog_rec_transfer(struct tlog_errs **perrs,
tlog_rec_exit_signum = 0;
tlog_rec_alarm_set = false;
tlog_rec_alarm_caught = 0;
tlog_rec_child_exited = false;

/* Setup signal handlers to terminate gracefully */
for (i = 0; i < TLOG_ARRAY_SIZE(exit_sig); i++) {
Expand Down Expand Up @@ -862,10 +872,28 @@ tlog_rec_transfer(struct tlog_errs **perrs,
"Failed to set a SIGALRM signal action");
}

/* Setup SIGCHLD signal handler */
sa.sa_handler = tlog_rec_sigchld_handler;
sigemptyset(&sa.sa_mask);
/* NOTE: no SA_RESTART on purpose */
sa.sa_flags = 0;
if(sigaction(SIGCHLD, &sa, NULL) == -1) {
grc = TLOG_GRC_ERRNO;
TLOG_ERRS_RAISECS(grc,
"Failed to set a SIGCHLD signal action");
}

/*
* Transfer I/O and window changes
*/
while (tlog_rec_exit_signum == 0) {
/* Expected exit conditions */
if (tlog_rec_child_exited) {
if (grc == TLOG_GRC_FROM(errno, EIO) || (tlog_pkt_is_eof(&pkt))) {
break;
}
}

/* Handle latency limit */
new_alarm_caught = tlog_rec_alarm_caught;
if (new_alarm_caught != last_alarm_caught) {
Expand Down Expand Up @@ -931,9 +959,8 @@ tlog_rec_transfer(struct tlog_errs **perrs,
tlog_errs_pushs(perrs, "Failed reading terminal data");
return_grc = grc;
}
break;
} else if (tlog_pkt_is_void(&pkt)) {
break;
} else if (tlog_pkt_is_eof(&pkt)) {
tlog_sink_io_close(tty_sink, pkt.data.io.output);
}
}

Expand Down Expand Up @@ -984,6 +1011,7 @@ tlog_rec_transfer(struct tlog_errs **perrs,
}
/* Restore signal handlers */
signal(SIGALRM, SIG_DFL);
signal(SIGCHLD, SIG_DFL);
for (i = 0; i < TLOG_ARRAY_SIZE(exit_sig); i++) {
sigaction(exit_sig[i], NULL, &sa);
if (sa.sa_handler != SIG_IGN) {
Expand Down
11 changes: 11 additions & 0 deletions lib/tlog/sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ tlog_sink_flush(struct tlog_sink *sink)
return grc;
}

void
tlog_sink_io_close(struct tlog_sink *sink,
bool output)
{
assert(tlog_sink_is_valid(sink));

if (sink->type->io_close != NULL) {
sink->type->io_close(sink, output);
}
}

void
tlog_sink_destroy(struct tlog_sink *sink)
{
Expand Down
2 changes: 1 addition & 1 deletion lib/tlog/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ tlog_source_read(struct tlog_source *source, struct tlog_pkt *pkt)
assert(grc == TLOG_RC_OK || tlog_pkt_is_void(pkt));
assert(tlog_source_is_valid(source));
#ifndef NDEBUG
if (!tlog_pkt_is_void(pkt)) {
if (!tlog_pkt_is_void(pkt) && !tlog_pkt_is_eof(pkt)) {
tlog_timespec_sub(&pkt->timestamp, &source->last_timestamp, &diff);
assert(tlog_timespec_cmp(&diff, &tlog_timespec_zero) >= 0);
assert(tlog_timespec_cmp(&diff, &tlog_delay_max_timespec) <= 0);
Expand Down
15 changes: 15 additions & 0 deletions lib/tlog/tty_sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ tlog_tty_sink_is_valid(const struct tlog_sink *sink)
return tty_sink != NULL;
}

static void
tlog_tty_sink_io_close(struct tlog_sink *sink, bool output)
{
struct tlog_tty_sink *tty_sink =
(struct tlog_tty_sink *)sink;

int *fd = output ? &tty_sink->out_fd : &tty_sink->in_fd;

if (*fd >= 0) {
close(*fd);
*fd = -1;
}
}

static void
tlog_tty_sink_cleanup(struct tlog_sink *sink)
{
Expand Down Expand Up @@ -124,4 +138,5 @@ const struct tlog_sink_type tlog_tty_sink_type = {
.cleanup = tlog_tty_sink_cleanup,
.is_valid = tlog_tty_sink_is_valid,
.write = tlog_tty_sink_write,
.io_close = tlog_tty_sink_io_close,
};
44 changes: 37 additions & 7 deletions lib/tlog/tty_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ tlog_tty_source_loc_fmt(const struct tlog_source *source, size_t loc)
return str;
}

/**
* Return true if any TTY source FDs are active, otherwise false
*
* @param fds FD list pointer.
*
* @return True if any TTY source FDs are active, false if all FDs are inactive.
*
*/
static bool
tlog_tty_source_fds_active(struct pollfd *fds)
{
for (int i = 0; i < TLOG_TTY_SOURCE_FD_IDX_NUM; i++) {
if (fds[i].fd >= 0) {
return true;
}
}

return false;
}

/**
* Call poll(2) if at least one FD in the poll FD list is "active"
* (i.e. non-negative), and poll(2) will be able to return an event.
Expand Down Expand Up @@ -243,6 +263,12 @@ tlog_tty_source_read(struct tlog_source *source, struct tlog_pkt *pkt)
goto success;
}
}
} else {
/* If all FDs are closed and win_fd is invalid, return Void packet
* to indicate no more data from source */
if (!tlog_tty_source_fds_active(tty_source->fd_list)) {
goto success;
}
}

/* Wait for I/O until interrupted by SIGWINCH or other signal */
Expand All @@ -264,20 +290,24 @@ tlog_tty_source_read(struct tlog_source *source, struct tlog_pkt *pkt)
if (tty_source->fd_list[tty_source->fd_idx].revents &
(POLLIN | POLLHUP | POLLERR)) {
ssize_t rc;
bool output;

output = tty_source->fd_idx == TLOG_TTY_SOURCE_FD_IDX_OUT;

rc = read(tty_source->fd_list[tty_source->fd_idx].fd,
tty_source->io_buf, tty_source->io_size);

if (clock_gettime(tty_source->clock_id, &ts) < 0) {
return TLOG_GRC_ERRNO;
}

if (rc < 0) {
return TLOG_GRC_ERRNO;
} else if (rc > 0) {
if (clock_gettime(tty_source->clock_id, &ts) < 0) {
return TLOG_GRC_ERRNO;
}
tlog_pkt_init_io(pkt, &ts,
tty_source->fd_idx ==
TLOG_TTY_SOURCE_FD_IDX_OUT,
tty_source->io_buf, false, rc);
tlog_pkt_init_io(pkt, &ts, output, tty_source->io_buf, false, rc);
} else if (rc == 0) {
tty_source->fd_list[tty_source->fd_idx].fd = -1;
tlog_pkt_init_eof(pkt, &ts, output);
}
goto success;
}
Expand Down

0 comments on commit a473529

Please sign in to comment.