Skip to content

Commit f42c5b0

Browse files
adamreeveamoeba
authored andcommitted
GH-44361: [C#][Integration] Include .NET in Flight integration tests (#44377)
### Rationale for this change See #44361. This allows testing compatibility of the .NET Flight implementation with other Flight implementations. ### What changes are included in this PR? * Adds a new `Apache.Arrow.Flight.IntegrationTest` project that can run in server or client mode for Flight integration tests. * Includes the integration tests that send then retrieve data defined in JSON files, but doesn't add any of the named scenarios * Configures archery to include C# in the Flight integration tests, but skip all the named scenarios * Also skips tests that use dictionary data due to #38045, and the empty data test due to #44363 ### Are these changes tested? These changes are tests. ### Are there any user-facing changes? No * GitHub Issue: #44361 Authored-by: Adam Reeve <[email protected]> Signed-off-by: Curt Hagenlocher <[email protected]>
1 parent 47ba570 commit f42c5b0

File tree

14 files changed

+502
-20
lines changed

14 files changed

+502
-20
lines changed

csharp/Apache.Arrow.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tes
2727
EndProject
2828
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}"
2929
EndProject
30+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
31+
EndProject
3032
Global
3133
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3234
Debug|Any CPU = Debug|Any CPU
@@ -81,6 +83,10 @@ Global
8183
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU
8284
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU
8385
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU
86+
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
87+
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = Debug|Any CPU
88+
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.ActiveCfg = Release|Any CPU
89+
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 = Release|Any CPU
8490
EndGlobalSection
8591
GlobalSection(SolutionProperties) = preSolution
8692
HideSolutionNode = FALSE

csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected virtual void Dispose(bool disposing)
6464
{
6565
if (!_disposed)
6666
{
67-
_flightDataStream.Dispose();
67+
_flightDataStream?.Dispose();
6868
_disposed = true;
6969
}
7070
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project Sdk="Microsoft.NET.Sdk">
3+
4+
<PropertyGroup>
5+
<OutputType>Exe</OutputType>
6+
<TargetFramework>net8.0</TargetFramework>
7+
<RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
12+
<PackageReference Include="System.Text.Json" Version="8.0.5" />
13+
<ProjectReference Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" />
14+
<ProjectReference Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" />
15+
<ProjectReference Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" />
16+
</ItemGroup>
17+
18+
</Project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using System.IO;
18+
using System.Threading.Tasks;
19+
20+
namespace Apache.Arrow.Flight.IntegrationTest;
21+
22+
public class FlightClientCommand
23+
{
24+
private readonly int _port;
25+
private readonly string _scenario;
26+
private readonly FileInfo _jsonFileInfo;
27+
28+
public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo)
29+
{
30+
_port = port;
31+
_scenario = scenario;
32+
_jsonFileInfo = jsonFileInfo;
33+
}
34+
35+
public async Task Execute()
36+
{
37+
if (!string.IsNullOrEmpty(_scenario))
38+
{
39+
// No named scenarios are currently implemented
40+
throw new Exception($"Scenario '{_scenario}' is not supported.");
41+
}
42+
43+
if (!(_jsonFileInfo?.Exists ?? false))
44+
{
45+
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'");
46+
}
47+
48+
var scenario = new JsonTestScenario(_port, _jsonFileInfo);
49+
await scenario.RunClient().ConfigureAwait(false);
50+
}
51+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using System.Net;
18+
using System.Threading.Tasks;
19+
using Apache.Arrow.Flight.TestWeb;
20+
using Microsoft.AspNetCore.Hosting;
21+
using Microsoft.AspNetCore.Hosting.Server;
22+
using Microsoft.AspNetCore.Hosting.Server.Features;
23+
using Microsoft.AspNetCore.Server.Kestrel.Core;
24+
using Microsoft.Extensions.DependencyInjection;
25+
using Microsoft.Extensions.Hosting;
26+
27+
namespace Apache.Arrow.Flight.IntegrationTest;
28+
29+
public class FlightServerCommand
30+
{
31+
private readonly string _scenario;
32+
33+
public FlightServerCommand(string scenario)
34+
{
35+
_scenario = scenario;
36+
}
37+
38+
public async Task Execute()
39+
{
40+
if (!string.IsNullOrEmpty(_scenario))
41+
{
42+
// No named scenarios are currently implemented
43+
throw new Exception($"Scenario '{_scenario}' is not supported.");
44+
}
45+
46+
var host = Host.CreateDefaultBuilder()
47+
.ConfigureWebHostDefaults(webBuilder =>
48+
{
49+
webBuilder
50+
.ConfigureKestrel(options =>
51+
{
52+
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2);
53+
})
54+
.UseStartup<Startup>();
55+
})
56+
.Build();
57+
58+
await host.StartAsync().ConfigureAwait(false);
59+
60+
var addresses = host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses;
61+
foreach (var address in addresses)
62+
{
63+
Console.WriteLine($"Server listening on {address}");
64+
}
65+
66+
await host.WaitForShutdownAsync().ConfigureAwait(false);
67+
}
68+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using Grpc.Net.Client.Balancer;
17+
18+
namespace Apache.Arrow.Flight.IntegrationTest;
19+
20+
/// <summary>
21+
/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight.
22+
/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme.
23+
/// </summary>
24+
public class GrpcTcpResolverFactory : ResolverFactory
25+
{
26+
public override string Name => "grpc+tcp";
27+
28+
public override Resolver Create(ResolverOptions options)
29+
{
30+
return new StaticResolverFactory(
31+
uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) })
32+
.Create(options);
33+
}
34+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using System.IO;
18+
using System.Linq;
19+
using System.Threading.Tasks;
20+
using Apache.Arrow.Flight.Client;
21+
using Apache.Arrow.IntegrationTest;
22+
using Apache.Arrow.Tests;
23+
using Apache.Arrow.Types;
24+
using Google.Protobuf;
25+
using Grpc.Net.Client;
26+
using Grpc.Core;
27+
using Grpc.Net.Client.Balancer;
28+
using Microsoft.Extensions.DependencyInjection;
29+
30+
namespace Apache.Arrow.Flight.IntegrationTest;
31+
32+
/// <summary>
33+
/// A test scenario defined using a JSON data file
34+
/// </summary>
35+
internal class JsonTestScenario
36+
{
37+
private readonly int _serverPort;
38+
private readonly FileInfo _jsonFile;
39+
private readonly ServiceProvider _serviceProvider;
40+
41+
public JsonTestScenario(int serverPort, FileInfo jsonFile)
42+
{
43+
_serverPort = serverPort;
44+
_jsonFile = jsonFile;
45+
46+
var services = new ServiceCollection();
47+
services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory());
48+
_serviceProvider = services.BuildServiceProvider();
49+
}
50+
51+
public async Task RunClient()
52+
{
53+
var address = $"grpc+tcp://localhost:{_serverPort}";
54+
using var channel = GrpcChannel.ForAddress(
55+
address,
56+
new GrpcChannelOptions
57+
{
58+
ServiceProvider = _serviceProvider,
59+
Credentials = ChannelCredentials.Insecure
60+
});
61+
var client = new FlightClient(channel);
62+
63+
var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName);
64+
65+
var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false);
66+
var schema = jsonFile.GetSchemaAndDictionaries(out Func<DictionaryType, IArrowArray> dictionaries);
67+
var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray();
68+
69+
// 1. Put the data to the server.
70+
await UploadBatches(client, descriptor, batches).ConfigureAwait(false);
71+
72+
// 2. Get the ticket for the data.
73+
var info = await client.GetInfo(descriptor).ConfigureAwait(false);
74+
if (info.Endpoints.Count == 0)
75+
{
76+
throw new Exception("No endpoints received");
77+
}
78+
79+
// 3. Stream data from the server, comparing individual batches.
80+
foreach (var endpoint in info.Endpoints)
81+
{
82+
var locations = endpoint.Locations.ToArray();
83+
if (locations.Length == 0)
84+
{
85+
// Can read with existing client
86+
await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false);
87+
}
88+
else
89+
{
90+
foreach (var location in locations)
91+
{
92+
using var readChannel = GrpcChannel.ForAddress(
93+
location.Uri,
94+
new GrpcChannelOptions
95+
{
96+
ServiceProvider = _serviceProvider,
97+
Credentials = ChannelCredentials.Insecure
98+
});
99+
var readClient = new FlightClient(readChannel);
100+
await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false);
101+
}
102+
}
103+
}
104+
}
105+
106+
private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches)
107+
{
108+
using var putCall = client.StartPut(descriptor);
109+
using var writer = putCall.RequestStream;
110+
111+
try
112+
{
113+
var counter = 0;
114+
foreach (var batch in batches)
115+
{
116+
var metadata = $"{counter}";
117+
118+
await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false);
119+
120+
// Verify server has acknowledged the write request
121+
await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
122+
var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8();
123+
124+
if (responseString != metadata)
125+
{
126+
throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'");
127+
}
128+
129+
counter++;
130+
}
131+
}
132+
finally
133+
{
134+
await writer.CompleteAsync().ConfigureAwait(false);
135+
}
136+
137+
// Drain the response stream to ensure the server has stored the data
138+
var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
139+
if (hasMore)
140+
{
141+
throw new Exception("Expected to have reached the end of the response stream");
142+
}
143+
}
144+
145+
private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches)
146+
{
147+
using var readStream = client.GetStream(ticket);
148+
var counter = 0;
149+
foreach (var originalBatch in batches)
150+
{
151+
if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
152+
{
153+
throw new Exception($"Expected {batches.Length} batches but received {counter}");
154+
}
155+
156+
var batch = readStream.ResponseStream.Current;
157+
ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false);
158+
159+
counter++;
160+
}
161+
162+
if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
163+
{
164+
throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches");
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)