Skip to content

Commit

Permalink
Merge pull request #848 from dotnet/v2.11
Browse files Browse the repository at this point in the history
Merge v2.11 into main
  • Loading branch information
AArnott authored Jan 30, 2025
2 parents 670a931 + 9eda341 commit 0465b32
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace Nerdbank.Streams
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Microsoft;
using Microsoft.VisualStudio.Threading;

Expand Down Expand Up @@ -1151,6 +1150,11 @@ private void OnChannelDisposed(Channel channel, Exception? exception = null)
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelDisposed, "Local channel {0} \"{1}\" stream disposed.", channel.QualifiedId, channel.Name);
}

if (exception is not null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Error))
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelDisposed, "Local channel {0} \"{1}\" stream disposed with {2}: {3}", channel.QualifiedId, channel.Name, exception.GetType().Name, exception.Message);
}

this.SendFrame(header, payload, this.DisposalToken);
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,20 @@ export class ChannelClass extends Channel {
}

public onContent(buffer: Buffer | null) {
const priorReadableFlowing = this._duplex.readableFlowing

this._duplex.push(buffer)

// Large buffer pushes can switch a stream from flowing to non-flowing
// when it meets or exceeds the highWaterMark. We need to resume the stream
// in this case so that the user can continue to receive data.
if (priorReadableFlowing && this._duplex.readableFlowing === false) {
this._duplex.resume()
}

// We should find a way to detect when we *actually* share the received buffer with the Channel's user
// and only report consumption when they receive the buffer from us so that we effectively apply
// backpressure to the remote party based on our user's actual consumption rather than keep allocating memory.
// backpressure to the remote party based on our user's actual consumption rather than continually allocating memory.
if (this._multiplexingStream.backpressureSupportEnabled && buffer) {
this._multiplexingStream.localContentExamined(this, buffer.length)
}
Expand Down
8 changes: 4 additions & 4 deletions src/nerdbank-streams/src/Utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ export async function getBufferFrom(
throw new Error('Stream terminated before required bytes were read.')
}

// Returns what has been read so far
// Returns what has been read so far.
if (readBuffer === null) {
return null
}

// we need trim extra spaces
// We need to trim the trailing space.
return readBuffer.subarray(0, index)
}

Expand All @@ -116,11 +116,11 @@ export async function getBufferFrom(

if (readBuffer === null) {
if (availableSize === size || newBuffer.length < availableSize) {
// in the fast pass, we read the entire data once, and donot allocate an extra array.
// In the fast pass, we read the entire data once, and do not allocate an extra array.
return newBuffer
}

// if we read partial data, we need allocate a buffer to join all data together.
// If we read partial data, we need to allocate a buffer to join all data together.
readBuffer = Buffer.alloc(size)
}

Expand Down
53 changes: 53 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,59 @@ import { Channel } from '../Channel'
import CancellationToken from 'cancellationtoken'
import * as assert from 'assert'
import { nextTick } from 'process'
import { Duplex } from 'stream'

it('highWatermark threshold does not clog', async () => {
// Brokered service
let bytesToReceive = 0
let receivedAllBytes = new Deferred()
function receiver(pipe: Duplex) {
let lengths: number[] = []
pipe.on('data', (data: Buffer) => {
lengths.push(data.length)

bytesToReceive -= data.length
// console.log(`recv ${data.length}. ${bytesToReceive} remaining`)
if (bytesToReceive <= 0) {
receivedAllBytes.resolve(undefined)
}
})
}

// IServiceBroker
const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair()
receiver(localServicePipe)

// MultiplexingStreamServiceBroker
const simulatedMxStream = FullDuplexStream.CreatePair()
const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)])
const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')])
servicePipe.pipe(local.stream)
local.stream.pipe(servicePipe)

global.test_servicePipe = servicePipe
global.test_d = local.stream
global.test_localServicePipe = localServicePipe

// brokered service client
function writeHelper(buffer: Buffer): boolean {
bytesToReceive += buffer.length
const result = remote.stream.write(buffer)
// console.log('written', buffer.length, result)
return result
}
for (let i = 15; i < 20; i++) {
const buffer = Buffer.alloc(i * 1024)
writeHelper(buffer)
await nextTickAsync()
writeHelper(Buffer.alloc(10))
await nextTickAsync()
}

if (bytesToReceive > 0) {
await receivedAllBytes.promise
}
})
;[1, 2, 3].forEach(protocolMajorVersion => {
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
let mx1: MultiplexingStream
Expand Down
11 changes: 4 additions & 7 deletions test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
// Copyright (c) Andrew Arnott. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft;
using Microsoft.VisualStudio.Threading;
using Nerdbank.Streams;
Expand Down Expand Up @@ -567,6 +560,10 @@ public async Task ReadByte()
byte[]? buffer = new byte[] { 5 };
await a.WriteAsync(buffer, 0, buffer.Length, this.TimeoutToken).WithCancellation(this.TimeoutToken);
await a.FlushAsync(this.TimeoutToken).WithCancellation(this.TimeoutToken);

// Avoid a random hang by yielding before engaging in sync I/O because we may be on an inline task completion stack
await Task.Yield();

Assert.Equal(5, b.ReadByte());
}

Expand Down

0 comments on commit 0465b32

Please sign in to comment.