Skip to content

Commit

Permalink
Merge branch 'dev/apache-drivers' of github.com:gopalldb/arrow-adbc i…
Browse files Browse the repository at this point in the history
…nto dev/apache-drivers
  • Loading branch information
yunbodeng-db committed Mar 14, 2024
2 parents 5701c04 + 4313cc2 commit ea46162
Show file tree
Hide file tree
Showing 144 changed files with 43,006 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .vs/VSWorkspaceState.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"ExpandedNodes": [
""
],
"SelectedNode": "\\Apache.Arrow.Adbc.sln",
"PreviewInSolutionExplorer": false
}
Binary file added .vs/arrow-adbc/v17/.wsuo
Binary file not shown.
23 changes: 23 additions & 0 deletions .vs/arrow-adbc/v17/DocumentLayout.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"Version": 1,
"WorkspaceRootPath": "C:\\Users\\gopal.lal\\source\\repos\\arrow-adbc\\",
"Documents": [],
"DocumentGroupContainers": [
{
"Orientation": 0,
"VerticalTabListWidth": 256,
"DocumentGroups": [
{
"DockedWidth": 200,
"SelectedChildIndex": -1,
"Children": [
{
"$type": "Bookmark",
"Name": "ST:0:0:{cce594b6-0c39-4442-ba28-10c64ac7e89f}"
}
]
}
]
}
]
}
14 changes: 14 additions & 0 deletions csharp/Apache.Arrow.Adbc.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Drivers.I
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Tests.Drivers.Interop.Snowflake", "test\Drivers\Interop\Snowflake\Apache.Arrow.Adbc.Tests.Drivers.Interop.Snowflake.csproj", "{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Drivers.Apache", "src\Drivers\Apache\Apache.Arrow.Adbc.Drivers.Apache.csproj", "{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Adbc.Tests.Drivers.Apache", "test\Drivers\Apache\Apache.Arrow.Adbc.Tests.Drivers.Apache.csproj", "{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -70,6 +74,14 @@ Global
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7}.Release|Any CPU.Build.0 = Release|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903}.Release|Any CPU.Build.0 = Release|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -84,6 +96,8 @@ Global
{EA43BB7C-BC00-4701-BDF4-367880C2495C} = {C7290227-E925-47E7-8B6B-A8B171645D58}
{30024B6F-7BC1-4574-BE5A-924FBD6EAF83} = {FEB257A0-4FD3-495E-9A47-9E1649755445}
{8BE1EECC-3ACF-41B2-AF7D-1A67196FF6C7} = {C7290227-E925-47E7-8B6B-A8B171645D58}
{6C0D8BE1-4A23-4C2F-88B1-D2FBEA0B1903} = {FEB257A0-4FD3-495E-9A47-9E1649755445}
{714F0BD2-3A92-4D1A-8FAC-D0C0599BE3E3} = {C7290227-E925-47E7-8B6B-A8B171645D58}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4795CF16-0FDB-4BE0-9768-5CF31564DC03}
Expand Down
15 changes: 15 additions & 0 deletions csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net7.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.19.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Apache.Arrow.Adbc\Apache.Arrow.Adbc.csproj" />
</ItemGroup>

</Project>
209 changes: 209 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Connection : AdbcConnection
{
const string userAgent = "AdbcExperimental/0.0";

protected TOperationHandle operationHandle;
protected IReadOnlyDictionary<string, string> properties;
internal TTransport transport;
internal TCLIService.Client client;
internal TSessionHandle sessionHandle;

internal HiveServer2Connection() : this(null)
{

}

internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
{
this.properties = properties;
}

public void Open()
{
TProtocol protocol = CreateProtocol();
this.transport = protocol.Transport;
this.client = new TCLIService.Client(protocol);

var s0 = this.client.OpenSession(CreateSessionRequest()).Result;
this.sessionHandle = s0.SessionHandle;
}

protected abstract TProtocol CreateProtocol();
protected abstract TOpenSessionReq CreateSessionRequest();

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern)
{
Dictionary<string, Dictionary<string, Dictionary<string, List<string>>>> catalogMap = new Dictionary<string, Dictionary<string, Dictionary<string, List<string>>>>();
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs)
{
TGetCatalogsReq getCatalogsReq = new TGetCatalogsReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.DbSchemas)
{
TGetSchemasReq getSchemasReq = new TGetSchemasReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Tables)
{
TGetTablesReq getTablesReq = new TGetTablesReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All)
{
TGetColumnsReq columnsReq = new TGetColumnsReq(this.sessionHandle);
columnsReq.CatalogName = catalogPattern;
columnsReq.SchemaName = dbSchemaPattern;
columnsReq.TableName = tableNamePattern;

if (!string.IsNullOrEmpty(columnNamePattern))
columnsReq.ColumnName = columnNamePattern;

var columnsResponse = this.client.GetColumns(columnsReq).Result;
if (columnsResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(columnsResponse.Status.ErrorMessage);
}

this.operationHandle = columnsResponse.OperationHandle;
}

PollForResponse();

Schema schema = GetSchema();

return new GetObjectsReader(this,schema);
}

public override IArrowArrayStream GetInfo(List<int> codes)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetTableTypes()
{
throw new NotImplementedException();
}

protected void PollForResponse()
{
TGetOperationStatusResp statusResponse = null;
do
{
if (statusResponse != null) { Thread.Sleep(500); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = this.client.GetOperationStatus(request).Result;
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}


public override void Dispose()
{
if (this.client != null)
{
TCloseSessionReq r6 = new TCloseSessionReq(this.sessionHandle);
this.client.CloseSession(r6).Wait();

this.transport.Close();
this.client.Dispose();
this.transport = null;
this.client = null;
}
}

protected Schema GetSchema()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.client.GetResultSetMetadata(request).Result;
return SchemaParser.GetArrowSchema(response.Schema);
}

sealed class GetObjectsReader : IArrowArrayStream
{
HiveServer2Connection connection;
Schema schema;
List<TSparkArrowBatch> batches;
int index;
IArrowReader reader;

public GetObjectsReader(HiveServer2Connection connection, Schema schema)
{
this.connection = connection;
this.schema = schema;
}

public Schema Schema { get { return schema; } }

public async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
while (true)
{
if (this.reader != null)
{
RecordBatch next = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
if (next != null)
{
return next;
}
this.reader = null;
}

if (this.batches != null && this.index < this.batches.Count)
{
this.reader = new ArrowStreamReader(new ChunkStream(this.schema, this.batches[this.index++].Batch));
continue;
}

this.batches = null;
this.index = 0;

if (this.connection == null)
{
return null;
}

TFetchResultsReq request = new TFetchResultsReq(this.connection.operationHandle, TFetchOrientation.FETCH_NEXT, 50000);
TFetchResultsResp response = await this.connection.client.FetchResults(request, cancellationToken);
this.batches = response.Results.ArrowBatches;

if (!response.HasMoreRows)
{
this.connection = null;
}
}
}

public void Dispose()
{
}
}
}
}
83 changes: 83 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Apache.Hive.Service.Rpc.Thrift;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Statement : AdbcStatement
{
protected HiveServer2Connection connection;
protected TOperationHandle operationHandle;

protected HiveServer2Statement(HiveServer2Connection connection)
{
this.connection = connection;
}

protected virtual void SetStatementProperties(TExecuteStatementReq statement)
{
}

protected void ExecuteStatement()
{
TExecuteStatementReq executeRequest = new TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
SetStatementProperties(executeRequest);
var executeResponse = this.connection.client.ExecuteStatement(executeRequest).Result;
if (executeResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(executeResponse.Status.ErrorMessage);
}

this.operationHandle = executeResponse.OperationHandle;
}

protected void PollForResponse()
{
TGetOperationStatusResp statusResponse = null;
do
{
if (statusResponse != null) { Thread.Sleep(500); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = this.connection.client.GetOperationStatus(request).Result;
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}

protected Schema GetSchema()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.connection.client.GetResultSetMetadata(request).Result;
return SchemaParser.GetArrowSchema(response.Schema);
}

public override void Dispose()
{
if (this.operationHandle != null)
{
TCloseOperationReq request = new TCloseOperationReq(this.operationHandle);
this.connection.client.CloseOperation(request).Wait();
this.operationHandle = null;
}

base.Dispose();
}


}
}
Loading

0 comments on commit ea46162

Please sign in to comment.