Skip to content

Commit

Permalink
HTTP reverse proxy, remove duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
veniware committed Aug 2, 2024
1 parent 22c0f75 commit 9f8c2e1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 69 deletions.
63 changes: 2 additions & 61 deletions Protest/Proxy/TrafficCountingHttpMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public async Task InvokeAsync(HttpContext context) {
Stream requestStream = context.Request.Body;
Stream responseStream = context.Response.Body;

using StreamWrapper requestWrapper = new StreamWrapper(requestStream, key, bytesTx, bytesRx);
using StreamWrapper responseWrapper = new StreamWrapper(responseStream, key, bytesTx, bytesRx);
using TrafficCountingStreamWrapper requestWrapper = new TrafficCountingStreamWrapper(requestStream, key, bytesTx, bytesRx);
using TrafficCountingStreamWrapper responseWrapper = new TrafficCountingStreamWrapper(responseStream, key, bytesTx, bytesRx);

context.Request.Body = requestWrapper;
context.Response.Body = responseWrapper;
Expand All @@ -39,62 +39,3 @@ public async Task InvokeAsync(HttpContext context) {
}
}
}

file sealed class StreamWrapper : Stream {
private readonly Stream baseStream;
private readonly uint key;
private readonly ConcurrentDictionary<uint, long> bytesRx;
private readonly ConcurrentDictionary<uint, long> bytesTx;

public StreamWrapper(Stream stream, uint clientIp, ConcurrentDictionary<uint, long> bytesRx, ConcurrentDictionary<uint, long> bytesTx) {
this.baseStream = stream;
this.key = clientIp;
this.bytesRx = bytesRx;
this.bytesTx = bytesTx;
}

public override bool CanRead => baseStream.CanRead;
public override bool CanSeek => baseStream.CanSeek;
public override bool CanWrite => baseStream.CanWrite;
public override long Length => baseStream.Length;

public override long Position {
get => baseStream.Position;
set => baseStream.Position = value;
}

public override void Flush() => baseStream.Flush();

public override long Seek(long offset, SeekOrigin origin) => baseStream.Seek(offset, origin);

public override void SetLength(long value) => baseStream.SetLength(value);

public override int Read(byte[] buffer, int offset, int count) {
int length = baseStream.Read(buffer, offset, count);
bytesRx.AddOrUpdate(key, length, (_, old) => old + length);
return length;
}

public override void Write(byte[] buffer, int offset, int count) {
baseStream.Write(buffer, offset, count);
bytesTx.AddOrUpdate(key, count, (_, old) => old + count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
int length = await baseStream.ReadAsync(buffer, offset, count, cancellationToken);
bytesRx.AddOrUpdate(key, length, (_, old) => old + length);
return length;
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
await baseStream.WriteAsync(buffer, offset, count, cancellationToken);
bytesTx.AddOrUpdate(key, count, (_, old) => old + count);
}

protected override void Dispose(bool disposing) {
if (disposing) {
baseStream.Dispose();
}
base.Dispose(disposing);
}
}
16 changes: 8 additions & 8 deletions Protest/Proxy/TrafficCountingStreamWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ namespace Protest.Proxy;
internal sealed class TrafficCountingStreamWrapper : Stream {
private readonly Stream baseStream;
private readonly uint key;
public ConcurrentDictionary<uint, long> bytesRx, bytesTx;
private ConcurrentDictionary<uint, long> bytesRx, bytesTx;

public TrafficCountingStreamWrapper(Stream stream, uint clientIp, ConcurrentDictionary<uint, long> bytesRx, ConcurrentDictionary<uint, long> bytesTx) {
this.baseStream = stream;
this.key = clientIp;
this.bytesRx = bytesRx;
this.bytesTx = bytesTx;
}

public override bool CanRead => baseStream.CanRead;
public override bool CanSeek => baseStream.CanSeek;
Expand All @@ -22,13 +29,6 @@ public override long Position {
set => baseStream.Position = value;
}

public TrafficCountingStreamWrapper(NetworkStream stream, uint clientIp, ConcurrentDictionary<uint, long> bytesRx, ConcurrentDictionary<uint, long> bytesTx) {
this.baseStream = stream;
this.key = clientIp;
this.bytesRx = bytesRx;
this.bytesTx = bytesTx;
}

public override void Flush() => baseStream.Flush();

public override long Seek(long offset, SeekOrigin origin) => baseStream.Seek(offset, origin);
Expand Down

0 comments on commit 9f8c2e1

Please sign in to comment.