Skip to content

Commit

Permalink
First working integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jadewang-db committed Mar 16, 2024
1 parent 08743eb commit d5bdb76
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 52 deletions.
21 changes: 17 additions & 4 deletions csharp/src/Drivers/Apache/Spark/SparkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
Expand All @@ -33,6 +32,9 @@
using Thrift.Protocol;
using Thrift.Transport.Client;

using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using System.Diagnostics;

namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
public class SparkConnection : HiveServer2Connection
Expand Down Expand Up @@ -70,9 +72,16 @@ internal SparkConnection(IReadOnlyDictionary<string, string> properties)

protected override TProtocol CreateProtocol()
{
string hostName = properties["HostName"];
string path = properties["Path"];
string token = properties["Token"];
Trace.TraceError($"create protocol with {properties.Count} properties.");

foreach(var property in properties.Keys)
{
Trace.TraceError($"key = {property} value = {properties[property]}");
}

string hostName = properties["hostname"];
string path = properties["path"];
string token = properties["token"];

string uri = "https://" + hostName + "/" + path;

Expand Down Expand Up @@ -109,6 +118,7 @@ public override AdbcStatement CreateStatement()

public override void Dispose()
{
/*
if (this.client != null)
{
TCloseSessionReq r6 = new TCloseSessionReq(this.sessionHandle);
Expand All @@ -120,6 +130,7 @@ public override void Dispose()
this.transport = null;
this.client = null;
}
*/
}

public override IArrowArrayStream GetInfo(List<AdbcInfoCode> codes)
Expand Down Expand Up @@ -299,6 +310,8 @@ private static IReadOnlyList<int> ConvertSpanToReadOnlyList(Int32Array span)

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern)
{
Trace.TraceError($"getting objects with depth={depth.ToString()}, catalog = {catalogPattern}, dbschema = {dbSchemaPattern}, tablename = {tableNamePattern}");

Dictionary<string, Dictionary<string, Dictionary<string, TableInfoPair>>> catalogMap = new Dictionary<string, Dictionary<string, Dictionary<string, TableInfoPair>>>();
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs)
{
Expand Down
7 changes: 4 additions & 3 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public override QueryResult ExecuteQuery()

Schema schema = GetSchema();

return new QueryResult(-1, new CloudFetchReader(this, schema));
return new QueryResult(-1, new SparkReader(this, schema));
//return new QueryResult(-1, new CloudFetchReader(this, schema));
}

public override UpdateResult ExecuteUpdate()
Expand All @@ -65,7 +66,7 @@ public override UpdateResult ExecuteUpdate()

public override object GetValue(IArrowArray arrowArray, int index)
{
throw new NotSupportedException();
return base.GetValue(arrowArray, index);
}

sealed class SparkReader : IArrowArrayStream
Expand Down Expand Up @@ -270,7 +271,7 @@ public async Task downloadData(HttpClient client)
request.Headers.Add(pair.Key, pair.Value);
}
HttpResponseMessage response = await client.SendAsync(request);
this.reader = new ArrowStreamReader(response.Content.ReadAsStream());
this.reader = new ArrowStreamReader(response.Content.ReadAsStreamAsync().Result);
isDownloaded = true;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Apache.Arrow.Types;
using Thrift.Collections;
using Thrift.Protocol;
Expand Down Expand Up @@ -87,7 +88,8 @@ public TBinaryColumn DeepCopy()

for(int _i197 = 0; _i197 < length; ++_i197)
{
typedMemory.Span[_i197] = offset;
//typedMemory.Span[_i197] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, _i197 * 4);
var size = await iprot.ReadI32Async(cancellationToken);
offset += size;

Expand All @@ -107,6 +109,7 @@ public TBinaryColumn DeepCopy()
values.Append(tmp.AsMemory(0, size).Span);
}
typedMemory.Span[length] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, length * 4);

await iprot.ReadListEndAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down Expand Up @@ -83,7 +84,8 @@ public TI16Column DeepCopy()
await transport.ReadExactlyAsync(memory, cancellationToken);
for (int _i152 = 0; _i152 < length; ++_i152)
{
typedMemory.Span[_i152] = BinaryPrimitives.ReverseEndianness(typedMemory.Span[_i152]);
//typedMemory.Span[_i152] = BinaryPrimitives.ReverseEndianness(typedMemory.Span[_i152]);
StreamExtensions.ReverseEndiannessInt16(memory.Span, _i152 * 2);
}
await iprot.ReadListEndAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down Expand Up @@ -82,7 +83,8 @@ public TI32Column DeepCopy()
await transport.ReadExactlyAsync(memory, cancellationToken);
for (int _i161 = 0; _i161 < length; ++_i161)
{
typedMemory.Span[_i161] = BinaryPrimitives.ReverseEndianness(typedMemory.Span[_i161]);
//typedMemory.Span[_i161] = BinaryPrimitives.ReverseEndianness(typedMemory.Span[_i161]);
StreamExtensions.ReverseEndianI32AtOffset(memory.Span, _i161 * 4);
}
await iprot.ReadListEndAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
using Thrift.Protocol.Utilities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Collections;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
Expand Down Expand Up @@ -86,7 +87,9 @@ public TStringColumn DeepCopy()

for(int _i188 = 0; _i188 < length; ++_i188)
{
typedMemory.Span[_i188] = offset;
//typedMemory.Span[_i188] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, _i188 * 4);

var size = await iprot.ReadI32Async(cancellationToken);
offset += size;

Expand All @@ -105,7 +108,8 @@ public TStringColumn DeepCopy()
await transport.ReadExactlyAsync(tmp.AsMemory(0, size), cancellationToken);
values.Append(tmp.AsMemory(0, size).Span);
}
typedMemory.Span[length] = offset;
//typedMemory.Span[length] = offset;
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, length * 4);

await iprot.ReadListEndAsync(cancellationToken);
}
Expand Down
125 changes: 125 additions & 0 deletions csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Runtime.InteropServices;

namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
{
public static class StreamExtensions
{
public static void WriteInt32LittleEndian(int value, Span<byte> buffer, int offset)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));

if (offset < 0 || offset > buffer.Length - sizeof(int))
throw new ArgumentOutOfRangeException(nameof(offset), "Offset is outside the bounds of the buffer.");

// Ensure the buffer is large enough to hold an int starting from the offset
if (buffer.Length < offset + sizeof(int))
throw new ArgumentException("Buffer too small to write an Int32 at the specified offset.");

// Write the integer in little-endian format
buffer[offset] = (byte)value;
buffer[offset + 1] = (byte)(value >> 8);
buffer[offset + 2] = (byte)(value >> 16);
buffer[offset + 3] = (byte)(value >> 24);
}

public static void ReverseEndianI32AtOffset(Span<byte> buffer, int offset)
{
// Check if the buffer is large enough to contain an i32 at the given offset
if (offset < 0 || buffer.Length < offset + sizeof(int))
throw new ArgumentException("Buffer is too small or offset is out of bounds.");

// Swap the bytes to reverse the endianness of the i32
// buffer[offset] and buffer[offset + 3]
// buffer[offset + 1] and buffer[offset + 2]
byte temp;

temp = buffer[offset];
buffer[offset] = buffer[offset + 3];
buffer[offset + 3] = temp;

temp = buffer[offset + 1];
buffer[offset + 1] = buffer[offset + 2];
buffer[offset + 2] = temp;
}
public static void ReverseEndiannessInt16(Span<byte> buffer, int offset)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));

if (offset < 0 || offset > buffer.Length - sizeof(short))
throw new ArgumentOutOfRangeException(nameof(offset), "Offset is outside the bounds of the buffer.");

// Swap the bytes to reverse the endianness of a 16-bit integer
(buffer[offset], buffer[offset + 1]) = (buffer[offset + 1], buffer[offset]);
}

public static TValue? GetValueOrDefault<TKey, TValue>(this IReadOnlyDictionary<TKey, TValue> dictionary, TKey key, TValue? defaultValue = default)
{
if (dictionary == null) throw new ArgumentNullException(nameof(dictionary));

return dictionary.TryGetValue(key, out TValue value) ? value : defaultValue;
}

public static async Task<bool> ReadExactlyAsync(this Stream stream, Memory<byte> memory, CancellationToken cancellationToken = default)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));

// Try to get the underlying array from the Memory<byte>
if (!MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> arraySegment))
{
throw new InvalidOperationException("The provided Memory<byte> does not have an accessible underlying array.");
}

int totalBytesRead = 0;
int count = memory.Length;

while (totalBytesRead < count)
{
int bytesRead = await stream.ReadAsync(arraySegment.Array, arraySegment.Offset + totalBytesRead, count - totalBytesRead, cancellationToken).ConfigureAwait(false);

if (bytesRead == 0)
{
// End of the stream reached before reading the desired amount
return totalBytesRead == 0;
}

totalBytesRead += bytesRead;
}

return true;
}

public static async Task<bool> ReadExactlyAsync(this Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (stream == null) throw new ArgumentNullException(nameof(stream));
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || offset >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0 || (count + offset) > buffer.Length) throw new ArgumentOutOfRangeException(nameof(count));

int bytesReadTotal = 0;

while (bytesReadTotal < count)
{
int bytesRead = await stream.ReadAsync(buffer, offset + bytesReadTotal, count - bytesReadTotal, cancellationToken).ConfigureAwait(false);

// If ReadAsync returns 0, it means the end of the stream has been reached
if (bytesRead == 0)
{
// If we haven't read any bytes at all, it's okay (might be at the end of the stream)
// But if we've read some bytes and then hit the end of the stream, it's unexpected
return bytesReadTotal == 0;
}

bytesReadTotal += bytesRead;
}

return true;
}
}
}
Loading

0 comments on commit d5bdb76

Please sign in to comment.