diff --git a/input.c b/input.c index b0dd5789..1381e28a 100644 --- a/input.c +++ b/input.c @@ -328,7 +328,7 @@ int inbuf_read_to_delimeter (struct inbuf *inbuf, FILE *fp, if (!inbuf->buf) return 0; /* Previous EOF encountered, see below. */ - while (conf.run && 1) { + while (conf.run) { ssize_t r; size_t dof; int delim_found; diff --git a/kcat.c b/kcat.c index 78876fb8..1670c57b 100644 --- a/kcat.c +++ b/kcat.c @@ -144,6 +144,9 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { int32_t broker_id = -1; struct buf *b = rkmessage->_private; +#if RD_KAFKA_VERSION < 0x01000000 + static int say_once = 1; +#endif if (b) buf_destroy(b); @@ -166,7 +169,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, #if RD_KAFKA_VERSION < 0x01000000 if (rkmessage->offset == 0 && say_once) { - static int say_once = 1; KC_INFO(3, "Enable message offset reporting " "with '-X topic.produce.offset.report=true'\n"); say_once = 0; @@ -414,12 +416,13 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { } else { struct inbuf inbuf; struct buf *b; + int at_eof = 0; inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size); /* Read messages from input, delimited by conf.delim */ while (conf.run && - inbuf_read_to_delimeter(&inbuf, fp, &b)) { + !(at_eof = !inbuf_read_to_delimeter(&inbuf, fp, &b))) { int msgflags = 0; char *buf = b->buf; char *key = NULL; @@ -484,11 +487,9 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { conf.run = 0; } - if (conf.run) { - if (!feof(fp)) - KC_FATAL("Unable to read message: %s", - strerror(errno)); - } + if (conf.run && !at_eof) + KC_FATAL("Unable to read message: %s", + strerror(errno)); } #if ENABLE_TXNS diff --git a/tests/0004-piped_producer.sh b/tests/0004-piped_producer.sh new file mode 100755 index 00000000..d62afaca --- /dev/null +++ b/tests/0004-piped_producer.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# + +set -e +source helpers.sh + + +# +# Verify that piping messages to the producer works. +# + + +topic=$(make_topic_name) + + + +echo "msg1" | $KCAT -P -t $topic