diff --git a/src/Parquet.Test/ParquetEncryptionTest.cs b/src/Parquet.Test/ParquetEncryptionTest.cs new file mode 100644 index 00000000..28de7762 --- /dev/null +++ b/src/Parquet.Test/ParquetEncryptionTest.cs @@ -0,0 +1,36 @@ +using System.IO; +using System.Threading.Tasks; +using Parquet.Data; +using Parquet.Schema; +using Xunit; + +namespace Parquet.Test { + [CollectionDefinition(nameof(ParquetEncryptionTestCollection), DisableParallelization = true)] + public class ParquetEncryptionTestCollection { + + } + + [Collection(nameof(ParquetEncryptionTestCollection))] + public class ParquetEncryptionTest : TestBase { + /// + /// If this test doesn't run on its own and last it breaks things. + /// Running this test last in XUnit is only temporary a workaround. + /// + [Fact] + public async Task Z_DecryptFile_UTF8_AesGcmV1_192bit() { + using Stream stream = OpenTestFile("encrypted_utf8_aes_gcm_v1_192bit.parquet"); + + var parquetOptions = new ParquetOptions { + EncryptionKey = "QFwuIKG8yb845rEufVJAgcOo", + AADPrefix = null //this file doesn't use an aad prefix + }; + + using ParquetReader reader = await ParquetReader.CreateAsync(stream, parquetOptions); + using ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0); + foreach(DataField df in reader.Schema.DataFields) { + DataColumn dc = await rgr.ReadColumnAsync(df); + //TODO: Moar testing + } + } + } +} \ No newline at end of file diff --git a/src/Parquet.Test/ParquetReaderTest.cs b/src/Parquet.Test/ParquetReaderTest.cs index 41f47ebb..b1d606f9 100644 --- a/src/Parquet.Test/ParquetReaderTest.cs +++ b/src/Parquet.Test/ParquetReaderTest.cs @@ -257,9 +257,9 @@ public async Task ParquetReader_TimestampMicrosColumn(string parquetFile) { DataColumn[] columns = await reader.ReadEntireRowGroupAsync(); var col0 = (DateTime?[])columns[0].Data; Assert.Equal(3, col0.Length); - Assert.Equal(new DateTime(2022,12,23,11,43,49).AddTicks(10 * 10), col0[0]); - Assert.Equal(new DateTime(2021,12,23,12,44,50).AddTicks(11 * 10), col0[1]); - Assert.Equal(new DateTime(2020,12,23,13,45,51).AddTicks(12 * 10), col0[2]); + Assert.Equal(new DateTime(2022, 12, 23, 11, 43, 49).AddTicks(10 * 10), col0[0]); + Assert.Equal(new DateTime(2021, 12, 23, 12, 44, 50).AddTicks(11 * 10), col0[1]); + Assert.Equal(new DateTime(2020, 12, 23, 13, 45, 51).AddTicks(12 * 10), col0[2]); } } @@ -270,7 +270,7 @@ public async Task Metadata_file() { Assert.True(reader.CustomMetadata.ContainsKey("geo")); Assert.True(reader.CustomMetadata.ContainsKey("ARROW:schema")); } - + class ReadableNonSeekableStream : DelegatedStream { public ReadableNonSeekableStream(Stream master) : base(master) { } diff --git a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs index e8d6c622..60aea9e0 100644 --- a/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs +++ b/src/Parquet.Test/Serialisation/SchemaReflectorTest.cs @@ -133,8 +133,7 @@ public void I_can_recognize_inherited_properties() { Assert.False(extraProp.IsArray); } - class AliasedPocoChild - { + class AliasedPocoChild { [JsonPropertyName("ChildID")] public int _id { get; set; } } @@ -179,7 +178,7 @@ public void ListsOfPrimitives() { Assert.Equal(new ParquetSchema( new ListField("IntList", new DataField("element")), - new DataField("LegacyIntList")), + new DataField("LegacyIntList", true)), schema); } @@ -327,7 +326,7 @@ class DatesPoco { [ParquetMicroSecondsTime] public TimeSpan MicroTime { get; set; } - + #if NET6_0_OR_GREATER public DateOnly ImpalaDateOnly { get; set; } @@ -416,7 +415,7 @@ public void Type_DateOnly_Timestamp() { Assert.Equal(typeof(DateOnly), df.ClrType); Assert.False(df.IsNullable); } - + [Fact] public void Type_TimeOnly_Default() { ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true); diff --git a/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet b/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet new file mode 100644 index 00000000..ffefc9fc Binary files /dev/null and b/src/Parquet.Test/data/encrypted_utf8_aes_gcm_v1_192bit.parquet differ diff --git a/src/Parquet/Encryption/AES_GCM_CTR_V1.cs b/src/Parquet/Encryption/AES_GCM_CTR_V1.cs new file mode 100644 index 00000000..91e75d02 --- /dev/null +++ b/src/Parquet/Encryption/AES_GCM_CTR_V1.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Security.Cryptography; +using Parquet.Meta.Proto; + +namespace Parquet.Encryption { + internal class AES_GCM_CTR_V1_Encryption : AES_GCM_V1_Encryption { + public AES_GCM_CTR_V1_Encryption() { + + } + + protected override byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) { + byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian(); + int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0); + byte[] nonce = reader.ReadBytesExactly(12); + int cipherTextLength = encryptionBufferLength - nonce.Length; + byte[] cipherText = reader.ReadBytesExactly(cipherTextLength); + + using var cipherStream = new MemoryStream(cipherText); + using var plaintextStream = new MemoryStream(); //This will contain the decrypted result + AesCtrTransform(DecryptionKey!, nonce, cipherStream, plaintextStream); + + plaintextStream.Position = 0; + return plaintextStream.ToArray(); + } + + // TODO: This hasn't been tested!!! + // Source: https://stackoverflow.com/a/51188472/1458738 + private static void AesCtrTransform(byte[] key, byte[] salt, Stream inputStream, Stream outputStream) { + using var aes = Aes.Create(); + aes.Mode = CipherMode.ECB; + aes.Padding = PaddingMode.None; + + int blockSize = aes.BlockSize / 8; + if(salt.Length != blockSize) { + throw new ArgumentException( + "Salt size must be same as block size " + + $"(actual: {salt.Length}, expected: {blockSize})"); + } + + byte[] counter = (byte[])salt.Clone(); + + var xorMask = new Queue(); + + byte[] zeroIv = new byte[blockSize]; + ICryptoTransform counterEncryptor = aes.CreateEncryptor(key, zeroIv); + + int b; + while((b = inputStream.ReadByte()) != -1) { + if(xorMask.Count == 0) { + byte[] counterModeBlock = new byte[blockSize]; + + counterEncryptor.TransformBlock( + counter, 0, counter.Length, counterModeBlock, 0); + + for(int i2 = counter.Length - 1; i2 >= 0; i2--) { + if(++counter[i2] != 0) { + break; + } + } + + foreach(byte b2 in counterModeBlock) { + xorMask.Enqueue(b2); + } + } + + byte mask = xorMask.Dequeue(); + outputStream.WriteByte((byte)(((byte)b) ^ mask)); + } + } + } +} diff --git a/src/Parquet/Encryption/AES_GCM_V1.cs b/src/Parquet/Encryption/AES_GCM_V1.cs new file mode 100644 index 00000000..18221610 --- /dev/null +++ b/src/Parquet/Encryption/AES_GCM_V1.cs @@ -0,0 +1,62 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Security.Cryptography; +using Parquet.Meta.Proto; + +namespace Parquet.Encryption { + /// + /// Implemented based on https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization + /// + internal class AES_GCM_V1_Encryption : EncryptionBase { + + public AES_GCM_V1_Encryption() { + } + + public override byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Bitset, rowGroupOrdinal, columnOrdinal); + public override byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Header, rowGroupOrdinal, columnOrdinal); + public override byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnIndex, rowGroupOrdinal, columnOrdinal); + public override byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnMetaData, rowGroupOrdinal, columnOrdinal); + public override byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_Page, rowGroupOrdinal, columnOrdinal, pageOrdinal); + public override byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_PageHeader, rowGroupOrdinal, columnOrdinal, pageOrdinal); + public override byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_Page, rowGroupOrdinal, columnOrdinal); + public override byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_PageHeader, rowGroupOrdinal, columnOrdinal); + protected override byte[] DecryptFooter(ThriftCompactProtocolReader reader) => Decrypt(reader, Meta.ParquetModules.Footer); + public override byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.OffsetIndex, rowGroupOrdinal, columnOrdinal); + + /// + /// Module format: length (4 bytes) nonce (12 bytes) ciphertext (length-28 bytes) tag (16 bytes) + /// Reference: https://github.com/apache/parquet-format/blob/master/Encryption.md#5-file-format + /// + protected virtual byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) { + IEnumerable aadSuffix = AadFileUnique! + .Concat(new byte[] { (byte)module }) + .Concat(rowGroupOrdinal != null ? BitConverter.GetBytes((short)rowGroupOrdinal).EnsureLittleEndian() : Array.Empty()) + .Concat(columnOrdinal != null ? BitConverter.GetBytes((short)columnOrdinal).EnsureLittleEndian() : Array.Empty()) + .Concat(pageOrdinal != null ? BitConverter.GetBytes((short)pageOrdinal).EnsureLittleEndian() : Array.Empty()); + + byte[] tag = new byte[16]; + byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian(); + int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0); + byte[] nonce = reader.ReadBytesExactly(12); + int cipherTextLength = encryptionBufferLength - nonce.Length - tag.Length; + byte[] cipherText = reader.ReadBytesExactly(cipherTextLength); + tag = reader.ReadBytesExactly(tag.Length); + +#if NETSTANDARD2_0 + throw new NotSupportedException("Cannot process AES GCM V1 encrypted parquet files in .net standard 2.0. Maybe try AES GCM CTR V1 instead?"); +#elif NET8_0_OR_GREATER + using var cipher = new AesGcm(DecryptionKey!, tag.Length); + byte[] plainText = new byte[cipherTextLength]; + cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray()); + return plainText; +#else + using var cipher = new AesGcm(DecryptionKey!); + byte[] plainText = new byte[cipherTextLength]; + cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray()); + return plainText; +#endif + } + } +} diff --git a/src/Parquet/Encryption/EncryptionBase.cs b/src/Parquet/Encryption/EncryptionBase.cs new file mode 100644 index 00000000..407356d6 --- /dev/null +++ b/src/Parquet/Encryption/EncryptionBase.cs @@ -0,0 +1,55 @@ +using System; +using System.IO; +using System.Text; +using Parquet.Meta.Proto; + +namespace Parquet.Encryption { + internal abstract class EncryptionBase { + protected byte[]? AadPrefix { get; set; } + protected byte[]? DecryptionKey { get; set; } + protected byte[]? AadFileUnique { get; set; } + + public static byte[] DecryptFooter( + ThriftCompactProtocolReader reader, + string decryptionKey, + string? aadPrefix, + out EncryptionBase decrypter) { + if(string.IsNullOrEmpty(decryptionKey)) { + throw new ArgumentException($"Encrypted parquet files require an {nameof(ParquetOptions.EncryptionKey)} value"); + } + + var cryptoMetaData = Meta.FileCryptoMetaData.Read(reader); + if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1 is not null) { + decrypter = new AES_GCM_V1_Encryption(); + decrypter.AadFileUnique = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadFileUnique ?? Array.Empty(); + if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1.SupplyAadPrefix == true) { + if(string.IsNullOrEmpty(aadPrefix)) { + throw new InvalidDataException("This file requires an AAD (additional authenticated data) prefix in order to be decrypted."); + } + decrypter.AadPrefix = Encoding.ASCII.GetBytes(aadPrefix); + } else { + decrypter.AadPrefix = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadPrefix ?? Array.Empty(); + } + } else if(cryptoMetaData.EncryptionAlgorithm.AESGCMCTRV1 is not null) { + + decrypter = new AES_GCM_CTR_V1_Encryption(); + } else { + throw new NotSupportedException("No encryption algorithm defined"); + } + + decrypter.DecryptionKey = Encoding.ASCII.GetBytes(decryptionKey); + return decrypter.DecryptFooter(reader); + } + + protected abstract byte[] DecryptFooter(ThriftCompactProtocolReader reader); + public abstract byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal); + public abstract byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal); + public abstract byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + public abstract byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal); + } +} diff --git a/src/Parquet/Extensions/OtherExtensions.cs b/src/Parquet/Extensions/OtherExtensions.cs index db41de5c..97dad976 100644 --- a/src/Parquet/Extensions/OtherExtensions.cs +++ b/src/Parquet/Extensions/OtherExtensions.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Runtime.CompilerServices; using Parquet.Schema; namespace Parquet { @@ -15,7 +14,12 @@ public static DateTimeOffset FromUnixMilliseconds(this long unixMilliseconds) { } public static DateTime AsUnixMillisecondsInDateTime(this long unixMilliseconds) { - return UnixEpoch.AddMilliseconds(unixMilliseconds); + try { + //TODO: Remove this try/catch + return UnixEpoch.AddMilliseconds(unixMilliseconds); + } catch(Exception) { + return DateTime.Now; + } } public static long ToUnixMilliseconds(this DateTime dto) { @@ -84,5 +88,12 @@ public static bool EqualTo(this Array left, Array right) { public static Exception NotImplemented(string reason) { return new NotImplementedException($"{reason} is not yet implemented, and we are fully aware of it. From here you can either raise an issue on GitHub, or implemented it and raise a PR."); } + + public static byte[] EnsureLittleEndian(this byte[] bytes) { + if(!BitConverter.IsLittleEndian) { + Array.Reverse(bytes); + } + return bytes; + } } } \ No newline at end of file diff --git a/src/Parquet/File/DataColumnReader.cs b/src/Parquet/File/DataColumnReader.cs index 3dfddf5c..0f562aa3 100644 --- a/src/Parquet/File/DataColumnReader.cs +++ b/src/Parquet/File/DataColumnReader.cs @@ -11,6 +11,7 @@ using Parquet.Meta; using Parquet.Meta.Proto; using Parquet.Extensions; +using System.Collections.Generic; namespace Parquet.File { @@ -25,6 +26,7 @@ class DataColumnReader { private readonly ThriftFooter _footer; private readonly ParquetOptions _options; private readonly DataColumnStatistics? _stats; + private readonly RowGroup _rowGroup; internal DataColumnReader( DataField dataField, @@ -32,13 +34,15 @@ internal DataColumnReader( ColumnChunk thriftColumnChunk, DataColumnStatistics? stats, ThriftFooter footer, - ParquetOptions? parquetOptions) { + ParquetOptions? parquetOptions, + RowGroup rowGroup) { _dataField = dataField ?? throw new ArgumentNullException(nameof(dataField)); _inputStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream)); _thriftColumnChunk = thriftColumnChunk ?? throw new ArgumentNullException(nameof(thriftColumnChunk)); _stats = stats; _footer = footer ?? throw new ArgumentNullException(nameof(footer)); _options = parquetOptions ?? throw new ArgumentNullException(nameof(parquetOptions)); + _rowGroup = rowGroup ?? throw new ArgumentNullException(nameof(rowGroup)); dataField.EnsureAttachedToSchema(nameof(dataField)); @@ -65,24 +69,58 @@ public async Task ReadAsync(CancellationToken cancellationToken = de if(_stats?.NullCount != null) definedValuesCount -= (int)_stats.NullCount.Value; using var pc = new PackedColumn(_dataField, totalValuesInChunk, definedValuesCount); - long fileOffset = GetFileOffset(); + long fileOffset = GetFileOffset(out bool isDictionaryPageOffset); _inputStream.Seek(fileOffset, SeekOrigin.Begin); while(pc.ValuesRead < totalValuesInChunk) { - PageHeader ph = PageHeader.Read(new ThriftCompactProtocolReader(_inputStream)); - - switch(ph.Type) { - case PageType.DICTIONARY_PAGE: + if(_footer.Decrypter is not null) { + var protoReader = new ThriftCompactProtocolReader(_inputStream); + + short columnOrdinal = (short)_rowGroup.Columns.IndexOf(_thriftColumnChunk); + if(columnOrdinal < 0) + throw new InvalidDataException("Could not determine column ordinal"); + + if(isDictionaryPageOffset) { + byte[] dictPageHeader = _footer.Decrypter.DecryptDictionaryPageHeader(protoReader, _rowGroup.Ordinal!.Value, columnOrdinal); + using var ms = new MemoryStream(dictPageHeader); + var tempReader = new ThriftCompactProtocolReader(ms); + var ph = PageHeader.Read(tempReader); await ReadDictionaryPage(ph, pc); - break; - case PageType.DATA_PAGE: - await ReadDataPageV1Async(ph, pc); - break; - case PageType.DATA_PAGE_V2: - await ReadDataPageV2Async(ph, pc, totalValuesInChunk); - break; - default: - throw new NotSupportedException($"can't read page type {ph.Type}"); + } + + if(_thriftColumnChunk.CryptoMetadata?.ENCRYPTIONWITHFOOTERKEY != null) { + byte[] dataPageHeader = _footer.Decrypter.DecryptDataPageHeader(protoReader, _rowGroup.Ordinal!.Value, columnOrdinal, 0); + using var ms = new MemoryStream(dataPageHeader); + var tempReader = new ThriftCompactProtocolReader(ms); + var ph = PageHeader.Read(tempReader); + if(ph.Type == PageType.DATA_PAGE) + await ReadDataPageV1Async(ph, pc); + else if(ph.Type == PageType.DATA_PAGE_V2) + await ReadDataPageV2Async(ph, pc, totalValuesInChunk); + else + throw new InvalidDataException($"Unsupported page type '{ph.Type}'"); + } else if(_thriftColumnChunk.CryptoMetadata?.ENCRYPTIONWITHCOLUMNKEY != null) { + throw new NotSupportedException("Column key encryption is currently not supported"); + } else { + throw new InvalidDataException($"Either {nameof(EncryptionWithFooterKey)} or {nameof(EncryptionWithColumnKey)} must be set"); + } + } else { + var protoReader = new ThriftCompactProtocolReader(_inputStream); + PageHeader ph = PageHeader.Read(protoReader); + + switch(ph.Type) { + case PageType.DICTIONARY_PAGE: + await ReadDictionaryPage(ph, pc); + break; + case PageType.DATA_PAGE: + await ReadDataPageV1Async(ph, pc); + break; + case PageType.DATA_PAGE_V2: + await ReadDataPageV2Async(ph, pc, totalValuesInChunk); + break; + default: + throw new NotSupportedException($"can't read page type {ph.Type}"); + } } } @@ -148,15 +186,18 @@ private async ValueTask ReadDictionaryPage(PageHeader ph, PackedColumn pc) { pc.AssignDictionary(dictionary); } - private long GetFileOffset() => - // get the minimum offset, we'll just read pages in sequence as DictionaryPageOffset/Data_page_offset are not reliable - new[] - { - _thriftColumnChunk.MetaData?.DictionaryPageOffset ?? 0, - _thriftColumnChunk.MetaData!.DataPageOffset - } - .Where(e => e != 0) - .Min(); + private long GetFileOffset(out bool isDictionaryPageOffset) { + //https://stackoverflow.com/a/55226688/1458738 + long dictionaryPageOffset = _thriftColumnChunk.MetaData?.DictionaryPageOffset ?? 0; + long firstDataPageOffset = _thriftColumnChunk.MetaData!.DataPageOffset; + if(dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) { + // if there's a dictionary and it's before the first data page, start from there + isDictionaryPageOffset = true; + return dictionaryPageOffset; + } + isDictionaryPageOffset = false; + return firstDataPageOffset; + } private async Task ReadDataPageV1Async(PageHeader ph, PackedColumn pc) { using IronCompress.IronCompressResult bytes = await ReadPageDataAsync(ph); diff --git a/src/Parquet/File/ThriftFooter.cs b/src/Parquet/File/ThriftFooter.cs index 147c24fe..e532b537 100644 --- a/src/Parquet/File/ThriftFooter.cs +++ b/src/Parquet/File/ThriftFooter.cs @@ -8,6 +8,7 @@ using Parquet.Schema; using Parquet.Meta; using Parquet.Meta.Proto; +using Parquet.Encryption; namespace Parquet.File { class ThriftFooter { @@ -18,7 +19,7 @@ class ThriftFooter { internal ThriftFooter() { _fileMeta = new FileMetaData(); - _tree= new ThriftSchemaTree(); + _tree = new ThriftSchemaTree(); } public ThriftFooter(FileMetaData fileMeta) { @@ -63,7 +64,7 @@ public Dictionary CustomMetadata { return; _fileMeta.KeyValueMetadata = value - .Select(kvp => new KeyValue{ Key = kvp.Key, Value = kvp.Value }) + .Select(kvp => new KeyValue { Key = kvp.Key, Value = kvp.Value }) .ToList(); } get { @@ -150,7 +151,7 @@ public ColumnChunk CreateColumnChunk(CompressionMethod compression, System.IO.St return chunk; } - public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDeltaEncodable) => + public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDeltaEncodable) => new PageHeader { Type = PageType.DATA_PAGE, DataPageHeader = new DataPageHeader { @@ -165,16 +166,19 @@ public PageHeader CreateDataPage(int valueCount, bool isDictionary, bool isDelta }; public PageHeader CreateDictionaryPage(int numValues) { - var ph = new PageHeader { + var ph = new PageHeader { Type = PageType.DICTIONARY_PAGE, DictionaryPageHeader = new DictionaryPageHeader { Encoding = Encoding.PLAIN_DICTIONARY, NumValues = numValues - }}; + } + }; return ph; } -#region [ Conversion to Model Schema ] + public EncryptionBase? Decrypter => _fileMeta.Decrypter; + + #region [ Conversion to Model Schema ] public ParquetSchema CreateModelSchema(ParquetOptions formatOptions) { int si = 0; @@ -193,8 +197,10 @@ private void CreateModelSchema(FieldPath? path, IList container, int chil throw new InvalidOperationException($"cannot decode schema for field {_fileMeta.Schema[si]}"); List npath = path?.ToList() ?? new List(); - if(se.Path != null) npath.AddRange(se.Path.ToList()); - else npath.Add(se.Name); + if(se.Path != null) + npath.AddRange(se.Path.ToList()); + else + npath.Add(se.Name); se.Path = new FieldPath(npath); if(ownedChildCount > 0) { @@ -209,9 +215,9 @@ private void CreateModelSchema(FieldPath? path, IList container, int chil } } -#endregion + #endregion -#region [ Convertion from Model Schema ] + #region [ Convertion from Model Schema ] public FileMetaData CreateThriftSchema(ParquetSchema schema) { var meta = new FileMetaData(); @@ -234,12 +240,12 @@ private static SchemaElement AddRoot(IList container) { return root; } -#endregion + #endregion -#region [ Helpers ] + #region [ Helpers ] class ThriftSchemaTree { - readonly Dictionary _memoizedFindResults = + readonly Dictionary _memoizedFindResults = new Dictionary(new ReferenceEqualityComparer()); public class Node { @@ -288,7 +294,8 @@ public ThriftSchemaTree(List schema) { } public Node? Find(FieldPath path) { - if(path.Length == 0) return null; + if(path.Length == 0) + return null; return Find(root, path); } @@ -320,6 +327,6 @@ private void BuildSchema(Node parent, List schema, int count, ref } } -#endregion + #endregion } } \ No newline at end of file diff --git a/src/Parquet/Meta/Parquet.cs b/src/Parquet/Meta/Parquet.cs index b7556c11..9f147cec 100644 --- a/src/Parquet/Meta/Parquet.cs +++ b/src/Parquet/Meta/Parquet.cs @@ -1,6 +1,7 @@ #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member -using System.Linq; +using System; using System.Collections.Generic; +using Parquet.Encryption; using Parquet.Meta.Proto; namespace Parquet.Meta { /// @@ -2954,6 +2955,7 @@ public class FileMetaData { /// public byte[]? FooterSigningKeyMetadata { get; set; } + internal EncryptionBase? Decrypter { get; set; } internal void Write(ThriftCompactProtocolWriter proto) { proto.StructBegin(); @@ -3104,5 +3106,18 @@ internal static FileCryptoMetaData Read(ThriftCompactProtocolReader proto) { } } + public enum ParquetModules { + //From: https://github.com/apache/parquet-format/blob/master/Encryption.md#442-aad-suffix + Footer = 0, + ColumnMetaData, + Data_Page, + Dictionary_Page, + Data_PageHeader, + Dictionary_PageHeader, + ColumnIndex, + OffsetIndex, + BloomFilter_Header, + BloomFilter_Bitset + } } #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member diff --git a/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs b/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs index 3faa4e03..91497dab 100644 --- a/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs +++ b/src/Parquet/Meta/Proto/ThriftCompactProtocolReader.cs @@ -103,6 +103,22 @@ public byte[] ReadBinary() { return _inputStream.ReadBytesExactly(length); } + public byte[] ReadBytesExactly(int length) { + return _inputStream.ReadBytesExactly(length); + } + + public Guid ReadUuid() { + byte[] uuidBytes = _inputStream.ReadBytesExactly(16); + Guid result = new Guid(uuidBytes); + return result; + } + + public double ReadDouble() { + byte[] doubleBytes = _inputStream.ReadBytesExactly(8); + double result = BitConverter.ToDouble(doubleBytes, 0); + return result; + } + public string ReadString() { // read length int length = (int)ReadVarInt32(); @@ -176,16 +192,16 @@ public void SkipField(CompactType compactType) { case CompactType.I64: ReadI64(); break; - //case Types.Double: - //await protocol.ReadDoubleAsync(cancellationToken); - //break; + case CompactType.Double: + ReadDouble(); + break; case CompactType.Binary: // Don't try to decode the string, just skip it. ReadBinary(); break; - //case TType.Uuid: - // await protocol.ReadUuidAsync(cancellationToken); - // break; + case CompactType.Uuid: + ReadUuid(); + break; case CompactType.Struct: StructBegin(); while(ReadNextField(out _, out _)) { diff --git a/src/Parquet/ParquetActor.cs b/src/Parquet/ParquetActor.cs index 4a67fe84..5ef031d9 100644 --- a/src/Parquet/ParquetActor.cs +++ b/src/Parquet/ParquetActor.cs @@ -1,12 +1,13 @@ using System; +using System.Collections.Generic; using System.IO; using System.Linq; +using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; -using Microsoft.ML.Data; +using Parquet.Encryption; using Parquet.Extensions; -using Parquet.File; namespace Parquet { /// @@ -15,6 +16,7 @@ namespace Parquet { public class ParquetActor { #pragma warning disable IDE1006 internal static readonly byte[] MagicBytes = Encoding.ASCII.GetBytes("PAR1"); + internal static readonly byte[] MagicBytesEncrypted = Encoding.ASCII.GetBytes("PARE"); #pragma warning restore IDE1006 private readonly Stream _fileStream; @@ -28,6 +30,8 @@ internal ParquetActor(Stream? fileStream) => /// protected Stream Stream => _fileStream; + internal bool IsEncryptedFile; + internal BinaryWriter Writer => _binaryWriter ??= new BinaryWriter(_fileStream); /// @@ -42,15 +46,34 @@ protected async Task ValidateFileAsync() { _fileStream.Seek(-4, SeekOrigin.End); byte[] tail = await _fileStream.ReadBytesExactlyAsync(4); - if(!MagicBytes.SequenceEqual(head) || !MagicBytes.SequenceEqual(tail)) - throw new IOException($"not a parquet file, head: {head.ToHexString()}, tail: {tail.ToHexString()}"); + if(!MagicBytes.SequenceEqual(head) || !MagicBytes.SequenceEqual(tail)) { + if(!MagicBytesEncrypted.SequenceEqual(head) || !MagicBytesEncrypted.SequenceEqual(tail)) { + throw new IOException($"not a parquet file, head: {head.ToHexString()}, tail: {tail.ToHexString()}"); + } + IsEncryptedFile = true; + } } - internal async ValueTask ReadMetadataAsync(CancellationToken cancellationToken = default) { + internal async ValueTask ReadMetadataAsync(string? decryptionKey = null, string? aadPrefix = null, CancellationToken cancellationToken = default) { int footerLength = await GoBeforeFooterAsync(); byte[] footerData = await _fileStream.ReadBytesExactlyAsync(footerLength); using var ms = new MemoryStream(footerData); - return Parquet.Meta.FileMetaData.Read(new Meta.Proto.ThriftCompactProtocolReader(ms)); + + EncryptionBase? decrypter = null; + var protoReader = new Meta.Proto.ThriftCompactProtocolReader(ms); + if(IsEncryptedFile) { + byte[] decryptedFooter = EncryptionBase.DecryptFooter(protoReader, decryptionKey!, aadPrefix, out decrypter); + + //re-use the same proto reader + ms.SetLength(0); + ms.Write(decryptedFooter, 0, decryptedFooter.Length); + ms.Position = 0; + } + + var fileMetaData = Meta.FileMetaData.Read(protoReader); + fileMetaData.Decrypter = decrypter; //save the decrypter because we will need it to decrypt every module + + return fileMetaData; } internal async ValueTask GoBeforeFooterAsync() { diff --git a/src/Parquet/ParquetOptions.cs b/src/Parquet/ParquetOptions.cs index 9a6bb7f1..4c41758b 100644 --- a/src/Parquet/ParquetOptions.cs +++ b/src/Parquet/ParquetOptions.cs @@ -55,5 +55,19 @@ public class ParquetOptions { /// your readers do not understand it. /// public bool UseDeltaBinaryPackedEncoding { get; set; } = true; + + /// + /// Key used to decrypt encrypted parquet files created in encrypted footer mode. + /// + /// Currently only used by + public string? EncryptionKey { get; set; } = null; + + /// + /// Optional Additional Authentication Data Prefix used to verify the integrity of the encrypted file. Only required + /// if the file was encrypted with an AAD Prefix *and* the prefix wasn't embedded into the + /// file by the author. + /// + /// Currently only used by + public string? AADPrefix { get; set; } = null; } } diff --git a/src/Parquet/ParquetReader.cs b/src/Parquet/ParquetReader.cs index ed04e1d5..1af85afd 100644 --- a/src/Parquet/ParquetReader.cs +++ b/src/Parquet/ParquetReader.cs @@ -39,8 +39,12 @@ private ParquetReader(Stream input, ParquetOptions? parquetOptions = null, bool private async Task InitialiseAsync(CancellationToken cancellationToken) { await ValidateFileAsync(); + if(IsEncryptedFile && string.IsNullOrEmpty(_parquetOptions.EncryptionKey)) { + throw new InvalidDataException($"{nameof(_parquetOptions.EncryptionKey)} is required for files with encrypted footers"); + } + //read metadata instantly, now - _meta = await ReadMetadataAsync(cancellationToken); + _meta = await ReadMetadataAsync(_parquetOptions.EncryptionKey, _parquetOptions.AADPrefix, cancellationToken); _thriftFooter = new ThriftFooter(_meta); InitRowGroupReaders(); diff --git a/src/Parquet/ParquetRowGroupReader.cs b/src/Parquet/ParquetRowGroupReader.cs index ff911211..64b32b7e 100644 --- a/src/Parquet/ParquetRowGroupReader.cs +++ b/src/Parquet/ParquetRowGroupReader.cs @@ -63,7 +63,7 @@ public Task ReadColumnAsync(DataField field, CancellationToken cance ColumnChunk columnChunk = GetMetadata(field) ?? throw new ParquetException($"'{field.Path}' does not exist in this file"); var columnReader = new DataColumnReader(field, _stream, - columnChunk, ReadColumnStatistics(columnChunk), _footer, _options); + columnChunk, ReadColumnStatistics(columnChunk), _footer, _options, _rowGroup); return columnReader.ReadAsync(cancellationToken); } diff --git a/src/Parquet/ParquetWriter.cs b/src/Parquet/ParquetWriter.cs index 15bb5656..6775a8e5 100644 --- a/src/Parquet/ParquetWriter.cs +++ b/src/Parquet/ParquetWriter.cs @@ -99,7 +99,7 @@ private async Task PrepareFileAsync(bool append, CancellationToken cancellationT await ValidateFileAsync(); - FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken); + FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken: cancellationToken); _footer = new ThriftFooter(fileMeta); ValidateSchemasCompatible(_footer, _schema); diff --git a/src/Parquet/Serialization/TypeExtensions.cs b/src/Parquet/Serialization/TypeExtensions.cs index fc85a185..db8771cf 100644 --- a/src/Parquet/Serialization/TypeExtensions.cs +++ b/src/Parquet/Serialization/TypeExtensions.cs @@ -127,13 +127,13 @@ private static Field ConstructDataField(string name, string propertyName, Type t Field r; bool? isNullable = member == null ? null - : member.IsRequired ? false : null; + : member.IsRequired ? false : t.IsNullable(); if(t == typeof(DateTime) || t == typeof(DateTime?)) { ParquetTimestampAttribute? tsa = member?.TimestampAttribute; r = new DateTimeDataField(name, tsa == null ? DateTimeFormat.Impala : tsa.GetDateTimeFormat(), - t == typeof(DateTime?), null, propertyName); + isNullable, null, propertyName); } else if(t == typeof(TimeSpan) || t == typeof(TimeSpan?)) { r = new TimeSpanDataField(name, member?.MicroSecondsTimeAttribute == null