Skip to content

Commit

Permalink
Reworked serde interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed May 5, 2024
1 parent 9ca4ed4 commit 3dd9767
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,27 @@ package com.sksamuel.centurion.avro.io
import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import java.io.InputStream

/**
* Creates an [BinaryReaderFactory] for a given schema which can then be used
* to create [BinaryReader]s.
*
* All [BinaryReader]s created from this factory share a thread safe [DatumReader] for efficiency.
*
* Creates an [BinaryReaderFactory] for a given schema which can then be used to create [BinaryReader]s.
* Pass in a pre-created [DecoderFactory] if you wish to configure buffer size.
*/
class BinaryReaderFactory(
reader: Schema,
writer: Schema,
private val factory: DecoderFactory,
) {

constructor(schema: Schema) : this(schema, schema, DecoderFactory.get())
constructor(reader: Schema, writer: Schema) : this(reader, writer, DecoderFactory.get())
constructor(schema: Schema, factory: DecoderFactory) : this(schema, schema, factory)
constructor() : this(DecoderFactory.get())

private val datum = GenericDatumReader<GenericRecord>(/* writer = */ writer, /* reader = */ reader)

companion object {

/**
* Reads a [GenericRecord] from the given [ByteArray].
*
* This method is a convenience function that is useful when you want to read a single record
* in a single method call.
*
* For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory].
* This will also allow customization of the [DecoderFactory], schema evolution and shares a [DatumReader].
*/
fun fromBytes(schema: Schema, bytes: ByteArray): GenericRecord {
val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ schema, /* reader = */ schema)
return BinaryReader(
datum = datumReader,
input = null,
bytes = bytes,
factory = DecoderFactory.get(),
).read()
}

/**
* Reads a [GenericRecord] from the given [InputStream].
*
* This method is a convenience function that is useful when you want to read a single record
* in a single method call.
*
* For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory].
* This will also allow customization of the [DecoderFactory], schema evolution and shares a [DatumReader].
*
* The given [input] stream will be closed after this function returns.
*
* This variant is slower than using a byte array. If you already have
* the bytes available, that should be preferred.
*/
fun fromBytes(schema: Schema, input: InputStream): GenericRecord {
val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ schema, /* reader = */ schema)
return BinaryReader(datumReader, input, null, DecoderFactory.get()).use { it.read() }
}
/**
* Creates an [BinaryReader] that reads from the given [InputStream].
*
* This variant is slower than using a byte array. If you already have
* the bytes available, that should be preferred.
*/
fun reader(schema: Schema, input: InputStream): BinaryReader {
return BinaryReader(schema, schema, input, null, factory)
}

/**
Expand All @@ -74,23 +32,44 @@ class BinaryReaderFactory(
* This variant is slower than using a byte array. If you already have
* the bytes available, that should be preferred.
*/
fun reader(input: InputStream): BinaryReader {
return BinaryReader(datum, input, null, factory)
fun reader(reader: Schema, writer: Schema, input: InputStream): BinaryReader {
return BinaryReader(reader, writer, input, null, factory)
}

/**
* Creates an [BinaryReader] that reads from the given [ByteArray].
*/
fun reader(bytes: ByteArray): BinaryReader {
return BinaryReader(datum = datum, input = null, bytes = bytes, factory = factory)
fun reader(schema: Schema, bytes: ByteArray): BinaryReader {
return BinaryReader(schema, schema, input = null, bytes = bytes, factory = factory)
}

/**
* Creates an [BinaryReader] that reads from the given [ByteArray].
*/
fun reader(reader: Schema, writer: Schema, bytes: ByteArray): BinaryReader {
return BinaryReader(readerSchema = reader, writerSchema = writer, input = null, bytes = bytes, factory = factory)
}

/**
* Reads a [GenericRecord] from the given [ByteArray].
*
* This method is a convenience function that is useful when you want to read a single record
* in a single method call.
*
* For better performance and customization, create a [BinaryReader] using a [BinaryReaderFactory].
* This will also allow customization of the [DecoderFactory], schema evolution.
*/
fun fromBytes(schema: Schema, bytes: ByteArray): GenericRecord {
return BinaryReader(schema, schema, null, bytes, factory).use { it.read() }
}
}

/**
* An [BinaryReader] is a non-thread safe, one time use, reader from a given stream or byte array.
*/
class BinaryReader(
private val datum: DatumReader<GenericRecord>,
readerSchema: Schema,
writerSchema: Schema,
private val input: InputStream?,
bytes: ByteArray?,
factory: DecoderFactory,
Expand All @@ -100,6 +79,8 @@ class BinaryReader(
require(input == null || bytes == null) { "Do not specify both ByteArray and InputStream" }
}

private val datum = GenericDatumReader<GenericRecord>(/* writer = */ writerSchema, /* reader = */ readerSchema)

private val decoder = when {
input != null -> factory.binaryDecoder(input, null)
bytes != null -> factory.binaryDecoder(bytes, null)
Expand All @@ -110,6 +91,10 @@ class BinaryReader(
return datum.read(null, decoder)
}

fun asSequence(): Sequence<GenericRecord> {
return generateSequence { read() }
}

override fun close() {
input?.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ import org.apache.avro.io.EncoderFactory
import java.io.ByteArrayOutputStream
import java.io.OutputStream

///**
// * An [AvroWriter] will write [GenericRecord]s to an output stream.
// *
// * There are three implementations of this stream
// * - a Data stream,
// * - a Binary stream
// * - a Json stream
// *
// * See the methods on the companion object to create instances of each
// * of these types of stream.
// */

/**
* Creates a [BinaryWriterFactory] for a given schema which can then be used to create [BinaryWriter]s.
*
Expand All @@ -28,79 +16,55 @@ import java.io.OutputStream
* Pass in a pre-created [EncoderFactory] if you wish to configure buffer size.
*/
class BinaryWriterFactory(
schema: Schema,
private val factory: EncoderFactory,
) {

/**
* Creates a [BinaryWriterFactory] with the default [EncoderFactory].
*/
constructor(schema: Schema) : this(schema, EncoderFactory.get())

companion object {

/**
* Creates an avro encoded byte array from the given [record].
*
* This method is a convenience function that is useful when you want to write a single record
* in a single method call.
*
* For better performance, considering creating a [BinaryWriterFactory] which will use
* a shared [GenericDatumWriter] and allows customizating the [EncoderFactory].
*/
fun toBytes(record: GenericRecord): ByteArray {
val baos = ByteArrayOutputStream()
toBytes(record, baos)
return baos.toByteArray()
}

/**
* Writes avro encoded bytes to the given [output] stream from the given [record].
*
* This method is a convenience function that is useful when you want to write a single record
* in a single method call.
*
* The given [output] stream will be closed after this function returns.
*
* For better performance, considering creating a [BinaryWriterFactory] which will use
* a shared [GenericDatumWriter] and allows customizating the [EncoderFactory].
*/
fun toBytes(record: GenericRecord, output: OutputStream) {
val datumWriter = GenericDatumWriter<GenericRecord>(record.schema)
BinaryWriter(datumWriter, output, EncoderFactory.get()).use { it.write(record) }
}
}

private val datumWriter = GenericDatumWriter<GenericRecord>(schema)
constructor() : this(EncoderFactory.get())

/**
* Creates a [BinaryWriter] that writes to the given [OutputStream].
* Calling close on the created writer will close this stream and ensure data is flushed.
*/
fun writer(output: OutputStream): BinaryWriter {
return BinaryWriter(datumWriter, output, factory)
fun writer(schema: Schema, output: OutputStream): BinaryWriter {
return BinaryWriter(schema, output, factory)
}

/**
* Creates a [BinaryWriter] that uses a [ByteArrayOutputStream].
* Once records have been written, users can call bytes() to retrieve the [ByteArray].
*/
fun writer(): BinaryWriter {
return BinaryWriter(datumWriter, ByteArrayOutputStream(), factory)
fun writer(schema: Schema): BinaryWriter {
return BinaryWriter(schema, ByteArrayOutputStream(), factory)
}

/**
* Creates an avro encoded byte array from the given [record].
*
* This method is a convenience function that is useful when you want to write a single record
* in a single method call.
*/
fun toBytes(record: GenericRecord): ByteArray {
return BinaryWriter(record.schema, ByteArrayOutputStream(), EncoderFactory.get())
.use { it.write(record) }
.bytes()
}
}

/**
* A [BinaryWriter] is a non-thread safe, one time use, writer to a given stream.
* A [BinaryWriter] is a non-thread safe, one time use, writer to a given stream that encodes
* in the avro "binary" format, which does not include the schema in the final bytes.
*
* Call [close] when all records have been written to ensure data is flushed to the underlying stream.
*/
class BinaryWriter(
private val datum: DatumWriter<GenericRecord>,
schema: Schema,
private val output: OutputStream,
factory: EncoderFactory,
) : AutoCloseable {

private val datum = GenericDatumWriter<GenericRecord>(schema)
private val encoder = factory.binaryEncoder(output, null)

fun write(record: GenericRecord): BinaryWriter {
Expand Down

This file was deleted.

Loading

0 comments on commit 3dd9767

Please sign in to comment.