diff --git a/examples/es-to-stdout.json b/examples/es-to-stdout.json
new file mode 100644
index 0000000..69bfe65
--- /dev/null
+++ b/examples/es-to-stdout.json
@@ -0,0 +1,12 @@
+{
+ "max_docs": 10000,
+ "source": {
+ "remote": {
+ "host": "http://localhost:9200"
+ },
+ "index": ".kibana",
+ "query": {
+ "size": 2000
+ }
+ }
+}
diff --git a/examples/kafka-to-stdout.json b/examples/kafka-to-stdout.json
new file mode 100644
index 0000000..810a4c3
--- /dev/null
+++ b/examples/kafka-to-stdout.json
@@ -0,0 +1,7 @@
+{
+ "max_docs": 1,
+ "source": {
+ "topic": "source-topic",
+ "bootstrap.servers": "127.0.0.1:9092"
+ }
+}
diff --git a/examples/stdin-to-es.json b/examples/stdin-to-es.json
new file mode 100644
index 0000000..c7962da
--- /dev/null
+++ b/examples/stdin-to-es.json
@@ -0,0 +1,7 @@
+{
+ "max_docs": 1,
+ "sink": {
+ "connection.url": "http://localhost:9200",
+ "dest.index": "dest-index-name2"
+ }
+}
diff --git a/resources/logback.xml b/resources/logback.xml
index 08554ac..47dad76 100644
--- a/resources/logback.xml
+++ b/resources/logback.xml
@@ -11,6 +11,7 @@
+
diff --git a/src/ops/es_to_stdout.clj b/src/ops/es_to_stdout.clj
new file mode 100644
index 0000000..fdd09f5
--- /dev/null
+++ b/src/ops/es_to_stdout.clj
@@ -0,0 +1,31 @@
+(ns ops.es-to-stdout
+ (:require [sink :as sink]
+ [source :as source]))
+
+(def default-opts
+ {:max_docs nil
+ :source {:implementation :elasticsearch
+ :remote {:host "http://localhost:9200"}
+ :index "*"
+ :query {:sort ["_doc"]
+ :size 2000}}
+ :sink {:implementation :stdout}})
+
+(defn execute
+ "Reads documents from Elasticsearch and writes them to stdout"
+ [opts]
+ (sink/store!
+ (source/fetch! (assoc opts :source (merge (:source default-opts)
+ (:source opts))))
+ (assoc opts :sink
+ (merge (:sink default-opts)
+ (:sink opts)))))
+
+(comment
+ (execute
+ {:max_docs 1
+ :source {:implementation :elasticsearch
+ :remote {:host "http://localhost:9200"}
+ :index ".kibana"
+ :query {:sort ["_doc"]
+ :size 2000}}}))
diff --git a/src/ops/kafka_to_stdout.clj b/src/ops/kafka_to_stdout.clj
new file mode 100644
index 0000000..e144df5
--- /dev/null
+++ b/src/ops/kafka_to_stdout.clj
@@ -0,0 +1,24 @@
+(ns ops.kafka-to-stdout)
+
+(def default-opts
+ {:max_docs 5
+ :source {:implementation :kafka
+ :bootstrap.servers "127.0.0.1:9092"
+ :topic nil
+ :decode-value? false
+ :impatient? true}
+ :sink {:implementation :stdout}})
+
+(defn execute
+ "Reads records from Kafka and writes them to STDOUT."
+ [opts]
+ (let [source-opts (merge (:source default-opts) (:source opts))
+ sink-opts (merge (:sink default-opts) (:sink opts))
+ records (source/fetch! (assoc opts :source source-opts))]
+ (sink/store! records (assoc opts :sink sink-opts))))
+
+(comment
+ (execute
+ {:max_docs 1
+ :source {:topic "source-topic"
+ :bootstrap.servers "127.0.0.1:9092"}}))
diff --git a/src/ops/stdin_to_es.clj b/src/ops/stdin_to_es.clj
new file mode 100644
index 0000000..ba39e24
--- /dev/null
+++ b/src/ops/stdin_to_es.clj
@@ -0,0 +1,30 @@
+(ns ops.stdin-to-es
+ (:require [sink.elasticsearch.index :as indexer]
+ [source :as source]
+ [jsonista.core :as json]))
+
+(def default-opts
+ {:max_docs 5
+ :source {:implementation :stdin
+ :decode-value? true}
+ :sink (merge {:implementation :elasticsearch}
+ indexer/default-opts)})
+
+(defn execute [opts]
+ (let [stdin-records (source/fetch! (assoc opts :source (merge (:source default-opts)
+ (:source opts))))]
+ (indexer/store! stdin-records
+ (merge
+ (:sink default-opts)
+ (:sink opts)))))
+
+(comment
+ (with-in-str
+ (json/write-value-as-string
+ {:_id "123" :_source {:foo "bar"}})
+ (ops.stdin-to-es/execute
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? true}
+ :sink {:connection.url "http://localhost:9200"
+ :dest.index "dest-index-name"}})))
diff --git a/src/ops/stdin_to_kafka.clj b/src/ops/stdin_to_kafka.clj
new file mode 100644
index 0000000..4b6d923
--- /dev/null
+++ b/src/ops/stdin_to_kafka.clj
@@ -0,0 +1,73 @@
+(ns ops.stdin-to-kafka
+ (:require [sink :as sink]
+ [source :as source]
+ [jsonista.core :as json])
+ (:import (sink KafkaRecord)))
+
+(def default-es-to-kafka-config
+ {:max_docs nil
+ :source {:implementation :stdin
+ :decode-value? true}
+ :sink {:implementation :kafka
+ :topic "sink-topic"
+ :bootstrap.servers "127.0.0.1:9092"}})
+
+(defn es-record? [record]
+ (and (contains? record :_id)
+ (contains? record :_source)))
+
+(defn execute [opts]
+ (sink/store!
+ (map (fn [record]
+ (if (string? record)
+ (KafkaRecord. nil record nil)
+ (if (es-record? record)
+ (KafkaRecord.
+ (get record :_id (get record :_id))
+ (get record :_source (get record :_source))
+ (dissoc record :_id :_source))
+ record)))
+ (source/fetch! (assoc opts :source (merge (:source default-es-to-kafka-config)
+ (:source opts)))))
+ (assoc opts :sink
+ (merge (:sink default-es-to-kafka-config)
+ (:sink opts)))))
+
+(comment
+ (with-in-str
+ (json/write-value-as-string
+ {:_id "123" :_source {:foo "bar"}})
+ (ops.stdin-to-kafka/execute
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? true}
+ :sink {:topic "stdin-to-kafka-test"
+ :bootstrap.servers "127.0.0.1:9092"}}))
+
+ (with-in-str
+ (json/write-value-as-string
+ {:_id "123" :_source {:foo "bar"}})
+ (ops.stdin-to-kafka/execute
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? false}
+ :sink {:topic "stdin-to-kafka-test"
+ :bootstrap.servers "127.0.0.1:9092"}}))
+
+ (with-in-str
+ (slurp "kafka-file.json")
+ (ops.stdin-to-kafka/execute
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? true}
+ :sink {:topic "stdin-to-kafka-test"
+ :bootstrap.servers "127.0.0.1:9092"}}))
+
+ (with-in-str
+ (slurp "kafka-file.json")
+ (ops.stdin-to-kafka/execute
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? false}
+ :sink {:topic "stdin-to-kafka-test"
+ :bootstrap.servers "127.0.0.1:9092"}})))
diff --git a/src/ops_overrides.clj b/src/ops_overrides.clj
index ba07763..e24bcf4 100644
--- a/src/ops_overrides.clj
+++ b/src/ops_overrides.clj
@@ -3,7 +3,10 @@
[core.maps :as maps]
[polyglot :as polyglot]
[server :as server]
- [clojure.java.io :as io])
+ [clojure.java.io :as io]
+ [ops.es-to-stdout :as es-to-stdout]
+ [ops.kafka-to-stdout :as kafka-to-stdout]
+ [ops.stdin-to-es :as std-to-es])
(:import (java.io File)))
@@ -48,4 +51,16 @@
["ALPHA."
(:doc (meta #'polyglot/apply-transformation))
(:doc (meta #'test-script-handler))])
- :defaults test-script-defaults}])
+ :defaults test-script-defaults}
+ {:name "kafka-to-stdout"
+ :handler-fn kafka-to-stdout/execute
+ :docs (:doc (meta #'kafka-to-stdout/execute))
+ :defaults kafka-to-stdout/default-opts}
+ {:name "elasticsearch-to-stdout"
+ :handler-fn es-to-stdout/execute
+ :docs (:doc (meta #'es-to-stdout/execute))
+ :defaults es-to-stdout/default-opts}
+ {:name "stdin-to-elasticsearch"
+ :handler-fn std-to-es/execute
+ :docs (:doc (meta #'std-to-es/execute))
+ :defaults std-to-es/default-opts}])
diff --git a/src/sink.clj b/src/sink.clj
index db83ee9..9516fdf 100644
--- a/src/sink.clj
+++ b/src/sink.clj
@@ -2,7 +2,8 @@
(:require [clojure.tools.logging :as log]
[sink.elasticsearch :as elasticsearch]
[sink.file :as file]
- [sink.kafka :as kafka]))
+ [sink.kafka :as kafka]
+ [sink.stdout :as stdout]))
(defrecord KafkaRecord [key value headers])
@@ -12,4 +13,5 @@
:kafka (kafka/store! records opts)
:elasticsearch (elasticsearch/store! records opts)
:file (file/store! records opts)
+ :stdout (stdout/store! records opts)
(log/errorf "No such sink '%s' implementation!" sink-implementation-id))))
diff --git a/src/sink/elasticsearch.clj b/src/sink/elasticsearch.clj
index 5f42f4d..6963e75 100644
--- a/src/sink/elasticsearch.clj
+++ b/src/sink/elasticsearch.clj
@@ -5,3 +5,9 @@
(defn store! [records opts]
(log/infof "Sinking in Elasticsearch")
(elasticsearch/store! records (:sink opts)))
+
+(comment
+ (sink.elasticsearch/store!
+ [{:_id "123" :_source {:foo "bar"}}]
+ {:sink {:connection.url "http://localhost:9200"
+ :dest.index "foo_index"}}))
diff --git a/src/sink/elasticsearch/index.clj b/src/sink/elasticsearch/index.clj
index f615549..1fb8357 100644
--- a/src/sink/elasticsearch/index.clj
+++ b/src/sink/elasticsearch/index.clj
@@ -112,8 +112,8 @@
(get-conf-val :dest-index sink-opts {}))
already-encoded? (get-conf-val :already.encoded sink-opts {})]
(when-not (ilm/index-exists? dest-host index-name)
- (log/infof "Created index: %s" (ilm/create-index! dest-host index-name)))
- (log/infof "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1"))
+ (log/debugf "Created index: %s" (ilm/create-index! dest-host index-name)))
+ (log/debugf "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1"))
(.start bulk-processor)
(r/fold
(fn [& [_ record]]
@@ -131,7 +131,7 @@
records)
(.flush bulk-processor flush-timeout)
(.stop bulk-processor)
- (log/infof "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s"))))
+ (log/debugf "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s"))))
(defrecord EsRecord [_id _source])
diff --git a/src/sink/stdout.clj b/src/sink/stdout.clj
new file mode 100644
index 0000000..d8ba2c1
--- /dev/null
+++ b/src/sink/stdout.clj
@@ -0,0 +1,17 @@
+(ns sink.stdout
+ (:require [jsonista.core :as json])
+ (:import (java.io BufferedWriter PrintWriter)))
+
+(defn store!
+ "JSON encodes each record and writes it to stdout."
+ [records opts]
+ (let [^PrintWriter writer (PrintWriter. (BufferedWriter. *out* (* 1024 8192)))]
+ (doseq [record records]
+ (.println writer (json/write-value-as-string record)))
+ (.flush writer)))
+
+(comment
+ (sink.stdout/store!
+ [{:value "line1"}
+ {:value "line2"}]
+ {:sink {}}))
diff --git a/src/source.clj b/src/source.clj
index 299c3b2..3a0c2ad 100644
--- a/src/source.clj
+++ b/src/source.clj
@@ -2,12 +2,14 @@
(:require [clojure.tools.logging :as log]
[source.elasticsearch :as elasticsearch]
[source.kafka :as kafka]
- [source.krp :as krp]))
+ [source.krp :as krp]
+ [source.stdin :as stdin]))
(defn fetch! [opts]
(let [source-implementation-id (keyword (get-in opts [:source :implementation]))]
(case source-implementation-id
:elasticsearch (elasticsearch/fetch opts)
:kafka (kafka/fetch opts)
+ :stdin (stdin/fetch opts)
:krp (krp/fetch opts)
(log/errorf "No such source implementation '%s'" source-implementation-id))))
diff --git a/src/source/kafka.clj b/src/source/kafka.clj
index 81effa4..7491a2d 100644
--- a/src/source/kafka.clj
+++ b/src/source/kafka.clj
@@ -229,8 +229,8 @@
:bootstrap.servers "127.0.0.1:9092"}})
(fetch
- {:max_docs 1
- :source {:topic "test-topic"
+ {:max_docs 2
+ :source {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"
:impatient? true
:retry-count 3}})
diff --git a/src/source/stdin.clj b/src/source/stdin.clj
new file mode 100644
index 0000000..a36f406
--- /dev/null
+++ b/src/source/stdin.clj
@@ -0,0 +1,36 @@
+(ns source.stdin
+ (:require [clojure.core.async :as a]
+ [core.async :as async]
+ [core.json :as json])
+ (:import (java.io BufferedReader)))
+
+(def defaults
+ {:max_docs 1
+ :source {:implementation :stdin
+ :decode-value? true}})
+
+(defn fetch
+ "Reads lines from STDIN and returns a lazy sequence of lines."
+ [opts]
+ (let [decode-value? (get-in opts [:source :decode-value?] true)
+ line-in-chan (a/chan 128)]
+ (a/go
+ (with-open [^BufferedReader rdr (BufferedReader. *in*)]
+ (loop [^String line (.readLine rdr)]
+ (if (= nil line)
+ (a/close! line-in-chan)
+ (do
+ (a/>!! line-in-chan
+ (if decode-value? (json/decode line) line))
+ (recur (.readLine rdr)))))))
+ (if-let [max-docs (get opts :max_docs)]
+ (take max-docs (async/seq-of-chan line-in-chan))
+ (async/seq-of-chan line-in-chan))))
+
+(comment
+ (with-in-str
+ "{\"foo\":\"bar\"}"
+ (println
+ (source.stdin/fetch
+ {:max_docs 1
+ :source {:decode-value? true}}))))
diff --git a/test/sink/stdout_test.clj b/test/sink/stdout_test.clj
new file mode 100644
index 0000000..7ab602b
--- /dev/null
+++ b/test/sink/stdout_test.clj
@@ -0,0 +1,13 @@
+(ns sink.stdout-test
+ (:require [clojure.test :refer :all]
+ [clojure.string :as str]
+ [sink.stdout :as stdout]))
+
+(deftest sinking-to-stdout
+ (let [records [{:value "line1"}
+ {:value "line2"}]
+ opts {:sink {}}]
+ (is (= "{\"value\":\"line1\"}\n{\"value\":\"line2\"}"
+ (str/trim
+ (with-out-str
+ (stdout/store! records opts)))))))
diff --git a/test/source/stdin_test.clj b/test/source/stdin_test.clj
new file mode 100644
index 0000000..9c9766b
--- /dev/null
+++ b/test/source/stdin_test.clj
@@ -0,0 +1,41 @@
+(ns source.stdin-test
+ (:require [clojure.test :refer :all]
+ [jsonista.core :as json]
+ [source.stdin :as stdin]
+ [clojure.string :as str]))
+
+(deftest reading-from-stdin
+ (with-in-str "" (is (= 0 (count (stdin/fetch {})))))
+ (let [str-line (json/write-value-as-string {:foo "bar"})]
+ (with-in-str
+ str-line
+ (let [rez (stdin/fetch {})]
+ (is (= 1 (count rez)))
+ (is (= [{:foo "bar"}] rez)))))
+ (testing "max_docs param handling"
+ (let [str-line (str/join "\n" [(json/write-value-as-string {:foo "bar1"})
+ (json/write-value-as-string {:foo "bar2"})])]
+ (with-in-str
+ str-line
+ (let [rez (stdin/fetch {})]
+ (is (= 2 (count rez)))
+ (is (= [{:foo "bar1"}
+ {:foo "bar2"}] rez))))
+ (with-in-str
+ str-line
+ (let [rez (stdin/fetch {:max_docs 1})]
+ (is (= 1 (count rez)))
+ (is (= [{:foo "bar1"}] rez))))))
+
+ (testing ":decode-value? parameter"
+ (let [str-line (str/join "\n" [(json/write-value-as-string {:foo "bar"})])]
+ (with-in-str
+ str-line
+ (let [rez (stdin/fetch {})]
+ (is (= 1 (count rez)))
+ (is (= [{:foo "bar"}] rez))))
+ (with-in-str
+ str-line
+ (let [rez (stdin/fetch {:source {:decode-value? false}})]
+ (is (= 1 (count rez)))
+ (is (= [(json/write-value-as-string {:foo "bar"})] rez)))))))