Skip to content

Commit

Permalink
Merge branch 'main' into feature/codeql
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-ivanov committed Dec 18, 2024
2 parents 61716e7 + 27e8bb5 commit 9344996
Show file tree
Hide file tree
Showing 35 changed files with 2,936 additions and 23 deletions.
35 changes: 29 additions & 6 deletions azure-pipelines-e2e-tests-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,38 @@ stages:
targetType: inline
script: |
echo "Download Hadoop utils for Windows."
curl -k -L -o hadoop.zip https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip
$hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
# Spark 3.3.3 version binary use Hadoop3 dependency
if ("3.3.3" -contains "${{ test.version }}") {
$hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
}
curl -k -L -o hadoop.zip $hadoopBinaryUrl
Expand-Archive -Path hadoop.zip -Destination .
New-Item -ItemType Directory -Force -Path hadoop\bin
cp hadoop-2.8.1\winutils.exe hadoop\bin
if ("3.3.3" -contains "${{ test.version }}") {
cp hadoop-3.3.5\winutils.exe hadoop\bin
# Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
cp hadoop-3.3.5\hadoop.dll hadoop\bin
cp hadoop-3.3.5\hadoop.dll C:\Windows\System32
[System.Environment]::SetEnvironmentVariable("PATH", $Env:Path + ";$(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop", [System.EnvironmentVariableTarget]::Machine)
} else {
cp hadoop-2.8.1\winutils.exe hadoop\bin
}
- pwsh: |
echo "Downloading Spark ${{ test.version }}"
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/spark-${{ test.version }}-bin-hadoop2.7.tgz
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
# In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
}
# In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
if ("3.3.3" -contains "${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
}
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz
tar xzvf spark-${{ test.version }}.tgz
move $sparkBinaryName spark-${{ test.version }}-bin-hadoop
displayName: 'Download Spark Distro ${{ test.version }}'
workingDirectory: $(Build.BinariesDirectory)
Expand All @@ -142,7 +165,7 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)

- pwsh: |
Expand All @@ -167,7 +190,7 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(BACKWARD_COMPATIBLE_DOTNET_WORKER_DIR)

- checkout: forwardCompatibleRelease
Expand All @@ -189,5 +212,5 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark-${{ parameters.forwardCompatibleRelease }}
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)
10 changes: 10 additions & 0 deletions azure-pipelines-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ variables:
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)

backwardCompatibleTestOptions_Windows_3_3: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand All @@ -63,6 +68,11 @@ parameters:
- '3.2.1'
- '3.2.2'
- '3.2.3'
- '3.3.0'
- '3.3.1'
- '3.3.2'
- '3.3.3'
- '3.3.4'
# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
- name: listOfE2ETestsPoolTypes
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ public class DeltaFixture
public DeltaFixture()
{
Version sparkVersion = SparkSettings.Version;
string deltaVersion = (sparkVersion.Major, sparkVersion.Minor) switch
string deltaVersion = (sparkVersion.Major, sparkVersion.Minor, sparkVersion.Build) switch
{
(2, _) => "delta-core_2.11:0.6.1",
(3, 0) => "delta-core_2.12:0.8.0",
(3, 1) => "delta-core_2.12:1.0.0",
(3, 2) => "delta-core_2.12:1.1.0",
(2, _, _) => "delta-core_2.11:0.6.1",
(3, 0, _) => "delta-core_2.12:0.8.0",
(3, 1, _) => "delta-core_2.12:1.0.0",
(3, 2, _) => "delta-core_2.12:1.1.0",
(3, 3, 0) => "delta-core_2.12:2.1.0",
(3, 3, 1) => "delta-core_2.12:2.1.0",
(3, 3, 2) => "delta-core_2.12:2.3.0",
(3, 3, 3) => "delta-core_2.12:2.3.0",
(3, 3, 4) => "delta-core_2.12:2.3.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};

Expand Down
45 changes: 45 additions & 0 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,45 @@ public void Write(Stream stream, TaskContext taskContext)
}
}

/// <summary>
/// TaskContextWriter for version 3.3.*.
/// </summary>
internal sealed class TaskContextWriterV3_3_X : ITaskContextWriter
{
public void Write(Stream stream, TaskContext taskContext)
{
SerDe.Write(stream, taskContext.IsBarrier);
SerDe.Write(stream, taskContext.Port);
SerDe.Write(stream, taskContext.Secret);

SerDe.Write(stream, taskContext.StageId);
SerDe.Write(stream, taskContext.PartitionId);
SerDe.Write(stream, taskContext.AttemptNumber);
SerDe.Write(stream, taskContext.AttemptId);
// Add CPUs field for spark 3.3.x
SerDe.Write(stream, taskContext.CPUs);

SerDe.Write(stream, taskContext.Resources.Count());
foreach (TaskContext.Resource resource in taskContext.Resources)
{
SerDe.Write(stream, resource.Key);
SerDe.Write(stream, resource.Value);
SerDe.Write(stream, resource.Addresses.Count());
foreach (string address in resource.Addresses)
{
SerDe.Write(stream, address);
}
}

SerDe.Write(stream, taskContext.LocalProperties.Count);
foreach (KeyValuePair<string, string> kv in taskContext.LocalProperties)
{
SerDe.Write(stream, kv.Key);
SerDe.Write(stream, kv.Value);
}
}
}

///////////////////////////////////////////////////////////////////////////
// BroadcastVariable writer for different Spark versions.
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -311,6 +350,12 @@ internal PayloadWriter Create(Version version = null)
new TaskContextWriterV3_0_X(),
new BroadcastVariableWriterV2_4_X(),
new CommandWriterV2_4_X());
case Versions.V3_3_0:
return new PayloadWriter(
version,
new TaskContextWriterV3_3_X(),
new BroadcastVariableWriterV2_4_X(),
new CommandWriterV2_4_X());
default:
throw new NotSupportedException($"Spark {version} is not supported.");
}
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static IEnumerable<object[]> VersionData() =>
new object[] { Versions.V2_4_0 },
new object[] { Versions.V3_0_0 },
new object[] { Versions.V3_2_0 },
new object[] { Versions.V3_3_0 },
};

internal static Payload GetDefaultPayload()
Expand Down
47 changes: 35 additions & 12 deletions src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,31 @@ internal TaskContext Process(Stream stream)
return (_version.Major, _version.Minor) switch
{
(2, 4) => TaskContextProcessorV2_4_X.Process(stream),
(3, _) => TaskContextProcessorV3_0_X.Process(stream),
(3, _) t when t.Minor < 3 => TaskContextProcessorV3_0_X.Process(stream),
(3, _) => TaskContextProcessorV3_3_X.Process(stream),
_ => throw new NotSupportedException($"Spark {_version} not supported.")
};
}

private static TaskContext ReadTaskContext(Stream stream)
private static TaskContext ReadTaskContext_2_x(Stream stream)
=> new()
{
return new TaskContext
{
StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream)
};
}
StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
};

private static TaskContext ReadTaskContext_3_3(Stream stream)
=> new()
{
StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
CPUs = SerDe.ReadInt32(stream)
};

private static void ReadBarrierInfo(Stream stream)
{
Expand Down Expand Up @@ -78,7 +88,7 @@ private static class TaskContextProcessorV2_4_X
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextProperties(stream, taskContext);

return taskContext;
Expand All @@ -90,7 +100,20 @@ private static class TaskContextProcessorV3_0_X
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);

return taskContext;
}
}

private static class TaskContextProcessorV3_3_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_3_3(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);

Expand Down
2 changes: 2 additions & 0 deletions src/csharp/Microsoft.Spark/TaskContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal class TaskContext

internal long AttemptId { get; set; }

internal int CPUs { get; set; }

internal bool IsBarrier { get; set; }

internal int Port { get; set; }
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ internal static class Versions
internal const string V3_1_0 = "3.1.0";
internal const string V3_1_1 = "3.1.1";
internal const string V3_2_0 = "3.2.0";
internal const string V3_3_0 = "3.3.0";
}
}
83 changes: 83 additions & 0 deletions src/scala/microsoft-spark-3-3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.microsoft.scala</groupId>
<artifactId>microsoft-spark</artifactId>
<version>${microsoft-spark.version}</version>
</parent>
<artifactId>microsoft-spark-3-3_2.12</artifactId>
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.3.0</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 9344996

Please sign in to comment.