Skip to content

Commit

Permalink
Make changes for cloud fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrantpuppala committed Mar 18, 2024
1 parent 9ae24cf commit 5038c9c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
8 changes: 5 additions & 3 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ internal SparkStatement(SparkConnection connection)

protected override void SetStatementProperties(TExecuteStatementReq statement)
{

statement.EnforceResultPersistenceMode = true;
statement.ResultPersistenceMode = 2;

statement.CanReadArrowResult = true;
statement.CanDownloadResult = true;
statement.ConfOverlay = SparkConnection.timestampConfig;
Expand All @@ -52,7 +54,6 @@ public override QueryResult ExecuteQuery()
{
ExecuteStatement();
PollForResponse();

Schema schema = GetSchema();

return new QueryResult(-1, new SparkReader(this, schema));
Expand Down Expand Up @@ -160,7 +161,7 @@ public async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken c
return next;
}
this.reader = null;
if (this.chunkDownloader.chunks[this.chunkDownloader.currentChunkIndex] == null)
if (this.chunkDownloader.currentChunkIndex >= this.chunkDownloader.chunks.Count)
{
this.statement = null;
}
Expand Down Expand Up @@ -206,6 +207,7 @@ public ChunkDownloader(List<TSparkArrowResultLink> links)
this.chunks.Add(i, currentChunk);
}
this.client = new HttpClient();
initialize();
}

public ChunkDownloader(Dictionary<string, Dictionary<string, string>> links)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public partial class TExecuteStatementReq : TBase
private string _requestValidation;
private int _resultPersistenceMode;
private bool _trimArrowBatchesToLimit;
private bool _enforceResultPersistenceMode;

public global::Apache.Hive.Service.Rpc.Thrift.TSessionHandle SessionHandle { get; set; }

Expand Down Expand Up @@ -85,6 +86,20 @@ public bool RunAsync
}
}


public bool EnforceResultPersistenceMode
{
get
{
return _enforceResultPersistenceMode;
}
set
{
__isset.enforceResultPersistenceMode = true;
this._enforceResultPersistenceMode = value;
}
}

public long QueryTimeout
{
get
Expand Down Expand Up @@ -315,6 +330,7 @@ public struct Isset
public bool requestValidation;
public bool resultPersistenceMode;
public bool trimArrowBatchesToLimit;
public bool enforceResultPersistenceMode;
}

public TExecuteStatementReq()
Expand Down Expand Up @@ -432,6 +448,11 @@ public TExecuteStatementReq DeepCopy()
tmp357.TrimArrowBatchesToLimit = this.TrimArrowBatchesToLimit;
}
tmp357.__isset.trimArrowBatchesToLimit = this.__isset.trimArrowBatchesToLimit;
if(__isset.enforceResultPersistenceMode)
{
tmp357.EnforceResultPersistenceMode = this.EnforceResultPersistenceMode;
}
tmp357.__isset.enforceResultPersistenceMode = this.__isset.enforceResultPersistenceMode;
return tmp357;
}

Expand Down Expand Up @@ -673,6 +694,16 @@ public TExecuteStatementReq DeepCopy()
await TProtocolUtil.SkipAsync(iprot, field.Type, cancellationToken);
}
break;
case 3344:
if (field.Type == TType.Bool)
{
EnforceResultPersistenceMode = await iprot.ReadBoolAsync(cancellationToken);
}
else
{
await TProtocolUtil.SkipAsync(iprot, field.Type, cancellationToken);
}
break;
default:
await TProtocolUtil.SkipAsync(iprot, field.Type, cancellationToken);
break;
Expand Down Expand Up @@ -891,6 +922,15 @@ public TExecuteStatementReq DeepCopy()
await oprot.WriteBoolAsync(TrimArrowBatchesToLimit, cancellationToken);
await oprot.WriteFieldEndAsync(cancellationToken);
}
if(__isset.enforceResultPersistenceMode)
{
tmp363.Name = "enforceResultPersistenceMode";
tmp363.Type = TType.Bool;
tmp363.ID = 3344;
await oprot.WriteFieldBeginAsync(tmp363, cancellationToken);
await oprot.WriteBoolAsync(EnforceResultPersistenceMode, cancellationToken);
await oprot.WriteFieldEndAsync(cancellationToken);
}
await oprot.WriteFieldStopAsync(cancellationToken);
await oprot.WriteStructEndAsync(cancellationToken);
}
Expand Down Expand Up @@ -923,7 +963,8 @@ public override bool Equals(object that)
&& ((__isset.executionVersion == other.__isset.executionVersion) && ((!__isset.executionVersion) || (global::System.Object.Equals(ExecutionVersion, other.ExecutionVersion))))
&& ((__isset.requestValidation == other.__isset.requestValidation) && ((!__isset.requestValidation) || (global::System.Object.Equals(RequestValidation, other.RequestValidation))))
&& ((__isset.resultPersistenceMode == other.__isset.resultPersistenceMode) && ((!__isset.resultPersistenceMode) || (global::System.Object.Equals(ResultPersistenceMode, other.ResultPersistenceMode))))
&& ((__isset.trimArrowBatchesToLimit == other.__isset.trimArrowBatchesToLimit) && ((!__isset.trimArrowBatchesToLimit) || (global::System.Object.Equals(TrimArrowBatchesToLimit, other.TrimArrowBatchesToLimit))));
&& ((__isset.trimArrowBatchesToLimit == other.__isset.trimArrowBatchesToLimit) && ((!__isset.trimArrowBatchesToLimit) || (global::System.Object.Equals(TrimArrowBatchesToLimit, other.TrimArrowBatchesToLimit))))
&& ((__isset.enforceResultPersistenceMode == other.__isset.enforceResultPersistenceMode) && ((!__isset.enforceResultPersistenceMode) || (global::System.Object.Equals(EnforceResultPersistenceMode, other.EnforceResultPersistenceMode))));
}

public override int GetHashCode() {
Expand Down Expand Up @@ -997,6 +1038,10 @@ public override int GetHashCode() {
{
hashcode = (hashcode * 397) + ExecutionVersion.GetHashCode();
}
if(__isset.enforceResultPersistenceMode)
{
hashcode = (hashcode * 397) + EnforceResultPersistenceMode.GetHashCode();
}
if((RequestValidation != null) && __isset.requestValidation)
{
hashcode = (hashcode * 397) + RequestValidation.GetHashCode();
Expand Down Expand Up @@ -1091,6 +1136,11 @@ public override string ToString()
tmp365.Append(", RejectHighCostQueries: ");
RejectHighCostQueries.ToString(tmp365);
}
if(__isset.enforceResultPersistenceMode)
{
tmp365.Append(", EnforceResultPersistenceMode: ");
EnforceResultPersistenceMode.ToString(tmp365);
}
if(__isset.estimatedCost)
{
tmp365.Append(", EstimatedCost: ");
Expand Down

0 comments on commit 5038c9c

Please sign in to comment.