Skip to content

Commit

Permalink
Merge branch 'main' into feature/codeql
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-ivanov committed Jan 3, 2025
2 parents b3a8fdb + 7b9224a commit ed9476f
Show file tree
Hide file tree
Showing 41 changed files with 2,920 additions and 33 deletions.
14 changes: 5 additions & 9 deletions azure-pipelines-e2e-tests-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ stages:
script: |
echo "Download Hadoop utils for Windows."
$hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
# Spark 3.3.3 version binary use Hadoop3 dependency
if ("3.3.3" -contains "${{ test.version }}") {
# Spark 3.3.0+ version binary uses Hadoop3 dependency
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
$hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
}
curl -k -L -o hadoop.zip $hadoopBinaryUrl
Expand-Archive -Path hadoop.zip -Destination .
New-Item -ItemType Directory -Force -Path hadoop\bin
if ("3.3.3" -contains "${{ test.version }}") {
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
cp hadoop-3.3.5\winutils.exe hadoop\bin
# Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
cp hadoop-3.3.5\hadoop.dll hadoop\bin
Expand All @@ -142,12 +142,8 @@ stages:
- pwsh: |
echo "Downloading Spark ${{ test.version }}"
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
# In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
}
# In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
if ("3.3.3" -contains "${{ test.version }}") {
# Spark 3.3.0+ uses Hadoop3
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
}
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz
Expand Down
12 changes: 11 additions & 1 deletion azure-pipelines-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ variables:
backwardCompatibleTestOptions_Linux_3_1: ""
forwardCompatibleTestOptions_Linux_3_1: ""

# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
Expand All @@ -41,6 +41,11 @@ variables:
forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)

backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
Expand Down Expand Up @@ -73,6 +78,11 @@ parameters:
- '3.3.2'
- '3.3.3'
- '3.3.4'
- '3.5.0'
- '3.5.1'
- '3.5.2'
- '3.5.3'

# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
- name: listOfE2ETestsPoolTypes
type: object
Expand Down
55 changes: 54 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ variables:
backwardCompatibleTestOptions_Linux_3_1: ""
forwardCompatibleTestOptions_Linux_3_1: ""

# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)

backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand Down Expand Up @@ -413,3 +418,51 @@ stages:
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_2)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_2)
- version: '3.5.0'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.1'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.2'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.3'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public DeltaFixture()
(3, 3, 2) => "delta-core_2.12:2.3.0",
(3, 3, 3) => "delta-core_2.12:2.3.0",
(3, 3, 4) => "delta-core_2.12:2.3.0",
(3, 5, _) => "delta-spark_2.12:3.2.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};

Expand Down
10 changes: 8 additions & 2 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ public void TestSignaturesV2_4_X()

/// <summary>
/// Test signatures for APIs introduced in Spark 3.1.*.
/// In Spark 3.5 Spark throws an exception when trying to delete
/// archive.zip from temp folder, and causes failures of other tests
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V3_1_0)]
[SkipIfSparkVersionIsNotInRange(Versions.V3_1_0, Versions.V3_3_0)]
public void TestSignaturesV3_1_X()
{
SparkContext sc = SparkContext.GetOrCreate(new SparkConf());

string archivePath = $"{TestEnvironment.ResourceDirectory}archive.zip";

sc.AddArchive(archivePath);

Assert.IsType<string[]>(sc.ListArchives().ToArray());
var archives = sc.ListArchives().ToArray();

Assert.IsType<string[]>(archives);
Assert.NotEmpty(archives.Where(a => a.EndsWith("archive.zip")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void TestSignaturesV2_4_X()
Assert.IsType<bool>(catalog.FunctionExists("functionname"));
Assert.IsType<Database>(catalog.GetDatabase("default"));
Assert.IsType<Function>(catalog.GetFunction("abs"));
Assert.IsType<Function>(catalog.GetFunction(null, "abs"));
Assert.IsType<Table>(catalog.GetTable("users"));
Assert.IsType<Table>(catalog.GetTable("default", "users"));
Assert.IsType<bool>(catalog.IsCached("users"));
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.UnitTest/TypeConverterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public void TestBaseCase()
Assert.Equal((short)1, TypeConverter.ConvertTo<short>((short)1));
Assert.Equal((ushort)1, TypeConverter.ConvertTo<ushort>((ushort)1));
Assert.Equal(1, TypeConverter.ConvertTo<int>(1));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1));
Assert.Equal(1u, TypeConverter.ConvertTo<uint>(1u));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1L));
Assert.Equal(1ul, TypeConverter.ConvertTo<ulong>(1ul));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ internal PayloadWriter Create(Version version = null)
new BroadcastVariableWriterV2_4_X(),
new CommandWriterV2_4_X());
case Versions.V3_3_0:
case Versions.V3_5_1:
return new PayloadWriter(
version,
new TaskContextWriterV3_3_X(),
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static IEnumerable<object[]> VersionData() =>
new object[] { Versions.V3_0_0 },
new object[] { Versions.V3_2_0 },
new object[] { Versions.V3_3_0 },
new object[] { Versions.V3_5_1 },
};

internal static Payload GetDefaultPayload()
Expand Down
22 changes: 10 additions & 12 deletions src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,32 @@ internal TaskContext Process(Stream stream)
private static TaskContext ReadTaskContext_2_x(Stream stream)
=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
};

// Needed for 3.3.0+
// https://issues.apache.org/jira/browse/SPARK-36173
private static TaskContext ReadTaskContext_3_3(Stream stream)
=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
CPUs = SerDe.ReadInt32(stream)
};

private static void ReadBarrierInfo(Stream stream)
{
// Read barrier-related payload. Note that barrier is currently not supported.
SerDe.ReadBool(stream); // IsBarrier
SerDe.ReadInt32(stream); // BoundPort
SerDe.ReadString(stream); // Secret
}

private static void ReadTaskContextProperties(Stream stream, TaskContext taskContext)
{
int numProperties = SerDe.ReadInt32(stream);
Expand Down Expand Up @@ -87,7 +88,6 @@ private static class TaskContextProcessorV2_4_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextProperties(stream, taskContext);

Expand All @@ -99,7 +99,6 @@ private static class TaskContextProcessorV3_0_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand All @@ -112,7 +111,6 @@ private static class TaskContextProcessorV3_3_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_3_3(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand Down
10 changes: 9 additions & 1 deletion src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using Microsoft.Spark.Network;
Expand Down Expand Up @@ -184,7 +185,7 @@ private object CallJavaMethod(
ISocketWrapper socket = null;

try
{
{
// Limit the number of connections to the JVM backend. Netty is configured
// to use a set number of threads to process incoming connections. Each
// new connection is delegated to these threads in a round robin fashion.
Expand Down Expand Up @@ -299,6 +300,13 @@ private object CallJavaMethod(
}
else
{
if (e.InnerException is SocketException)
{
_logger.LogError(
"Scala worker abandoned the connection, likely fatal crash on Java side. \n" +
"Ensure Spark runs with sufficient memory.");
}

// In rare cases we may hit the Netty connection thread deadlock.
// If max backend threads is 10 and we are currently using 10 active
// connections (0 in the _sockets queue). When we hit this exception,
Expand Down
14 changes: 8 additions & 6 deletions src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,22 @@ public Database GetDatabase(string dbName) =>
new Database((JvmObjectReference)Reference.Invoke("getDatabase", dbName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built
/// function then use the unqualified name.
/// Get the function with the specified name. This function can be a temporary function
/// or a function.
/// </summary>
/// <param name="functionName">Is either a qualified or unqualified name that designates a
/// function. If no database identifier is provided, it refers to a temporary function or
/// a function in the current database.</param>
/// function. It follows the same resolution rule with SQL: search for built-in/temp
/// functions first then functions in the current database(namespace).</param>
/// <returns>`Function` object which includes the class name, database, description,
/// whether it is temporary and the name of the function.</returns>
public Function GetFunction(string functionName) =>
new Function((JvmObjectReference)Reference.Invoke("getFunction", functionName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built function
/// then pass null as the dbName.
/// Get the function with the specified name in the specified database under the Hive
/// Metastore.
/// To get built-in functions, or functions in other catalogs, please use `getFunction(functionName)` with
/// qualified function name instead.
/// </summary>
/// <param name="dbName">Is a name that designates a database. Built-in functions will be
/// in database null rather than default.</param>
Expand Down
5 changes: 5 additions & 0 deletions src/csharp/Microsoft.Spark/Utils/TypeConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ private static object Convert(object obj, Type toType)
{
return ConvertToDictionary(hashtable, toType);
}
// Fails to convert int to long otherwise
else if (toType.IsPrimitive)
{
return System.Convert.ChangeType(obj, toType);
}

return obj;
}
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal static class Versions
internal const string V3_1_1 = "3.1.1";
internal const string V3_2_0 = "3.2.0";
internal const string V3_3_0 = "3.3.0";
internal const string V3_5_1 = "3.5.1";
}
}
Loading

0 comments on commit ed9476f

Please sign in to comment.