From d7edb0a064a20d636bdfdaae4a6c32c23f360b99 Mon Sep 17 00:00:00 2001 From: Dainius Jocas Date: Mon, 24 May 2021 11:38:46 +0300 Subject: [PATCH] STDIN and STDOUT support (#12) * feat: stdout sink * feat: op kafka-to-std-out * feat: es-to-stdout operation * feat: source stdin; ops: stdin-to-es, stdin-to-kafka --- examples/es-to-stdout.json | 12 ++++++ examples/kafka-to-stdout.json | 7 +++ examples/stdin-to-es.json | 7 +++ resources/logback.xml | 1 + src/ops/es_to_stdout.clj | 31 ++++++++++++++ src/ops/kafka_to_stdout.clj | 24 +++++++++++ src/ops/stdin_to_es.clj | 30 +++++++++++++ src/ops/stdin_to_kafka.clj | 73 ++++++++++++++++++++++++++++++++ src/ops_overrides.clj | 19 ++++++++- src/sink.clj | 4 +- src/sink/elasticsearch.clj | 6 +++ src/sink/elasticsearch/index.clj | 6 +-- src/sink/stdout.clj | 17 ++++++++ src/source.clj | 4 +- src/source/kafka.clj | 4 +- src/source/stdin.clj | 36 ++++++++++++++++ test/sink/stdout_test.clj | 13 ++++++ test/source/stdin_test.clj | 41 ++++++++++++++++++ 18 files changed, 326 insertions(+), 9 deletions(-) create mode 100644 examples/es-to-stdout.json create mode 100644 examples/kafka-to-stdout.json create mode 100644 examples/stdin-to-es.json create mode 100644 src/ops/es_to_stdout.clj create mode 100644 src/ops/kafka_to_stdout.clj create mode 100644 src/ops/stdin_to_es.clj create mode 100644 src/ops/stdin_to_kafka.clj create mode 100644 src/sink/stdout.clj create mode 100644 src/source/stdin.clj create mode 100644 test/sink/stdout_test.clj create mode 100644 test/source/stdin_test.clj diff --git a/examples/es-to-stdout.json b/examples/es-to-stdout.json new file mode 100644 index 0000000..69bfe65 --- /dev/null +++ b/examples/es-to-stdout.json @@ -0,0 +1,12 @@ +{ + "max_docs": 10000, + "source": { + "remote": { + "host": "http://localhost:9200" + }, + "index": ".kibana", + "query": { + "size": 2000 + } + } +} diff --git a/examples/kafka-to-stdout.json b/examples/kafka-to-stdout.json new file mode 100644 index 0000000..810a4c3 --- /dev/null +++ b/examples/kafka-to-stdout.json @@ -0,0 +1,7 @@ +{ + "max_docs": 1, + "source": { + "topic": "source-topic", + "bootstrap.servers": "127.0.0.1:9092" + } +} diff --git a/examples/stdin-to-es.json b/examples/stdin-to-es.json new file mode 100644 index 0000000..c7962da --- /dev/null +++ b/examples/stdin-to-es.json @@ -0,0 +1,7 @@ +{ + "max_docs": 1, + "sink": { + "connection.url": "http://localhost:9200", + "dest.index": "dest-index-name2" + } +} diff --git a/resources/logback.xml b/resources/logback.xml index 08554ac..47dad76 100644 --- a/resources/logback.xml +++ b/resources/logback.xml @@ -11,6 +11,7 @@ + diff --git a/src/ops/es_to_stdout.clj b/src/ops/es_to_stdout.clj new file mode 100644 index 0000000..fdd09f5 --- /dev/null +++ b/src/ops/es_to_stdout.clj @@ -0,0 +1,31 @@ +(ns ops.es-to-stdout + (:require [sink :as sink] + [source :as source])) + +(def default-opts + {:max_docs nil + :source {:implementation :elasticsearch + :remote {:host "http://localhost:9200"} + :index "*" + :query {:sort ["_doc"] + :size 2000}} + :sink {:implementation :stdout}}) + +(defn execute + "Reads documents from Elasticsearch and writes them to stdout" + [opts] + (sink/store! + (source/fetch! (assoc opts :source (merge (:source default-opts) + (:source opts)))) + (assoc opts :sink + (merge (:sink default-opts) + (:sink opts))))) + +(comment + (execute + {:max_docs 1 + :source {:implementation :elasticsearch + :remote {:host "http://localhost:9200"} + :index ".kibana" + :query {:sort ["_doc"] + :size 2000}}})) diff --git a/src/ops/kafka_to_stdout.clj b/src/ops/kafka_to_stdout.clj new file mode 100644 index 0000000..e144df5 --- /dev/null +++ b/src/ops/kafka_to_stdout.clj @@ -0,0 +1,24 @@ +(ns ops.kafka-to-stdout) + +(def default-opts + {:max_docs 5 + :source {:implementation :kafka + :bootstrap.servers "127.0.0.1:9092" + :topic nil + :decode-value? false + :impatient? true} + :sink {:implementation :stdout}}) + +(defn execute + "Reads records from Kafka and writes them to STDOUT." + [opts] + (let [source-opts (merge (:source default-opts) (:source opts)) + sink-opts (merge (:sink default-opts) (:sink opts)) + records (source/fetch! (assoc opts :source source-opts))] + (sink/store! records (assoc opts :sink sink-opts)))) + +(comment + (execute + {:max_docs 1 + :source {:topic "source-topic" + :bootstrap.servers "127.0.0.1:9092"}})) diff --git a/src/ops/stdin_to_es.clj b/src/ops/stdin_to_es.clj new file mode 100644 index 0000000..ba39e24 --- /dev/null +++ b/src/ops/stdin_to_es.clj @@ -0,0 +1,30 @@ +(ns ops.stdin-to-es + (:require [sink.elasticsearch.index :as indexer] + [source :as source] + [jsonista.core :as json])) + +(def default-opts + {:max_docs 5 + :source {:implementation :stdin + :decode-value? true} + :sink (merge {:implementation :elasticsearch} + indexer/default-opts)}) + +(defn execute [opts] + (let [stdin-records (source/fetch! (assoc opts :source (merge (:source default-opts) + (:source opts))))] + (indexer/store! stdin-records + (merge + (:sink default-opts) + (:sink opts))))) + +(comment + (with-in-str + (json/write-value-as-string + {:_id "123" :_source {:foo "bar"}}) + (ops.stdin-to-es/execute + {:max_docs 1 + :source {:implementation :stdin + :decode-value? true} + :sink {:connection.url "http://localhost:9200" + :dest.index "dest-index-name"}}))) diff --git a/src/ops/stdin_to_kafka.clj b/src/ops/stdin_to_kafka.clj new file mode 100644 index 0000000..4b6d923 --- /dev/null +++ b/src/ops/stdin_to_kafka.clj @@ -0,0 +1,73 @@ +(ns ops.stdin-to-kafka + (:require [sink :as sink] + [source :as source] + [jsonista.core :as json]) + (:import (sink KafkaRecord))) + +(def default-es-to-kafka-config + {:max_docs nil + :source {:implementation :stdin + :decode-value? true} + :sink {:implementation :kafka + :topic "sink-topic" + :bootstrap.servers "127.0.0.1:9092"}}) + +(defn es-record? [record] + (and (contains? record :_id) + (contains? record :_source))) + +(defn execute [opts] + (sink/store! + (map (fn [record] + (if (string? record) + (KafkaRecord. nil record nil) + (if (es-record? record) + (KafkaRecord. + (get record :_id (get record :_id)) + (get record :_source (get record :_source)) + (dissoc record :_id :_source)) + record))) + (source/fetch! (assoc opts :source (merge (:source default-es-to-kafka-config) + (:source opts))))) + (assoc opts :sink + (merge (:sink default-es-to-kafka-config) + (:sink opts))))) + +(comment + (with-in-str + (json/write-value-as-string + {:_id "123" :_source {:foo "bar"}}) + (ops.stdin-to-kafka/execute + {:max_docs 1 + :source {:implementation :stdin + :decode-value? true} + :sink {:topic "stdin-to-kafka-test" + :bootstrap.servers "127.0.0.1:9092"}})) + + (with-in-str + (json/write-value-as-string + {:_id "123" :_source {:foo "bar"}}) + (ops.stdin-to-kafka/execute + {:max_docs 1 + :source {:implementation :stdin + :decode-value? false} + :sink {:topic "stdin-to-kafka-test" + :bootstrap.servers "127.0.0.1:9092"}})) + + (with-in-str + (slurp "kafka-file.json") + (ops.stdin-to-kafka/execute + {:max_docs 1 + :source {:implementation :stdin + :decode-value? true} + :sink {:topic "stdin-to-kafka-test" + :bootstrap.servers "127.0.0.1:9092"}})) + + (with-in-str + (slurp "kafka-file.json") + (ops.stdin-to-kafka/execute + {:max_docs 1 + :source {:implementation :stdin + :decode-value? false} + :sink {:topic "stdin-to-kafka-test" + :bootstrap.servers "127.0.0.1:9092"}}))) diff --git a/src/ops_overrides.clj b/src/ops_overrides.clj index ba07763..e24bcf4 100644 --- a/src/ops_overrides.clj +++ b/src/ops_overrides.clj @@ -3,7 +3,10 @@ [core.maps :as maps] [polyglot :as polyglot] [server :as server] - [clojure.java.io :as io]) + [clojure.java.io :as io] + [ops.es-to-stdout :as es-to-stdout] + [ops.kafka-to-stdout :as kafka-to-stdout] + [ops.stdin-to-es :as std-to-es]) (:import (java.io File))) @@ -48,4 +51,16 @@ ["ALPHA." (:doc (meta #'polyglot/apply-transformation)) (:doc (meta #'test-script-handler))]) - :defaults test-script-defaults}]) + :defaults test-script-defaults} + {:name "kafka-to-stdout" + :handler-fn kafka-to-stdout/execute + :docs (:doc (meta #'kafka-to-stdout/execute)) + :defaults kafka-to-stdout/default-opts} + {:name "elasticsearch-to-stdout" + :handler-fn es-to-stdout/execute + :docs (:doc (meta #'es-to-stdout/execute)) + :defaults es-to-stdout/default-opts} + {:name "stdin-to-elasticsearch" + :handler-fn std-to-es/execute + :docs (:doc (meta #'std-to-es/execute)) + :defaults std-to-es/default-opts}]) diff --git a/src/sink.clj b/src/sink.clj index db83ee9..9516fdf 100644 --- a/src/sink.clj +++ b/src/sink.clj @@ -2,7 +2,8 @@ (:require [clojure.tools.logging :as log] [sink.elasticsearch :as elasticsearch] [sink.file :as file] - [sink.kafka :as kafka])) + [sink.kafka :as kafka] + [sink.stdout :as stdout])) (defrecord KafkaRecord [key value headers]) @@ -12,4 +13,5 @@ :kafka (kafka/store! records opts) :elasticsearch (elasticsearch/store! records opts) :file (file/store! records opts) + :stdout (stdout/store! records opts) (log/errorf "No such sink '%s' implementation!" sink-implementation-id)))) diff --git a/src/sink/elasticsearch.clj b/src/sink/elasticsearch.clj index 5f42f4d..6963e75 100644 --- a/src/sink/elasticsearch.clj +++ b/src/sink/elasticsearch.clj @@ -5,3 +5,9 @@ (defn store! [records opts] (log/infof "Sinking in Elasticsearch") (elasticsearch/store! records (:sink opts))) + +(comment + (sink.elasticsearch/store! + [{:_id "123" :_source {:foo "bar"}}] + {:sink {:connection.url "http://localhost:9200" + :dest.index "foo_index"}})) diff --git a/src/sink/elasticsearch/index.clj b/src/sink/elasticsearch/index.clj index f615549..1fb8357 100644 --- a/src/sink/elasticsearch/index.clj +++ b/src/sink/elasticsearch/index.clj @@ -112,8 +112,8 @@ (get-conf-val :dest-index sink-opts {})) already-encoded? (get-conf-val :already.encoded sink-opts {})] (when-not (ilm/index-exists? dest-host index-name) - (log/infof "Created index: %s" (ilm/create-index! dest-host index-name))) - (log/infof "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1")) + (log/debugf "Created index: %s" (ilm/create-index! dest-host index-name))) + (log/debugf "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1")) (.start bulk-processor) (r/fold (fn [& [_ record]] @@ -131,7 +131,7 @@ records) (.flush bulk-processor flush-timeout) (.stop bulk-processor) - (log/infof "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s")))) + (log/debugf "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s")))) (defrecord EsRecord [_id _source]) diff --git a/src/sink/stdout.clj b/src/sink/stdout.clj new file mode 100644 index 0000000..d8ba2c1 --- /dev/null +++ b/src/sink/stdout.clj @@ -0,0 +1,17 @@ +(ns sink.stdout + (:require [jsonista.core :as json]) + (:import (java.io BufferedWriter PrintWriter))) + +(defn store! + "JSON encodes each record and writes it to stdout." + [records opts] + (let [^PrintWriter writer (PrintWriter. (BufferedWriter. *out* (* 1024 8192)))] + (doseq [record records] + (.println writer (json/write-value-as-string record))) + (.flush writer))) + +(comment + (sink.stdout/store! + [{:value "line1"} + {:value "line2"}] + {:sink {}})) diff --git a/src/source.clj b/src/source.clj index 299c3b2..3a0c2ad 100644 --- a/src/source.clj +++ b/src/source.clj @@ -2,12 +2,14 @@ (:require [clojure.tools.logging :as log] [source.elasticsearch :as elasticsearch] [source.kafka :as kafka] - [source.krp :as krp])) + [source.krp :as krp] + [source.stdin :as stdin])) (defn fetch! [opts] (let [source-implementation-id (keyword (get-in opts [:source :implementation]))] (case source-implementation-id :elasticsearch (elasticsearch/fetch opts) :kafka (kafka/fetch opts) + :stdin (stdin/fetch opts) :krp (krp/fetch opts) (log/errorf "No such source implementation '%s'" source-implementation-id)))) diff --git a/src/source/kafka.clj b/src/source/kafka.clj index 81effa4..7491a2d 100644 --- a/src/source/kafka.clj +++ b/src/source/kafka.clj @@ -229,8 +229,8 @@ :bootstrap.servers "127.0.0.1:9092"}}) (fetch - {:max_docs 1 - :source {:topic "test-topic" + {:max_docs 2 + :source {:topic "stdin-to-kafka-test" :bootstrap.servers "127.0.0.1:9092" :impatient? true :retry-count 3}}) diff --git a/src/source/stdin.clj b/src/source/stdin.clj new file mode 100644 index 0000000..a36f406 --- /dev/null +++ b/src/source/stdin.clj @@ -0,0 +1,36 @@ +(ns source.stdin + (:require [clojure.core.async :as a] + [core.async :as async] + [core.json :as json]) + (:import (java.io BufferedReader))) + +(def defaults + {:max_docs 1 + :source {:implementation :stdin + :decode-value? true}}) + +(defn fetch + "Reads lines from STDIN and returns a lazy sequence of lines." + [opts] + (let [decode-value? (get-in opts [:source :decode-value?] true) + line-in-chan (a/chan 128)] + (a/go + (with-open [^BufferedReader rdr (BufferedReader. *in*)] + (loop [^String line (.readLine rdr)] + (if (= nil line) + (a/close! line-in-chan) + (do + (a/>!! line-in-chan + (if decode-value? (json/decode line) line)) + (recur (.readLine rdr))))))) + (if-let [max-docs (get opts :max_docs)] + (take max-docs (async/seq-of-chan line-in-chan)) + (async/seq-of-chan line-in-chan)))) + +(comment + (with-in-str + "{\"foo\":\"bar\"}" + (println + (source.stdin/fetch + {:max_docs 1 + :source {:decode-value? true}})))) diff --git a/test/sink/stdout_test.clj b/test/sink/stdout_test.clj new file mode 100644 index 0000000..7ab602b --- /dev/null +++ b/test/sink/stdout_test.clj @@ -0,0 +1,13 @@ +(ns sink.stdout-test + (:require [clojure.test :refer :all] + [clojure.string :as str] + [sink.stdout :as stdout])) + +(deftest sinking-to-stdout + (let [records [{:value "line1"} + {:value "line2"}] + opts {:sink {}}] + (is (= "{\"value\":\"line1\"}\n{\"value\":\"line2\"}" + (str/trim + (with-out-str + (stdout/store! records opts))))))) diff --git a/test/source/stdin_test.clj b/test/source/stdin_test.clj new file mode 100644 index 0000000..9c9766b --- /dev/null +++ b/test/source/stdin_test.clj @@ -0,0 +1,41 @@ +(ns source.stdin-test + (:require [clojure.test :refer :all] + [jsonista.core :as json] + [source.stdin :as stdin] + [clojure.string :as str])) + +(deftest reading-from-stdin + (with-in-str "" (is (= 0 (count (stdin/fetch {}))))) + (let [str-line (json/write-value-as-string {:foo "bar"})] + (with-in-str + str-line + (let [rez (stdin/fetch {})] + (is (= 1 (count rez))) + (is (= [{:foo "bar"}] rez))))) + (testing "max_docs param handling" + (let [str-line (str/join "\n" [(json/write-value-as-string {:foo "bar1"}) + (json/write-value-as-string {:foo "bar2"})])] + (with-in-str + str-line + (let [rez (stdin/fetch {})] + (is (= 2 (count rez))) + (is (= [{:foo "bar1"} + {:foo "bar2"}] rez)))) + (with-in-str + str-line + (let [rez (stdin/fetch {:max_docs 1})] + (is (= 1 (count rez))) + (is (= [{:foo "bar1"}] rez)))))) + + (testing ":decode-value? parameter" + (let [str-line (str/join "\n" [(json/write-value-as-string {:foo "bar"})])] + (with-in-str + str-line + (let [rez (stdin/fetch {})] + (is (= 1 (count rez))) + (is (= [{:foo "bar"}] rez)))) + (with-in-str + str-line + (let [rez (stdin/fetch {:source {:decode-value? false}})] + (is (= 1 (count rez))) + (is (= [(json/write-value-as-string {:foo "bar"})] rez)))))))