forked from slunyakin-zz/parquet-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParquetWriter.cs
191 lines (162 loc) · 7.18 KB
/
ParquetWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
using System;
using System.IO;
using Parquet.File;
using System.Collections.Generic;
using System.Linq;
using System.Collections;
using Parquet.Data;
namespace Parquet
{
/// <summary>
/// Implements Apache Parquet format writer
/// </summary>
public class ParquetWriter : ParquetActor, IDisposable
{
private ThriftFooter _footer;
private readonly ParquetOptions _formatOptions;
private readonly WriterOptions _writerOptions;
private bool _dataWritten;
/// <summary>
/// Creates an instance of parquet writer on top of a stream
/// </summary>
/// <param name="output">Writeable, seekable stream</param>
/// <param name="formatOptions">Additional options</param>
/// <param name="writerOptions">The writer options.</param>
/// <exception cref="ArgumentNullException">Output is null.</exception>
/// <exception cref="ArgumentException">Output stream is not writeable</exception>
public ParquetWriter(Stream output, ParquetOptions formatOptions = null, WriterOptions writerOptions = null)
: base(new PositionTrackingStream(output))
{
if (output == null) throw new ArgumentNullException(nameof(output));
if (!output.CanWrite) throw new ArgumentException("stream is not writeable", nameof(output));
_formatOptions = formatOptions ?? new ParquetOptions();
_writerOptions = writerOptions ?? new WriterOptions();
}
/// <summary>
/// Write out dataset to the output stream
/// </summary>
/// <param name="dataSet">Dataset to write</param>
/// <param name="compression">Compression method</param>
/// <param name="append">When true, appends to the file, otherwise creates a new file.</param>
public void Write(DataSet dataSet, CompressionMethod compression = CompressionMethod.Gzip, bool append = false)
{
PrepareFile(dataSet, append);
_footer.CustomMetadata = dataSet.Metadata.Custom;
int offset = 0;
int count;
List<Thrift.SchemaElement> writeableSchema = _footer.GetWriteableSchema().ToList();
do
{
count = Math.Min(_writerOptions.RowGroupsSize, dataSet.Count - offset);
Thrift.RowGroup rg = _footer.AddRowGroup();
long rgStartPos = Stream.Position;
rg.Columns = new List<Thrift.ColumnChunk>();
foreach(Thrift.SchemaElement tse in writeableSchema)
{
List<string> path = _footer.GetPath(tse);
string flatPath = string.Join(Schema.PathSeparator, path);
var cw = new ColumnarWriter(Stream, ThriftStream, _footer, tse, path, compression, _formatOptions, _writerOptions);
IList values = dataSet.GetColumn(flatPath, offset, count);
Thrift.ColumnChunk chunk = cw.Write(offset, count, values);
rg.Columns.Add(chunk);
}
//row group's size is a sum of _uncompressed_ sizes of all columns in it, including the headers
//luckily ColumnChunk already contains sizes of page+header in it's meta
rg.Total_byte_size = rg.Columns.Sum(c => c.Meta_data.Total_compressed_size);
rg.Num_rows = count;
offset += _writerOptions.RowGroupsSize;
}
while (offset < dataSet.Count);
_dataWritten = true;
}
private void PrepareFile(DataSet ds, bool append)
{
if (append)
{
if (!Stream.CanSeek) throw new IOException("destination stream must be seekable for append operations.");
ValidateFile();
Thrift.FileMetaData fileMeta = ReadMetadata();
_footer = new ThriftFooter(fileMeta);
ValidateSchemasCompatible(_footer, ds);
GoBeforeFooter();
}
else
{
if (_footer == null)
{
_footer = new ThriftFooter(ds.Schema, ds.RowCount);
//file starts with magic
WriteMagic();
}
else
{
ValidateSchemasCompatible(_footer, ds);
_footer.Add(ds.RowCount);
}
}
}
private void ValidateSchemasCompatible(ThriftFooter footer, DataSet ds)
{
Schema existingSchema = footer.CreateModelSchema(_formatOptions);
if (!ds.Schema.Equals(existingSchema))
{
string reason = ds.Schema.GetNotEqualsMessage(existingSchema, "appending", "existing");
throw new ParquetException($"{nameof(DataSet)} schema does not match existing file schema, reason: {reason}");
}
}
/// <summary>
/// Writes <see cref="DataSet"/> to a target stream
/// </summary>
/// <param name="dataSet"><see cref="DataSet"/> to write</param>
/// <param name="destination">Destination stream</param>
/// <param name="compression">Compression method</param>
/// <param name="formatOptions">Parquet options, optional.</param>
/// <param name="writerOptions">Writer options, optional.</param>
/// <param name="append">When true, assumes that this stream contains existing file and appends data to it, otherwise writes a new Parquet file.</param>
public static void Write(DataSet dataSet, Stream destination, CompressionMethod compression = CompressionMethod.Gzip, ParquetOptions formatOptions = null, WriterOptions writerOptions = null, bool append = false)
{
using (var writer = new ParquetWriter(destination, formatOptions, writerOptions))
{
writer.Write(dataSet, compression, append);
}
}
/// <summary>
/// Writes <see cref="DataSet"/> to a target file
/// </summary>
/// <param name="dataSet"><see cref="DataSet"/> to write</param>
/// <param name="fileName">Path to a file to write to.</param>
/// <param name="compression">Compression method</param>
/// <param name="formatOptions">Parquet options, optional.</param>
/// <param name="writerOptions">Writer options, optional.</param>
/// <param name="append">When true, assumes that this stream contains existing file and appends data to it, otherwise writes a new Parquet file.</param>
public static void WriteFile(DataSet dataSet, string fileName, CompressionMethod compression = CompressionMethod.Gzip, ParquetOptions formatOptions = null, WriterOptions writerOptions = null, bool append = false)
{
using (Stream fs = System.IO.File.Create(fileName))
{
using (var writer = new ParquetWriter(fs, formatOptions, writerOptions))
{
writer.Write(dataSet, compression);
}
}
}
private void WriteMagic()
{
Stream.Write(MagicBytes, 0, MagicBytes.Length);
}
/// <summary>
/// Finalizes file, writes metadata and footer
/// </summary>
public void Dispose()
{
if (!_dataWritten) return;
//finalize file
long size = _footer.Write(ThriftStream);
//metadata size
Writer.Write((int)size); //4 bytes
//end magic
WriteMagic(); //4 bytes
Writer.Flush();
Stream.Flush();
}
}
}