From ab6ce814fa2e435359e77a1d3813853248679206 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 17 Nov 2022 16:24:10 +0100 Subject: [PATCH] Fix piped producer error (Unable to read message: ..) --- input.c | 2 +- kcat.c | 11 +++++------ tests/0004-piped_producer.sh | 17 +++++++++++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) create mode 100755 tests/0004-piped_producer.sh diff --git a/input.c b/input.c index b0dd5789..1381e28a 100644 --- a/input.c +++ b/input.c @@ -328,7 +328,7 @@ int inbuf_read_to_delimeter (struct inbuf *inbuf, FILE *fp, if (!inbuf->buf) return 0; /* Previous EOF encountered, see below. */ - while (conf.run && 1) { + while (conf.run) { ssize_t r; size_t dof; int delim_found; diff --git a/kcat.c b/kcat.c index 7ffd73c7..bfd6f8ff 100644 --- a/kcat.c +++ b/kcat.c @@ -416,12 +416,13 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { } else { struct inbuf inbuf; struct buf *b; + int at_eof = 0; inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size); /* Read messages from input, delimited by conf.delim */ while (conf.run && - inbuf_read_to_delimeter(&inbuf, fp, &b)) { + !(at_eof = !inbuf_read_to_delimeter(&inbuf, fp, &b))) { int msgflags = 0; char *buf = b->buf; char *key = NULL; @@ -486,11 +487,9 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { conf.run = 0; } - if (conf.run) { - if (!feof(fp)) - KC_FATAL("Unable to read message: %s", - strerror(errno)); - } + if (conf.run && !at_eof) + KC_FATAL("Unable to read message: %s", + strerror(errno)); } #if ENABLE_TXNS diff --git a/tests/0004-piped_producer.sh b/tests/0004-piped_producer.sh new file mode 100755 index 00000000..d62afaca --- /dev/null +++ b/tests/0004-piped_producer.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# + +set -e +source helpers.sh + + +# +# Verify that piping messages to the producer works. +# + + +topic=$(make_topic_name) + + + +echo "msg1" | $KCAT -P -t $topic