Skip to content

Commit

Permalink
No-queue TCP sockets
Browse files Browse the repository at this point in the history
Same general advantages as no-queue UDP
  • Loading branch information
Turnerj committed Mar 1, 2022
1 parent 13da288 commit 4882324
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 39 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ A good DNS client implementation will pool TCP sockets to avoid needing to negot

| Method | Mean | Error | StdDev | Op/s | Ratio | RatioSD | Gen 0 | Allocated |
|------------------ |----------:|---------:|---------:|---------:|------:|--------:|-------:|----------:|
| DinoDNS | 93.58 us | 1.793 us | 1.678 us | 10,685.9 | 1.00 | 0.00 | 0.4883 | 1,900 B |
| MichaCo_DnsClient | 114.45 us | 2.215 us | 2.551 us | 8,737.3 | 1.23 | 0.03 | 1.4648 | 5,067 B |
| DinoDNS | 94.99 us | 1.018 us | 0.902 us | 10,527.1 | 1.00 | 0.00 | 0.4883 | 1,892 B |
| MichaCo_DnsClient | 112.52 us | 2.246 us | 3.562 us | 8,887.1 | 1.21 | 0.05 | 1.4648 | 5,064 B |

<small>
⚠ Note: While Kapetan's DNS client does support TCP, it can't be benchmarked due to port exhaustion issues it has.
Expand Down
81 changes: 44 additions & 37 deletions src/TurnerSoftware.DinoDNS/Connection/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,38 @@ public class TcpConnectionClient : IDnsConnectionClient
{
public static readonly TcpConnectionClient Instance = new();

private readonly ConcurrentDictionary<IPEndPoint, ConcurrentQueue<Socket>> Sockets = new();
private readonly ConcurrentDictionary<IPEndPoint, Socket> Sockets = new();
private readonly object NewSocketLock = new();

private Socket GetSocket(IPEndPoint endPoint)
{
if (Sockets.TryGetValue(endPoint, out var socket))
{
if (socket.Connected)
{
return socket;
}

//TODO: Investigate whether we can just re-connect to existing sockets that are closed
SocketMessageOrderer.ClearSocket(socket);
OnSocketEnd(socket);
socket.Dispose();
}

//We can't rely on GetOrAdd-type methods on ConcurrentDictionary as the factory can be called multiple times.
//Instead, we rely on TryGetValue for the hot path (existing socket) otherwise use a typical lock.
lock (NewSocketLock)
{
if (!Sockets.TryGetValue(endPoint, out socket))
{
socket = CreateSocket(endPoint);

Sockets.TryAdd(endPoint, socket);
}

return socket;
}
}

protected virtual Socket CreateSocket(IPEndPoint endPoint) => new(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
Expand All @@ -26,51 +57,27 @@ protected virtual void OnSocketEnd(Socket socket) { }

public async ValueTask<int> SendMessageAsync(IPEndPoint endPoint, ReadOnlyMemory<byte> sourceBuffer, Memory<byte> destinationBuffer, CancellationToken cancellationToken)
{
var socketQueue = Sockets.GetOrAdd(endPoint, static _ => new());

if (socketQueue.TryDequeue(out var socket))
{
if (!socket.Connected)
{
//TODO: Investigate whether we can just re-connect to existing sockets that are closed
SocketMessageOrderer.ClearSocket(socket);
OnSocketEnd(socket);
socket.Dispose();
socket = CreateSocket(endPoint);
}
}
else if (socket is null)
{
socket = CreateSocket(endPoint);
}

var socket = GetSocket(endPoint);
if (!socket.Connected)
{
await socket.ConnectAsync(endPoint, cancellationToken).ConfigureAwait(false);
await OnConnectAsync(socket, endPoint, cancellationToken).ConfigureAwait(false);
}

try
{
var messageLength = await PerformQueryAsync(socket, sourceBuffer, destinationBuffer, cancellationToken).ConfigureAwait(false);

if (SocketMessageOrderer.CheckMessageId(sourceBuffer, destinationBuffer) == MessageIdResult.Mixed)
{
messageLength = SocketMessageOrderer.Exchange(
socket,
sourceBuffer,
destinationBuffer,
messageLength,
cancellationToken
);
}
var messageLength = await PerformQueryAsync(socket, sourceBuffer, destinationBuffer, cancellationToken).ConfigureAwait(false);

return messageLength;
}
finally
if (SocketMessageOrderer.CheckMessageId(sourceBuffer, destinationBuffer) == MessageIdResult.Mixed)
{
socketQueue.Enqueue(socket);
messageLength = SocketMessageOrderer.Exchange(
socket,
sourceBuffer,
destinationBuffer,
messageLength,
cancellationToken
);
}

return messageLength;
}

protected virtual async ValueTask<int> PerformQueryAsync(Socket socket, ReadOnlyMemory<byte> sourceBuffer, Memory<byte> destinationBuffer, CancellationToken cancellationToken)
Expand Down

0 comments on commit 4882324

Please sign in to comment.