From 3dd9767d3343a3f290f704772e7c20b6e5752174 Mon Sep 17 00:00:00 2001 From: sksamuel Date: Sun, 5 May 2024 11:45:33 -0500 Subject: [PATCH] Reworked serde interfaces --- .../centurion/avro/io/BinaryReader.kt | 99 ++++++++----------- .../centurion/avro/io/BinaryWriter.kt | 78 ++++----------- .../avro/io/CachedReflectionSerdeFactory.kt | 32 ------ .../sksamuel/centurion/avro/io/DataWriter.kt | 80 ++++++--------- .../com/sksamuel/centurion/avro/io/Format.kt | 20 ++++ .../avro/io/ReflectionSerdeFactory.kt | 37 ------- .../centurion/avro/io/SpecificSerde.kt | 57 ----------- .../centurion/avro/io/serde/BinarySerde.kt | 45 +++++++++ .../avro/io/serde/CachedSerdeFactory.kt | 43 ++++++++ .../{Serde.kt => serde/CompressingSerde.kt} | 11 +-- .../centurion/avro/io/serde/DataSerde.kt | 42 ++++++++ .../avro/io/serde/ReflectionSerdeFactory.kt | 36 +++++++ .../sksamuel/centurion/avro/io/serde/Serde.kt | 14 +++ .../centurion/avro/io/serde/SerdeOptions.kt | 12 +++ .../sksamuel/centurion/avro/EvolutionTest.kt | 4 +- .../io/CachedReflectionSerdeFactoryTest.kt | 11 ++- .../sksamuel/centurion/avro/io/SerdeTest.kt | 4 +- .../sksamuel/centurion/avro/io/SizeTest.kt | 2 +- 18 files changed, 321 insertions(+), 306 deletions(-) delete mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactory.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Format.kt delete mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/ReflectionSerdeFactory.kt delete mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/SpecificSerde.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/BinarySerde.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CachedSerdeFactory.kt rename centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/{Serde.kt => serde/CompressingSerde.kt} (74%) create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/DataSerde.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/ReflectionSerdeFactory.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/Serde.kt create mode 100644 centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/SerdeOptions.kt diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryReader.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryReader.kt index 3eb117b..eaddff8 100644 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryReader.kt +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryReader.kt @@ -3,69 +3,27 @@ package com.sksamuel.centurion.avro.io import org.apache.avro.Schema import org.apache.avro.generic.GenericDatumReader import org.apache.avro.generic.GenericRecord -import org.apache.avro.io.DatumReader import org.apache.avro.io.DecoderFactory import java.io.InputStream /** - * Creates an [BinaryReaderFactory] for a given schema which can then be used - * to create [BinaryReader]s. - * - * All [BinaryReader]s created from this factory share a thread safe [DatumReader] for efficiency. - * + * Creates an [BinaryReaderFactory] for a given schema which can then be used to create [BinaryReader]s. * Pass in a pre-created [DecoderFactory] if you wish to configure buffer size. */ class BinaryReaderFactory( - reader: Schema, - writer: Schema, private val factory: DecoderFactory, ) { - constructor(schema: Schema) : this(schema, schema, DecoderFactory.get()) - constructor(reader: Schema, writer: Schema) : this(reader, writer, DecoderFactory.get()) - constructor(schema: Schema, factory: DecoderFactory) : this(schema, schema, factory) + constructor() : this(DecoderFactory.get()) - private val datum = GenericDatumReader(/* writer = */ writer, /* reader = */ reader) - - companion object { - - /** - * Reads a [GenericRecord] from the given [ByteArray]. - * - * This method is a convenience function that is useful when you want to read a single record - * in a single method call. - * - * For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory]. - * This will also allow customization of the [DecoderFactory], schema evolution and shares a [DatumReader]. - */ - fun fromBytes(schema: Schema, bytes: ByteArray): GenericRecord { - val datumReader = GenericDatumReader(/* writer = */ schema, /* reader = */ schema) - return BinaryReader( - datum = datumReader, - input = null, - bytes = bytes, - factory = DecoderFactory.get(), - ).read() - } - - /** - * Reads a [GenericRecord] from the given [InputStream]. - * - * This method is a convenience function that is useful when you want to read a single record - * in a single method call. - * - * For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory]. - * This will also allow customization of the [DecoderFactory], schema evolution and shares a [DatumReader]. - * - * The given [input] stream will be closed after this function returns. - * - * This variant is slower than using a byte array. If you already have - * the bytes available, that should be preferred. - */ - fun fromBytes(schema: Schema, input: InputStream): GenericRecord { - val datumReader = GenericDatumReader(/* writer = */ schema, /* reader = */ schema) - return BinaryReader(datumReader, input, null, DecoderFactory.get()).use { it.read() } - } + /** + * Creates an [BinaryReader] that reads from the given [InputStream]. + * + * This variant is slower than using a byte array. If you already have + * the bytes available, that should be preferred. + */ + fun reader(schema: Schema, input: InputStream): BinaryReader { + return BinaryReader(schema, schema, input, null, factory) } /** @@ -74,15 +32,35 @@ class BinaryReaderFactory( * This variant is slower than using a byte array. If you already have * the bytes available, that should be preferred. */ - fun reader(input: InputStream): BinaryReader { - return BinaryReader(datum, input, null, factory) + fun reader(reader: Schema, writer: Schema, input: InputStream): BinaryReader { + return BinaryReader(reader, writer, input, null, factory) } /** * Creates an [BinaryReader] that reads from the given [ByteArray]. */ - fun reader(bytes: ByteArray): BinaryReader { - return BinaryReader(datum = datum, input = null, bytes = bytes, factory = factory) + fun reader(schema: Schema, bytes: ByteArray): BinaryReader { + return BinaryReader(schema, schema, input = null, bytes = bytes, factory = factory) + } + + /** + * Creates an [BinaryReader] that reads from the given [ByteArray]. + */ + fun reader(reader: Schema, writer: Schema, bytes: ByteArray): BinaryReader { + return BinaryReader(readerSchema = reader, writerSchema = writer, input = null, bytes = bytes, factory = factory) + } + + /** + * Reads a [GenericRecord] from the given [ByteArray]. + * + * This method is a convenience function that is useful when you want to read a single record + * in a single method call. + * + * For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory]. + * This will also allow customization of the [DecoderFactory], schema evolution. + */ + fun fromBytes(schema: Schema, bytes: ByteArray): GenericRecord { + return BinaryReader(schema, schema, null, bytes, factory).use { it.read() } } } @@ -90,7 +68,8 @@ class BinaryReaderFactory( * An [BinaryReader] is a non-thread safe, one time use, reader from a given stream or byte array. */ class BinaryReader( - private val datum: DatumReader, + readerSchema: Schema, + writerSchema: Schema, private val input: InputStream?, bytes: ByteArray?, factory: DecoderFactory, @@ -100,6 +79,8 @@ class BinaryReader( require(input == null || bytes == null) { "Do not specify both ByteArray and InputStream" } } + private val datum = GenericDatumReader(/* writer = */ writerSchema, /* reader = */ readerSchema) + private val decoder = when { input != null -> factory.binaryDecoder(input, null) bytes != null -> factory.binaryDecoder(bytes, null) @@ -110,6 +91,10 @@ class BinaryReader( return datum.read(null, decoder) } + fun asSequence(): Sequence { + return generateSequence { read() } + } + override fun close() { input?.close() } diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryWriter.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryWriter.kt index 73bcc01..b919c7a 100644 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryWriter.kt +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/BinaryWriter.kt @@ -8,18 +8,6 @@ import org.apache.avro.io.EncoderFactory import java.io.ByteArrayOutputStream import java.io.OutputStream -///** -// * An [AvroWriter] will write [GenericRecord]s to an output stream. -// * -// * There are three implementations of this stream -// * - a Data stream, -// * - a Binary stream -// * - a Json stream -// * -// * See the methods on the companion object to create instances of each -// * of these types of stream. -// */ - /** * Creates a [BinaryWriterFactory] for a given schema which can then be used to create [BinaryWriter]s. * @@ -28,79 +16,55 @@ import java.io.OutputStream * Pass in a pre-created [EncoderFactory] if you wish to configure buffer size. */ class BinaryWriterFactory( - schema: Schema, private val factory: EncoderFactory, ) { /** * Creates a [BinaryWriterFactory] with the default [EncoderFactory]. */ - constructor(schema: Schema) : this(schema, EncoderFactory.get()) - - companion object { - - /** - * Creates an avro encoded byte array from the given [record]. - * - * This method is a convenience function that is useful when you want to write a single record - * in a single method call. - * - * For better performance, considering creating a [BinaryWriterFactory] which will use - * a shared [GenericDatumWriter] and allows customizating the [EncoderFactory]. - */ - fun toBytes(record: GenericRecord): ByteArray { - val baos = ByteArrayOutputStream() - toBytes(record, baos) - return baos.toByteArray() - } - - /** - * Writes avro encoded bytes to the given [output] stream from the given [record]. - * - * This method is a convenience function that is useful when you want to write a single record - * in a single method call. - * - * The given [output] stream will be closed after this function returns. - * - * For better performance, considering creating a [BinaryWriterFactory] which will use - * a shared [GenericDatumWriter] and allows customizating the [EncoderFactory]. - */ - fun toBytes(record: GenericRecord, output: OutputStream) { - val datumWriter = GenericDatumWriter(record.schema) - BinaryWriter(datumWriter, output, EncoderFactory.get()).use { it.write(record) } - } - } - - private val datumWriter = GenericDatumWriter(schema) + constructor() : this(EncoderFactory.get()) /** * Creates a [BinaryWriter] that writes to the given [OutputStream]. - * Calling close on the created writer will close this stream and ensure data is flushed. */ - fun writer(output: OutputStream): BinaryWriter { - return BinaryWriter(datumWriter, output, factory) + fun writer(schema: Schema, output: OutputStream): BinaryWriter { + return BinaryWriter(schema, output, factory) } /** * Creates a [BinaryWriter] that uses a [ByteArrayOutputStream]. * Once records have been written, users can call bytes() to retrieve the [ByteArray]. */ - fun writer(): BinaryWriter { - return BinaryWriter(datumWriter, ByteArrayOutputStream(), factory) + fun writer(schema: Schema): BinaryWriter { + return BinaryWriter(schema, ByteArrayOutputStream(), factory) + } + + /** + * Creates an avro encoded byte array from the given [record]. + * + * This method is a convenience function that is useful when you want to write a single record + * in a single method call. + */ + fun toBytes(record: GenericRecord): ByteArray { + return BinaryWriter(record.schema, ByteArrayOutputStream(), EncoderFactory.get()) + .use { it.write(record) } + .bytes() } } /** - * A [BinaryWriter] is a non-thread safe, one time use, writer to a given stream. + * A [BinaryWriter] is a non-thread safe, one time use, writer to a given stream that encodes + * in the avro "binary" format, which does not include the schema in the final bytes. * * Call [close] when all records have been written to ensure data is flushed to the underlying stream. */ class BinaryWriter( - private val datum: DatumWriter, + schema: Schema, private val output: OutputStream, factory: EncoderFactory, ) : AutoCloseable { + private val datum = GenericDatumWriter(schema) private val encoder = factory.binaryEncoder(output, null) fun write(record: GenericRecord): BinaryWriter { diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactory.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactory.kt deleted file mode 100644 index fc5008d..0000000 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactory.kt +++ /dev/null @@ -1,32 +0,0 @@ -package com.sksamuel.centurion.avro.io - -import java.util.concurrent.ConcurrentHashMap -import kotlin.reflect.KClass - -/** - * A [CachedReflectionSerdeFactory] will create a [SpecificSerde] once for a given type via delegation - * to a [ReflectionSerdeFactory] and return that cached [SpecificSerde] upon future invocations. - * - * This instance is thread safe. - */ -object CachedReflectionSerdeFactory { - - private val cache = ConcurrentHashMap, SpecificSerde<*>>() - - /** - * Creates or returns a [SpecificSerde] for the given [kclass]. - */ - fun create( - kclass: KClass, - options: SerdeOptions = SerdeOptions() - ): SpecificSerde { - return cache.getOrPut(kclass) { ReflectionSerdeFactory.create(kclass, options) } as SpecificSerde - } - - /** - * Creates or returns a [SpecificSerde] from the given type parameter [T]. - */ - inline fun create(options: SerdeOptions = SerdeOptions()): SpecificSerde { - return create(T::class, options) - } -} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/DataWriter.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/DataWriter.kt index 3960a92..6defb26 100644 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/DataWriter.kt +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/DataWriter.kt @@ -5,74 +5,54 @@ import org.apache.avro.file.CodecFactory import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.GenericDatumWriter import org.apache.avro.generic.GenericRecord -import org.apache.avro.io.DatumWriter import java.io.ByteArrayOutputStream import java.io.OutputStream /** - * Creates a [DataWriterFactory] for a given schema which can then be used to create [DataWriter]s. - * - * All writers created from this factory share a thread safe [DatumWriter]. + * Creates a [DataWriterFactory] which can then be used to create [DataWriter]s. */ class DataWriterFactory( - private val schema: Schema, private val codec: CodecFactory, ) { - companion object { - - /** - * Creates an avro encoded byte array from the given [record]. - * - * This method is a convenience function that is useful when you want to write a single record - * in a single method call. - * - * For better performance, considering creating a [DataWriterFactory] which will use - * a shared [GenericDatumWriter]. - */ - fun toBytes(record: GenericRecord, codec: CodecFactory = CodecFactory.nullCodec()): ByteArray { - val datum = GenericDatumWriter(record.schema) - val writer = DataWriter(datum, record.schema, ByteArrayOutputStream(), codec) - writer.write(record) - writer.close() - return writer.bytes() - } - - /** - * Creates an avro encoded byte array from the given [record]s. - * - * This method is a convenience function that is useful when you want to write a batch - * of records in a single method call. - * - * For better performance, considering creating a [DataWriterFactory] which will use - * a shared [GenericDatumWriter]. - */ - fun toBytes(records: List, codec: CodecFactory = CodecFactory.nullCodec()): ByteArray { - require(records.isNotEmpty()) - val datumWriter = GenericDatumWriter(records.first().schema) - val writer = DataWriter(datumWriter, records.first().schema, ByteArrayOutputStream(), codec) - records.forEach { writer.write(it) } - writer.close() - return writer.bytes() - } - } - - private val datum = GenericDatumWriter(schema) - /** * Creates a [DataWriter] that writes to the given [OutputStream]. * Calling close on the created writer will close this stream and ensure data is flushed. */ - fun writer(output: OutputStream): DataWriter { - return DataWriter(datum, schema, output, codec) + fun writer(schema: Schema, output: OutputStream): DataWriter { + return DataWriter( schema, output, codec) } /** * Creates a [DataWriter] that uses a [ByteArrayOutputStream]. * Once records have been written, users can call bytes() to retrieve the [ByteArray]. */ - fun writer(): DataWriter { - return DataWriter(datum, schema, ByteArrayOutputStream(), codec) + fun writer(schema: Schema): DataWriter { + return DataWriter(schema, ByteArrayOutputStream(), codec) + } + + /** + * Creates an avro encoded byte array from the given [record]. + * + * This method is a convenience function that is useful when you want to write a single record + * in a single method call. + */ + fun toBytes(record: GenericRecord): ByteArray { + return toBytes(listOf(record)) + } + + /** + * Creates an avro encoded byte array from the given [record]s. + * + * This method is a convenience function that is useful when you want to write a batch + * of records in a single method call. + */ + fun toBytes(records: List): ByteArray { + require(records.isNotEmpty()) + val writer = DataWriter(records.first().schema, ByteArrayOutputStream(), codec) + records.forEach { writer.write(it) } + writer.close() + return writer.bytes() } } @@ -85,12 +65,12 @@ class DataWriterFactory( * If you do not want to include the schema, see [BinaryWriter]. */ class DataWriter( - datum: DatumWriter, schema: Schema, private val output: OutputStream, codecFactory: CodecFactory, ) : AutoCloseable { + private val datum = GenericDatumWriter(schema) private val writer = DataFileWriter(datum).setCodec(codecFactory).create(schema, output) fun write(record: GenericRecord): DataWriter { diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Format.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Format.kt new file mode 100644 index 0000000..f4089f8 --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Format.kt @@ -0,0 +1,20 @@ +package com.sksamuel.centurion.avro.io + +/** + * The [Format] enum describes the three types of encoding avro supports when reading and writing. + * + * - [Data] will write records as binary, and includes the schema in the output. + * This format results in larger sizes than [Binary], clearly as the schema takes up room, but supports + * schema evolution, as the deserializers can compare expected schema with the written schema. + * + * - [Binary] will write records as binary, but omits the schema from the output. + * This format results in the smallest sizes, and is similar to how protobuf works. However, readers + * are required to know the schema when deserializing. + * + * - [Json] will write records as json, including the schema. + */ +enum class Format { + Binary, + Data, + Json +} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/ReflectionSerdeFactory.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/ReflectionSerdeFactory.kt deleted file mode 100644 index 7f61db0..0000000 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/ReflectionSerdeFactory.kt +++ /dev/null @@ -1,37 +0,0 @@ -package com.sksamuel.centurion.avro.io - -import com.sksamuel.centurion.avro.decoders.SpecificRecordDecoder -import com.sksamuel.centurion.avro.encoders.SpecificRecordEncoder -import com.sksamuel.centurion.avro.generation.ReflectionSchemaBuilder -import kotlin.reflect.KClass - -/** - * A [ReflectionSerdeFactory] will create a [SpecificSerde] for a given type using - * reflection based builders. - * - * This instance is thread safe. - */ -object ReflectionSerdeFactory { - - /** - * Creates a [SpecificSerde] reflectively from the given [kclass] using a [ReflectionSchemaBuilder], - * [SpecificRecordEncoder] and [SpecificRecordDecoder]. - */ - fun create( - kclass: KClass, - options: SerdeOptions = SerdeOptions() - ): SpecificSerde { - val schema = ReflectionSchemaBuilder(true).schema(kclass) - val encoder = SpecificRecordEncoder(kclass) - val decoder = SpecificRecordDecoder(kclass) - return SpecificSerde(schema, encoder, decoder, options) - } - - /** - * Creates a [SpecificSerde] reflectively from the given type parameter [T] using a [ReflectionSchemaBuilder], - * [SpecificRecordEncoder] and [SpecificRecordDecoder]. - */ - inline fun create(options: SerdeOptions = SerdeOptions()): SpecificSerde { - return create(T::class, options) - } -} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/SpecificSerde.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/SpecificSerde.kt deleted file mode 100644 index 072a0de..0000000 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/SpecificSerde.kt +++ /dev/null @@ -1,57 +0,0 @@ -package com.sksamuel.centurion.avro.io - -import com.sksamuel.centurion.avro.decoders.Decoder -import com.sksamuel.centurion.avro.encoders.Encoder -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecord -import org.apache.avro.io.DecoderFactory -import org.apache.avro.io.EncoderFactory - -/** - * A [SpecificSerde] is a [Serde] that delegates to an [Encoder] and [Decoder] that handles that type. - * If you wish to create a [SpecificSerde] reflectively, see [ReflectionSerdeFactory]. - * - * This class is thread safe once constructed. - */ -class SpecificSerde( - schema: Schema, - encoder: Encoder, - decoder: Decoder, - options: SerdeOptions, -) : Serde { - - init { - if (options.fastReader) - GenericData.get().setFastReaderEnabled(true) - } - - private val encoderFactory = EncoderFactory() - .configureBufferSize(options.encoderBufferSize) - .configureBlockSize(options.blockBufferSize) - - private val decoderFactory = DecoderFactory() - .configureDecoderBufferSize(options.decoderBufferSize) - - private val writerFactory = BinaryWriterFactory(schema, encoderFactory) - private val readerFactory = BinaryReaderFactory(schema, decoderFactory) - - private val encodeFn = encoder.encode(schema) - private val decodeFn = decoder.decode(schema) - - override fun serialize(obj: T): ByteArray = - writerFactory.writer().write(encodeFn.invoke(obj) as GenericRecord).bytes() - - override fun deserialize(bytes: ByteArray): T = decodeFn(readerFactory.reader(bytes).read()) -} - -private const val DEFAULT_ENCODER_BUFFER_SIZE = 2048 -private const val DEFAULT_DECODER_BUFFER_SIZE = 8192 -private const val DEFAULT_BLOCK_BUFFER_SIZE = 64 * 1024 - -data class SerdeOptions( - val fastReader: Boolean = false, - val encoderBufferSize: Int = DEFAULT_ENCODER_BUFFER_SIZE, - val decoderBufferSize: Int = DEFAULT_DECODER_BUFFER_SIZE, - val blockBufferSize: Int = DEFAULT_BLOCK_BUFFER_SIZE, -) diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/BinarySerde.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/BinarySerde.kt new file mode 100644 index 0000000..c901660 --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/BinarySerde.kt @@ -0,0 +1,45 @@ +package com.sksamuel.centurion.avro.io.serde + +import com.sksamuel.centurion.avro.decoders.Decoder +import com.sksamuel.centurion.avro.encoders.Encoder +import com.sksamuel.centurion.avro.io.BinaryReaderFactory +import com.sksamuel.centurion.avro.io.BinaryWriterFactory +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.io.DecoderFactory +import org.apache.avro.io.EncoderFactory + +/** + * A [BinarySerde] reads and writes in the avro "binary" format which does not include the schema + * in the written bytes. This results in a smaller payload, similar to protobuf, but requires + * that the deserializer knows which schema was used when the payload was written. So this serde + * always requires the schema to be specified. + */ +class BinarySerde( + private val schema: Schema, + encoder: Encoder, + decoder: Decoder, + options: SerdeOptions +) : Serde { + + private val encoderFactory = EncoderFactory() + .configureBufferSize(options.encoderBufferSize) + .configureBlockSize(options.blockBufferSize) + + private val decoderFactory = DecoderFactory() + .configureDecoderBufferSize(options.decoderBufferSize) + + private val writerFactory = BinaryWriterFactory(encoderFactory) + private val readerFactory = BinaryReaderFactory(decoderFactory) + + private val encoderFn = encoder.encode(schema) + private val decodeFn = decoder.decode(schema) + + override fun serialize(obj: T): ByteArray { + return writerFactory.toBytes(encoderFn(obj) as GenericRecord) + } + + override fun deserialize(bytes: ByteArray): T { + return decodeFn(readerFactory.fromBytes(schema, bytes)) + } +} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CachedSerdeFactory.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CachedSerdeFactory.kt new file mode 100644 index 0000000..62d41ef --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CachedSerdeFactory.kt @@ -0,0 +1,43 @@ +package com.sksamuel.centurion.avro.io.serde + +import com.sksamuel.centurion.avro.io.Format +import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass + +/** + * A [CachedSerdeFactory] will create a [Serde] once for a given type via delegation + * to a [ReflectionSerdeFactory] and return that cached [Serde] upon future invocations. This allows + * the reflection setup calls to be invoked only once per type, which gives a huge performance gain. + * + * This instance is thread safe. + */ +class CachedSerdeFactory(private val factory: SerdeFactory) : SerdeFactory() { + + private val cache = ConcurrentHashMap, Serde<*>>() + + /** + * Creates or returns a [Serde] for the given [kclass]. + */ + override fun create(kclass: KClass, format: Format, options: SerdeOptions): Serde { + return cache.getOrPut(kclass) { factory.create(kclass, format, options) } as Serde + } +} + +abstract class SerdeFactory { + + /** + * Creates or returns a [Serde] for the given [kclass]. + */ + abstract fun create( + kclass: KClass, + format: Format, + options: SerdeOptions, + ): Serde + + /** + * Creates or returns a [Serde] from the given type parameter [T]. + */ + inline fun create(format: Format, options: SerdeOptions): Serde { + return create(T::class, format, options) + } +} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Serde.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CompressingSerde.kt similarity index 74% rename from centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Serde.kt rename to centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CompressingSerde.kt index 84baaa7..a5ba932 100644 --- a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/Serde.kt +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/CompressingSerde.kt @@ -1,17 +1,8 @@ -package com.sksamuel.centurion.avro.io +package com.sksamuel.centurion.avro.io.serde import org.apache.avro.file.Codec import java.nio.ByteBuffer -/** - * A [Serde] provides an easy way to convert between a single data class instance [T] - * and avro encoded byte arrays. - */ -interface Serde { - fun serialize(obj: T): ByteArray - fun deserialize(bytes: ByteArray): T -} - /** * A [CompressingSerde] wraps another [Serde] applying compression after serialization, * and applying decompression before deserialization. diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/DataSerde.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/DataSerde.kt new file mode 100644 index 0000000..cda51aa --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/DataSerde.kt @@ -0,0 +1,42 @@ +package com.sksamuel.centurion.avro.io.serde + +import com.sksamuel.centurion.avro.decoders.Decoder +import com.sksamuel.centurion.avro.encoders.Encoder +import com.sksamuel.centurion.avro.io.BinaryReaderFactory +import com.sksamuel.centurion.avro.io.BinaryWriterFactory +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.io.DecoderFactory +import org.apache.avro.io.EncoderFactory + +/** + * A [DataSerde] reads and writes in the avro "data" format which includes the schema in the written bytes. + */ +class DataSerde( + private val schema: Schema, + encoder: Encoder, + decoder: Decoder, + options: SerdeOptions +) : Serde { + + private val encoderFactory = EncoderFactory() + .configureBufferSize(options.encoderBufferSize) + .configureBlockSize(options.blockBufferSize) + + private val decoderFactory = DecoderFactory() + .configureDecoderBufferSize(options.decoderBufferSize) + + private val writerFactory = BinaryWriterFactory(encoderFactory) + private val readerFactory = BinaryReaderFactory(decoderFactory) + + private val encoderFn = encoder.encode(schema) + private val decodeFn = decoder.decode(schema) + + override fun serialize(obj: T): ByteArray { + return writerFactory.writer(schema).use { it.write(encoderFn(obj) as GenericRecord).bytes() } + } + + override fun deserialize(bytes: ByteArray): T { + return decodeFn(readerFactory.reader(schema, bytes).use(decodeFn)) + } +} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/ReflectionSerdeFactory.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/ReflectionSerdeFactory.kt new file mode 100644 index 0000000..26428e8 --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/ReflectionSerdeFactory.kt @@ -0,0 +1,36 @@ +package com.sksamuel.centurion.avro.io.serde + +import com.sksamuel.centurion.avro.decoders.SpecificRecordDecoder +import com.sksamuel.centurion.avro.encoders.SpecificRecordEncoder +import com.sksamuel.centurion.avro.generation.ReflectionSchemaBuilder +import com.sksamuel.centurion.avro.io.Format +import kotlin.reflect.KClass + +/** + * A [ReflectionSerdeFactory] will create a [Serde] for a given type using reflection based builders. + * + * This instance is thread safe. + */ +object ReflectionSerdeFactory : SerdeFactory() { + + /** + * Creates a [Serde] reflectively from the given [kclass] using a [ReflectionSchemaBuilder], + * [SpecificRecordEncoder] and [SpecificRecordDecoder]. + * + * @param format specify the type of output. + */ + override fun create( + kclass: KClass, + format: Format, + options: SerdeOptions + ): Serde { + val schema = ReflectionSchemaBuilder(true).schema(kclass) + val encoder = SpecificRecordEncoder(kclass) + val decoder = SpecificRecordDecoder(kclass) + return when (format) { + Format.Binary -> BinarySerde(schema, encoder, decoder, options) + Format.Data -> TODO() + Format.Json -> TODO() + } + } +} diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/Serde.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/Serde.kt new file mode 100644 index 0000000..ad8c6a3 --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/Serde.kt @@ -0,0 +1,14 @@ +package com.sksamuel.centurion.avro.io.serde + +/** + * A [Serde] provides an easy way to convert between a single data class instance [T] + * and avro encoded byte arrays. + * + * It is intended as an easy-to-use alternative to creating input/output streams and + * datum reader / writers, and all that jazz when you simply want to read and write a single record. + */ +interface Serde { + fun serialize(obj: T): ByteArray + fun deserialize(bytes: ByteArray): T +} + diff --git a/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/SerdeOptions.kt b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/SerdeOptions.kt new file mode 100644 index 0000000..e77e275 --- /dev/null +++ b/centurion-avro/src/main/kotlin/com/sksamuel/centurion/avro/io/serde/SerdeOptions.kt @@ -0,0 +1,12 @@ +package com.sksamuel.centurion.avro.io.serde + +private const val DEFAULT_ENCODER_BUFFER_SIZE = 2048 +private const val DEFAULT_DECODER_BUFFER_SIZE = 8192 +private const val DEFAULT_BLOCK_BUFFER_SIZE = 64 * 1024 + +data class SerdeOptions( + val fastReader: Boolean = false, + val encoderBufferSize: Int = DEFAULT_ENCODER_BUFFER_SIZE, + val decoderBufferSize: Int = DEFAULT_DECODER_BUFFER_SIZE, + val blockBufferSize: Int = DEFAULT_BLOCK_BUFFER_SIZE, +) diff --git a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/EvolutionTest.kt b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/EvolutionTest.kt index fec7e38..2d487aa 100644 --- a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/EvolutionTest.kt +++ b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/EvolutionTest.kt @@ -28,8 +28,8 @@ class EvolutionTest : FunSpec() { record1.put("a", "hello") record1.put("b", true) - val bytes = BinaryWriterFactory.toBytes(record1) - val record2 = BinaryReaderFactory(schema2, schema1).reader(bytes).read() + val bytes = BinaryWriterFactory().toBytes(record1) + val record2 = BinaryReaderFactory().reader(schema2, schema1, bytes).read() record2["a"] shouldBe Utf8("hello") record2["b"] shouldBe true diff --git a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactoryTest.kt b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactoryTest.kt index 43d2fe6..e78bac1 100644 --- a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactoryTest.kt +++ b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/CachedReflectionSerdeFactoryTest.kt @@ -1,14 +1,21 @@ package com.sksamuel.centurion.avro.io import com.sksamuel.centurion.avro.encoders.User +import com.sksamuel.centurion.avro.io.serde.CachedSerdeFactory +import com.sksamuel.centurion.avro.io.serde.ReflectionSerdeFactory +import com.sksamuel.centurion.avro.io.serde.SerdeOptions import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.types.shouldBeSameInstanceAs class CachedReflectionSerdeFactoryTest : FunSpec({ test("should cache instances") { - val serde1 = CachedReflectionSerdeFactory.create() - val serde2 = CachedReflectionSerdeFactory.create() + + val factory = CachedSerdeFactory(ReflectionSerdeFactory) + + val serde1 = factory.create(Format.Binary, SerdeOptions()) + val serde2 = factory.create(Format.Binary, SerdeOptions()) + serde1.shouldBeSameInstanceAs(serde2) } diff --git a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SerdeTest.kt b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SerdeTest.kt index 399346b..943e472 100644 --- a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SerdeTest.kt +++ b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SerdeTest.kt @@ -2,6 +2,8 @@ package com.sksamuel.centurion.avro.io import com.sksamuel.centurion.avro.encoders.User import com.sksamuel.centurion.avro.encoders.UserType +import com.sksamuel.centurion.avro.io.serde.ReflectionSerdeFactory +import com.sksamuel.centurion.avro.io.serde.SerdeOptions import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe import kotlin.random.Random @@ -10,7 +12,7 @@ class SerdeTest : FunSpec({ test("round trip happy path") { val user = User(Random.nextLong(), "sammy mcsamface", "sammy@mcsamface.com", Random.nextLong(), UserType.Admin) - val serde = ReflectionSerdeFactory.create() + val serde = ReflectionSerdeFactory.create(Format.Binary, SerdeOptions()) serde.deserialize(serde.serialize(user)) shouldBe user } }) diff --git a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SizeTest.kt b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SizeTest.kt index 7daeef2..074695d 100644 --- a/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SizeTest.kt +++ b/centurion-avro/src/test/kotlin/com/sksamuel/centurion/avro/io/SizeTest.kt @@ -29,7 +29,7 @@ fun main() { val encoder = SpecificRecordEncoder(User::class) val record = encoder.encode(schema).invoke(user) as GenericRecord - println(BinaryWriterFactory(schema).writer(ByteArrayOutputStream()).write(record).bytes().size) + println(BinaryWriterFactory().writer(schema).write(record).bytes().size) val baos = ByteArrayOutputStream() val writer = DataFileWriter(GenericDatumWriter(schema))