Skip to content

Commit

Permalink
STDIN and STDOUT support (#12)
Browse files Browse the repository at this point in the history
* feat: stdout sink
* feat: op kafka-to-std-out
* feat: es-to-stdout operation
* feat: source stdin; ops: stdin-to-es, stdin-to-kafka
  • Loading branch information
dainiusjocas authored May 24, 2021
1 parent 9b67fbe commit d7edb0a
Show file tree
Hide file tree
Showing 18 changed files with 326 additions and 9 deletions.
12 changes: 12 additions & 0 deletions examples/es-to-stdout.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"max_docs": 10000,
"source": {
"remote": {
"host": "http://localhost:9200"
},
"index": ".kibana",
"query": {
"size": 2000
}
}
}
7 changes: 7 additions & 0 deletions examples/kafka-to-stdout.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"max_docs": 1,
"source": {
"topic": "source-topic",
"bootstrap.servers": "127.0.0.1:9092"
}
}
7 changes: 7 additions & 0 deletions examples/stdin-to-es.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"max_docs": 1,
"sink": {
"connection.url": "http://localhost:9200",
"dest.index": "dest-index-name2"
}
}
1 change: 1 addition & 0 deletions resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</layout>
</appender>
<logger name="org.apache" level="WARN"/>
<logger name="io.searchbox" level="WARN"/>
<root level="${ROOT_LOGGER_LEVEL:-INFO}">
<appender-ref ref="A1"/>
</root>
Expand Down
31 changes: 31 additions & 0 deletions src/ops/es_to_stdout.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
(ns ops.es-to-stdout
(:require [sink :as sink]
[source :as source]))

(def default-opts
{:max_docs nil
:source {:implementation :elasticsearch
:remote {:host "http://localhost:9200"}
:index "*"
:query {:sort ["_doc"]
:size 2000}}
:sink {:implementation :stdout}})

(defn execute
"Reads documents from Elasticsearch and writes them to stdout"
[opts]
(sink/store!
(source/fetch! (assoc opts :source (merge (:source default-opts)
(:source opts))))
(assoc opts :sink
(merge (:sink default-opts)
(:sink opts)))))

(comment
(execute
{:max_docs 1
:source {:implementation :elasticsearch
:remote {:host "http://localhost:9200"}
:index ".kibana"
:query {:sort ["_doc"]
:size 2000}}}))
24 changes: 24 additions & 0 deletions src/ops/kafka_to_stdout.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns ops.kafka-to-stdout)

(def default-opts
{:max_docs 5
:source {:implementation :kafka
:bootstrap.servers "127.0.0.1:9092"
:topic nil
:decode-value? false
:impatient? true}
:sink {:implementation :stdout}})

(defn execute
"Reads records from Kafka and writes them to STDOUT."
[opts]
(let [source-opts (merge (:source default-opts) (:source opts))
sink-opts (merge (:sink default-opts) (:sink opts))
records (source/fetch! (assoc opts :source source-opts))]
(sink/store! records (assoc opts :sink sink-opts))))

(comment
(execute
{:max_docs 1
:source {:topic "source-topic"
:bootstrap.servers "127.0.0.1:9092"}}))
30 changes: 30 additions & 0 deletions src/ops/stdin_to_es.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
(ns ops.stdin-to-es
(:require [sink.elasticsearch.index :as indexer]
[source :as source]
[jsonista.core :as json]))

(def default-opts
{:max_docs 5
:source {:implementation :stdin
:decode-value? true}
:sink (merge {:implementation :elasticsearch}
indexer/default-opts)})

(defn execute [opts]
(let [stdin-records (source/fetch! (assoc opts :source (merge (:source default-opts)
(:source opts))))]
(indexer/store! stdin-records
(merge
(:sink default-opts)
(:sink opts)))))

(comment
(with-in-str
(json/write-value-as-string
{:_id "123" :_source {:foo "bar"}})
(ops.stdin-to-es/execute
{:max_docs 1
:source {:implementation :stdin
:decode-value? true}
:sink {:connection.url "http://localhost:9200"
:dest.index "dest-index-name"}})))
73 changes: 73 additions & 0 deletions src/ops/stdin_to_kafka.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
(ns ops.stdin-to-kafka
(:require [sink :as sink]
[source :as source]
[jsonista.core :as json])
(:import (sink KafkaRecord)))

(def default-es-to-kafka-config
{:max_docs nil
:source {:implementation :stdin
:decode-value? true}
:sink {:implementation :kafka
:topic "sink-topic"
:bootstrap.servers "127.0.0.1:9092"}})

(defn es-record? [record]
(and (contains? record :_id)
(contains? record :_source)))

(defn execute [opts]
(sink/store!
(map (fn [record]
(if (string? record)
(KafkaRecord. nil record nil)
(if (es-record? record)
(KafkaRecord.
(get record :_id (get record :_id))
(get record :_source (get record :_source))
(dissoc record :_id :_source))
record)))
(source/fetch! (assoc opts :source (merge (:source default-es-to-kafka-config)
(:source opts)))))
(assoc opts :sink
(merge (:sink default-es-to-kafka-config)
(:sink opts)))))

(comment
(with-in-str
(json/write-value-as-string
{:_id "123" :_source {:foo "bar"}})
(ops.stdin-to-kafka/execute
{:max_docs 1
:source {:implementation :stdin
:decode-value? true}
:sink {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"}}))

(with-in-str
(json/write-value-as-string
{:_id "123" :_source {:foo "bar"}})
(ops.stdin-to-kafka/execute
{:max_docs 1
:source {:implementation :stdin
:decode-value? false}
:sink {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"}}))

(with-in-str
(slurp "kafka-file.json")
(ops.stdin-to-kafka/execute
{:max_docs 1
:source {:implementation :stdin
:decode-value? true}
:sink {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"}}))

(with-in-str
(slurp "kafka-file.json")
(ops.stdin-to-kafka/execute
{:max_docs 1
:source {:implementation :stdin
:decode-value? false}
:sink {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"}})))
19 changes: 17 additions & 2 deletions src/ops_overrides.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
[core.maps :as maps]
[polyglot :as polyglot]
[server :as server]
[clojure.java.io :as io])
[clojure.java.io :as io]
[ops.es-to-stdout :as es-to-stdout]
[ops.kafka-to-stdout :as kafka-to-stdout]
[ops.stdin-to-es :as std-to-es])
(:import (java.io File)))


Expand Down Expand Up @@ -48,4 +51,16 @@
["ALPHA."
(:doc (meta #'polyglot/apply-transformation))
(:doc (meta #'test-script-handler))])
:defaults test-script-defaults}])
:defaults test-script-defaults}
{:name "kafka-to-stdout"
:handler-fn kafka-to-stdout/execute
:docs (:doc (meta #'kafka-to-stdout/execute))
:defaults kafka-to-stdout/default-opts}
{:name "elasticsearch-to-stdout"
:handler-fn es-to-stdout/execute
:docs (:doc (meta #'es-to-stdout/execute))
:defaults es-to-stdout/default-opts}
{:name "stdin-to-elasticsearch"
:handler-fn std-to-es/execute
:docs (:doc (meta #'std-to-es/execute))
:defaults std-to-es/default-opts}])
4 changes: 3 additions & 1 deletion src/sink.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [clojure.tools.logging :as log]
[sink.elasticsearch :as elasticsearch]
[sink.file :as file]
[sink.kafka :as kafka]))
[sink.kafka :as kafka]
[sink.stdout :as stdout]))

(defrecord KafkaRecord [key value headers])

Expand All @@ -12,4 +13,5 @@
:kafka (kafka/store! records opts)
:elasticsearch (elasticsearch/store! records opts)
:file (file/store! records opts)
:stdout (stdout/store! records opts)
(log/errorf "No such sink '%s' implementation!" sink-implementation-id))))
6 changes: 6 additions & 0 deletions src/sink/elasticsearch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@
(defn store! [records opts]
(log/infof "Sinking in Elasticsearch")
(elasticsearch/store! records (:sink opts)))

(comment
(sink.elasticsearch/store!
[{:_id "123" :_source {:foo "bar"}}]
{:sink {:connection.url "http://localhost:9200"
:dest.index "foo_index"}}))
6 changes: 3 additions & 3 deletions src/sink/elasticsearch/index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
(get-conf-val :dest-index sink-opts {}))
already-encoded? (get-conf-val :already.encoded sink-opts {})]
(when-not (ilm/index-exists? dest-host index-name)
(log/infof "Created index: %s" (ilm/create-index! dest-host index-name)))
(log/infof "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1"))
(log/debugf "Created index: %s" (ilm/create-index! dest-host index-name)))
(log/debugf "Disabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "-1"))
(.start bulk-processor)
(r/fold
(fn [& [_ record]]
Expand All @@ -131,7 +131,7 @@
records)
(.flush bulk-processor flush-timeout)
(.stop bulk-processor)
(log/infof "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s"))))
(log/debugf "Enabled index refresh interval: %s" (ilm/set-refresh-interval! dest-host index-name "1s"))))

(defrecord EsRecord [_id _source])

Expand Down
17 changes: 17 additions & 0 deletions src/sink/stdout.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns sink.stdout
(:require [jsonista.core :as json])
(:import (java.io BufferedWriter PrintWriter)))

(defn store!
"JSON encodes each record and writes it to stdout."
[records opts]
(let [^PrintWriter writer (PrintWriter. (BufferedWriter. *out* (* 1024 8192)))]
(doseq [record records]
(.println writer (json/write-value-as-string record)))
(.flush writer)))

(comment
(sink.stdout/store!
[{:value "line1"}
{:value "line2"}]
{:sink {}}))
4 changes: 3 additions & 1 deletion src/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
(:require [clojure.tools.logging :as log]
[source.elasticsearch :as elasticsearch]
[source.kafka :as kafka]
[source.krp :as krp]))
[source.krp :as krp]
[source.stdin :as stdin]))

(defn fetch! [opts]
(let [source-implementation-id (keyword (get-in opts [:source :implementation]))]
(case source-implementation-id
:elasticsearch (elasticsearch/fetch opts)
:kafka (kafka/fetch opts)
:stdin (stdin/fetch opts)
:krp (krp/fetch opts)
(log/errorf "No such source implementation '%s'" source-implementation-id))))
4 changes: 2 additions & 2 deletions src/source/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@
:bootstrap.servers "127.0.0.1:9092"}})

(fetch
{:max_docs 1
:source {:topic "test-topic"
{:max_docs 2
:source {:topic "stdin-to-kafka-test"
:bootstrap.servers "127.0.0.1:9092"
:impatient? true
:retry-count 3}})
Expand Down
36 changes: 36 additions & 0 deletions src/source/stdin.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
(ns source.stdin
(:require [clojure.core.async :as a]
[core.async :as async]
[core.json :as json])
(:import (java.io BufferedReader)))

(def defaults
{:max_docs 1
:source {:implementation :stdin
:decode-value? true}})

(defn fetch
"Reads lines from STDIN and returns a lazy sequence of lines."
[opts]
(let [decode-value? (get-in opts [:source :decode-value?] true)
line-in-chan (a/chan 128)]
(a/go
(with-open [^BufferedReader rdr (BufferedReader. *in*)]
(loop [^String line (.readLine rdr)]
(if (= nil line)
(a/close! line-in-chan)
(do
(a/>!! line-in-chan
(if decode-value? (json/decode line) line))
(recur (.readLine rdr)))))))
(if-let [max-docs (get opts :max_docs)]
(take max-docs (async/seq-of-chan line-in-chan))
(async/seq-of-chan line-in-chan))))

(comment
(with-in-str
"{\"foo\":\"bar\"}"
(println
(source.stdin/fetch
{:max_docs 1
:source {:decode-value? true}}))))
13 changes: 13 additions & 0 deletions test/sink/stdout_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(ns sink.stdout-test
(:require [clojure.test :refer :all]
[clojure.string :as str]
[sink.stdout :as stdout]))

(deftest sinking-to-stdout
(let [records [{:value "line1"}
{:value "line2"}]
opts {:sink {}}]
(is (= "{\"value\":\"line1\"}\n{\"value\":\"line2\"}"
(str/trim
(with-out-str
(stdout/store! records opts)))))))
Loading

0 comments on commit d7edb0a

Please sign in to comment.