Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for logical types #32

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3816c3d
Fix typo
llasram Sep 12, 2016
5b5ebaa
Add support for localdate as epoch day
olib963 Nov 11, 2018
d2adbe8
Add instant support for timestamps
olib963 Nov 11, 2018
667e478
Add UUID support
olib963 Nov 11, 2018
edbbd1f
Add rough outline of clojure logical types
olib963 Nov 12, 2018
22f679c
Fix decimal conversion
olib963 Nov 12, 2018
78f5892
Create clojure implementation of writing with conversions
olib963 Nov 12, 2018
a59a768
Integrate reading logical types in clojure
olib963 Nov 12, 2018
7bdebf5
Add support for time-millis logical type
olib963 Nov 12, 2018
225cc33
Add keyword logical type
olib963 Nov 12, 2018
da7b50a
Add support for fixed type decimals
olib963 Nov 12, 2018
5c77757
Don't mangle logical types
olib963 Nov 12, 2018
5808a02
Use default conversions when only schema is passed
olib963 Nov 12, 2018
7454eeb
Change conversions to a map and check the key for java conversions ma…
olib963 Nov 12, 2018
c851c2c
Change api so destructure is higher up
olib963 Nov 12, 2018
390bf25
Add a conversion that rounds and sets scale of decimals
olib963 Nov 13, 2018
300ae83
Validate keywords are strings
olib963 Nov 13, 2018
8fe5ccc
Fix compiler warnings
olib963 Nov 13, 2018
2eb6218
Add remaining primitive proxies
olib963 Nov 13, 2018
c96659a
Update documentation
olib963 Nov 13, 2018
ec449ab
Add test of record with logical types
olib963 Nov 13, 2018
0b730a3
Refactor destructuring
olib963 Nov 15, 2018
c54e6d7
Remove unused type hints
olib963 Nov 16, 2018
80b6c5c
remove keyword logical type
olib963 Nov 20, 2018
81b6b39
Simplify clojure datum writer
olib963 Nov 20, 2018
1094378
Allow keyword strings
olib963 Nov 20, 2018
e201b30
Cast for logical type conversions
olib963 Nov 20, 2018
7d28dff
Add java implementation of logical types
olib963 Nov 20, 2018
1aa520b
Remove bigdec calls where possible
olib963 Nov 20, 2018
d8910be
Refactor ClojureData
olib963 Nov 21, 2018
51cde6d
Make all logical types enabled by default and decimal rounding from c…
olib963 Nov 21, 2018
540d8af
Update docs
olib963 Nov 21, 2018
de6aad2
remove todos
olib963 Nov 21, 2018
ea012d0
Refactoring from review
olib963 Nov 22, 2018
b08fc75
Merge pull request #3 from olib963/add-logical-type-support
olib963 Nov 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,33 @@ schemas.
;;=> {:foo [bar "baz" 1337]}
```

### Logical Types

Abracad supports [avro logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types).
Out of the box it supports:

* `timestamp-millis` as `java.time.Instant`
* `time-millis` as `java.time.LocalTime`
* `date` as `java.time.LocalDate`
* `uuid` as `java.util.UUID`
* `decimal` as `java.math.BigDecimal`. The `*math-context*` dynamic var will be used to set the rounding mode
of bigdecimals and they will be scaled to the correct scale for your schema. If no math context is set then
`RoundingMode/UNNECESSARY` will be used.

You can also automatically (de)serialise strings as keywords using `{:type :string :clojureType :keyword}`.

if you prefer you can turn off logical types by setting the dynamic var `abracad.avro.conversion/*use-logical-types*` to `false`.

```clojure

(def logical-type-reader
(abracad.avro/datum-reader schema))

(def standard-reader
(binding [abracad.avro.conversion/*use-logical-types* false]
(abracad.avro/datum-reader schema)))
```

### Hadoop MapReduce integration

Avro 1.7.5 and later supports configurable “data models” for datum
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
:global-vars {*warn-on-reflection* true}
:source-paths ["src/clojure"]
:java-source-paths ["src/java"]
:javac-options ["-target" "1.7" "-source" "1.7"]
:javac-options ["-target" "1.8" "-source" "1.8"]
:dependencies [[org.clojure/clojure "1.8.0"]
[org.apache.avro/avro "1.8.0"]
[cheshire/cheshire "5.6.1"]]
Expand Down
38 changes: 20 additions & 18 deletions src/clojure/abracad/avro.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
(:require [clojure.java.io :as io]
[clojure.walk :refer [postwalk]]
[cheshire.core :as json]
[abracad.avro.util :refer [returning mangle unmangle coerce]])
[abracad.avro.util :refer [returning mangle unmangle coerce]]
[abracad.avro.conversion :as c])
(:import [java.io
ByteArrayInputStream ByteArrayOutputStream EOFException
File FileInputStream InputStream OutputStream]
ByteArrayOutputStream EOFException
File InputStream OutputStream]
[clojure.lang Named]
[org.apache.avro Schema Schema$Parser Schema$Type]
[org.apache.avro Schema Schema$Parser]
[org.apache.avro.file
CodecFactory DataFileWriter DataFileReader DataFileStream SeekableInput
SeekableFileInput SeekableByteArrayInput]
Expand Down Expand Up @@ -130,30 +131,29 @@ but the first `n` fields when sorting."
(defn datum-reader
"Return an Avro DatumReader which produces Clojure data structures."
{:tag `ClojureDatumReader}
([] (ClojureDatumReader.))
([schema]
(ClojureDatumReader.
(if-not (nil? schema) (parse-schema schema))))
([] (datum-reader nil))
([schema] (datum-reader schema schema))
([expected actual]
(ClojureDatumReader.
(if-not (nil? expected) (parse-schema expected))
(if-not (nil? actual) (parse-schema actual)))))
(when-not (nil? expected) (parse-schema expected))
(when-not (nil? actual) (parse-schema actual))
(c/create-clojure-data))))

(defn data-file-reader
"Return an Avro DataFileReader which produces Clojure data structures."
{:tag `DataFileReader}
([source] (data-file-reader nil source))
([expected source]
(DataFileReader/openReader
(seekable-input source) (datum-reader expected))))
(DataFileReader/openReader
(seekable-input source) (datum-reader expected))))

(defn data-file-stream
"Return an Avro DataFileStream which produces Clojure data structures."
{:tag `DataFileStream}
([source] (data-file-stream nil source))
([expected source]
(DataFileStream.
(io/input-stream source) (datum-reader expected))))
(DataFileStream.
(io/input-stream source) (datum-reader expected))))

(defmacro ^:private decoder-factory
"Invoke static methods of default Avro Decoder factory."
Expand Down Expand Up @@ -210,10 +210,11 @@ decoded serially from `source`."
(defn datum-writer
"Return an Avro DatumWriter which consumes Clojure data structures."
{:tag `ClojureDatumWriter}
([] (ClojureDatumWriter.))
([] (datum-writer nil))
([schema]
(ClojureDatumWriter.
(if-not (nil? schema) (parse-schema schema)))))
(ClojureDatumWriter.
(when-not (nil? schema) (parse-schema schema))
(c/create-clojure-data))))

(defn data-file-writer
"Return an Avro DataFileWriter which consumes Clojure data structures."
Expand Down Expand Up @@ -253,6 +254,7 @@ decoded serially from `source`."
(let [schema (parse-schema schema)]
(encoder-factory jsonEncoder schema ^OutputStream sink)))


(defn encode
"Serially encode each record in `records` to `sink` using `schema`.
The `sink` may be an existing Encoder object, or anything on which
Expand Down Expand Up @@ -284,7 +286,7 @@ via `encode`."
"Compare `x` and `y` according to `schema`."
[schema x y]
(let [schema (parse-schema schema)]
(.compare (ClojureData/get) x y schema)))
(.compare (ClojureData/withoutConversions) x y schema)))

(defn spit
"Like core `spit`, but emits `content` to `f` as Avro with `schema`."
Expand Down
5 changes: 2 additions & 3 deletions src/clojure/abracad/avro/compare.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
"Generic data comparison implementation."
{:private true}
(:refer-clojure :exclude [compare])
(:require [clojure.core :as cc]
[abracad.avro :as avro]
(:require [abracad.avro :as avro]
[abracad.avro.write :refer [resolve-union*]]
[abracad.avro.util :refer [case-enum if-not-let]])
(:import [org.apache.avro Schema Schema$Field Schema$Field$Order Schema$Type]
Expand Down Expand Up @@ -48,7 +47,7 @@

(defn supercompare
^long [x y ^Schema schema equals]
(._supercompare (ClojureData/get) x y schema equals))
(._supercompare (ClojureData/withoutConversions) x y schema equals))

(defn compare
^long [x y ^Schema schema equals]
Expand Down
28 changes: 28 additions & 0 deletions src/clojure/abracad/avro/conversion.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(ns abracad.avro.conversion
"Logical Type converter implementations"
(:import (org.apache.avro Conversions$UUIDConversion)
(java.math MathContext RoundingMode)
(abracad.avro ClojureData Java8LogicalTypes$RoundingDecimalConversion Java8LogicalTypes$DateConversion Java8LogicalTypes$TimeMillisConversion Java8LogicalTypes$TimeMicrosConversion Java8LogicalTypes$TimestampMillisConversion Java8LogicalTypes$TimestampMicrosConversion)))

(def ^:dynamic *use-logical-types*
"When true, record field values will be (de)serialised from/to their logical types
e.g. {:type :int :logicalType :date} -> java.time.LocalDate. Default value is `true`."
true)

(def uuid-conversion (Conversions$UUIDConversion.))
(def date-conversion (Java8LogicalTypes$DateConversion.))
(def time-conversion (Java8LogicalTypes$TimeMillisConversion.))
(def time-micros-conversion (Java8LogicalTypes$TimeMicrosConversion.))
(def timestamp-conversion (Java8LogicalTypes$TimestampMillisConversion.))
(def timestamp-micros-conversion (Java8LogicalTypes$TimestampMicrosConversion.))

(defn- default-conversions []
(let [^MathContext context *math-context*
roundingMode (if context (.getRoundingMode context) RoundingMode/UNNECESSARY)]
[uuid-conversion date-conversion time-conversion time-micros-conversion
timestamp-conversion timestamp-micros-conversion (Java8LogicalTypes$RoundingDecimalConversion. roundingMode)]))

(defn create-clojure-data []
(if *use-logical-types*
(ClojureData. (default-conversions))
(ClojureData/withoutConversions)))
7 changes: 3 additions & 4 deletions src/clojure/abracad/avro/edn.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
(ns abracad.avro.edn
(:require [abracad.avro :as avro]
[abracad.avro.util :refer [coerce]])
(:import [clojure.lang BigInt Cons IMeta IPersistentList IPersistentMap
IPersistentSet IPersistentVector ISeq Keyword PersistentArrayMap
PersistentQueue Ratio Sorted Symbol]
[org.apache.avro Schema]))
(:import [clojure.lang BigInt IPersistentList IPersistentMap
IPersistentSet IPersistentVector ISeq Keyword
PersistentQueue Ratio Sorted Symbol]))

(defprotocol EDNAvroSerializable
"Protocol for customizing EDN-in-Avro serialization."
Expand Down
8 changes: 7 additions & 1 deletion src/clojure/abracad/avro/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ evaluated at macro-expansion time."

(defn mangle
"Perform Clojure->Avro name-mangling when `*mangle-names*` is true."
[^String n] (if *mangle-names* (.replace n \- \_) n))
[^String n] (if *mangle-names* (-> n
(.replace \- \_)
;; Un-mangle logical type names
(.replace "timestamp_millis" "timestamp-millis")
(.replace "timestamp_micros" "timestamp-micros")
(.replace "time_millis" "time-millis")
(.replace "time_micros" "time-micros"))))

(defn unmangle
"Reverse Clojure->Avro name-mangling when `*mangle-names* is true."
Expand Down
33 changes: 26 additions & 7 deletions src/clojure/abracad/avro/write.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
[abracad.avro.edn :as edn]
[abracad.avro.util :refer [case-expr case-enum mangle unmangle
field-keyword]])
(:import [java.util Collection Map List]
(:import [java.util Collection Map]
[java.nio ByteBuffer]
[clojure.lang Named Sequential IRecord Indexed]
[org.apache.avro Schema Schema$Field Schema$Type AvroTypeException]
[org.apache.avro.io Encoder]
[org.apache.avro.generic GenericRecord]
[abracad.avro ClojureDatumWriter ArrayAccessor]))
[org.apache.avro.generic GenericRecord GenericFixed]
[abracad.avro ClojureDatumWriter ArrayAccessor ClojureData]
(org.apache.avro.util Utf8)))

(def ^:const edn-element
"abracad.avro.edn.Element")
Expand Down Expand Up @@ -71,6 +72,14 @@ record serialization."
(emit-fixed [^ByteBuffer bytes ^Encoder encoder]
(.writeFixed encoder bytes)))

(extend-type GenericFixed
HandleBytes
(count-bytes [^GenericFixed fixed] (alength (.bytes fixed)))
(emit-bytes [^GenericFixed fixed ^Encoder encoder]
(.writeBytes encoder (.bytes fixed)))
(emit-fixed [^GenericFixed fixed ^Encoder encoder]
(.writeFixed encoder (.bytes fixed))))

(defn schema-error!
[^Schema schema datum]
(throw (ex-info "Cannot write datum as schema"
Expand Down Expand Up @@ -200,7 +209,7 @@ record serialization."
GenericRecord
(schema-name [this] (-> this .getSchema .getFullName))
(field-get [this field] (.get this (name field)))
(filed-list [this] (->> this .getSchema .getFields (map field-name) set))
(field-list [this] (->> this .getSchema .getFields (map field-name) set))

Object
(schema-name [this] (schema-name-type this))
Expand All @@ -220,26 +229,36 @@ record serialization."
(and (named? datum) (.hasEnumSymbol schema (-> datum name mangle))))

(defn avro-bytes?
[^Schema schema datum]
[datum]
(or (instance? bytes-class datum)
(instance? ByteBuffer datum)))

(defn avro-string? [datum]
(or (string? datum)
(instance? Utf8 datum)
(instance? CharSequence datum)))

(defn avro-fixed?
[^Schema schema datum]
(and (avro-bytes? schema datum)
(and (avro-bytes? datum)
(= (.getFixedSize schema) (count-bytes datum))))

(defn- keyword-type? [^Schema schema]
(= "keyword" (.getProp schema ClojureData/CLOJURE_TYPE_PROP)))

(defn schema-match?
[^Schema schema datum]
(case-enum (.getType schema)
Schema$Type/RECORD (avro-record? schema datum)
Schema$Type/ENUM (avro-enum? schema datum)
Schema$Type/FIXED (avro-fixed? schema datum)
Schema$Type/BYTES (avro-bytes? schema datum)
Schema$Type/BYTES (avro-bytes? datum)
Schema$Type/LONG (integer? datum)
Schema$Type/INT (integer? datum)
Schema$Type/DOUBLE (float? datum)
Schema$Type/FLOAT (float? datum)
Schema$Type/STRING (if (keyword-type? schema) (keyword? datum)
(avro-string? datum))
#_ else false))

(defn resolve-union*
Expand Down
Loading