Skip to content

Commit

Permalink
Allow deserialization from open RowGroupReaders.
Browse files Browse the repository at this point in the history
Add a new public method that matches the existing deserialization
functionality but operates on open RowGroupReaders rather than opening
and closing a ParquetReader for every row group.
  • Loading branch information
ddrinka authored and aloneguid committed Nov 14, 2023
1 parent 3eb923a commit f913225
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,30 @@ public static async Task<IList<T>> DeserializeAsync<T>(Stream source,
return result;
}

/// <summary>
/// Deserialise
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="rowGroupReader"></param>
/// <param name="schema"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="InvalidDataException"></exception>
public static async Task<IList<T>> DeserializeAsync<T>(ParquetRowGroupReader rowGroupReader,
ParquetSchema schema,
CancellationToken cancellationToken = default)
where T : new() {

Assembler<T> asm = GetAssembler<T>();

var result = new List<T>();

await DeserializeRowGroupAsync(rowGroupReader, schema, asm, result, cancellationToken);

return result;
}

private static Assembler<T> GetAssembler<T>() where T : new() {

object boxedAssemblyer = _typeToAssembler.GetOrAdd(typeof(T), _ => new Assembler<T>(typeof(T).GetParquetSchema(true)));
Expand All @@ -165,6 +189,16 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetReader reader, int

using ParquetRowGroupReader rg = reader.OpenRowGroupReader(rgi);

await DeserializeRowGroupAsync(rg, reader.Schema, asm, result, cancellationToken);
}


private static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rg,
ParquetSchema schema,
Assembler<T> asm,
ICollection<T> result,
CancellationToken cancellationToken = default) where T : new() {

// add more empty class instances to the result
int prevRowCount = result.Count;
for(int i = 0; i < rg.RowCount; i++) {
Expand All @@ -175,7 +209,7 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetReader reader, int
foreach(FieldAssembler<T> fasm in asm.FieldAssemblers) {

// validate reflected vs actual schema field
DataField? actual = reader.Schema.DataFields.FirstOrDefault(f => f.Path.Equals(fasm.Field.Path));
DataField? actual = schema.DataFields.FirstOrDefault(f => f.Path.Equals(fasm.Field.Path));
if(actual != null && !actual.IsArray && !fasm.Field.Equals(actual)) {
throw new InvalidDataException($"property '{fasm.Field.ClrPropName}' is declared as '{fasm.Field}' but source data has it as '{actual}'");
}
Expand Down

0 comments on commit f913225

Please sign in to comment.