diff --git a/README.md b/README.md index 28752fd..5cb5cda 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,33 @@ schemas. ;;=> {:foo [bar "baz" 1337]} ``` +### Logical Types + +Abracad supports [avro logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types). +Out of the box it supports: + +* `timestamp-millis` as `java.time.Instant` +* `time-millis` as `java.time.LocalTime` +* `date` as `java.time.LocalDate` +* `uuid` as `java.util.UUID` +* `decimal` as `java.math.BigDecimal`. The `*math-context*` dynamic var will be used to set the rounding mode +of bigdecimals and they will be scaled to the correct scale for your schema. If no math context is set then +`RoundingMode/UNNECESSARY` will be used. + +You can also automatically (de)serialise strings as keywords using `{:type :string :clojureType :keyword}`. + +if you prefer you can turn off logical types by setting the dynamic var `abracad.avro.conversion/*use-logical-types*` to `false`. + +```clojure + +(def logical-type-reader + (abracad.avro/datum-reader schema)) + +(def standard-reader + (binding [abracad.avro.conversion/*use-logical-types* false] + (abracad.avro/datum-reader schema))) +``` + ### Hadoop MapReduce integration Avro 1.7.5 and later supports configurable “data models” for datum diff --git a/project.clj b/project.clj index 474016b..48f5fe0 100644 --- a/project.clj +++ b/project.clj @@ -8,7 +8,7 @@ :global-vars {*warn-on-reflection* true} :source-paths ["src/clojure"] :java-source-paths ["src/java"] - :javac-options ["-target" "1.7" "-source" "1.7"] + :javac-options ["-target" "1.8" "-source" "1.8"] :dependencies [[org.clojure/clojure "1.8.0"] [org.apache.avro/avro "1.8.0"] [cheshire/cheshire "5.6.1"]] diff --git a/src/clojure/abracad/avro.clj b/src/clojure/abracad/avro.clj index f69074a..aec5e4e 100644 --- a/src/clojure/abracad/avro.clj +++ b/src/clojure/abracad/avro.clj @@ -4,12 +4,13 @@ (:require [clojure.java.io :as io] [clojure.walk :refer [postwalk]] [cheshire.core :as json] - [abracad.avro.util :refer [returning mangle unmangle coerce]]) + [abracad.avro.util :refer [returning mangle unmangle coerce]] + [abracad.avro.conversion :as c]) (:import [java.io - ByteArrayInputStream ByteArrayOutputStream EOFException - File FileInputStream InputStream OutputStream] + ByteArrayOutputStream EOFException + File InputStream OutputStream] [clojure.lang Named] - [org.apache.avro Schema Schema$Parser Schema$Type] + [org.apache.avro Schema Schema$Parser] [org.apache.avro.file CodecFactory DataFileWriter DataFileReader DataFileStream SeekableInput SeekableFileInput SeekableByteArrayInput] @@ -130,30 +131,29 @@ but the first `n` fields when sorting." (defn datum-reader "Return an Avro DatumReader which produces Clojure data structures." {:tag `ClojureDatumReader} - ([] (ClojureDatumReader.)) - ([schema] - (ClojureDatumReader. - (if-not (nil? schema) (parse-schema schema)))) + ([] (datum-reader nil)) + ([schema] (datum-reader schema schema)) ([expected actual] (ClojureDatumReader. - (if-not (nil? expected) (parse-schema expected)) - (if-not (nil? actual) (parse-schema actual))))) + (when-not (nil? expected) (parse-schema expected)) + (when-not (nil? actual) (parse-schema actual)) + (c/create-clojure-data)))) (defn data-file-reader "Return an Avro DataFileReader which produces Clojure data structures." {:tag `DataFileReader} ([source] (data-file-reader nil source)) ([expected source] - (DataFileReader/openReader - (seekable-input source) (datum-reader expected)))) + (DataFileReader/openReader + (seekable-input source) (datum-reader expected)))) (defn data-file-stream "Return an Avro DataFileStream which produces Clojure data structures." {:tag `DataFileStream} ([source] (data-file-stream nil source)) ([expected source] - (DataFileStream. - (io/input-stream source) (datum-reader expected)))) + (DataFileStream. + (io/input-stream source) (datum-reader expected)))) (defmacro ^:private decoder-factory "Invoke static methods of default Avro Decoder factory." @@ -210,10 +210,11 @@ decoded serially from `source`." (defn datum-writer "Return an Avro DatumWriter which consumes Clojure data structures." {:tag `ClojureDatumWriter} - ([] (ClojureDatumWriter.)) + ([] (datum-writer nil)) ([schema] - (ClojureDatumWriter. - (if-not (nil? schema) (parse-schema schema))))) + (ClojureDatumWriter. + (when-not (nil? schema) (parse-schema schema)) + (c/create-clojure-data)))) (defn data-file-writer "Return an Avro DataFileWriter which consumes Clojure data structures." @@ -253,6 +254,7 @@ decoded serially from `source`." (let [schema (parse-schema schema)] (encoder-factory jsonEncoder schema ^OutputStream sink))) + (defn encode "Serially encode each record in `records` to `sink` using `schema`. The `sink` may be an existing Encoder object, or anything on which @@ -284,7 +286,7 @@ via `encode`." "Compare `x` and `y` according to `schema`." [schema x y] (let [schema (parse-schema schema)] - (.compare (ClojureData/get) x y schema))) + (.compare (ClojureData/withoutConversions) x y schema))) (defn spit "Like core `spit`, but emits `content` to `f` as Avro with `schema`." diff --git a/src/clojure/abracad/avro/compare.clj b/src/clojure/abracad/avro/compare.clj index b0524d2..0defa37 100644 --- a/src/clojure/abracad/avro/compare.clj +++ b/src/clojure/abracad/avro/compare.clj @@ -2,8 +2,7 @@ "Generic data comparison implementation." {:private true} (:refer-clojure :exclude [compare]) - (:require [clojure.core :as cc] - [abracad.avro :as avro] + (:require [abracad.avro :as avro] [abracad.avro.write :refer [resolve-union*]] [abracad.avro.util :refer [case-enum if-not-let]]) (:import [org.apache.avro Schema Schema$Field Schema$Field$Order Schema$Type] @@ -48,7 +47,7 @@ (defn supercompare ^long [x y ^Schema schema equals] - (._supercompare (ClojureData/get) x y schema equals)) + (._supercompare (ClojureData/withoutConversions) x y schema equals)) (defn compare ^long [x y ^Schema schema equals] diff --git a/src/clojure/abracad/avro/conversion.clj b/src/clojure/abracad/avro/conversion.clj new file mode 100644 index 0000000..0d9ce6d --- /dev/null +++ b/src/clojure/abracad/avro/conversion.clj @@ -0,0 +1,28 @@ +(ns abracad.avro.conversion + "Logical Type converter implementations" + (:import (org.apache.avro Conversions$UUIDConversion) + (java.math MathContext RoundingMode) + (abracad.avro ClojureData Java8LogicalTypes$RoundingDecimalConversion Java8LogicalTypes$DateConversion Java8LogicalTypes$TimeMillisConversion Java8LogicalTypes$TimeMicrosConversion Java8LogicalTypes$TimestampMillisConversion Java8LogicalTypes$TimestampMicrosConversion))) + +(def ^:dynamic *use-logical-types* + "When true, record field values will be (de)serialised from/to their logical types + e.g. {:type :int :logicalType :date} -> java.time.LocalDate. Default value is `true`." + true) + +(def uuid-conversion (Conversions$UUIDConversion.)) +(def date-conversion (Java8LogicalTypes$DateConversion.)) +(def time-conversion (Java8LogicalTypes$TimeMillisConversion.)) +(def time-micros-conversion (Java8LogicalTypes$TimeMicrosConversion.)) +(def timestamp-conversion (Java8LogicalTypes$TimestampMillisConversion.)) +(def timestamp-micros-conversion (Java8LogicalTypes$TimestampMicrosConversion.)) + +(defn- default-conversions [] + (let [^MathContext context *math-context* + roundingMode (if context (.getRoundingMode context) RoundingMode/UNNECESSARY)] + [uuid-conversion date-conversion time-conversion time-micros-conversion + timestamp-conversion timestamp-micros-conversion (Java8LogicalTypes$RoundingDecimalConversion. roundingMode)])) + +(defn create-clojure-data [] + (if *use-logical-types* + (ClojureData. (default-conversions)) + (ClojureData/withoutConversions))) diff --git a/src/clojure/abracad/avro/edn.clj b/src/clojure/abracad/avro/edn.clj index 10afc8f..2d933c4 100644 --- a/src/clojure/abracad/avro/edn.clj +++ b/src/clojure/abracad/avro/edn.clj @@ -1,10 +1,9 @@ (ns abracad.avro.edn (:require [abracad.avro :as avro] [abracad.avro.util :refer [coerce]]) - (:import [clojure.lang BigInt Cons IMeta IPersistentList IPersistentMap - IPersistentSet IPersistentVector ISeq Keyword PersistentArrayMap - PersistentQueue Ratio Sorted Symbol] - [org.apache.avro Schema])) + (:import [clojure.lang BigInt IPersistentList IPersistentMap + IPersistentSet IPersistentVector ISeq Keyword + PersistentQueue Ratio Sorted Symbol])) (defprotocol EDNAvroSerializable "Protocol for customizing EDN-in-Avro serialization." diff --git a/src/clojure/abracad/avro/util.clj b/src/clojure/abracad/avro/util.clj index 9ecc90a..d4d368f 100644 --- a/src/clojure/abracad/avro/util.clj +++ b/src/clojure/abracad/avro/util.clj @@ -37,7 +37,13 @@ evaluated at macro-expansion time." (defn mangle "Perform Clojure->Avro name-mangling when `*mangle-names*` is true." - [^String n] (if *mangle-names* (.replace n \- \_) n)) + [^String n] (if *mangle-names* (-> n + (.replace \- \_) + ;; Un-mangle logical type names + (.replace "timestamp_millis" "timestamp-millis") + (.replace "timestamp_micros" "timestamp-micros") + (.replace "time_millis" "time-millis") + (.replace "time_micros" "time-micros")))) (defn unmangle "Reverse Clojure->Avro name-mangling when `*mangle-names* is true." diff --git a/src/clojure/abracad/avro/write.clj b/src/clojure/abracad/avro/write.clj index 8c5bcbd..2aa4489 100644 --- a/src/clojure/abracad/avro/write.clj +++ b/src/clojure/abracad/avro/write.clj @@ -5,13 +5,14 @@ [abracad.avro.edn :as edn] [abracad.avro.util :refer [case-expr case-enum mangle unmangle field-keyword]]) - (:import [java.util Collection Map List] + (:import [java.util Collection Map] [java.nio ByteBuffer] [clojure.lang Named Sequential IRecord Indexed] [org.apache.avro Schema Schema$Field Schema$Type AvroTypeException] [org.apache.avro.io Encoder] - [org.apache.avro.generic GenericRecord] - [abracad.avro ClojureDatumWriter ArrayAccessor])) + [org.apache.avro.generic GenericRecord GenericFixed] + [abracad.avro ClojureDatumWriter ArrayAccessor ClojureData] + (org.apache.avro.util Utf8))) (def ^:const edn-element "abracad.avro.edn.Element") @@ -71,6 +72,14 @@ record serialization." (emit-fixed [^ByteBuffer bytes ^Encoder encoder] (.writeFixed encoder bytes))) +(extend-type GenericFixed + HandleBytes + (count-bytes [^GenericFixed fixed] (alength (.bytes fixed))) + (emit-bytes [^GenericFixed fixed ^Encoder encoder] + (.writeBytes encoder (.bytes fixed))) + (emit-fixed [^GenericFixed fixed ^Encoder encoder] + (.writeFixed encoder (.bytes fixed)))) + (defn schema-error! [^Schema schema datum] (throw (ex-info "Cannot write datum as schema" @@ -200,7 +209,7 @@ record serialization." GenericRecord (schema-name [this] (-> this .getSchema .getFullName)) (field-get [this field] (.get this (name field))) - (filed-list [this] (->> this .getSchema .getFields (map field-name) set)) + (field-list [this] (->> this .getSchema .getFields (map field-name) set)) Object (schema-name [this] (schema-name-type this)) @@ -220,26 +229,36 @@ record serialization." (and (named? datum) (.hasEnumSymbol schema (-> datum name mangle)))) (defn avro-bytes? - [^Schema schema datum] + [datum] (or (instance? bytes-class datum) (instance? ByteBuffer datum))) +(defn avro-string? [datum] + (or (string? datum) + (instance? Utf8 datum) + (instance? CharSequence datum))) + (defn avro-fixed? [^Schema schema datum] - (and (avro-bytes? schema datum) + (and (avro-bytes? datum) (= (.getFixedSize schema) (count-bytes datum)))) +(defn- keyword-type? [^Schema schema] + (= "keyword" (.getProp schema ClojureData/CLOJURE_TYPE_PROP))) + (defn schema-match? [^Schema schema datum] (case-enum (.getType schema) Schema$Type/RECORD (avro-record? schema datum) Schema$Type/ENUM (avro-enum? schema datum) Schema$Type/FIXED (avro-fixed? schema datum) - Schema$Type/BYTES (avro-bytes? schema datum) + Schema$Type/BYTES (avro-bytes? datum) Schema$Type/LONG (integer? datum) Schema$Type/INT (integer? datum) Schema$Type/DOUBLE (float? datum) Schema$Type/FLOAT (float? datum) + Schema$Type/STRING (if (keyword-type? schema) (keyword? datum) + (avro-string? datum)) #_ else false)) (defn resolve-union* diff --git a/src/java/abracad/avro/ArrayAccessor.java b/src/java/abracad/avro/ArrayAccessor.java index dbd2e3a..a8db507 100644 --- a/src/java/abracad/avro/ArrayAccessor.java +++ b/src/java/abracad/avro/ArrayAccessor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,241 +26,224 @@
public class ArrayAccessor {
-public static void
-writeArray(Object data, Encoder out) throws IOException {
- if (data instanceof boolean[]) {
- writeArray((boolean[]) data, out);
- } else if (data instanceof short[]) {
- writeArray((short[]) data, out);
- } else if (data instanceof char[]) {
- writeArray((char[]) data, out);
- } else if (data instanceof int[]) {
- writeArray((int[]) data, out);
- } else if (data instanceof long[]) {
- writeArray((long[]) data, out);
- } else if (data instanceof float[]) {
- writeArray((float[]) data, out);
- } else if (data instanceof double[]) {
- writeArray((double[]) data, out);
+ public static void writeArray(Object data, Encoder out) throws IOException {
+ if (data instanceof boolean[]) {
+ writeArray((boolean[]) data, out);
+ } else if (data instanceof short[]) {
+ writeArray((short[]) data, out);
+ } else if (data instanceof char[]) {
+ writeArray((char[]) data, out);
+ } else if (data instanceof int[]) {
+ writeArray((int[]) data, out);
+ } else if (data instanceof long[]) {
+ writeArray((long[]) data, out);
+ } else if (data instanceof float[]) {
+ writeArray((float[]) data, out);
+ } else if (data instanceof double[]) {
+ writeArray((double[]) data, out);
+ }
}
-}
-public static void
-writeArray(boolean[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeBoolean(data[i]);
+ public static void writeArray(boolean[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeBoolean(data[i]);
+ }
}
-}
-
-// short, and char arrays are upcast to avro int
-public static void
-writeArray(short[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeInt(data[i]);
+ // short, and char arrays are upcast to avro int
+ public static void writeArray(short[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeInt(data[i]);
+ }
}
-}
-public static void
-writeArray(char[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeInt(data[i]);
+ public static void writeArray(char[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeInt(data[i]);
+ }
}
-}
-public static void
-writeArray(int[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeInt(data[i]);
+ public static void writeArray(int[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeInt(data[i]);
+ }
}
-}
-public static void
-writeArray(long[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeLong(data[i]);
+ public static void writeArray(long[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeLong(data[i]);
+ }
}
-}
-public static void
-writeArray(float[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeFloat(data[i]);
+ public static void writeArray(float[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeFloat(data[i]);
+ }
}
-}
-public static void
-writeArray(double[] data, Encoder out) throws IOException {
- int size = data.length;
- out.setItemCount(size);
- for (int i = 0; i < size; i++) {
- out.startItem();
- out.writeDouble(data[i]);
+ public static void writeArray(double[] data, Encoder out) throws IOException {
+ int size = data.length;
+ out.setItemCount(size);
+ for (int i = 0; i < size; i++) {
+ out.startItem();
+ out.writeDouble(data[i]);
+ }
}
-}
-public static Object
-readArray(Object array, Class> elementType, long l, ResolvingDecoder in)
- throws IOException {
- if (elementType == int.class)
- return readArray((int[]) array, l, in);
- if (elementType == long.class)
- return readArray((long[]) array, l, in);
- if (elementType == float.class)
- return readArray((float[]) array, l, in);
- if (elementType == double.class)
- return readArray((double[]) array, l, in);
- if (elementType == boolean.class)
- return readArray((boolean[]) array, l, in);
- if (elementType == char.class)
- return readArray((char[]) array, l, in);
- if (elementType == short.class)
- return readArray((short[]) array, l, in);
- return null;
-}
+ public static Object readArray(Object array, Class> elementType, long l, ResolvingDecoder in)
+ throws IOException {
+ if (elementType == int.class)
+ return readArray((int[]) array, l, in);
+ if (elementType == long.class)
+ return readArray((long[]) array, l, in);
+ if (elementType == float.class)
+ return readArray((float[]) array, l, in);
+ if (elementType == double.class)
+ return readArray((double[]) array, l, in);
+ if (elementType == boolean.class)
+ return readArray((boolean[]) array, l, in);
+ if (elementType == char.class)
+ return readArray((char[]) array, l, in);
+ if (elementType == short.class)
+ return readArray((short[]) array, l, in);
+ return null;
+ }
-public static boolean[]
-readArray(boolean[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static boolean[] readArray(boolean[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = in.readBoolean();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = in.readBoolean();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static int[]
-readArray(int[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static int[] readArray(int[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = in.readInt();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = in.readInt();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static short[]
-readArray(short[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static short[] readArray(short[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = (short) in.readInt();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = (short) in.readInt();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static char[]
-readArray(char[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static char[] readArray(char[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = (char) in.readInt();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = (char) in.readInt();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static long[]
-readArray(long[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static long[] readArray(long[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = in.readLong();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = in.readLong();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static float[]
-readArray(float[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static float[] readArray(float[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = in.readFloat();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = in.readFloat();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
-public static double[]
-readArray(double[] array, long l, ResolvingDecoder in)
- throws IOException {
- int index = 0;
- while (l > 0) {
- int limit = index + (int) l;
- if (array.length < limit) {
- array = Arrays.copyOf(array, limit);
+ public static double[] readArray(double[] array, long l, ResolvingDecoder in)
+ throws IOException {
+ int index = 0;
+ while (l > 0) {
+ int limit = index + (int) l;
+ if (array.length < limit) {
+ array = Arrays.copyOf(array, limit);
+ }
+ while (index < limit) {
+ array[index] = in.readDouble();
+ index++;
+ }
+ l = in.arrayNext();
}
- while (index < limit) {
- array[index] = in.readDouble();
- index++;
- }
- l = in.arrayNext();
+ return array;
}
- return array;
-}
}
diff --git a/src/java/abracad/avro/ClojureData.java b/src/java/abracad/avro/ClojureData.java
index 7084789..10b8aa5 100644
--- a/src/java/abracad/avro/ClojureData.java
+++ b/src/java/abracad/avro/ClojureData.java
@@ -1,7 +1,9 @@
package abracad.avro;
-import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.avro.Conversion;
import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.io.DatumReader;
@@ -14,58 +16,52 @@
public class ClojureData extends ReflectData {
-private static class Vars {
- private static final String NS = "abracad.avro.compare";
- private static final Var compare = RT.var(NS, "compare");
- static {
- RT.var("clojure.core", "require").invoke(Symbol.intern(NS));
- }
-}
+ public static String CLOJURE_TYPE_PROP = "clojureType";
-private static final ClojureData INSTANCE = new ClojureData();
+ private static class Vars {
+ private static final String NS = "abracad.avro.compare";
+ private static final Var compare = RT.var(NS, "compare");
-public
-ClojureData() {
- super();
-}
+ static {
+ RT.var("clojure.core", "require").invoke(Symbol.intern(NS));
+ }
+ }
-public
-ClojureData(ClassLoader classLoader) {
- super(classLoader);
-}
+ private static final ClojureData NO_CONVERSIONS = new ClojureData(Collections.emptyList());
-public static ClojureData
-get() {
- return INSTANCE;
-}
+ public ClojureData(List