Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Modular Encryption Support When Reading Parquet Files #480

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
36 changes: 36 additions & 0 deletions src/Parquet.Test/ParquetEncryptionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.IO;
using System.Threading.Tasks;
using Parquet.Data;
using Parquet.Schema;
using Xunit;

namespace Parquet.Test {
[CollectionDefinition(nameof(ParquetEncryptionTestCollection), DisableParallelization = true)]
public class ParquetEncryptionTestCollection {

}

[Collection(nameof(ParquetEncryptionTestCollection))]
public class ParquetEncryptionTest : TestBase {
/// <remarks>
/// If this test doesn't run on its own and last it breaks things.
/// Running this test last in XUnit is only temporary a workaround.
/// </remarks>
[Fact]
public async Task Z_DecryptFile_UTF8_AesGcmV1_192bit() {
using Stream stream = OpenTestFile("encrypted_utf8_aes_gcm_v1_192bit.parquet");

var parquetOptions = new ParquetOptions {
EncryptionKey = "QFwuIKG8yb845rEufVJAgcOo",
AADPrefix = null //this file doesn't use an aad prefix
};

using ParquetReader reader = await ParquetReader.CreateAsync(stream, parquetOptions);
using ParquetRowGroupReader rgr = reader.OpenRowGroupReader(0);
foreach(DataField df in reader.Schema.DataFields) {
DataColumn dc = await rgr.ReadColumnAsync(df);
//TODO: Moar testing
}
}
}
}
8 changes: 4 additions & 4 deletions src/Parquet.Test/ParquetReaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ public async Task ParquetReader_TimestampMicrosColumn(string parquetFile) {
DataColumn[] columns = await reader.ReadEntireRowGroupAsync();
var col0 = (DateTime?[])columns[0].Data;
Assert.Equal(3, col0.Length);
Assert.Equal(new DateTime(2022,12,23,11,43,49).AddTicks(10 * 10), col0[0]);
Assert.Equal(new DateTime(2021,12,23,12,44,50).AddTicks(11 * 10), col0[1]);
Assert.Equal(new DateTime(2020,12,23,13,45,51).AddTicks(12 * 10), col0[2]);
Assert.Equal(new DateTime(2022, 12, 23, 11, 43, 49).AddTicks(10 * 10), col0[0]);
Assert.Equal(new DateTime(2021, 12, 23, 12, 44, 50).AddTicks(11 * 10), col0[1]);
Assert.Equal(new DateTime(2020, 12, 23, 13, 45, 51).AddTicks(12 * 10), col0[2]);
}
}

Expand All @@ -270,7 +270,7 @@ public async Task Metadata_file() {
Assert.True(reader.CustomMetadata.ContainsKey("geo"));
Assert.True(reader.CustomMetadata.ContainsKey("ARROW:schema"));
}

class ReadableNonSeekableStream : DelegatedStream {
public ReadableNonSeekableStream(Stream master) : base(master) {
}
Expand Down
9 changes: 4 additions & 5 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public int[]? IntArray { get; set; }

public bool MarkerField;

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Build NuGet

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false
}

[Fact]
Expand Down Expand Up @@ -133,8 +133,7 @@
Assert.False(extraProp.IsArray);
}

class AliasedPocoChild
{
class AliasedPocoChild {
[JsonPropertyName("ChildID")]
public int _id { get; set; }
}
Expand Down Expand Up @@ -179,7 +178,7 @@

Assert.Equal(new ParquetSchema(
new ListField("IntList", new DataField<int>("element")),
new DataField<int[]>("LegacyIntList")),
new DataField<int[]>("LegacyIntList", true)),
schema);
}

Expand Down Expand Up @@ -327,7 +326,7 @@

[ParquetMicroSecondsTime]
public TimeSpan MicroTime { get; set; }

#if NET6_0_OR_GREATER
public DateOnly ImpalaDateOnly { get; set; }

Expand Down Expand Up @@ -416,7 +415,7 @@
Assert.Equal(typeof(DateOnly), df.ClrType);
Assert.False(df.IsNullable);
}

[Fact]
public void Type_TimeOnly_Default() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);
Expand Down
Binary file not shown.
73 changes: 73 additions & 0 deletions src/Parquet/Encryption/AES_GCM_CTR_V1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Security.Cryptography;
using Parquet.Meta.Proto;

namespace Parquet.Encryption {
internal class AES_GCM_CTR_V1_Encryption : AES_GCM_V1_Encryption {
public AES_GCM_CTR_V1_Encryption() {

}

protected override byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) {
byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian();
int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0);
byte[] nonce = reader.ReadBytesExactly(12);
int cipherTextLength = encryptionBufferLength - nonce.Length;
byte[] cipherText = reader.ReadBytesExactly(cipherTextLength);

using var cipherStream = new MemoryStream(cipherText);
using var plaintextStream = new MemoryStream(); //This will contain the decrypted result
AesCtrTransform(DecryptionKey!, nonce, cipherStream, plaintextStream);

plaintextStream.Position = 0;
return plaintextStream.ToArray();
}

// TODO: This hasn't been tested!!!
// Source: https://stackoverflow.com/a/51188472/1458738
private static void AesCtrTransform(byte[] key, byte[] salt, Stream inputStream, Stream outputStream) {
using var aes = Aes.Create();
aes.Mode = CipherMode.ECB;
aes.Padding = PaddingMode.None;

int blockSize = aes.BlockSize / 8;
if(salt.Length != blockSize) {
throw new ArgumentException(
"Salt size must be same as block size " +
$"(actual: {salt.Length}, expected: {blockSize})");
}

byte[] counter = (byte[])salt.Clone();

var xorMask = new Queue<byte>();

byte[] zeroIv = new byte[blockSize];
ICryptoTransform counterEncryptor = aes.CreateEncryptor(key, zeroIv);

int b;
while((b = inputStream.ReadByte()) != -1) {
if(xorMask.Count == 0) {
byte[] counterModeBlock = new byte[blockSize];

counterEncryptor.TransformBlock(
counter, 0, counter.Length, counterModeBlock, 0);

for(int i2 = counter.Length - 1; i2 >= 0; i2--) {
if(++counter[i2] != 0) {
break;
}
}

foreach(byte b2 in counterModeBlock) {
xorMask.Enqueue(b2);
}
}

byte mask = xorMask.Dequeue();
outputStream.WriteByte((byte)(((byte)b) ^ mask));
}
}
}
}
62 changes: 62 additions & 0 deletions src/Parquet/Encryption/AES_GCM_V1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using Parquet.Meta.Proto;

namespace Parquet.Encryption {
/// <summary>
/// Implemented based on https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization
/// </summary>
internal class AES_GCM_V1_Encryption : EncryptionBase {

public AES_GCM_V1_Encryption() {
}

public override byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Bitset, rowGroupOrdinal, columnOrdinal);
public override byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.BloomFilter_Header, rowGroupOrdinal, columnOrdinal);
public override byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnIndex, rowGroupOrdinal, columnOrdinal);
public override byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.ColumnMetaData, rowGroupOrdinal, columnOrdinal);
public override byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_Page, rowGroupOrdinal, columnOrdinal, pageOrdinal);
public override byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) => Decrypt(reader, Meta.ParquetModules.Data_PageHeader, rowGroupOrdinal, columnOrdinal, pageOrdinal);
public override byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_Page, rowGroupOrdinal, columnOrdinal);
public override byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.Dictionary_PageHeader, rowGroupOrdinal, columnOrdinal);
protected override byte[] DecryptFooter(ThriftCompactProtocolReader reader) => Decrypt(reader, Meta.ParquetModules.Footer);
public override byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal) => Decrypt(reader, Meta.ParquetModules.OffsetIndex, rowGroupOrdinal, columnOrdinal);

/// <summary>
/// Module format: length (4 bytes) nonce (12 bytes) ciphertext (length-28 bytes) tag (16 bytes)
/// Reference: https://github.com/apache/parquet-format/blob/master/Encryption.md#5-file-format
/// </summary>
protected virtual byte[] Decrypt(ThriftCompactProtocolReader reader, Meta.ParquetModules module, short? rowGroupOrdinal = null, short? columnOrdinal = null, short? pageOrdinal = null) {
IEnumerable<byte> aadSuffix = AadFileUnique!
.Concat(new byte[] { (byte)module })
.Concat(rowGroupOrdinal != null ? BitConverter.GetBytes((short)rowGroupOrdinal).EnsureLittleEndian() : Array.Empty<byte>())
.Concat(columnOrdinal != null ? BitConverter.GetBytes((short)columnOrdinal).EnsureLittleEndian() : Array.Empty<byte>())
.Concat(pageOrdinal != null ? BitConverter.GetBytes((short)pageOrdinal).EnsureLittleEndian() : Array.Empty<byte>());

byte[] tag = new byte[16];
byte[] encryptionBufferLengthBytes = reader.ReadBytesExactly(4).EnsureLittleEndian();
int encryptionBufferLength = BitConverter.ToInt32(encryptionBufferLengthBytes, 0);
byte[] nonce = reader.ReadBytesExactly(12);
int cipherTextLength = encryptionBufferLength - nonce.Length - tag.Length;
byte[] cipherText = reader.ReadBytesExactly(cipherTextLength);
tag = reader.ReadBytesExactly(tag.Length);

#if NETSTANDARD2_0
throw new NotSupportedException("Cannot process AES GCM V1 encrypted parquet files in .net standard 2.0. Maybe try AES GCM CTR V1 instead?");
#elif NET8_0_OR_GREATER
using var cipher = new AesGcm(DecryptionKey!, tag.Length);
byte[] plainText = new byte[cipherTextLength];
cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray());
return plainText;
#else
using var cipher = new AesGcm(DecryptionKey!);
byte[] plainText = new byte[cipherTextLength];
cipher.Decrypt(nonce, cipherText, tag, plainText, AadPrefix!.Concat(aadSuffix).ToArray());
return plainText;
#endif
}
}
}
55 changes: 55 additions & 0 deletions src/Parquet/Encryption/EncryptionBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.IO;
using System.Text;
using Parquet.Meta.Proto;

namespace Parquet.Encryption {
internal abstract class EncryptionBase {
protected byte[]? AadPrefix { get; set; }
protected byte[]? DecryptionKey { get; set; }
protected byte[]? AadFileUnique { get; set; }

public static byte[] DecryptFooter(
ThriftCompactProtocolReader reader,
string decryptionKey,
string? aadPrefix,
out EncryptionBase decrypter) {
if(string.IsNullOrEmpty(decryptionKey)) {
throw new ArgumentException($"Encrypted parquet files require an {nameof(ParquetOptions.EncryptionKey)} value");
}

var cryptoMetaData = Meta.FileCryptoMetaData.Read(reader);
if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1 is not null) {
decrypter = new AES_GCM_V1_Encryption();
decrypter.AadFileUnique = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadFileUnique ?? Array.Empty<byte>();
if(cryptoMetaData.EncryptionAlgorithm.AESGCMV1.SupplyAadPrefix == true) {
if(string.IsNullOrEmpty(aadPrefix)) {
throw new InvalidDataException("This file requires an AAD (additional authenticated data) prefix in order to be decrypted.");
}
decrypter.AadPrefix = Encoding.ASCII.GetBytes(aadPrefix);
} else {
decrypter.AadPrefix = cryptoMetaData.EncryptionAlgorithm.AESGCMV1.AadPrefix ?? Array.Empty<byte>();
}
} else if(cryptoMetaData.EncryptionAlgorithm.AESGCMCTRV1 is not null) {

decrypter = new AES_GCM_CTR_V1_Encryption();
} else {
throw new NotSupportedException("No encryption algorithm defined");
}

decrypter.DecryptionKey = Encoding.ASCII.GetBytes(decryptionKey);
return decrypter.DecryptFooter(reader);
}

protected abstract byte[] DecryptFooter(ThriftCompactProtocolReader reader);
public abstract byte[] DecryptColumnMetaData(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] DecryptDataPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal);
public abstract byte[] DecryptDictionaryPage(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] DecryptDataPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal);
public abstract byte[] DecryptDictionaryPageHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] DecryptColumnIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] DecryptOffsetIndex(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] BloomFilterHeader(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
public abstract byte[] BloomFilterBitset(ThriftCompactProtocolReader reader, short rowGroupOrdinal, short columnOrdinal);
}
}
15 changes: 13 additions & 2 deletions src/Parquet/Extensions/OtherExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using Parquet.Schema;

namespace Parquet {
Expand All @@ -15,7 +14,12 @@ public static DateTimeOffset FromUnixMilliseconds(this long unixMilliseconds) {
}

public static DateTime AsUnixMillisecondsInDateTime(this long unixMilliseconds) {
return UnixEpoch.AddMilliseconds(unixMilliseconds);
try {
//TODO: Remove this try/catch
return UnixEpoch.AddMilliseconds(unixMilliseconds);
} catch(Exception) {
return DateTime.Now;
}
}

public static long ToUnixMilliseconds(this DateTime dto) {
Expand Down Expand Up @@ -84,5 +88,12 @@ public static bool EqualTo(this Array left, Array right) {
public static Exception NotImplemented(string reason) {
return new NotImplementedException($"{reason} is not yet implemented, and we are fully aware of it. From here you can either raise an issue on GitHub, or implemented it and raise a PR.");
}

public static byte[] EnsureLittleEndian(this byte[] bytes) {
if(!BitConverter.IsLittleEndian) {
Array.Reverse(bytes);
}
return bytes;
}
}
}
Loading
Loading