diff --git a/README.md b/README.md index 28752fd..5cb5cda 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,33 @@ schemas. ;;=> {:foo [bar "baz" 1337]} ``` +### Logical Types + +Abracad supports [avro logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types). +Out of the box it supports: + +* `timestamp-millis` as `java.time.Instant` +* `time-millis` as `java.time.LocalTime` +* `date` as `java.time.LocalDate` +* `uuid` as `java.util.UUID` +* `decimal` as `java.math.BigDecimal`. The `*math-context*` dynamic var will be used to set the rounding mode +of bigdecimals and they will be scaled to the correct scale for your schema. If no math context is set then +`RoundingMode/UNNECESSARY` will be used. + +You can also automatically (de)serialise strings as keywords using `{:type :string :clojureType :keyword}`. + +if you prefer you can turn off logical types by setting the dynamic var `abracad.avro.conversion/*use-logical-types*` to `false`. + +```clojure + +(def logical-type-reader + (abracad.avro/datum-reader schema)) + +(def standard-reader + (binding [abracad.avro.conversion/*use-logical-types* false] + (abracad.avro/datum-reader schema))) +``` + ### Hadoop MapReduce integration Avro 1.7.5 and later supports configurable “data models” for datum diff --git a/project.clj b/project.clj index 474016b..48f5fe0 100644 --- a/project.clj +++ b/project.clj @@ -8,7 +8,7 @@ :global-vars {*warn-on-reflection* true} :source-paths ["src/clojure"] :java-source-paths ["src/java"] - :javac-options ["-target" "1.7" "-source" "1.7"] + :javac-options ["-target" "1.8" "-source" "1.8"] :dependencies [[org.clojure/clojure "1.8.0"] [org.apache.avro/avro "1.8.0"] [cheshire/cheshire "5.6.1"]] diff --git a/src/clojure/abracad/avro.clj b/src/clojure/abracad/avro.clj index f69074a..aec5e4e 100644 --- a/src/clojure/abracad/avro.clj +++ b/src/clojure/abracad/avro.clj @@ -4,12 +4,13 @@ (:require [clojure.java.io :as io] [clojure.walk :refer [postwalk]] [cheshire.core :as json] - [abracad.avro.util :refer [returning mangle unmangle coerce]]) + [abracad.avro.util :refer [returning mangle unmangle coerce]] + [abracad.avro.conversion :as c]) (:import [java.io - ByteArrayInputStream ByteArrayOutputStream EOFException - File FileInputStream InputStream OutputStream] + ByteArrayOutputStream EOFException + File InputStream OutputStream] [clojure.lang Named] - [org.apache.avro Schema Schema$Parser Schema$Type] + [org.apache.avro Schema Schema$Parser] [org.apache.avro.file CodecFactory DataFileWriter DataFileReader DataFileStream SeekableInput SeekableFileInput SeekableByteArrayInput] @@ -130,30 +131,29 @@ but the first `n` fields when sorting." (defn datum-reader "Return an Avro DatumReader which produces Clojure data structures." {:tag `ClojureDatumReader} - ([] (ClojureDatumReader.)) - ([schema] - (ClojureDatumReader. - (if-not (nil? schema) (parse-schema schema)))) + ([] (datum-reader nil)) + ([schema] (datum-reader schema schema)) ([expected actual] (ClojureDatumReader. - (if-not (nil? expected) (parse-schema expected)) - (if-not (nil? actual) (parse-schema actual))))) + (when-not (nil? expected) (parse-schema expected)) + (when-not (nil? actual) (parse-schema actual)) + (c/create-clojure-data)))) (defn data-file-reader "Return an Avro DataFileReader which produces Clojure data structures." {:tag `DataFileReader} ([source] (data-file-reader nil source)) ([expected source] - (DataFileReader/openReader - (seekable-input source) (datum-reader expected)))) + (DataFileReader/openReader + (seekable-input source) (datum-reader expected)))) (defn data-file-stream "Return an Avro DataFileStream which produces Clojure data structures." {:tag `DataFileStream} ([source] (data-file-stream nil source)) ([expected source] - (DataFileStream. - (io/input-stream source) (datum-reader expected)))) + (DataFileStream. + (io/input-stream source) (datum-reader expected)))) (defmacro ^:private decoder-factory "Invoke static methods of default Avro Decoder factory." @@ -210,10 +210,11 @@ decoded serially from `source`." (defn datum-writer "Return an Avro DatumWriter which consumes Clojure data structures." {:tag `ClojureDatumWriter} - ([] (ClojureDatumWriter.)) + ([] (datum-writer nil)) ([schema] - (ClojureDatumWriter. - (if-not (nil? schema) (parse-schema schema))))) + (ClojureDatumWriter. + (when-not (nil? schema) (parse-schema schema)) + (c/create-clojure-data)))) (defn data-file-writer "Return an Avro DataFileWriter which consumes Clojure data structures." @@ -253,6 +254,7 @@ decoded serially from `source`." (let [schema (parse-schema schema)] (encoder-factory jsonEncoder schema ^OutputStream sink))) + (defn encode "Serially encode each record in `records` to `sink` using `schema`. The `sink` may be an existing Encoder object, or anything on which @@ -284,7 +286,7 @@ via `encode`." "Compare `x` and `y` according to `schema`." [schema x y] (let [schema (parse-schema schema)] - (.compare (ClojureData/get) x y schema))) + (.compare (ClojureData/withoutConversions) x y schema))) (defn spit "Like core `spit`, but emits `content` to `f` as Avro with `schema`." diff --git a/src/clojure/abracad/avro/compare.clj b/src/clojure/abracad/avro/compare.clj index b0524d2..0defa37 100644 --- a/src/clojure/abracad/avro/compare.clj +++ b/src/clojure/abracad/avro/compare.clj @@ -2,8 +2,7 @@ "Generic data comparison implementation." {:private true} (:refer-clojure :exclude [compare]) - (:require [clojure.core :as cc] - [abracad.avro :as avro] + (:require [abracad.avro :as avro] [abracad.avro.write :refer [resolve-union*]] [abracad.avro.util :refer [case-enum if-not-let]]) (:import [org.apache.avro Schema Schema$Field Schema$Field$Order Schema$Type] @@ -48,7 +47,7 @@ (defn supercompare ^long [x y ^Schema schema equals] - (._supercompare (ClojureData/get) x y schema equals)) + (._supercompare (ClojureData/withoutConversions) x y schema equals)) (defn compare ^long [x y ^Schema schema equals] diff --git a/src/clojure/abracad/avro/conversion.clj b/src/clojure/abracad/avro/conversion.clj new file mode 100644 index 0000000..0d9ce6d --- /dev/null +++ b/src/clojure/abracad/avro/conversion.clj @@ -0,0 +1,28 @@ +(ns abracad.avro.conversion + "Logical Type converter implementations" + (:import (org.apache.avro Conversions$UUIDConversion) + (java.math MathContext RoundingMode) + (abracad.avro ClojureData Java8LogicalTypes$RoundingDecimalConversion Java8LogicalTypes$DateConversion Java8LogicalTypes$TimeMillisConversion Java8LogicalTypes$TimeMicrosConversion Java8LogicalTypes$TimestampMillisConversion Java8LogicalTypes$TimestampMicrosConversion))) + +(def ^:dynamic *use-logical-types* + "When true, record field values will be (de)serialised from/to their logical types + e.g. {:type :int :logicalType :date} -> java.time.LocalDate. Default value is `true`." + true) + +(def uuid-conversion (Conversions$UUIDConversion.)) +(def date-conversion (Java8LogicalTypes$DateConversion.)) +(def time-conversion (Java8LogicalTypes$TimeMillisConversion.)) +(def time-micros-conversion (Java8LogicalTypes$TimeMicrosConversion.)) +(def timestamp-conversion (Java8LogicalTypes$TimestampMillisConversion.)) +(def timestamp-micros-conversion (Java8LogicalTypes$TimestampMicrosConversion.)) + +(defn- default-conversions [] + (let [^MathContext context *math-context* + roundingMode (if context (.getRoundingMode context) RoundingMode/UNNECESSARY)] + [uuid-conversion date-conversion time-conversion time-micros-conversion + timestamp-conversion timestamp-micros-conversion (Java8LogicalTypes$RoundingDecimalConversion. roundingMode)])) + +(defn create-clojure-data [] + (if *use-logical-types* + (ClojureData. (default-conversions)) + (ClojureData/withoutConversions))) diff --git a/src/clojure/abracad/avro/edn.clj b/src/clojure/abracad/avro/edn.clj index 10afc8f..2d933c4 100644 --- a/src/clojure/abracad/avro/edn.clj +++ b/src/clojure/abracad/avro/edn.clj @@ -1,10 +1,9 @@ (ns abracad.avro.edn (:require [abracad.avro :as avro] [abracad.avro.util :refer [coerce]]) - (:import [clojure.lang BigInt Cons IMeta IPersistentList IPersistentMap - IPersistentSet IPersistentVector ISeq Keyword PersistentArrayMap - PersistentQueue Ratio Sorted Symbol] - [org.apache.avro Schema])) + (:import [clojure.lang BigInt IPersistentList IPersistentMap + IPersistentSet IPersistentVector ISeq Keyword + PersistentQueue Ratio Sorted Symbol])) (defprotocol EDNAvroSerializable "Protocol for customizing EDN-in-Avro serialization." diff --git a/src/clojure/abracad/avro/util.clj b/src/clojure/abracad/avro/util.clj index 9ecc90a..d4d368f 100644 --- a/src/clojure/abracad/avro/util.clj +++ b/src/clojure/abracad/avro/util.clj @@ -37,7 +37,13 @@ evaluated at macro-expansion time." (defn mangle "Perform Clojure->Avro name-mangling when `*mangle-names*` is true." - [^String n] (if *mangle-names* (.replace n \- \_) n)) + [^String n] (if *mangle-names* (-> n + (.replace \- \_) + ;; Un-mangle logical type names + (.replace "timestamp_millis" "timestamp-millis") + (.replace "timestamp_micros" "timestamp-micros") + (.replace "time_millis" "time-millis") + (.replace "time_micros" "time-micros")))) (defn unmangle "Reverse Clojure->Avro name-mangling when `*mangle-names* is true." diff --git a/src/clojure/abracad/avro/write.clj b/src/clojure/abracad/avro/write.clj index 8c5bcbd..2aa4489 100644 --- a/src/clojure/abracad/avro/write.clj +++ b/src/clojure/abracad/avro/write.clj @@ -5,13 +5,14 @@ [abracad.avro.edn :as edn] [abracad.avro.util :refer [case-expr case-enum mangle unmangle field-keyword]]) - (:import [java.util Collection Map List] + (:import [java.util Collection Map] [java.nio ByteBuffer] [clojure.lang Named Sequential IRecord Indexed] [org.apache.avro Schema Schema$Field Schema$Type AvroTypeException] [org.apache.avro.io Encoder] - [org.apache.avro.generic GenericRecord] - [abracad.avro ClojureDatumWriter ArrayAccessor])) + [org.apache.avro.generic GenericRecord GenericFixed] + [abracad.avro ClojureDatumWriter ArrayAccessor ClojureData] + (org.apache.avro.util Utf8))) (def ^:const edn-element "abracad.avro.edn.Element") @@ -71,6 +72,14 @@ record serialization." (emit-fixed [^ByteBuffer bytes ^Encoder encoder] (.writeFixed encoder bytes))) +(extend-type GenericFixed + HandleBytes + (count-bytes [^GenericFixed fixed] (alength (.bytes fixed))) + (emit-bytes [^GenericFixed fixed ^Encoder encoder] + (.writeBytes encoder (.bytes fixed))) + (emit-fixed [^GenericFixed fixed ^Encoder encoder] + (.writeFixed encoder (.bytes fixed)))) + (defn schema-error! [^Schema schema datum] (throw (ex-info "Cannot write datum as schema" @@ -200,7 +209,7 @@ record serialization." GenericRecord (schema-name [this] (-> this .getSchema .getFullName)) (field-get [this field] (.get this (name field))) - (filed-list [this] (->> this .getSchema .getFields (map field-name) set)) + (field-list [this] (->> this .getSchema .getFields (map field-name) set)) Object (schema-name [this] (schema-name-type this)) @@ -220,26 +229,36 @@ record serialization." (and (named? datum) (.hasEnumSymbol schema (-> datum name mangle)))) (defn avro-bytes? - [^Schema schema datum] + [datum] (or (instance? bytes-class datum) (instance? ByteBuffer datum))) +(defn avro-string? [datum] + (or (string? datum) + (instance? Utf8 datum) + (instance? CharSequence datum))) + (defn avro-fixed? [^Schema schema datum] - (and (avro-bytes? schema datum) + (and (avro-bytes? datum) (= (.getFixedSize schema) (count-bytes datum)))) +(defn- keyword-type? [^Schema schema] + (= "keyword" (.getProp schema ClojureData/CLOJURE_TYPE_PROP))) + (defn schema-match? [^Schema schema datum] (case-enum (.getType schema) Schema$Type/RECORD (avro-record? schema datum) Schema$Type/ENUM (avro-enum? schema datum) Schema$Type/FIXED (avro-fixed? schema datum) - Schema$Type/BYTES (avro-bytes? schema datum) + Schema$Type/BYTES (avro-bytes? datum) Schema$Type/LONG (integer? datum) Schema$Type/INT (integer? datum) Schema$Type/DOUBLE (float? datum) Schema$Type/FLOAT (float? datum) + Schema$Type/STRING (if (keyword-type? schema) (keyword? datum) + (avro-string? datum)) #_ else false)) (defn resolve-union* diff --git a/src/java/abracad/avro/ArrayAccessor.java b/src/java/abracad/avro/ArrayAccessor.java index dbd2e3a..a8db507 100644 --- a/src/java/abracad/avro/ArrayAccessor.java +++ b/src/java/abracad/avro/ArrayAccessor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,241 +26,224 @@ public class ArrayAccessor { -public static void -writeArray(Object data, Encoder out) throws IOException { - if (data instanceof boolean[]) { - writeArray((boolean[]) data, out); - } else if (data instanceof short[]) { - writeArray((short[]) data, out); - } else if (data instanceof char[]) { - writeArray((char[]) data, out); - } else if (data instanceof int[]) { - writeArray((int[]) data, out); - } else if (data instanceof long[]) { - writeArray((long[]) data, out); - } else if (data instanceof float[]) { - writeArray((float[]) data, out); - } else if (data instanceof double[]) { - writeArray((double[]) data, out); + public static void writeArray(Object data, Encoder out) throws IOException { + if (data instanceof boolean[]) { + writeArray((boolean[]) data, out); + } else if (data instanceof short[]) { + writeArray((short[]) data, out); + } else if (data instanceof char[]) { + writeArray((char[]) data, out); + } else if (data instanceof int[]) { + writeArray((int[]) data, out); + } else if (data instanceof long[]) { + writeArray((long[]) data, out); + } else if (data instanceof float[]) { + writeArray((float[]) data, out); + } else if (data instanceof double[]) { + writeArray((double[]) data, out); + } } -} -public static void -writeArray(boolean[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeBoolean(data[i]); + public static void writeArray(boolean[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeBoolean(data[i]); + } } -} - -// short, and char arrays are upcast to avro int -public static void -writeArray(short[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeInt(data[i]); + // short, and char arrays are upcast to avro int + public static void writeArray(short[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeInt(data[i]); + } } -} -public static void -writeArray(char[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeInt(data[i]); + public static void writeArray(char[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeInt(data[i]); + } } -} -public static void -writeArray(int[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeInt(data[i]); + public static void writeArray(int[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeInt(data[i]); + } } -} -public static void -writeArray(long[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeLong(data[i]); + public static void writeArray(long[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeLong(data[i]); + } } -} -public static void -writeArray(float[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeFloat(data[i]); + public static void writeArray(float[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeFloat(data[i]); + } } -} -public static void -writeArray(double[] data, Encoder out) throws IOException { - int size = data.length; - out.setItemCount(size); - for (int i = 0; i < size; i++) { - out.startItem(); - out.writeDouble(data[i]); + public static void writeArray(double[] data, Encoder out) throws IOException { + int size = data.length; + out.setItemCount(size); + for (int i = 0; i < size; i++) { + out.startItem(); + out.writeDouble(data[i]); + } } -} -public static Object -readArray(Object array, Class elementType, long l, ResolvingDecoder in) - throws IOException { - if (elementType == int.class) - return readArray((int[]) array, l, in); - if (elementType == long.class) - return readArray((long[]) array, l, in); - if (elementType == float.class) - return readArray((float[]) array, l, in); - if (elementType == double.class) - return readArray((double[]) array, l, in); - if (elementType == boolean.class) - return readArray((boolean[]) array, l, in); - if (elementType == char.class) - return readArray((char[]) array, l, in); - if (elementType == short.class) - return readArray((short[]) array, l, in); - return null; -} + public static Object readArray(Object array, Class elementType, long l, ResolvingDecoder in) + throws IOException { + if (elementType == int.class) + return readArray((int[]) array, l, in); + if (elementType == long.class) + return readArray((long[]) array, l, in); + if (elementType == float.class) + return readArray((float[]) array, l, in); + if (elementType == double.class) + return readArray((double[]) array, l, in); + if (elementType == boolean.class) + return readArray((boolean[]) array, l, in); + if (elementType == char.class) + return readArray((char[]) array, l, in); + if (elementType == short.class) + return readArray((short[]) array, l, in); + return null; + } -public static boolean[] -readArray(boolean[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static boolean[] readArray(boolean[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = in.readBoolean(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = in.readBoolean(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static int[] -readArray(int[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static int[] readArray(int[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = in.readInt(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = in.readInt(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static short[] -readArray(short[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static short[] readArray(short[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = (short) in.readInt(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = (short) in.readInt(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static char[] -readArray(char[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static char[] readArray(char[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = (char) in.readInt(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = (char) in.readInt(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static long[] -readArray(long[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static long[] readArray(long[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = in.readLong(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = in.readLong(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static float[] -readArray(float[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static float[] readArray(float[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = in.readFloat(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = in.readFloat(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} -public static double[] -readArray(double[] array, long l, ResolvingDecoder in) - throws IOException { - int index = 0; - while (l > 0) { - int limit = index + (int) l; - if (array.length < limit) { - array = Arrays.copyOf(array, limit); + public static double[] readArray(double[] array, long l, ResolvingDecoder in) + throws IOException { + int index = 0; + while (l > 0) { + int limit = index + (int) l; + if (array.length < limit) { + array = Arrays.copyOf(array, limit); + } + while (index < limit) { + array[index] = in.readDouble(); + index++; + } + l = in.arrayNext(); } - while (index < limit) { - array[index] = in.readDouble(); - index++; - } - l = in.arrayNext(); + return array; } - return array; -} } diff --git a/src/java/abracad/avro/ClojureData.java b/src/java/abracad/avro/ClojureData.java index 7084789..10b8aa5 100644 --- a/src/java/abracad/avro/ClojureData.java +++ b/src/java/abracad/avro/ClojureData.java @@ -1,7 +1,9 @@ package abracad.avro; -import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.avro.Conversion; import org.apache.avro.Schema; import org.apache.avro.reflect.ReflectData; import org.apache.avro.io.DatumReader; @@ -14,58 +16,52 @@ public class ClojureData extends ReflectData { -private static class Vars { - private static final String NS = "abracad.avro.compare"; - private static final Var compare = RT.var(NS, "compare"); - static { - RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); - } -} + public static String CLOJURE_TYPE_PROP = "clojureType"; -private static final ClojureData INSTANCE = new ClojureData(); + private static class Vars { + private static final String NS = "abracad.avro.compare"; + private static final Var compare = RT.var(NS, "compare"); -public -ClojureData() { - super(); -} + static { + RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); + } + } -public -ClojureData(ClassLoader classLoader) { - super(classLoader); -} + private static final ClojureData NO_CONVERSIONS = new ClojureData(Collections.emptyList()); -public static ClojureData -get() { - return INSTANCE; -} + public ClojureData(List> conversions) { + super(); + for (Conversion conversion : conversions) { + addLogicalTypeConversion(conversion); + } + } -@Override -public DatumReader -createDatumReader(Schema schema) { - return new ClojureDatumReader(schema, schema); -} + public static ClojureData withoutConversions() { + return NO_CONVERSIONS; + } -@Override -public DatumReader -createDatumReader(Schema writer, Schema reader) { - return new ClojureDatumReader(writer, reader); -} + @Override + public DatumReader createDatumReader(Schema schema) { + return createDatumReader(schema, schema); + } -@Override -public DatumWriter -createDatumWriter(Schema schema) { - return new ClojureDatumWriter(schema); -} + @Override + public DatumReader createDatumReader(Schema writer, Schema reader) { + return new ClojureDatumReader(writer, reader, this); + } -@Override -public int -compare(Object o1, Object o2, Schema s, boolean equals) { - return (int) ((IFn.OOOOL) Vars.compare.get()).invokePrim(o1, o2, s, equals); -} + @Override + public DatumWriter createDatumWriter(Schema schema) { + return new ClojureDatumWriter(schema, this); + } -public int -_supercompare(Object o1, Object o2, Schema s, boolean equals) { - return super.compare(o1, o2, s, equals); -} + @Override + public int compare(Object o1, Object o2, Schema s, boolean equals) { + return (int) ((IFn.OOOOL) Vars.compare.get()).invokePrim(o1, o2, s, equals); + } + + public int _supercompare(Object o1, Object o2, Schema s, boolean equals) { + return super.compare(o1, o2, s, equals); + } } diff --git a/src/java/abracad/avro/ClojureDatumReader.java b/src/java/abracad/avro/ClojureDatumReader.java index 16d55d3..503cbcb 100644 --- a/src/java/abracad/avro/ClojureDatumReader.java +++ b/src/java/abracad/avro/ClojureDatumReader.java @@ -1,7 +1,9 @@ package abracad.avro; import java.io.IOException; +import java.nio.ByteBuffer; +import clojure.lang.Keyword; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.Decoder; @@ -13,90 +15,78 @@ public class ClojureDatumReader extends GenericDatumReader { -private static class Vars { - private static final String NS = "abracad.avro.read"; + private static class Vars { + private static final String NS = "abracad.avro.read"; - private static final Var readRecord = RT.var(NS, "read-record"); - private static final Var readEnum = RT.var(NS, "read-enum"); - private static final Var readArray = RT.var(NS, "read-array"); - private static final Var readMap = RT.var(NS, "read-map"); - private static final Var readFixed = RT.var(NS, "read-fixed"); - private static final Var readBytes = RT.var(NS, "read-bytes"); + private static final Var readRecord = RT.var(NS, "read-record"); + private static final Var readEnum = RT.var(NS, "read-enum"); + private static final Var readArray = RT.var(NS, "read-array"); + private static final Var readMap = RT.var(NS, "read-map"); + private static final Var readFixed = RT.var(NS, "read-fixed"); + private static final Var readBytes = RT.var(NS, "read-bytes"); - static { - RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); + static { + RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); + } } -} - -public -ClojureDatumReader() { - super(null, null); -} - -public -ClojureDatumReader(Schema schema) { - super(schema, schema); -} - -public -ClojureDatumReader(Schema writer, Schema reader) { - super(writer, reader); -} -@Override -public Object -read(Object old, Schema expected, ResolvingDecoder in) - throws IOException { - return super.read(old, expected, in); -} + public ClojureDatumReader(Schema writer, Schema reader, ClojureData data) { + super(writer, reader, data); + } -@Override -protected Object -readRecord(Object old, Schema expected, ResolvingDecoder in) - throws IOException { - return Vars.readRecord.invoke(this, expected, in); -} + @Override + public Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException { + return super.read(old, expected, in); + } -@Override -protected Object -readEnum(Schema expected, Decoder in) throws IOException { - return Vars.readEnum.invoke(this, expected, in); -} + @Override + protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) { + return Vars.readRecord.invoke(this, expected, in); + } -@Override -protected Object -readArray(Object old, Schema expected, ResolvingDecoder in) - throws IOException { - return Vars.readArray.invoke(this, expected, in); -} + @Override + protected Object readEnum(Schema expected, Decoder in) { + return Vars.readEnum.invoke(this, expected, in); + } -@Override -protected Object -readMap(Object old, Schema expected, ResolvingDecoder in) - throws IOException { - return Vars.readMap.invoke(this, expected, in); -} + @Override + protected Object readArray(Object old, Schema expected, ResolvingDecoder in) { + return Vars.readArray.invoke(this, expected, in); + } -@Override -protected Object -readString(Object old, Schema expected, Decoder in) - throws IOException { - return in.readString(); -} + @Override + protected Object readMap(Object old, Schema expected, ResolvingDecoder in) { + return Vars.readMap.invoke(this, expected, in); + } + @Override + protected Object readString(Object old, Schema expected, Decoder in) throws IOException { + String stringValue = in.readString(); + if ("keyword".equals(expected.getProp(ClojureData.CLOJURE_TYPE_PROP))) { + return Keyword.intern(stringValue); + } else { + return stringValue; + } + } -@Override -protected Object -readFixed(Object old, Schema expected, Decoder in) - throws IOException { - return Vars.readFixed.invoke(this, expected, in); -} + @Override + protected Object readFixed(Object old, Schema expected, Decoder in) { + Object bytes = Vars.readFixed.invoke(this, expected, in); + if (expected.getLogicalType() != null) { + // Logical type conversion expects generic fixed + return getData().createFixed(old, (byte[]) bytes, expected); + } + return bytes; + } -@Override -protected Object -readBytes(Object old, Schema expected, Decoder in) - throws IOException { - return Vars.readBytes.invoke(this, expected, in); -} + @Override + protected Object readBytes(Object old, Schema expected, Decoder in) { + Object bytes = Vars.readBytes.invoke(this, expected, in); + if (expected.getLogicalType() != null) { + // Logical type conversions expect byte buffers + return ByteBuffer.wrap((byte[]) bytes); + } + return bytes; + } } diff --git a/src/java/abracad/avro/ClojureDatumWriter.java b/src/java/abracad/avro/ClojureDatumWriter.java index edcc3d3..491fa77 100644 --- a/src/java/abracad/avro/ClojureDatumWriter.java +++ b/src/java/abracad/avro/ClojureDatumWriter.java @@ -2,8 +2,8 @@ import java.io.IOException; +import clojure.lang.Keyword; import org.apache.avro.Schema; -import org.apache.avro.UnresolvedUnionException; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.Encoder; @@ -13,89 +13,93 @@ public class ClojureDatumWriter extends GenericDatumWriter { -private static class Vars { - private static final String NS = "abracad.avro.write"; + private static class Vars { + private static final String NS = "abracad.avro.write"; - private static final Var writeRecord = RT.var(NS, "write-record"); - private static final Var writeEnum = RT.var(NS, "write-enum"); - private static final Var writeArray = RT.var(NS, "write-array"); - private static final Var resolveUnion = RT.var(NS, "resolve-union"); - private static final Var writeBytes = RT.var(NS, "write-bytes"); - private static final Var writeFixed = RT.var(NS, "write-fixed"); + private static final Var writeRecord = RT.var(NS, "write-record"); + private static final Var writeEnum = RT.var(NS, "write-enum"); + private static final Var writeArray = RT.var(NS, "write-array"); + private static final Var resolveUnion = RT.var(NS, "resolve-union"); + private static final Var writeBytes = RT.var(NS, "write-bytes"); + private static final Var writeFixed = RT.var(NS, "write-fixed"); - static { - RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); + static { + RT.var("clojure.core", "require").invoke(Symbol.intern(NS)); + } } -} -public -ClojureDatumWriter() { - super(); -} + public ClojureDatumWriter(Schema schema, ClojureData data) { + super(schema, data); + } -public -ClojureDatumWriter(Schema schema) { - super(schema); -} + @Override + public void write(Schema schema, Object datum, Encoder out) throws IOException { + Object datumCast = castDatum(schema, datum); + super.write(schema, datumCast, out); + } -@Override -public void -write(Schema schema, Object datum, Encoder out) throws IOException { - try { - switch (schema.getType()) { - case INT: out.writeInt(RT.intCast(datum)); break; - case LONG: out.writeLong(RT.longCast(datum)); break; - case FLOAT: out.writeFloat(RT.floatCast(datum)); break; - case DOUBLE: out.writeDouble(RT.doubleCast(datum)); break; - case BOOLEAN: out.writeBoolean(RT.booleanCast(datum)); break; - default: super.write(schema, datum, out); break; + private Object castDatum(Schema schema, Object datum) { + if (schema.getLogicalType() != null) { + return datum; + } else { + switch (schema.getType()) { + case INT: + return RT.intCast(datum); + case LONG: + return RT.longCast(datum); + case FLOAT: + return RT.floatCast(datum); + case DOUBLE: + return RT.doubleCast(datum); + case BOOLEAN: + return RT.booleanCast(datum); + default: + return datum; + } } - } catch (NullPointerException e) { - throw super.npe(e, " of " + schema.getFullName()); } -} -@Override -protected void -writeRecord(Schema schema, Object datum, Encoder out) - throws IOException { - Vars.writeRecord.invoke(this, schema, datum, out); -} + @Override + protected void writeRecord(Schema schema, Object datum, Encoder out) { + Vars.writeRecord.invoke(this, schema, datum, out); + } -@Override -protected void -writeEnum(Schema schema, Object datum, Encoder out) - throws IOException { - Vars.writeEnum.invoke(this, schema, datum, out); -} + @Override + protected void writeEnum(Schema schema, Object datum, Encoder out) { + Vars.writeEnum.invoke(this, schema, datum, out); + } -@Override -protected void -writeArray(Schema schema, Object datum, Encoder out) - throws IOException { - Vars.writeArray.invoke(this, schema, datum, out); -} + @Override + protected void writeArray(Schema schema, Object datum, Encoder out) { + Vars.writeArray.invoke(this, schema, datum, out); + } -@Override -protected int -resolveUnion(Schema union, Object datum) { - Object i = Vars.resolveUnion.invoke(this, union, datum); - if (i == null) throw new UnresolvedUnionException(union, datum); - return RT.intCast(i); -} + @Override + protected int resolveUnion(Schema union, Object datum) { + Object i = Vars.resolveUnion.invoke(this, union, datum); + if (i == null) { + // Logical type cases will not be resolved by the underlying clojure implementation + return super.resolveUnion(union, datum); + } + return RT.intCast(i); + } -@Override -protected void -writeBytes(Object datum, Encoder out) - throws IOException { - Vars.writeBytes.invoke(this, datum, out); -} + @Override + protected void writeBytes(Object datum, Encoder out) { + Vars.writeBytes.invoke(this, datum, out); + } -@Override -protected void -writeFixed(Schema schema, Object datum, Encoder out) - throws IOException { - Vars.writeFixed.invoke(this, schema, datum, out); -} + @Override + protected void writeFixed(Schema schema, Object datum, Encoder out) { + Vars.writeFixed.invoke(this, schema, datum, out); + } + @Override + protected void writeString(Schema schema, Object datum, Encoder out) throws IOException { + if ("keyword".equals(schema.getProp(ClojureData.CLOJURE_TYPE_PROP))) { + super.writeString(schema, ((Keyword) datum).getName(), out); + } else { + super.writeString(schema, datum, out); + } + } } diff --git a/src/java/abracad/avro/Java8LogicalTypes.java b/src/java/abracad/avro/Java8LogicalTypes.java new file mode 100644 index 0000000..ed79f27 --- /dev/null +++ b/src/java/abracad/avro/Java8LogicalTypes.java @@ -0,0 +1,214 @@ +package abracad.avro; + +import org.apache.avro.*; +import org.apache.avro.generic.GenericFixed; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.concurrent.TimeUnit; + +public class Java8LogicalTypes { + + private Java8LogicalTypes() {} + + public static class DateConversion extends Conversion { + + @Override + public Class getConvertedType() { + return LocalDate.class; + } + + @Override + public String getLogicalTypeName() { + return "date"; + } + + @Override + public LocalDate fromInt(Integer daysFromEpoch, Schema schema, LogicalType type) { + return LocalDate.ofEpochDay(daysFromEpoch); + } + + @Override + public Integer toInt(LocalDate date, Schema schema, LogicalType type) { + return Math.toIntExact(date.toEpochDay()); + } + + @Override + public Schema getRecommendedSchema() { + return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + } + } + + public static class TimeMillisConversion extends Conversion { + @Override + public Class getConvertedType() { + return LocalTime.class; + } + + @Override + public String getLogicalTypeName() { + return "time-millis"; + } + + @Override + public LocalTime fromInt(Integer millisFromMidnight, Schema schema, LogicalType type) { + return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight)); + } + + @Override + public Integer toInt(LocalTime time, Schema schema, LogicalType type) { + return Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay())); + } + + @Override + public Schema getRecommendedSchema() { + return LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)); + } + } + + public static class TimeMicrosConversion extends Conversion { + @Override + public Class getConvertedType() { + return LocalTime.class; + } + + @Override + public String getLogicalTypeName() { + return "time-micros"; + } + + @Override + public LocalTime fromLong(Long microsFromMidnight, Schema schema, LogicalType type) { + return LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(microsFromMidnight)); + } + + @Override + public Long toLong(LocalTime time, Schema schema, LogicalType type) { + return TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay()); + } + + @Override + public Schema getRecommendedSchema() { + return LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); + } + } + + public static class TimestampMillisConversion extends Conversion { + @Override + public Class getConvertedType() { + return Instant.class; + } + + @Override + public String getLogicalTypeName() { + return "timestamp-millis"; + } + + @Override + public Instant fromLong(Long millisFromEpoch, Schema schema, LogicalType type) { + return Instant.ofEpochMilli(millisFromEpoch); + } + + @Override + public Long toLong(Instant timestamp, Schema schema, LogicalType type) { + return timestamp.toEpochMilli(); + } + + @Override + public Schema getRecommendedSchema() { + return LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + } + } + + public static class TimestampMicrosConversion extends Conversion { + @Override + public Class getConvertedType() { + return Instant.class; + } + + @Override + public String getLogicalTypeName() { + return "timestamp-micros"; + } + + @Override + public Instant fromLong(Long microsFromEpoch, Schema schema, LogicalType type) { + long epochSeconds = microsFromEpoch / (1_000_000); + long nanoAdjustment = (microsFromEpoch % (1_000_000)) * 1_000; + + return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + } + + @Override + public Long toLong(Instant instant, Schema schema, LogicalType type) { + long seconds = instant.getEpochSecond(); + int nanos = instant.getNano(); + + if (seconds < 0 && nanos > 0) { + long micros = Math.multiplyExact(seconds + 1, 1_000_000); + long adjustment = (nanos / 1_000L) - 1_000_000; + + return Math.addExact(micros, adjustment); + } else { + long micros = Math.multiplyExact(seconds, 1_000_000); + + return Math.addExact(micros, nanos / 1_000); + } + } + + @Override + public Schema getRecommendedSchema() { + return LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + } + } + + public static class RoundingDecimalConversion extends Conversion { + private static final Conversion DELEGATE = new Conversions.DecimalConversion(); + private final RoundingMode roundingMode; + + public RoundingDecimalConversion(RoundingMode roundingMode) { + this.roundingMode = roundingMode; + } + + @Override + public Class getConvertedType() { + return DELEGATE.getConvertedType(); + } + + @Override + public String getLogicalTypeName() { + return DELEGATE.getLogicalTypeName(); + } + + @Override + public Schema getRecommendedSchema() { + return DELEGATE.getRecommendedSchema(); + } + + @Override + public BigDecimal fromFixed(GenericFixed value, Schema schema, LogicalType type) { + return DELEGATE.fromFixed(value, schema, type); + } + + @Override + public BigDecimal fromBytes(ByteBuffer value, Schema schema, LogicalType type) { + return DELEGATE.fromBytes(value, schema, type); + } + + @Override + public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType type) { + int scale = ((LogicalTypes.Decimal) type).getScale(); + return DELEGATE.toFixed(value.setScale(scale, roundingMode), schema, type); + } + + @Override + public ByteBuffer toBytes(BigDecimal value, Schema schema, LogicalType type) { + int scale = ((LogicalTypes.Decimal) type).getScale(); + return DELEGATE.toBytes(value.setScale(scale, roundingMode), schema, type); + } + } +} diff --git a/test/abracad/avro/model_test.clj b/test/abracad/avro/model_test.clj index 4aa886c..4bc64fb 100644 --- a/test/abracad/avro/model_test.clj +++ b/test/abracad/avro/model_test.clj @@ -1,12 +1,11 @@ (ns abracad.avro.model-test (:require [clojure.test :refer :all] [abracad.avro :as avro]) - (:import [org.apache.avro Schema] - [abracad.avro ClojureData])) + (:import [org.apache.avro Schema])) (defn ac-compare [x y ^Schema schema] - (.compare (ClojureData/get) x y schema)) + (avro/compare schema x y )) (deftest test-compare-bytes (let [schema (avro/parse-schema :bytes) diff --git a/test/abracad/avro_test.clj b/test/abracad/avro_test.clj index df56ebe..800c377 100644 --- a/test/abracad/avro_test.clj +++ b/test/abracad/avro_test.clj @@ -1,11 +1,15 @@ (ns abracad.avro-test (:require [clojure.test :refer :all] [abracad.avro :as avro] - [clojure.java.io :as io]) - (:import [java.io ByteArrayOutputStream FileInputStream] + [clojure.java.io :as io] + [abracad.avro.conversion :as c]) + (:import [java.io FileInputStream] [java.net InetAddress] + [java.time LocalDate LocalTime Instant] [org.apache.avro SchemaParseException] - [clojure.lang ExceptionInfo])) + [clojure.lang ExceptionInfo] + (java.util UUID) + (java.time.temporal ChronoUnit))) (defn roundtrip-binary [schema & records] @@ -34,7 +38,7 @@ (field-get [this field] (case field :address (.getAddress this))) - (field-list [this] #{:address})) + (field-list [_] #{:address})) (defn ->InetAddress [address] (InetAddress/getByAddress address)) @@ -104,6 +108,91 @@ (is (roundtrips? schema [false] [false])) (is (roundtrips? schema [false] [nil])))) +(deftest test-date + (let [schema (avro/parse-schema {:type 'int :logicalType :date}) + epoch (LocalDate/of 1970 1 1) + today (LocalDate/now) + before-epoch (LocalDate/of 1969 12 31) + max-date (LocalDate/of 5881580 7 11) ;; Date corresponding to MAX_VALUE days since epoch + after-max (LocalDate/of 5881580 7 12) + min-date (LocalDate/of -5877641 6 23) ;; Date corresponding to MIN_VALUE days before epoch + before-min (LocalDate/of -5877641 6 22)] + (is (roundtrips? schema [epoch])) + (is (roundtrips? schema [today])) + (is (roundtrips? schema [before-epoch])) + (is (roundtrips? schema [max-date])) + (is (roundtrips? schema [min-date])) + (is (thrown? ArithmeticException (roundtrips? schema [after-max]))) + (is (thrown? ArithmeticException (roundtrips? schema [before-min]))) + (testing "An array of dates roundtrips with logical types" + (let [array-schema (avro/parse-schema {:type :array :items {:type 'int :logicalType :date}})] + (is (roundtrips? array-schema [[epoch today max-date]])))))) + +(deftest test-date-with-logical-types-off + (binding [abracad.avro.conversion/*use-logical-types* false] + (let [schema (avro/parse-schema {:type 'int :logicalType :date}) + today (LocalDate/now)] + (testing "Underlying primitive still roundtrips" + (is (roundtrips? schema [10]))) + (testing "Logical type fails" + (is (thrown? ClassCastException (roundtrips? schema [today]))))))) + +(deftest test-time + (let [schema (avro/parse-schema {:type 'int :logicalType :time-millis}) + midnight LocalTime/MIDNIGHT + now (LocalTime/now) + one-milli-before-midnight (.minus midnight 1 ChronoUnit/MILLIS)] + (is (roundtrips? schema [midnight])) + (is (roundtrips? schema [now])) + (is (roundtrips? schema [one-milli-before-midnight])))) + +(deftest test-timestamp-millis + (let [schema (avro/parse-schema {:type 'long :logicalType :timestamp-millis}) + epoch Instant/EPOCH + now (Instant/now) + before-epoch (Instant/ofEpochMilli -1) + max-time (Instant/ofEpochMilli Long/MAX_VALUE) + after-max (.plusMillis max-time 1) + min-time (Instant/ofEpochMilli Long/MIN_VALUE) + before-min (.minusMillis min-time 1)] + (is (roundtrips? schema [epoch])) + (is (roundtrips? schema [now])) + (is (roundtrips? schema [before-epoch])) + (is (roundtrips? schema [max-time])) + (is (roundtrips? schema [min-time])) + (is (thrown? ArithmeticException (roundtrips? schema [after-max]))) + (is (thrown? ArithmeticException (roundtrips? schema [before-min]))))) + +(deftest test-decimal + (let [schema (avro/parse-schema {:type :bytes :logicalType :decimal :scale 6 :precision 12}) + fixed-schema (avro/parse-schema {:type :fixed :name :foo :size 10 :logicalType :decimal :scale 6 :precision 12})] + (is (roundtrips? schema [(.setScale 5M 6)])) + (is (roundtrips? fixed-schema [(.setScale 5M 6)])) + (is (roundtrips? schema [5.12345M])) ;; Scale too small + (is (roundtrips? fixed-schema [5.12345M])) + (is (thrown? ArithmeticException (roundtrips? schema [5.123456789M]))) ;; Scale too big + (is (thrown? ArithmeticException (roundtrips? fixed-schema [5.123456789M]))) + (is (roundtrips? schema [(bigdec 1234567890123.123456)])) + (is (roundtrips? fixed-schema [(bigdec 1234567890123.123456)])))) + +(deftest test-decimal-with-rounding + (with-precision 8 :rounding HALF_UP + (let [schema (avro/parse-schema {:type :bytes :logicalType :decimal :scale 6 :precision 8}) + fixed-schema (avro/parse-schema {:type :fixed :name :foo :size 10 :logicalType :decimal :scale 6 :precision 8})] + (is (roundtrips? schema [5M])) + (is (roundtrips? fixed-schema [5M])) + (is (roundtrips? schema [5.12345M])) + (is (roundtrips? fixed-schema [5.12345M])) + (is (roundtrips? schema [5.123457M 5.123456M] [5.1234565M 5.12345649M])) ;; Rounded [half up, down] + (is (roundtrips? fixed-schema [5.123457M 5.123456M] [5.1234565M 5.12345649M]))))) ;; Rounded [half up, down] + +(deftest test-uuid + (let [schema (avro/parse-schema {:type 'string :logicalType :uuid}) + uuid (UUID/randomUUID) + stringUUID "a7b168ce-d4ff-49a2-a7a5-e65ac06dbe67"] + (is (roundtrips? schema [uuid])) + (is (roundtrips? schema [(UUID/fromString stringUUID)] [stringUUID])))) + (deftest test-union (let [vertical {:type :enum, :name "vertical", :symbols [:up :down]} horizontal (avro/parse-schema @@ -275,3 +364,34 @@ (avro/mspit schema path records) (with-open [dfs (avro/data-file-stream (FileInputStream. path))] (is (= records (seq dfs)))))) + +(deftest test-keyword-string + (let [schema (avro/parse-schema {:type 'string :clojureType :keyword})] + (is (roundtrips? schema [:foo])) + (is (roundtrips? schema [:bar])))) + +(deftest test-with-logical-types + (let [schema (avro/parse-schema + {:type :record + :name :Person + :namespace "com.test" + :fields [{:name :message-timestamp :type {:type 'long :logicalType :timestamp-millis}} + {:name :firstName :type 'string} + {:name :lastName :type 'string} + {:name :dateOfBirth :type {:type 'int :logicalType :date}} + {:name :height :type {:type :bytes :logicalType :decimal :scale 2 :precision 12}} + {:name :candles :type [{:type 'int} + {:type 'string :clojureType :keyword}]}]}) + records [{:message-timestamp (Instant/now) + :firstName "Ronnie", + :lastName "Corbet", + :dateOfBirth (LocalDate/of 1930 12 4) + :height 1.55M + :candles 4} + {:message-timestamp (Instant/ofEpochMilli 1234567890) + :firstName "Ronnie", + :lastName "Barker", + :dateOfBirth (LocalDate/of 1929 9 25) + :height 1.72M + :candles :fork}]] + (is (roundtrips? schema records))))