Skip to content

Commit

Permalink
split out MDA into a separate library
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid committed Nov 13, 2024
1 parent 630ff1b commit 1b14461
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 93 deletions.
95 changes: 95 additions & 0 deletions src/Parquet.Data.Analysis/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.Analysis;
using Parquet.Data;
using Parquet.Data.Analysis;
using Parquet.Schema;

namespace Parquet {
/// <summary>
/// Defines extension methods to simplify Parquet usage
/// </summary>
public static class AnalysisExtensions {
/// <summary>
///
/// </summary>
/// <param name="inputStream"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<DataFrame> ReadParquetAsDataFrameAsync(
this Stream inputStream, CancellationToken cancellationToken = default) {
using ParquetReader reader = await ParquetReader.CreateAsync(inputStream, cancellationToken: cancellationToken);

var dfcs = new List<DataFrameColumn>();
//var readableFields = reader.Schema.DataFields.Where(df => df.MaxRepetitionLevel == 0).ToList();
List<DataField> readableFields = reader.Schema.Fields
.Select(df => df as DataField)
.Where(df => df != null)
.Cast<DataField>()
.ToList();
var columns = new List<DataFrameColumn>();

for(int rowGroupIndex = 0; rowGroupIndex < reader.RowGroupCount; rowGroupIndex++) {
using ParquetRowGroupReader rgr = reader.OpenRowGroupReader(rowGroupIndex);

for(int dataFieldIndex = 0; dataFieldIndex < readableFields.Count; dataFieldIndex++) {
DataColumn dc = await rgr.ReadColumnAsync(readableFields[dataFieldIndex], cancellationToken);

if(rowGroupIndex == 0) {
dfcs.Add(DataFrameMapper.ToDataFrameColumn(dc));
} else {
DataFrameMapper.AppendValues(dfcs[dataFieldIndex], dc);
}
}
}

return new DataFrame(dfcs);
}

/// <summary>
///
/// </summary>
/// <param name="df"></param>
/// <param name="outputStream"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task WriteAsync(this DataFrame df, Stream outputStream, CancellationToken cancellationToken = default) {
// create schema
var schema = new ParquetSchema(
df.Columns.Select(col => new DataField(col.Name, col.DataType.GetNullable())));

using ParquetWriter writer = await ParquetWriter.CreateAsync(schema, outputStream, cancellationToken: cancellationToken);
using ParquetRowGroupWriter rgw = writer.CreateRowGroup();

int i = 0;
foreach(DataFrameColumn? col in df.Columns) {
if(col == null)
throw new InvalidOperationException("unexpected null column");

Array data = DataFrameMapper.GetTypedDataFast(col);
var parquetColumn = new DataColumn(schema.DataFields[i], data);

await rgw.WriteColumnAsync(parquetColumn, cancellationToken);

i += 1;
}
}

static Type GetNullable(this Type t) {
TypeInfo ti = t.GetTypeInfo();

if(ti.IsClass) {
return t;
}

Type nt = typeof(Nullable<>);
return nt.MakeGenericType(t);
}
}
}
45 changes: 45 additions & 0 deletions src/Parquet.Data.Analysis/Parquet.Data.Analysis.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net8.0;net6.0;netstandard2.1;netstandard2.0</TargetFrameworks>
<Company></Company>
<PackageId>Parquet.Net.Data.Analysis</PackageId>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors />
<DocumentationFile>bin\Debug\$(TargetFramework)\Parquet.Net.Data.Analysis.xml</DocumentationFile>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<PackageTags>apache parquet dotnet core net c# f# windows linux macos ios android xboxone xbox</PackageTags>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>

</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Parquet\Parquet.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Data.Analysis" Version="0.21.1" />
</ItemGroup>

<ItemGroup>
<Compile Update="DataFrameMapper.cs">
<DependentUpon>DataFrameMapper.tt</DependentUpon>
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
</Compile>
</ItemGroup>

<ItemGroup>
<None Update="DataFrameMapper.tt">
<LastGenOutput>DataFrameMapper.cs</LastGenOutput>
<Generator>TextTemplatingFileGenerator</Generator>
</None>
</ItemGroup>



</Project>
1 change: 1 addition & 0 deletions src/Parquet.Test/Parquet.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Parquet.Data.Analysis\Parquet.Data.Analysis.csproj" />
<ProjectReference Include="..\Parquet\Parquet.csproj" />
</ItemGroup>

Expand Down
6 changes: 6 additions & 0 deletions src/Parquet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "rn", "rn", "{EE4ECD1F-5A44-
..\docs\rn\previous.md = ..\docs\rn\previous.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Parquet.Data.Analysis", "Parquet.Data.Analysis\Parquet.Data.Analysis.csproj", "{B7D4F7F8-4E90-4BC1-8CCB-98434ABFA491}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -63,6 +65,10 @@ Global
{882BE8ED-7F5F-4392-8884-CF602622E569}.Debug|Any CPU.Build.0 = Debug|Any CPU
{882BE8ED-7F5F-4392-8884-CF602622E569}.Release|Any CPU.ActiveCfg = Release|Any CPU
{882BE8ED-7F5F-4392-8884-CF602622E569}.Release|Any CPU.Build.0 = Release|Any CPU
{B7D4F7F8-4E90-4BC1-8CCB-98434ABFA491}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B7D4F7F8-4E90-4BC1-8CCB-98434ABFA491}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7D4F7F8-4E90-4BC1-8CCB-98434ABFA491}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7D4F7F8-4E90-4BC1-8CCB-98434ABFA491}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
10 changes: 0 additions & 10 deletions src/Parquet/Parquet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

<ItemGroup>
<PackageReference Include="IronCompress" Version="1.6.3" />
<PackageReference Include="Microsoft.Data.Analysis" Version="0.21.1" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
</ItemGroup>

Expand All @@ -77,10 +76,6 @@
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
<None Update="Data\Analysis\DataFrameMapper.tt">
<Generator>TextTemplatingFileGenerator</Generator>
<LastGenOutput>DataFrameMapper.cs</LastGenOutput>
</None>
<None Update="Encodings\BitPackedEncoder.tt">
<Generator>TextTemplatingFileGenerator</Generator>
<LastGenOutput>BitPackedEncoder.cs</LastGenOutput>
Expand All @@ -104,11 +99,6 @@
</ItemGroup>

<ItemGroup>
<Compile Update="Data\Analysis\DataFrameMapper.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
<DependentUpon>DataFrameMapper.tt</DependentUpon>
</Compile>
<Compile Update="Encodings\BitPackedEncoder.Precompiled.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
Expand Down
1 change: 0 additions & 1 deletion src/Parquet/ParquetActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ML.Data;
using Parquet.Extensions;
using Parquet.File;

Expand Down
75 changes: 2 additions & 73 deletions src/Parquet/ParquetExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Data.Analysis;
using Parquet.Data;
using Parquet.Data.Analysis;
using Parquet.Schema;

namespace Parquet {
Expand All @@ -17,7 +11,7 @@ namespace Parquet {
public delegate Task TableReaderProgressCallback(int progress, string message);

/// <summary>
/// Defines extension methods to simplify Parquet usage (experimental v3)
/// Defines extension methods to simplify Parquet usage
/// </summary>
public static class ParquetExtensions {
/// <summary>
Expand Down Expand Up @@ -56,70 +50,5 @@ public static async Task WriteSingleRowGroupParquetFileAsync(
}
return (schema, columns);
}

/// <summary>
///
/// </summary>
/// <param name="inputStream"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<DataFrame> ReadParquetAsDataFrameAsync(
this Stream inputStream, CancellationToken cancellationToken = default) {
using ParquetReader reader = await ParquetReader.CreateAsync(inputStream, cancellationToken: cancellationToken);

var dfcs = new List<DataFrameColumn>();
//var readableFields = reader.Schema.DataFields.Where(df => df.MaxRepetitionLevel == 0).ToList();
List<DataField> readableFields = reader.Schema.Fields
.Select(df => df as DataField)
.Where(df => df != null)
.Cast<DataField>()
.ToList();
var columns = new List<DataFrameColumn>();

for(int rowGroupIndex = 0; rowGroupIndex < reader.RowGroupCount; rowGroupIndex++) {
using ParquetRowGroupReader rgr = reader.OpenRowGroupReader(rowGroupIndex);

for(int dataFieldIndex = 0; dataFieldIndex < readableFields.Count; dataFieldIndex++) {
DataColumn dc = await rgr.ReadColumnAsync(readableFields[dataFieldIndex], cancellationToken);

if(rowGroupIndex == 0) {
dfcs.Add(DataFrameMapper.ToDataFrameColumn(dc));
} else {
DataFrameMapper.AppendValues(dfcs[dataFieldIndex], dc);
}
}
}

return new DataFrame(dfcs);
}

/// <summary>
///
/// </summary>
/// <param name="df"></param>
/// <param name="outputStream"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task WriteAsync(this DataFrame df, Stream outputStream, CancellationToken cancellationToken = default) {
// create schema
var schema = new ParquetSchema(
df.Columns.Select(col => new DataField(col.Name, col.DataType.GetNullable())));

using ParquetWriter writer = await ParquetWriter.CreateAsync(schema, outputStream, cancellationToken: cancellationToken);
using ParquetRowGroupWriter rgw = writer.CreateRowGroup();

int i = 0;
foreach(DataFrameColumn? col in df.Columns) {
if(col == null)
throw new InvalidOperationException("unexpected null column");

Array data = DataFrameMapper.GetTypedDataFast(col);
var parquetColumn = new DataColumn(schema.DataFields[i], data);

await rgw.WriteColumnAsync(parquetColumn, cancellationToken);

i += 1;
}
}
}
}
13 changes: 4 additions & 9 deletions src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Parquet.Data;
using Parquet.Extensions;
using Parquet.Schema;
Expand Down Expand Up @@ -365,7 +364,7 @@ public static async IAsyncEnumerable<T> DeserializeAllAsync<T>(Stream source,
}

/// <summary>
/// Highly experimental
/// Untyped deserialisation result.
/// </summary>
public record UntypedAsyncEnumableResult(IAsyncEnumerable<Dictionary<string, object>> Data, ParquetSchema Schema);

Expand Down Expand Up @@ -532,12 +531,13 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rg,

// add more empty class instances to the result
int prevRowCount = result.Count;
if(!resultsAlreadyAllocated)

if(!resultsAlreadyAllocated) {
for(int i = 0; i < rg.RowCount; i++) {
var ne = new T();
result.Add(ne);
}
}

foreach(FieldAssembler<T> fasm in asm.FieldAssemblers) {

Expand All @@ -547,11 +547,6 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rg,
if(readerField == null)
continue;

// validate reflected vs actual schema field
//if(!actualField.IsArray && !fasm.Field.Equals(actualField)) {
//throw new InvalidDataException($"property '{fasm.Field.ClrPropName}' is declared as '{fasm.Field}' but source data has it as '{actualField}'");
//}

// this needs reflected schema field due to it containing important schema adjustments
DataColumn dc;

Expand Down

0 comments on commit 1b14461

Please sign in to comment.