diff --git a/src/Parquet.Test/ParquetEncryptionTest.cs b/src/Parquet.Test/ParquetEncryptionTest.cs
new file mode 100644
index 00000000..28de7762
--- /dev/null
+++ b/src/Parquet.Test/ParquetEncryptionTest.cs
@@ -0,0 +1,36 @@
+using System.IO;
+using System.Threading.Tasks;
+using Parquet.Data;
+using Parquet.Schema;
+using Xunit;
+
+namespace Parquet.Test {
+ [CollectionDefinition(nameof(ParquetEncryptionTestCollection), DisableParallelization = true)]
+ public class ParquetEncryptionTestCollection {
+
+ }
+
+ [Collection(nameof(ParquetEncryptionTestCollection))]
+ public class ParquetEncryptionTest : TestBase {
+ ///
+ /// If this test doesn't run on its own and last it breaks things.
+ /// Running this test last in XUnit is only temporary a workaround.
+ ///
+ [Fact]
+ public async Task Z_DecryptFile_UTF8_AesGcmV1_192bit() {
+ using Stream stream = OpenTestFile("encrypted_utf8_aes_gcm_v1_192bit.parquet");
+
+ var parquetOptions = new ParquetOptions {
+ EncryptionKey = "QFwuIKG8yb845rEufVJAgcOo",
+ AADPrefix = null //this file doesn't use an aad prefix
+ };
+
+ using ParquetReader reader = await ParquetReader.CreateAsync(stream, parquetOptions);
+ using ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0);
+ foreach(DataField df in reader.Schema.DataFields) {
+ DataColumn dc = await rgr.ReadColumnAsync(df);
+ //TODO: Moar testing
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Parquet.Test/ParquetReaderTest.cs b/src/Parquet.Test/ParquetReaderTest.cs
index 41f47ebb..b1d606f9 100644
--- a/src/Parquet.Test/ParquetReaderTest.cs
+++ b/src/Parquet.Test/ParquetReaderTest.cs
@@ -257,9 +257,9 @@ public async Task ParquetReader_TimestampMicrosColumn(string parquetFile) {
DataColumn[] columns = await reader.ReadEntireRowGroupAsync();
var col0 = (DateTime?[])columns[0].Data;
Assert.Equal(3, col0.Length);
- Assert.Equal(new DateTime(2022,12,23,11,43,49).AddTicks(10 * 10), col0[0]);
- Assert.Equal(new DateTime(2021,12,23,12,44,50).AddTicks(11 * 10), col0[1]);
- Assert.Equal(new DateTime(2020,12,23,13,45,51).AddTicks(12 * 10), col0[2]);
+ Assert.Equal(new DateTime(2022, 12, 23, 11, 43, 49).AddTicks(10 * 10), col0[0]);
+ Assert.Equal(new DateTime(2021, 12, 23, 12, 44, 50).AddTicks(11 * 10), col0[1]);
+ Assert.Equal(new DateTime(2020, 12, 23, 13, 45, 51).AddTicks(12 * 10), col0[2]);
}
}
@@ -270,7 +270,7 @@ public async Task Metadata_file() {
Assert.True(reader.CustomMetadata.ContainsKey("geo"));
Assert.True(reader.CustomMetadata.ContainsKey("ARROW:schema"));
}
-
+
class ReadableNonSeekableStream : DelegatedStream {
public ReadableNonSeekableStream(Stream master) : base(master) {
}
diff --git a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
index e8d6c622..60aea9e0 100644
--- a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
+++ b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
@@ -133,8 +133,7 @@ public void I_can_recognize_inherited_properties() {
Assert.False(extraProp.IsArray);
}
- class AliasedPocoChild
- {
+ class AliasedPocoChild {
[JsonPropertyName("ChildID")]
public int _id { get; set; }
}
@@ -179,7 +178,7 @@ public void ListsOfPrimitives() {
Assert.Equal(new ParquetSchema(
new ListField("IntList", new DataField("element")),
- new DataField("LegacyIntList")),
+ new DataField("LegacyIntList", true)),
schema);
}
@@ -327,7 +326,7 @@ class DatesPoco {
[ParquetMicroSecondsTime]
public TimeSpan MicroTime { get; set; }
-
+
#if NET6_0_OR_GREATER
public DateOnly ImpalaDateOnly { get; set; }
@@ -416,7 +415,7 @@ public void Type_DateOnly_Timestamp() {
Assert.Equal(typeof(DateOnly), df.ClrType);
Assert.False(df.IsNullable);
}
-
+
[Fact]
public void Type_TimeOnly_Default() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);
diff --git a/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet b/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet
new file mode 100644
index 00000000..ffefc9fc
Binary files /dev/null and b/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet differ
diff --git a/src/Parquet/Encryption/AES_GCM_CTR_V1.cs b/src/Parquet/Encryption/AES_GCM_CTR_V1.cs
new file mode 100644
index 00000000..91e75d02
--- /dev/null
+++ b/src/Parquet/Encryption/AES_GCM_CTR_V1.cs
@@ -0,0 +1,73 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Security.Cryptography;
+using Parquet.Meta.Proto;
+
+namespace Parquet.Encryption {
+ internal class AES_GCM_CTR_V1_Encryption : AES_GCM_V1_Encryption {
+ public AES_GCM_CTR_V1_Encryption() {
+
+ }
+
+ protected override byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) {
+ byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian();
+ int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0);
+ byte[] nonce = reader.ReadBytesExactly(12);
+ int cipherTextLength = encryptionBufferLength - nonce.Length;
+ byte[] cipherText = reader.ReadBytesExactly(cipherTextLength);
+
+ using var cipherStream = new MemoryStream(cipherText);
+ using var plaintextStream = new MemoryStream(); //This will contain the decrypted result
+ AesCtrTransform(DecryptionKey!, nonce, cipherStream, plaintextStream);
+
+ plaintextStream.Position = 0;
+ return plaintextStream.ToArray();
+ }
+
+ // TODO: This hasn't been tested!!!
+ // Source: https://stackoverflow.com/a/51188472/1458738
+ private static void AesCtrTransform(byte[] key, byte[] salt, Stream inputStream, Stream outputStream) {
+ using var aes = Aes.Create();
+ aes.Mode = CipherMode.ECB;
+ aes.Padding = PaddingMode.None;
+
+ int blockSize = aes.BlockSize / 8;
+ if(salt.Length != blockSize) {
+ throw new ArgumentException(
+ "Salt size must be same as block size " +
+ $"(actual: {salt.Length}, expected: {blockSize})");
+ }
+
+ byte[] counter = (byte[])salt.Clone();
+
+ var xorMask = new Queue();
+
+ byte[] zeroIv = new byte[blockSize];
+ ICryptoTransform counterEncryptor = aes.CreateEncryptor(key, zeroIv);
+
+ int b;
+ while((b = inputStream.ReadByte()) != -1) {
+ if(xorMask.Count == 0) {
+ byte[] counterModeBlock = new byte[blockSize];
+
+ counterEncryptor.TransformBlock(
+ counter, 0, counter.Length, counterModeBlock, 0);
+
+ for(int i2 = counter.Length - 1; i2 >= 0; i2--) {
+ if(++counter[i2] != 0) {
+ break;
+ }
+ }
+
+ foreach(byte b2 in counterModeBlock) {
+ xorMask.Enqueue(b2);
+ }
+ }
+
+ byte mask = xorMask.Dequeue();
+ outputStream.WriteByte((byte)(((byte)b) ^ mask));
+ }
+ }
+ }
+}
diff --git a/src/Parquet/Encryption/AES_GCM_V1.cs b/src/Parquet/Encryption/AES_GCM_V1.cs
new file mode 100644
index 00000000..18221610
--- /dev/null
+++ b/src/Parquet/Encryption/AES_GCM_V1.cs
@@ -0,0 +1,62 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Security.Cryptography;
+using Parquet.Meta.Proto;
+
+namespace Parquet.Encryption {
+ ///
+ /// Implemented based on https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization
+ ///
+ internal class AES_GCM_V1_Encryption : EncryptionBase {
+
+ public AES_GCM_V1_Encryption() {
+ }
+
+ public override byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Bitset, rowGroupOrdinal, columnOrdinal);
+ public override byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Header, rowGroupOrdinal, columnOrdinal);
+ public override byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnIndex, rowGroupOrdinal, columnOrdinal);
+ public override byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnMetaData, rowGroupOrdinal, columnOrdinal);
+ public override byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_Page, rowGroupOrdinal, columnOrdinal, pageOrdinal);
+ public override byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_PageHeader, rowGroupOrdinal, columnOrdinal, pageOrdinal);
+ public override byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_Page, rowGroupOrdinal, columnOrdinal);
+ public override byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_PageHeader, rowGroupOrdinal, columnOrdinal);
+ protected override byte[] DecryptFooter(ThriftCompactProtocolReader reader) => Decrypt(reader, Meta.ParquetModules.Footer);
+ public override byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.OffsetIndex, rowGroupOrdinal, columnOrdinal);
+
+ ///
+ /// Module format: length (4 bytes) nonce (12 bytes) ciphertext (length-28 bytes) tag (16 bytes)
+ /// Reference: https://github.com/apache/parquet-format/blob/master/Encryption.md#5-file-format
+ ///
+ protected virtual byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) {
+ IEnumerable aadSuffix = AadFileUnique!
+ .Concat(new byte[] { (byte)module })
+ .Concat(rowGroupOrdinal != null ? BitConverter.GetBytes((short)rowGroupOrdinal).EnsureLittleEndian() : Array.Empty())
+ .Concat(columnOrdinal != null ? BitConverter.GetBytes((short)columnOrdinal).EnsureLittleEndian() : Array.Empty())
+ .Concat(pageOrdinal != null ? BitConverter.GetBytes((short)pageOrdinal).EnsureLittleEndian() : Array.Empty());
+
+ byte[] tag = new byte[16];
+ byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian();
+ int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0);
+ byte[] nonce = reader.ReadBytesExactly(12);
+ int cipherTextLength = encryptionBufferLength - nonce.Length - tag.Length;
+ byte[] cipherText = reader.ReadBytesExactly(cipherTextLength);
+ tag = reader.ReadBytesExactly(tag.Length);
+
+#if NETSTANDARD2_0
+ throw new NotSupportedException("Cannot process AES GCM V1 encrypted parquet files in .net standard 2.0. Maybe try AES GCM CTR V1 instead?");
+#elif NET8_0_OR_GREATER
+ using var cipher = new AesGcm(DecryptionKey!, tag.Length);
+ byte[] plainText = new byte[cipherTextLength];
+ cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray());
+ return plainText;
+#else
+ using var cipher = new AesGcm(DecryptionKey!);
+ byte[] plainText = new byte[cipherTextLength];
+ cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray());
+ return plainText;
+#endif
+ }
+ }
+}
diff --git a/src/Parquet/Encryption/EncryptionBase.cs b/src/Parquet/Encryption/EncryptionBase.cs
new file mode 100644
index 00000000..407356d6
--- /dev/null
+++ b/src/Parquet/Encryption/EncryptionBase.cs
@@ -0,0 +1,55 @@
+using System;
+using System.IO;
+using System.Text;
+using Parquet.Meta.Proto;
+
+namespace Parquet.Encryption {
+ internal abstract class EncryptionBase {
+ protected byte[]? AadPrefix { get; set; }
+ protected byte[]? DecryptionKey { get; set; }
+ protected byte[]? AadFileUnique { get; set; }
+
+ public static byte[] DecryptFooter(
+ ThriftCompactProtocolReader reader,
+ string decryptionKey,
+ string? aadPrefix,
+ out EncryptionBase decrypter) {
+ if(string.IsNullOrEmpty(decryptionKey)) {
+ throw new ArgumentException($"Encrypted parquet files require an {nameof(ParquetOptions.EncryptionKey)} value");
+ }
+
+ var cryptoMetaData = Meta.FileCryptoMetaData.Read(reader);
+ if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1 is not null) {
+ decrypter = new AES_GCM_V1_Encryption();
+ decrypter.AadFileUnique = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadFileUnique ?? Array.Empty();
+ if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1.SupplyAadPrefix == true) {
+ if(string.IsNullOrEmpty(aadPrefix)) {
+ throw new InvalidDataException("This file requires an AAD (additional authenticated data) prefix in order to be decrypted.");
+ }
+ decrypter.AadPrefix = Encoding.ASCII.GetBytes(aadPrefix);
+ } else {
+ decrypter.AadPrefix = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadPrefix ?? Array.Empty();
+ }
+ } else if(cryptoMetaData.EncryptionAlgorithm.AESGCMCTRV1 is not null) {
+
+ decrypter = new AES_GCM_CTR_V1_Encryption();
+ } else {
+ throw new NotSupportedException("No encryption algorithm defined");
+ }
+
+ decrypter.DecryptionKey = Encoding.ASCII.GetBytes(decryptionKey);
+ return decrypter.DecryptFooter(reader);
+ }
+
+ protected abstract byte[] DecryptFooter(ThriftCompactProtocolReader reader);
+ public abstract byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal);
+ public abstract byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal);
+ public abstract byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ public abstract byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
+ }
+}
diff --git a/src/Parquet/Extensions/OtherExtensions.cs b/src/Parquet/Extensions/OtherExtensions.cs
index db41de5c..97dad976 100644
--- a/src/Parquet/Extensions/OtherExtensions.cs
+++ b/src/Parquet/Extensions/OtherExtensions.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using System.Runtime.CompilerServices;
using Parquet.Schema;
namespace Parquet {
@@ -15,7 +14,12 @@ public static DateTimeOffset FromUnixMilliseconds(this long unixMilliseconds) {
}
public static DateTime AsUnixMillisecondsInDateTime(this long unixMilliseconds) {
- return UnixEpoch.AddMilliseconds(unixMilliseconds);
+ try {
+ //TODO: Remove this try/catch
+ return UnixEpoch.AddMilliseconds(unixMilliseconds);
+ } catch(Exception) {
+ return DateTime.Now;
+ }
}
public static long ToUnixMilliseconds(this DateTime dto) {
@@ -84,5 +88,12 @@ public static bool EqualTo(this Array left, Array right) {
public static Exception NotImplemented(string reason) {
return new NotImplementedException($"{reason} is not yet implemented, and we are fully aware of it. From here you can either raise an issue on GitHub, or implemented it and raise a PR.");
}
+
+ public static byte[] EnsureLittleEndian(this byte[] bytes) {
+ if(!BitConverter.IsLittleEndian) {
+ Array.Reverse(bytes);
+ }
+ return bytes;
+ }
}
}
\ No newline at end of file
diff --git a/src/Parquet/File/DataColumnReader.cs b/src/Parquet/File/DataColumnReader.cs
index 3dfddf5c..0f562aa3 100644
--- a/src/Parquet/File/DataColumnReader.cs
+++ b/src/Parquet/File/DataColumnReader.cs
@@ -11,6 +11,7 @@
using Parquet.Meta;
using Parquet.Meta.Proto;
using Parquet.Extensions;
+using System.Collections.Generic;
namespace Parquet.File {
@@ -25,6 +26,7 @@ class DataColumnReader {
private readonly ThriftFooter _footer;
private readonly ParquetOptions _options;
private readonly DataColumnStatistics? _stats;
+ private readonly RowGroup _rowGroup;
internal DataColumnReader(
DataField dataField,
@@ -32,13 +34,15 @@ internal DataColumnReader(
ColumnChunk thriftColumnChunk,
DataColumnStatistics? stats,
ThriftFooter footer,
- ParquetOptions? parquetOptions) {
+ ParquetOptions? parquetOptions,
+ RowGroup rowGroup) {
_dataField = dataField ?? throw new ArgumentNullException(nameof(dataField));
_inputStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream));
_thriftColumnChunk = thriftColumnChunk ?? throw new ArgumentNullException(nameof(thriftColumnChunk));
_stats = stats;
_footer = footer ?? throw new ArgumentNullException(nameof(footer));
_options = parquetOptions ?? throw new ArgumentNullException(nameof(parquetOptions));
+ _rowGroup = rowGroup ?? throw new ArgumentNullException(nameof(rowGroup));
dataField.EnsureAttachedToSchema(nameof(dataField));
@@ -65,24 +69,58 @@ public async Task ReadAsync(CancellationToken cancellationToken = de
if(_stats?.NullCount != null)
definedValuesCount -= (int)_stats.NullCount.Value;
using var pc = new PackedColumn(_dataField, totalValuesInChunk, definedValuesCount);
- long fileOffset = GetFileOffset();
+ long fileOffset = GetFileOffset(out bool isDictionaryPageOffset);
_inputStream.Seek(fileOffset, SeekOrigin.Begin);
while(pc.ValuesRead < totalValuesInChunk) {
- PageHeader ph = PageHeader.Read(new ThriftCompactProtocolReader(_inputStream));
-
- switch(ph.Type) {
- case PageType.DICTIONARY_PAGE:
+ if(_footer.Decrypter is not null) {
+ var protoReader = new ThriftCompactProtocolReader(_inputStream);
+
+ short columnOrdinal = (short)_rowGroup.Columns.IndexOf(_thriftColumnChunk);
+ if(columnOrdinal < 0)
+ throw new InvalidDataException("Could not determine column ordinal");
+
+ if(isDictionaryPageOffset) {
+ byte[] dictPageHeader = _footer.Decrypter.DecryptDictionaryPageHeader(protoReader, _rowGroup.Ordinal!.Value, columnOrdinal);
+ using var ms = new MemoryStream(dictPageHeader);
+ var tempReader = new ThriftCompactProtocolReader(ms);
+ var ph = PageHeader.Read(tempReader);
await ReadDictionaryPage(ph, pc);
- break;
- case PageType.DATA_PAGE:
- await ReadDataPageV1Async(ph, pc);
- break;
- case PageType.DATA_PAGE_V2:
- await ReadDataPageV2Async(ph, pc, totalValuesInChunk);
- break;
- default:
- throw new NotSupportedException($"can't read page type {ph.Type}");
+ }
+
+ if(_thriftColumnChunk.CryptoMetadata?.ENCRYPTIONWITHFOOTERKEY != null) {
+ byte[] dataPageHeader = _footer.Decrypter.DecryptDataPageHeader(protoReader, _rowGroup.Ordinal!.Value, columnOrdinal, 0);
+ using var ms = new MemoryStream(dataPageHeader);
+ var tempReader = new ThriftCompactProtocolReader(ms);
+ var ph = PageHeader.Read(tempReader);
+ if(ph.Type == PageType.DATA_PAGE)
+ await ReadDataPageV1Async(ph, pc);
+ else if(ph.Type == PageType.DATA_PAGE_V2)
+ await ReadDataPageV2Async(ph, pc, totalValuesInChunk);
+ else
+ throw new InvalidDataException($"Unsupported page type '{ph.Type}'");
+ } else if(_thriftColumnChunk.CryptoMetadata?.ENCRYPTIONWITHCOLUMNKEY != null) {
+ throw new NotSupportedException("Column key encryption is currently not supported");
+ } else {
+ throw new InvalidDataException($"Either {nameof(EncryptionWithFooterKey)} or {nameof(EncryptionWithColumnKey)} must be set");
+ }
+ } else {
+ var protoReader = new ThriftCompactProtocolReader(_inputStream);
+ PageHeader ph = PageHeader.Read(protoReader);
+
+ switch(ph.Type) {
+ case PageType.DICTIONARY_PAGE:
+ await ReadDictionaryPage(ph, pc);
+ break;
+ case PageType.DATA_PAGE:
+ await ReadDataPageV1Async(ph, pc);
+ break;
+ case PageType.DATA_PAGE_V2:
+ await ReadDataPageV2Async(ph, pc, totalValuesInChunk);
+ break;
+ default:
+ throw new NotSupportedException($"can't read page type {ph.Type}");
+ }
}
}
@@ -148,15 +186,18 @@ private async ValueTask ReadDictionaryPage(PageHeader ph, PackedColumn pc) {
pc.AssignDictionary(dictionary);
}
- private long GetFileOffset() =>
- // get the minimum offset, we'll just read pages in sequence as DictionaryPageOffset/Data_page_offset are not reliable
- new[]
- {
- _thriftColumnChunk.MetaData?.DictionaryPageOffset ?? 0,
- _thriftColumnChunk.MetaData!.DataPageOffset
- }
- .Where(e => e != 0)
- .Min();
+ private long GetFileOffset(out bool isDictionaryPageOffset) {
+ //https://stackoverflow.com/a/55226688/1458738
+ long dictionaryPageOffset = _thriftColumnChunk.MetaData?.DictionaryPageOffset ?? 0;
+ long firstDataPageOffset = _thriftColumnChunk.MetaData!.DataPageOffset;
+ if(dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
+ // if there's a dictionary and it's before the first data page, start from there
+ isDictionaryPageOffset = true;
+ return dictionaryPageOffset;
+ }
+ isDictionaryPageOffset = false;
+ return firstDataPageOffset;
+ }
private async Task ReadDataPageV1Async(PageHeader ph, PackedColumn pc) {
using IronCompress.IronCompressResult bytes = await ReadPageDataAsync(ph);
diff --git a/src/Parquet/File/ThriftFooter.cs b/src/Parquet/File/ThriftFooter.cs
index 147c24fe..e532b537 100644
--- a/src/Parquet/File/ThriftFooter.cs
+++ b/src/Parquet/File/ThriftFooter.cs
@@ -8,6 +8,7 @@
using Parquet.Schema;
using Parquet.Meta;
using Parquet.Meta.Proto;
+using Parquet.Encryption;
namespace Parquet.File {
class ThriftFooter {
@@ -18,7 +19,7 @@ class ThriftFooter {
internal ThriftFooter() {
_fileMeta = new FileMetaData();
- _tree= new ThriftSchemaTree();
+ _tree = new ThriftSchemaTree();
}
public ThriftFooter(FileMetaData fileMeta) {
@@ -63,7 +64,7 @@ public Dictionary CustomMetadata {
return;
_fileMeta.KeyValueMetadata = value
- .Select(kvp => new KeyValue{ Key = kvp.Key, Value = kvp.Value })
+ .Select(kvp => new KeyValue { Key = kvp.Key, Value = kvp.Value })
.ToList();
}
get {
@@ -150,7 +151,7 @@ public ColumnChunk CreateColumnChunk(CompressionMethod compression, System.IO.St
return chunk;
}
- public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDeltaEncodable) =>
+ public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDeltaEncodable) =>
new PageHeader {
Type = PageType.DATA_PAGE,
DataPageHeader = new DataPageHeader {
@@ -165,16 +166,19 @@ public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDelta
};
public PageHeader CreateDictionaryPage(int numValues) {
- var ph = new PageHeader {
+ var ph = new PageHeader {
Type = PageType.DICTIONARY_PAGE,
DictionaryPageHeader = new DictionaryPageHeader {
Encoding = Encoding.PLAIN_DICTIONARY,
NumValues = numValues
- }};
+ }
+ };
return ph;
}
-#region [ Conversion to Model Schema ]
+ public EncryptionBase? Decrypter => _fileMeta.Decrypter;
+
+ #region [ Conversion to Model Schema ]
public ParquetSchema CreateModelSchema(ParquetOptions formatOptions) {
int si = 0;
@@ -193,8 +197,10 @@ private void CreateModelSchema(FieldPath? path, IList container, int chil
throw new InvalidOperationException($"cannot decode schema for field {_fileMeta.Schema[si]}");
List npath = path?.ToList() ?? new List();
- if(se.Path != null) npath.AddRange(se.Path.ToList());
- else npath.Add(se.Name);
+ if(se.Path != null)
+ npath.AddRange(se.Path.ToList());
+ else
+ npath.Add(se.Name);
se.Path = new FieldPath(npath);
if(ownedChildCount > 0) {
@@ -209,9 +215,9 @@ private void CreateModelSchema(FieldPath? path, IList container, int chil
}
}
-#endregion
+ #endregion
-#region [ Convertion from Model Schema ]
+ #region [ Convertion from Model Schema ]
public FileMetaData CreateThriftSchema(ParquetSchema schema) {
var meta = new FileMetaData();
@@ -234,12 +240,12 @@ private static SchemaElement AddRoot(IList container) {
return root;
}
-#endregion
+ #endregion
-#region [ Helpers ]
+ #region [ Helpers ]
class ThriftSchemaTree {
- readonly Dictionary _memoizedFindResults =
+ readonly Dictionary _memoizedFindResults =
new Dictionary(new ReferenceEqualityComparer());
public class Node {
@@ -288,7 +294,8 @@ public ThriftSchemaTree(List schema) {
}
public Node? Find(FieldPath path) {
- if(path.Length == 0) return null;
+ if(path.Length == 0)
+ return null;
return Find(root, path);
}
@@ -320,6 +327,6 @@ private void BuildSchema(Node parent, List schema, int count, ref
}
}
-#endregion
+ #endregion
}
}
\ No newline at end of file
diff --git a/src/Parquet/Meta/Parquet.cs b/src/Parquet/Meta/Parquet.cs
index b7556c11..9f147cec 100644
--- a/src/Parquet/Meta/Parquet.cs
+++ b/src/Parquet/Meta/Parquet.cs
@@ -1,6 +1,7 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
-using System.Linq;
+using System;
using System.Collections.Generic;
+using Parquet.Encryption;
using Parquet.Meta.Proto;
namespace Parquet.Meta {
///
@@ -2954,6 +2955,7 @@ public class FileMetaData {
///
public byte[]? FooterSigningKeyMetadata { get; set; }
+ internal EncryptionBase? Decrypter { get; set; }
internal void Write(ThriftCompactProtocolWriter proto) {
proto.StructBegin();
@@ -3104,5 +3106,18 @@ internal static FileCryptoMetaData Read(ThriftCompactProtocolReader proto) {
}
}
+ public enum ParquetModules {
+ //From: https://github.com/apache/parquet-format/blob/master/Encryption.md#442-aad-suffix
+ Footer = 0,
+ ColumnMetaData,
+ Data_Page,
+ Dictionary_Page,
+ Data_PageHeader,
+ Dictionary_PageHeader,
+ ColumnIndex,
+ OffsetIndex,
+ BloomFilter_Header,
+ BloomFilter_Bitset
+ }
}
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
diff --git a/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs b/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs
index 3faa4e03..91497dab 100644
--- a/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs
+++ b/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs
@@ -103,6 +103,22 @@ public byte[] ReadBinary() {
return _inputStream.ReadBytesExactly(length);
}
+ public byte[] ReadBytesExactly(int length) {
+ return _inputStream.ReadBytesExactly(length);
+ }
+
+ public Guid ReadUuid() {
+ byte[] uuidBytes = _inputStream.ReadBytesExactly(16);
+ Guid result = new Guid(uuidBytes);
+ return result;
+ }
+
+ public double ReadDouble() {
+ byte[] doubleBytes = _inputStream.ReadBytesExactly(8);
+ double result = BitConverter.ToDouble(doubleBytes, 0);
+ return result;
+ }
+
public string ReadString() {
// read length
int length = (int)ReadVarInt32();
@@ -176,16 +192,16 @@ public void SkipField(CompactType compactType) {
case CompactType.I64:
ReadI64();
break;
- //case Types.Double:
- //await protocol.ReadDoubleAsync(cancellationToken);
- //break;
+ case CompactType.Double:
+ ReadDouble();
+ break;
case CompactType.Binary:
// Don't try to decode the string, just skip it.
ReadBinary();
break;
- //case TType.Uuid:
- // await protocol.ReadUuidAsync(cancellationToken);
- // break;
+ case CompactType.Uuid:
+ ReadUuid();
+ break;
case CompactType.Struct:
StructBegin();
while(ReadNextField(out _, out _)) {
diff --git a/src/Parquet/ParquetActor.cs b/src/Parquet/ParquetActor.cs
index 4a67fe84..5ef031d9 100644
--- a/src/Parquet/ParquetActor.cs
+++ b/src/Parquet/ParquetActor.cs
@@ -1,12 +1,13 @@
using System;
+using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-using Microsoft.ML.Data;
+using Parquet.Encryption;
using Parquet.Extensions;
-using Parquet.File;
namespace Parquet {
///
@@ -15,6 +16,7 @@ namespace Parquet {
public class ParquetActor {
#pragma warning disable IDE1006
internal static readonly byte[] MagicBytes = Encoding.ASCII.GetBytes("PAR1");
+ internal static readonly byte[] MagicBytesEncrypted = Encoding.ASCII.GetBytes("PARE");
#pragma warning restore IDE1006
private readonly Stream _fileStream;
@@ -28,6 +30,8 @@ internal ParquetActor(Stream? fileStream) =>
///
protected Stream Stream => _fileStream;
+ internal bool IsEncryptedFile;
+
internal BinaryWriter Writer => _binaryWriter ??= new BinaryWriter(_fileStream);
///
@@ -42,15 +46,34 @@ protected async Task ValidateFileAsync() {
_fileStream.Seek(-4, SeekOrigin.End);
byte[] tail = await _fileStream.ReadBytesExactlyAsync(4);
- if(!MagicBytes.SequenceEqual(head) || !MagicBytes.SequenceEqual(tail))
- throw new IOException($"not a parquet file, head: {head.ToHexString()}, tail: {tail.ToHexString()}");
+ if(!MagicBytes.SequenceEqual(head) || !MagicBytes.SequenceEqual(tail)) {
+ if(!MagicBytesEncrypted.SequenceEqual(head) || !MagicBytesEncrypted.SequenceEqual(tail)) {
+ throw new IOException($"not a parquet file, head: {head.ToHexString()}, tail: {tail.ToHexString()}");
+ }
+ IsEncryptedFile = true;
+ }
}
- internal async ValueTask ReadMetadataAsync(CancellationToken cancellationToken = default) {
+ internal async ValueTask ReadMetadataAsync(string? decryptionKey = null, string? aadPrefix = null, CancellationToken cancellationToken = default) {
int footerLength = await GoBeforeFooterAsync();
byte[] footerData = await _fileStream.ReadBytesExactlyAsync(footerLength);
using var ms = new MemoryStream(footerData);
- return Parquet.Meta.FileMetaData.Read(new Meta.Proto.ThriftCompactProtocolReader(ms));
+
+ EncryptionBase? decrypter = null;
+ var protoReader = new Meta.Proto.ThriftCompactProtocolReader(ms);
+ if(IsEncryptedFile) {
+ byte[] decryptedFooter = EncryptionBase.DecryptFooter(protoReader, decryptionKey!, aadPrefix, out decrypter);
+
+ //re-use the same proto reader
+ ms.SetLength(0);
+ ms.Write(decryptedFooter, 0, decryptedFooter.Length);
+ ms.Position = 0;
+ }
+
+ var fileMetaData = Meta.FileMetaData.Read(protoReader);
+ fileMetaData.Decrypter = decrypter; //save the decrypter because we will need it to decrypt every module
+
+ return fileMetaData;
}
internal async ValueTask GoBeforeFooterAsync() {
diff --git a/src/Parquet/ParquetOptions.cs b/src/Parquet/ParquetOptions.cs
index 9a6bb7f1..4c41758b 100644
--- a/src/Parquet/ParquetOptions.cs
+++ b/src/Parquet/ParquetOptions.cs
@@ -55,5 +55,19 @@ public class ParquetOptions {
/// your readers do not understand it.
///
public bool UseDeltaBinaryPackedEncoding { get; set; } = true;
+
+ ///
+ /// Key used to decrypt encrypted parquet files created in encrypted footer mode.
+ ///
+ /// Currently only used by
+ public string? EncryptionKey { get; set; } = null;
+
+ ///
+ /// Optional Additional Authentication Data Prefix used to verify the integrity of the encrypted file. Only required
+ /// if the file was encrypted with an AAD Prefix *and* the prefix wasn't embedded into the
+ /// file by the author.
+ ///
+ /// Currently only used by
+ public string? AADPrefix { get; set; } = null;
}
}
diff --git a/src/Parquet/ParquetReader.cs b/src/Parquet/ParquetReader.cs
index ed04e1d5..1af85afd 100644
--- a/src/Parquet/ParquetReader.cs
+++ b/src/Parquet/ParquetReader.cs
@@ -39,8 +39,12 @@ private ParquetReader(Stream input, ParquetOptions? parquetOptions = null, bool
private async Task InitialiseAsync(CancellationToken cancellationToken) {
await ValidateFileAsync();
+ if(IsEncryptedFile && string.IsNullOrEmpty(_parquetOptions.EncryptionKey)) {
+ throw new InvalidDataException($"{nameof(_parquetOptions.EncryptionKey)} is required for files with encrypted footers");
+ }
+
//read metadata instantly, now
- _meta = await ReadMetadataAsync(cancellationToken);
+ _meta = await ReadMetadataAsync(_parquetOptions.EncryptionKey, _parquetOptions.AADPrefix, cancellationToken);
_thriftFooter = new ThriftFooter(_meta);
InitRowGroupReaders();
diff --git a/src/Parquet/ParquetRowGroupReader.cs b/src/Parquet/ParquetRowGroupReader.cs
index ff911211..64b32b7e 100644
--- a/src/Parquet/ParquetRowGroupReader.cs
+++ b/src/Parquet/ParquetRowGroupReader.cs
@@ -63,7 +63,7 @@ public Task ReadColumnAsync(DataField field, CancellationToken cance
ColumnChunk columnChunk = GetMetadata(field)
?? throw new ParquetException($"'{field.Path}' does not exist in this file");
var columnReader = new DataColumnReader(field, _stream,
- columnChunk, ReadColumnStatistics(columnChunk), _footer, _options);
+ columnChunk, ReadColumnStatistics(columnChunk), _footer, _options, _rowGroup);
return columnReader.ReadAsync(cancellationToken);
}
diff --git a/src/Parquet/ParquetWriter.cs b/src/Parquet/ParquetWriter.cs
index 15bb5656..6775a8e5 100644
--- a/src/Parquet/ParquetWriter.cs
+++ b/src/Parquet/ParquetWriter.cs
@@ -99,7 +99,7 @@ private async Task PrepareFileAsync(bool append, CancellationToken cancellationT
await ValidateFileAsync();
- FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken);
+ FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken: cancellationToken);
_footer = new ThriftFooter(fileMeta);
ValidateSchemasCompatible(_footer, _schema);
diff --git a/src/Parquet/Serialization/TypeExtensions.cs b/src/Parquet/Serialization/TypeExtensions.cs
index fc85a185..db8771cf 100644
--- a/src/Parquet/Serialization/TypeExtensions.cs
+++ b/src/Parquet/Serialization/TypeExtensions.cs
@@ -127,13 +127,13 @@ private static Field ConstructDataField(string name, string propertyName, Type t
Field r;
bool? isNullable = member == null
? null
- : member.IsRequired ? false : null;
+ : member.IsRequired ? false : t.IsNullable();
if(t == typeof(DateTime) || t == typeof(DateTime?)) {
ParquetTimestampAttribute? tsa = member?.TimestampAttribute;
r = new DateTimeDataField(name,
tsa == null ? DateTimeFormat.Impala : tsa.GetDateTimeFormat(),
- t == typeof(DateTime?), null, propertyName);
+ isNullable, null, propertyName);
} else if(t == typeof(TimeSpan) || t == typeof(TimeSpan?)) {
r = new TimeSpanDataField(name,
member?.MicroSecondsTimeAttribute == null