Skip to content

Remove APM use outside of TaskHost node provider / endpoint #11800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
45 changes: 29 additions & 16 deletions src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,25 +207,29 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
#if FEATURE_APM
IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
// Use a separate reuseable wait handle to avoid allocating on Task.AsyncWaitHandle.
using AutoResetEvent readTaskEvent = new(false);
ValueTask<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length, readTaskEvent);
#endif

bool continueReading = true;
do
{
// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the client and it never gets an opportunity to shutdown.
WaitHandle[] handles =
[
localPacketPumpShutdownEvent,

// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the client and it never gets an opportunity to shutdown.
WaitHandle[] handles =
[
localPacketPumpShutdownEvent,
#if FEATURE_APM
result.AsyncWaitHandle
result.AsyncWaitHandle
#else
((IAsyncResult)readTask).AsyncWaitHandle
readTaskEvent,
#endif
];
];

do
{
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
{
Expand All @@ -242,7 +246,10 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
#if FEATURE_APM
headerBytesRead = localStream.EndRead(result);
#else
headerBytesRead = readTask.Result;
// Avoid allocating an additional task instance when possible.
// However if a ValueTask runs asynchronously, it must be converted to a Task before consuming the result.
// Otherwise, the result will be undefined when not using async/await.
headerBytesRead = readTask.IsCompleted ? readTask.Result : readTask.AsTask().Result;
#endif

if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
Expand All @@ -264,7 +271,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
}
}

NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
NodePacketType packetType = (NodePacketType)headerByte[0];

int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(headerByte, 1, 4));
int packetBytesRead = 0;
Expand All @@ -275,7 +282,12 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu

while (packetBytesRead < packetLength)
{
#if NET
ValueTask<int> bytesReadTask = localStream.ReadAsync(packetData.AsMemory(packetBytesRead, packetLength - packetBytesRead));
int bytesRead = bytesReadTask.IsCompleted ? bytesReadTask.Result : bytesReadTask.AsTask().Result;
#else
int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead);
#endif
if (bytesRead == 0)
{
// Incomplete read. Abort.
Expand Down Expand Up @@ -305,8 +317,9 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
// Start reading the next package header.
#if FEATURE_APM
result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
handles[1] = result.AsyncWaitHandle;
#else
readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length, readTaskEvent);
#endif
}
}
Expand Down
157 changes: 42 additions & 115 deletions src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ internal static void ConnectToPipeStream(NamedPipeClientStream nodeStream, strin
/// <summary>
/// Class which wraps up the communications infrastructure for a given node.
/// </summary>
internal class NodeContext
internal sealed class NodeContext
{
private enum ExitPacketState
{
Expand Down Expand Up @@ -635,31 +635,45 @@ public NodeContext(int nodeId, Process process,
public void BeginAsyncPacketRead()
{
#if FEATURE_APM
_clientToServerStream.BeginRead(_headerByte, 0, _headerByte.Length, HeaderReadComplete, this);
_clientToServerStream.BeginRead(_headerByte, 0, _headerByte.Length, static result =>
((NodeContext)result.AsyncState).RunPacketReadLoopAsync(result), this);
#else
ThreadPool.QueueUserWorkItem(delegate
{
var ignored = RunPacketReadLoopAsync();
});
ThreadPool.QueueUserWorkItem(static async context =>
await context.RunPacketReadLoopAsync(), state: this, preferLocal: false);
#endif
}

#if !FEATURE_APM
#if FEATURE_APM
public void RunPacketReadLoopAsync(IAsyncResult headerReadResult)
{
#else
public async Task RunPacketReadLoopAsync()
{
while (true)
#endif
{
try
{
#if FEATURE_APM
int bytesRead = _clientToServerStream.EndRead(headerReadResult);
#else
int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, _headerByte, _headerByte.Length);
#endif
if (!ProcessHeaderBytesRead(bytesRead))
{
return;
}
}
catch (ArgumentException)
{
// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync: {0}", e);
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in {0}: {1}", nameof(RunPacketReadLoopAsync), e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
Expand All @@ -673,15 +687,31 @@ public async Task RunPacketReadLoopAsync()

try
{
#if FEATURE_APM
MSBuildEventSource.Log.PacketReadSize(packetLength);
int bytesRead;
try
{
bytesRead = _clientToServerStream.Read(packetData, 0, packetLength);
}
catch (ArgumentException)
{
// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}
#else
int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, packetData, packetLength);
#endif
if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync (Reading): {0}", e);
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in {0} (Reading): {1}", nameof(RunPacketReadLoopAsync), e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
Expand All @@ -698,9 +728,11 @@ public async Task RunPacketReadLoopAsync()
Close();
return;
}
#if FEATURE_APM
BeginAsyncPacketRead();
#endif
}
}
#endif

/// <summary>
/// Sends the specified packet to this node asynchronously.
Expand Down Expand Up @@ -910,53 +942,6 @@ private bool ProcessHeaderBytesRead(int bytesRead)
return true;
}

#if FEATURE_APM
/// <summary>
/// Callback invoked by the completion of a read of a header byte on one of the named pipes.
/// </summary>
private void HeaderReadComplete(IAsyncResult result)
{
int bytesRead;
try
{
try
{
bytesRead = _clientToServerStream.EndRead(result);
}

// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}

if (!ProcessHeaderBytesRead(bytesRead))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in HeaderReadComplete: {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}

int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(_headerByte, 1, 4));
MSBuildEventSource.Log.PacketReadSize(packetLength);

// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
#endif

private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketType packetType)
{
if (bytesRead != packetLength)
Expand Down Expand Up @@ -990,64 +975,6 @@ private bool ReadAndRoutePacket(NodePacketType packetType, byte[] packetData, in
}
return true;
}

#if FEATURE_APM
/// <summary>
/// Method called when the body of a packet has been read.
/// </summary>
private void BodyReadComplete(IAsyncResult result)
{
NodePacketType packetType = (NodePacketType)_headerByte[0];
var state = (Tuple<byte[], int>)result.AsyncState;
byte[] packetData = state.Item1;
int packetLength = state.Item2;
int bytesRead;

try
{
try
{
bytesRead = _clientToServerStream.EndRead(result);
}

// Workaround for CLR stress bug; it sporadically calls us twice on the same async
// result, and EndRead will throw on the second one. Pretend the second one never happened.
catch (ArgumentException)
{
CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring");
return;
}

if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType))
{
return;
}
}
catch (IOException e)
{
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in BodyReadComplete (Reading): {0}", e);
_packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed));
Close();
return;
}

// Read and route the packet.
if (!ReadAndRoutePacket(packetType, packetData, packetLength))
{
return;
}

if (packetType != NodePacketType.NodeShutdown)
{
// Read the next packet.
BeginAsyncPacketRead();
}
else
{
Close();
}
}
#endif
}
}
}
14 changes: 12 additions & 2 deletions src/Shared/CommunicationsUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,23 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce
}
#nullable disable

#if !FEATURE_APM
#if NET
/// <summary>
/// By signalling an external reset event, this allows allocation-free use of WaitHandle.WaitAny() in non-async/await contexts.
/// </summary>
internal static async ValueTask<int> ReadAsync(Stream stream, byte[] buffer, int bytesToRead, AutoResetEvent autoResetEvent)
{
int bytesRead = await ReadAsync(stream, buffer, bytesToRead).ConfigureAwait(false);
_ = autoResetEvent.Set();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This serves the same purpose as ((IAsyncResult)readTask).AsyncWaitHandle while shaving off at min ~200 bytes per packet, so hopefully this account for the difference.

return bytesRead;
}

internal static async ValueTask<int> ReadAsync(Stream stream, byte[] buffer, int bytesToRead)
{
int totalBytesRead = 0;
while (totalBytesRead < bytesToRead)
{
int bytesRead = await stream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead), CancellationToken.None);
int bytesRead = await stream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead)).ConfigureAwait(false);
if (bytesRead == 0)
{
return totalBytesRead;
Expand Down
Loading
Loading