diff --git a/src/LocalSqsSnsMessaging/SnsClient/InMemorySnsClient.cs b/src/LocalSqsSnsMessaging/SnsClient/InMemorySnsClient.cs index a9b623e..3b6da6d 100644 --- a/src/LocalSqsSnsMessaging/SnsClient/InMemorySnsClient.cs +++ b/src/LocalSqsSnsMessaging/SnsClient/InMemorySnsClient.cs @@ -20,6 +20,8 @@ public sealed partial class InMemorySnsClient : IAmazonSimpleNotificationService { private readonly InMemoryAwsBus _bus; private readonly Lazy _paginators; + + private const int MaxMessageSize = 262144; internal InMemorySnsClient(InMemoryAwsBus bus) { @@ -390,17 +392,63 @@ public Task PublishAsync(string topicArn, string message, strin public Task PublishAsync(PublishRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); + + var messageSize = CalculateMessageSize(request.Message, request.MessageAttributes); + if (messageSize > MaxMessageSize) + { + throw new InvalidParameterException($"Message size has exceeded the limit of {MaxMessageSize} bytes."); + } var topic = GetTopicByArn(request.TopicArn); var result = topic.PublishAction.Execute(request); return Task.FromResult(result); } + + private static int CalculateMessageSize(string message, Dictionary? messageAttributes) + { + var totalSize = 0; + + // Add message body size + totalSize += Encoding.UTF8.GetByteCount(message); + + // Add message attributes size + if (messageAttributes != null) + { + foreach (var (key, attributeValue) in messageAttributes) + { + // Add attribute name size + totalSize += Encoding.UTF8.GetByteCount(key); + + // Add data type size (including any custom type prefix) + totalSize += Encoding.UTF8.GetByteCount(attributeValue.DataType); + + // Add value size based on the type + if (attributeValue.BinaryValue != null) + { + totalSize += (int)attributeValue.BinaryValue.Length; + } + else if (attributeValue.StringValue != null) + { + totalSize += Encoding.UTF8.GetByteCount(attributeValue.StringValue); + } + } + } + + return totalSize; + } + public Task PublishBatchAsync(PublishBatchRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); + var totalSize = request.PublishBatchRequestEntries + .Sum(requestEntry => CalculateMessageSize(requestEntry.Message, requestEntry.MessageAttributes)); + if (totalSize > MaxMessageSize) + { + throw new InvalidParameterException($"Message size has exceeded the limit of {MaxMessageSize} bytes."); + } var topic = GetTopicByArn(request.TopicArn); var result = topic.PublishAction.ExecuteBatch(request); diff --git a/src/LocalSqsSnsMessaging/SqsClient/InMemorySqsClient.cs b/src/LocalSqsSnsMessaging/SqsClient/InMemorySqsClient.cs index a931e50..67499a9 100644 --- a/src/LocalSqsSnsMessaging/SqsClient/InMemorySqsClient.cs +++ b/src/LocalSqsSnsMessaging/SqsClient/InMemorySqsClient.cs @@ -27,6 +27,7 @@ public sealed partial class InMemorySqsClient : IAmazonSQS private readonly InMemoryAwsBus _bus; private readonly Lazy _paginators; + private const int MaxMessageSize = 262144; private static readonly string[] InternalAttributes = [ QueueAttributeName.ApproximateNumberOfMessages, QueueAttributeName.ApproximateNumberOfMessagesDelayed, @@ -470,6 +471,13 @@ public Task SendMessageAsync(SendMessageRequest request, } var message = CreateMessage(request.MessageBody, request.MessageAttributes, request.MessageSystemAttributes); + var totalSize = CalculateMessageSize(message.Body, message.MessageAttributes); + + if (totalSize > MaxMessageSize) + { + throw new AmazonSQSException( + $"Message size ({totalSize} bytes) exceeds the maximum allowed size ({MaxMessageSize} bytes)"); + } if (queue.IsFifo) { @@ -523,6 +531,39 @@ public Task SendMessageAsync(SendMessageRequest request, }.SetCommonProperties()); } + private static int CalculateMessageSize(string messageBody, Dictionary? messageAttributes) + { + var totalSize = 0; + + // Add message body size + totalSize += Encoding.UTF8.GetByteCount(messageBody); + + // Add message attributes size + if (messageAttributes != null) + { + foreach (var (key, attributeValue) in messageAttributes) + { + // Add attribute name size + totalSize += Encoding.UTF8.GetByteCount(key); + + // Add data type size (including any custom type prefix) + totalSize += Encoding.UTF8.GetByteCount(attributeValue.DataType); + + // Add value size based on the type + if (attributeValue.BinaryValue != null) + { + totalSize += (int)attributeValue.BinaryValue.Length; + } + else if (attributeValue.StringValue != null) + { + totalSize += Encoding.UTF8.GetByteCount(attributeValue.StringValue); + } + } + } + + return totalSize; + } + private static void EnqueueFifoMessage(SqsQueueResource queue, string messageGroupId, Message message) { queue.MessageGroups.AddOrUpdate(messageGroupId, @@ -1075,10 +1116,18 @@ public Task SendMessageBatchAsync(SendMessageBatchRequ Failed = [] }; + var totalSize = request.Entries.Sum(e => CalculateMessageSize(e.MessageBody, e.MessageAttributes)); + + if (totalSize > MaxMessageSize) + { + throw new BatchRequestTooLongException( + $"Batch size ({totalSize} bytes) exceeds the maximum allowed size ({MaxMessageSize} bytes)"); + } + foreach (var entry in request.Entries) { var message = CreateMessage(entry.MessageBody, entry.MessageAttributes, entry.MessageSystemAttributes); - + if (entry.DelaySeconds > 0) { message.Attributes["DelaySeconds"] = entry.DelaySeconds.ToString(NumberFormatInfo.InvariantInfo); @@ -1096,7 +1145,7 @@ public Task SendMessageBatchAsync(SendMessageBatchRequ MD5OfMessageBody = message.MD5OfBody }); } - + return Task.FromResult(response.SetCommonProperties()); } diff --git a/tests/LocalSqsSnsMessaging.Tests.Shared/SnsPublishAsyncTests.cs b/tests/LocalSqsSnsMessaging.Tests.Shared/SnsPublishAsyncTests.cs index b956b0b..fe34a5e 100644 --- a/tests/LocalSqsSnsMessaging.Tests.Shared/SnsPublishAsyncTests.cs +++ b/tests/LocalSqsSnsMessaging.Tests.Shared/SnsPublishAsyncTests.cs @@ -5,7 +5,7 @@ using Amazon.SQS; using Amazon.SQS.Model; using FluentAssertions; -using Xunit; + using MessageAttributeValue = Amazon.SimpleNotificationService.Model.MessageAttributeValue; namespace LocalSqsSnsMessaging.Tests; @@ -22,7 +22,7 @@ protected SnsPublishAsyncTests(ITestOutputHelper testOutputHelper) _testOutputHelper = testOutputHelper; } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_WithRawDelivery_ShouldDeliverMessageDirectly() { // Arrange @@ -47,7 +47,8 @@ public async Task PublishAsync_WithRawDelivery_ShouldDeliverMessageDirectly() // Assert response.MessageId.Should().NotBeNullOrEmpty(); - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -63,8 +64,8 @@ public async Task PublishAsync_WithRawDelivery_ShouldDeliverMessageDirectly() sqsMessage.MessageAttributes.Should().ContainKey("TestAttribute") .WhoseValue.StringValue.Should().Be("TestValue"); } - - [Fact] + + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_WithRawDelivery_ShouldCalculateMD5OfBody() { // Arrange @@ -91,7 +92,8 @@ public async Task PublishAsync_WithRawDelivery_ShouldCalculateMD5OfBody() // Assert response.MessageId.Should().NotBeNullOrEmpty(); - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -106,7 +108,7 @@ public async Task PublishAsync_WithRawDelivery_ShouldCalculateMD5OfBody() sqsMessage.MD5OfBody.Should().Be(expectedHash); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_WithNonRawDelivery_ShouldWrapMessageInSNSFormat() { // Arrange @@ -132,7 +134,8 @@ public async Task PublishAsync_WithNonRawDelivery_ShouldWrapMessageInSNSFormat() // Assert response.MessageId.Should().NotBeNullOrEmpty(); - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = "MyQueue" }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -174,7 +177,8 @@ public async Task PublishAsync_WithNonExistentTopic_ShouldThrowException() }; // Act & Assert - await Assert.ThrowsAsync(() => Sns.PublishAsync(request, TestContext.Current.CancellationToken)); + await Assert.ThrowsAsync(() => + Sns.PublishAsync(request, TestContext.Current.CancellationToken)); } private async Task SetupTopicAndQueue(string topicArn, string queueArn, bool isRawDelivery) @@ -207,7 +211,8 @@ public async Task SetTopicAttributes_ShouldSetAndRetrieveAttributes() var topicArn = $"arn:aws:sns:us-east-1:{AccountId}:{topicName}"; // Create the topic - var createTopicResponse = await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }, TestContext.Current.CancellationToken); + var createTopicResponse = await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }, + TestContext.Current.CancellationToken); createTopicResponse.TopicArn.Should().Be(topicArn); // Act - Set topic attributes @@ -289,7 +294,8 @@ public async Task GetSubscriptionAttributes_ShouldRetrieveCorrectAttributes() // Create topic and queue await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }, TestContext.Current.CancellationToken); - await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); // Subscribe queue to topic var subscribeResponse = await Sns.SubscribeAsync(new SubscribeRequest @@ -354,7 +360,8 @@ public async Task SetSubscriptionAttributes_ShouldUpdateAttributes() var queueArn = $"arn:aws:sqs:us-east-1:{AccountId}:{queueName}"; await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }, TestContext.Current.CancellationToken); - await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var subscribeResponse = await Sns.SubscribeAsync(new SubscribeRequest { @@ -405,7 +412,8 @@ public async Task ListSubscriptionsAsync_ShouldReturnAllSubscriptions() await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topic1Name }, TestContext.Current.CancellationToken); await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topic2Name }, TestContext.Current.CancellationToken); - await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var sub1 = await Sns.SubscribeAsync(new SubscribeRequest { @@ -447,7 +455,8 @@ public async Task ListSubscriptionsByTopicAsync_ShouldReturnSubscriptionsForSpec await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topic1Name }, TestContext.Current.CancellationToken); await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topic2Name }, TestContext.Current.CancellationToken); - await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var sub1 = await Sns.SubscribeAsync(new SubscribeRequest { @@ -492,7 +501,8 @@ public async Task ListSubscriptionsAsync_WithMoreThan100Subscriptions_ShouldRetu { var queueName = $"{queueNamePrefix}{i}"; var queueArn = $"arn:aws:sqs:us-east-1:{AccountId}:{queueName}"; - await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); await Sns.SubscribeAsync(new SubscribeRequest { TopicArn = topicArn, @@ -520,7 +530,7 @@ await Sns.Paginators protected abstract Task WaitAsync(TimeSpan delay); // FIFO scenarios - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_ToFifoTopic_ShouldDeliverMessageToFifoQueue_InOrder() { // Arrange @@ -567,7 +577,8 @@ public async Task PublishAsync_ToFifoTopic_ShouldDeliverMessageToFifoQueue_InOrd } // Assert - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -595,7 +606,8 @@ public async Task PublishAsync_ToFifoTopic_ShouldDeliverMessageToFifoQueue_InOrd } // Check the order based on SequenceNumber - var orderedMessages = receivedMessages.OrderBy(m => Int128.Parse(m.Attributes["SequenceNumber"], NumberFormatInfo.InvariantInfo)).ToList(); + var orderedMessages = receivedMessages + .OrderBy(m => Int128.Parse(m.Attributes["SequenceNumber"], NumberFormatInfo.InvariantInfo)).ToList(); orderedMessages[0].Body.Should().Be("First message", "it was published first"); orderedMessages[1].Body.Should().Be("Second message", "it was published second"); @@ -608,7 +620,8 @@ public async Task PublishAsync_ToFifoTopic_ShouldDeliverMessageToFifoQueue_InOrd receivedMessages.Select(m => m.Attributes["MessageDeduplicationId"]).Distinct().Should().HaveCount(3); // Check if the order matches the publish order - if (receivedMessages[1].Body != "Second message" || !string.Equals(receivedMessages[2].Body, "Third message", StringComparison.Ordinal)) + if (receivedMessages[1].Body != "Second message" || + !string.Equals(receivedMessages[2].Body, "Third message", StringComparison.Ordinal)) { _testOutputHelper.WriteLine("Warning: Messages were not received in the exact order they were published."); _testOutputHelper.WriteLine("This might be due to how FIFO queues handle nearly simultaneous publishes."); @@ -616,7 +629,7 @@ public async Task PublishAsync_ToFifoTopic_ShouldDeliverMessageToFifoQueue_InOrd } } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_ToFifoTopic_ShouldPreventDuplicates() { // Arrange @@ -642,7 +655,8 @@ public async Task PublishAsync_ToFifoTopic_ShouldPreventDuplicates() await Sns.PublishAsync(message, TestContext.Current.CancellationToken); // Attempt to send duplicate // Assert - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -659,7 +673,7 @@ public async Task PublishAsync_ToFifoTopic_ShouldPreventDuplicates() receivedMessages[0].Attributes["MessageDeduplicationId"].Should().Be(deduplicationId); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task PublishAsync_ToFifoTopic_WithMultipleMessageGroups_ShouldMaintainOrderWithinGroups() { // Arrange @@ -710,7 +724,8 @@ public async Task PublishAsync_ToFifoTopic_WithMultipleMessageGroups_ShouldMaint } // Assert - var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, TestContext.Current.CancellationToken); + var queueUrlResponse = await Sqs.GetQueueUrlAsync(new GetQueueUrlRequest { QueueName = queueName }, + TestContext.Current.CancellationToken); var queueUrl = queueUrlResponse.QueueUrl; await WaitAsync(TimeSpan.FromMilliseconds(100)); @@ -774,4 +789,111 @@ await Sns.SubscribeAsync(new SubscribeRequest } }); } + + [Fact] + public async Task PublishAsync_MessageExceedsMaximumSize_ThrowsInvalidParameterException() + { + // Arrange + var topicName = "TestTopic"; + var topicArn = $"arn:aws:sns:us-east-1:{AccountId}:{topicName}"; + await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }); + + var request = new PublishRequest + { + TopicArn = topicArn, + Message = new string('x', 262145) // Exceeds 256KB + }; + + // Act & Assert + await Assert.ThrowsAsync(() => + Sns.PublishAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task PublishAsync_MessageAttributesExceedMaximumSize_ThrowsInvalidParameterException() + { + // Arrange + var topicName = "TestTopic"; + var topicArn = $"arn:aws:sns:us-east-1:{AccountId}:{topicName}"; + await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }); + + var request = new PublishRequest + { + TopicArn = topicArn, + Message = new string('x', 200000), + MessageAttributes = new Dictionary + { + [new string('a', 1000)] = new() // Long attribute name + { + DataType = "String", + StringValue = new string('y', 61145) // Push total over 256KB + } + } + }; + + // Act & Assert + await Assert.ThrowsAsync(() => + Sns.PublishAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task PublishAsync_ExactlyMaximumSize_Succeeds() + { + // Arrange + var topicName = "TestTopic"; + var queueName = "TestQueue"; + var topicArn = $"arn:aws:sns:us-east-1:{AccountId}:{topicName}"; + var queueArn = $"arn:aws:sqs:us-east-1:{AccountId}:{queueName}"; + + await SetupTopicAndQueue(topicArn, queueArn, isRawDelivery: true); + + var request = new PublishRequest + { + TopicArn = topicArn, + Message = new string('x', 262144) // Exactly 256KB + }; + + // Act + var response = await Sns.PublishAsync(request, TestContext.Current.CancellationToken); + + // Assert + response.MessageId.Should().NotBeNullOrEmpty(); + } + + [Fact] + public async Task PublishAsync_WithSubjectAndMessageAttributes_ExceedsLimit_ThrowsInvalidParameterException() + { + // Arrange + var topicName = "TestTopic"; + var topicArn = $"arn:aws:sns:us-east-1:{AccountId}:{topicName}"; + await Sns.CreateTopicAsync(new CreateTopicRequest { Name = topicName }); + + var messageBody = new string('x', 200_000); + + var request = new PublishRequest + { + TopicArn = topicArn, + Subject = new string('s', 50), + Message = messageBody, + MessageAttributes = new Dictionary + { + // Long attribute name (contributes to size) + [new string('a', 1000)] = new MessageAttributeValue + { + DataType = "String", // 6 bytes + StringValue = new string('y', 62000) + }, + // Another attribute to push us over the limit + [new string('b', 100)] = new MessageAttributeValue + { + DataType = "Number", // 6 bytes + StringValue = "123" + } + } + }; + + // Act & Assert + await Assert.ThrowsAsync(() => + Sns.PublishAsync(request, TestContext.Current.CancellationToken)); + } } \ No newline at end of file diff --git a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsChangeMessageVisibilityAsyncTests.cs b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsChangeMessageVisibilityAsyncTests.cs index cd6659a..1a88005 100644 --- a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsChangeMessageVisibilityAsyncTests.cs +++ b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsChangeMessageVisibilityAsyncTests.cs @@ -20,7 +20,7 @@ await Sqs.SendMessageAsync(new SendMessageRequest }); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ChangeMessageVisibilityAsync_ValidRequest_ChangesVisibilityTimeout() { await SetupQueueAndMessage(); @@ -86,7 +86,7 @@ await Assert.ThrowsAsync(() => }, TestContext.Current.CancellationToken)); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ChangeMessageVisibilityAsync_MessageNotInFlight_ThrowsException() { await SetupQueueAndMessage(); @@ -111,7 +111,7 @@ await Sqs.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest Assert.Single(secondReceiveResult.Messages); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ChangeMessageVisibilityAsync_ChangeMultipleTimes_LastChangeApplies() { await SetupQueueAndMessage(); diff --git a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsReceiveMessageAsyncTests.cs b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsReceiveMessageAsyncTests.cs index 7c6bc30..5c3800f 100644 --- a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsReceiveMessageAsyncTests.cs +++ b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsReceiveMessageAsyncTests.cs @@ -8,20 +8,21 @@ public abstract class SqsReceiveMessageAsyncTests { protected IAmazonSQS Sqs = null!; protected string AccountId = null!; - + [Fact] public async Task ReceiveMessageAsync_QueueNotFound_ThrowsQueueDoesNotExistException() { var request = new ReceiveMessageRequest { QueueUrl = "nonexistent-queue" }; - await Assert.ThrowsAsync(() => + await Assert.ThrowsAsync(() => Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken)); } [Fact] public async Task ReceiveMessageAsync_NoMessages_ReturnsEmptyList() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 0 }; @@ -33,7 +34,8 @@ public async Task ReceiveMessageAsync_NoMessages_ReturnsEmptyList() [Fact] public async Task ReceiveMessageAsync_MessagesAvailable_ReturnsMessages() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, MaxNumberOfMessages = 2 }; @@ -55,32 +57,34 @@ await Sqs.SendMessageAsync(new SendMessageRequest Assert.Equal("Goodbye, world!", result.Messages[1].Body); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_WaitsForMessages_ReturnsMessagesWhenAvailable() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); - var queueUrl = createQueueResponse.QueueUrl; + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); + var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 5 }; var task = Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); - + await AdvanceTime(TimeSpan.FromSeconds(3)); await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Hello, world!" }, TestContext.Current.CancellationToken); - + var result = await task; - + var receivedMessage = Assert.Single(result.Messages); Assert.Equal("Hello, world!", receivedMessage.Body); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_Timeout_ReturnsEmptyList() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 5 }; @@ -96,7 +100,8 @@ public async Task ReceiveMessageAsync_Timeout_ReturnsEmptyList() [Fact] public async Task ReceiveMessageAsync_CancellationRequested_ReturnsEmptyList() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 10 }; @@ -107,11 +112,12 @@ await Assert.ThrowsAnyAsync(async () => await Sqs.ReceiveMessageAsync(request, cts.Token) ); } - - [Fact] + + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_RespectVisibilityTimeout() { - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 0, VisibilityTimeout = 30 }; @@ -146,12 +152,13 @@ await Sqs.SendMessageAsync(new SendMessageRequest var forthReceivedMessage = Assert.Single(result4.Messages); Assert.Equal("Hello, world!", forthReceivedMessage.Body); } - - [Fact] + + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_DelayedMessageBecomesVisible() { //SynchronizationContext.SetSynchronizationContext(null); - var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken); + var createQueueResponse = await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken); var queueUrl = createQueueResponse.QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 0, VisibilityTimeout = 30 }; @@ -183,24 +190,28 @@ await Sqs.SendMessageAsync(new SendMessageRequest Assert.Equal("Hello, world!", receivedMessage.Body); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_MultipleMessagesWithDifferentDelays() { //SynchronizationContext.SetSynchronizationContext(null); - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, WaitTimeSeconds = 0, VisibilityTimeout = 30, MaxNumberOfMessages = 10 }; // Send messages with different delays await Sqs.SendMessageAsync(new SendMessageRequest - { QueueUrl = queueUrl, MessageBody = "Message 1", DelaySeconds = 5 }, TestContext.Current.CancellationToken); + { QueueUrl = queueUrl, MessageBody = "Message 1", DelaySeconds = 5 }, + TestContext.Current.CancellationToken); await Sqs.SendMessageAsync(new SendMessageRequest - { QueueUrl = queueUrl, MessageBody = "Message 2", DelaySeconds = 10 }, TestContext.Current.CancellationToken); + { QueueUrl = queueUrl, MessageBody = "Message 2", DelaySeconds = 10 }, + TestContext.Current.CancellationToken); await Sqs.SendMessageAsync(new SendMessageRequest - { QueueUrl = queueUrl, MessageBody = "Message 3", DelaySeconds = 15 }, TestContext.Current.CancellationToken); + { QueueUrl = queueUrl, MessageBody = "Message 3", DelaySeconds = 15 }, + TestContext.Current.CancellationToken); await AdvanceTime(TimeSpan.FromSeconds(1)); - + // Advance time and check messages await AdvanceTime(TimeSpan.FromSeconds(5)); var result1 = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); @@ -237,11 +248,12 @@ await Sqs.SendMessageAsync(new SendMessageRequest Assert.Single(result6.Messages); Assert.Equal("Message 3", result6.Messages[0].Body); } - - [Fact] + + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_ApproximateReceiveCount_IncreasesWithEachReceive() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, @@ -251,7 +263,8 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_IncreasesWithEachR }; // Send a message - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Test message" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Test message" }, + TestContext.Current.CancellationToken); // First receive var result1 = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); @@ -278,7 +291,8 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_IncreasesWithEachR [Fact] public async Task ReceiveMessageAsync_ApproximateReceiveCount_ResetAfterDelete() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, @@ -288,7 +302,8 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_ResetAfterDelete() }; // Send a message - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Test message" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Test message" }, + TestContext.Current.CancellationToken); // Receive and delete the message var result1 = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); @@ -297,7 +312,8 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_ResetAfterDelete() await Sqs.DeleteMessageAsync(queueUrl, message1.ReceiptHandle, TestContext.Current.CancellationToken); // Send another message - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "New test message" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "New test message" }, + TestContext.Current.CancellationToken); // Receive the new message var result2 = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); @@ -305,10 +321,11 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_ResetAfterDelete() Assert.Equal("1", message2.Attributes["ApproximateReceiveCount"]); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_ApproximateReceiveCount_MultipleMessages() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var request = new ReceiveMessageRequest { QueueUrl = queueUrl, @@ -319,8 +336,10 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_MultipleMessages() }; // Send multiple messages - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Message 1" }, TestContext.Current.CancellationToken); - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Message 2" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Message 1" }, + TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = queueUrl, MessageBody = "Message 2" }, + TestContext.Current.CancellationToken); // First receive var result1 = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); @@ -336,7 +355,8 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_MultipleMessages() Assert.All(result2.Messages, m => Assert.Equal("2", m.Attributes["ApproximateReceiveCount"])); // Delete one message - await Sqs.DeleteMessageAsync(queueUrl, result2.Messages[0].ReceiptHandle, TestContext.Current.CancellationToken); + await Sqs.DeleteMessageAsync(queueUrl, result2.Messages[0].ReceiptHandle, + TestContext.Current.CancellationToken); // Wait for visibility timeout to expire again await AdvanceTime(TimeSpan.FromSeconds(6)); @@ -346,15 +366,17 @@ public async Task ReceiveMessageAsync_ApproximateReceiveCount_MultipleMessages() var message3 = Assert.Single(result3.Messages); Assert.Equal("3", message3.Attributes["ApproximateReceiveCount"]); } - - [Fact] + + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_MessageMovedToErrorQueue_AfterMaxReceives() { // Create main queue and error queue - var mainQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "main-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - var errorQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "error-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var mainQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "main-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + var errorQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "error-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var errorQueueArn = $"arn:aws:sqs:us-east-1:{AccountId}:{errorQueueUrl.Split('/').Last()}"; - + // Set up redrive policy for the main queue await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest { @@ -374,14 +396,16 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest }; // Send a message to the main queue - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Test message" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Test message" }, + TestContext.Current.CancellationToken); // Receive the message three times for (int i = 0; i < 3; i++) { var result = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); Assert.Single(result.Messages); - Assert.Equal((i + 1).ToString(NumberFormatInfo.InvariantInfo), result.Messages[0].Attributes["ApproximateReceiveCount"]); + Assert.Equal((i + 1).ToString(NumberFormatInfo.InvariantInfo), + result.Messages[0].Attributes["ApproximateReceiveCount"]); await AdvanceTime(TimeSpan.FromSeconds(6)); // Wait for visibility timeout to expire } @@ -390,19 +414,22 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest Assert.Empty(emptyResult.Messages); // Check the error queue - the message should be there - var errorQueueResult = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = errorQueueUrl }, TestContext.Current.CancellationToken); + var errorQueueResult = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = errorQueueUrl }, + TestContext.Current.CancellationToken); var errorMessage = Assert.Single(errorQueueResult.Messages); Assert.Equal("Test message", errorMessage.Body); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ReceiveMessageAsync_MessageNotMovedToErrorQueue_IfDeletedBeforeMaxReceives() { // Create main queue and error queue - var mainQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "main-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - var errorQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "error-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var mainQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "main-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + var errorQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "error-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; var errorQueueArn = $"arn:aws:sqs:us-east-1:{AccountId}:{errorQueueUrl.Split('/').Last()}"; - + // Set up redrive policy for the main queue await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest { @@ -422,14 +449,16 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest }; // Send a message to the main queue - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Test message" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Test message" }, + TestContext.Current.CancellationToken); // Receive the message twice for (int i = 0; i < 2; i++) { var result = await Sqs.ReceiveMessageAsync(request, TestContext.Current.CancellationToken); Assert.Single(result.Messages); - Assert.Equal((i + 1).ToString(NumberFormatInfo.InvariantInfo), result.Messages[0].Attributes["ApproximateReceiveCount"]); + Assert.Equal((i + 1).ToString(NumberFormatInfo.InvariantInfo), + result.Messages[0].Attributes["ApproximateReceiveCount"]); await AdvanceTime(TimeSpan.FromSeconds(6)); // Wait for visibility timeout to expire } @@ -444,17 +473,29 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest Assert.Empty(emptyMainResult.Messages); // Check the error queue - should be empty - var errorQueueResult = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = errorQueueUrl }, TestContext.Current.CancellationToken); + var errorQueueResult = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = errorQueueUrl }, + TestContext.Current.CancellationToken); Assert.Empty(errorQueueResult.Messages); } - [Fact(Skip = "Fifo queues not supported yet")] public async Task ReceiveMessageAsync_ErrorQueueRespectsFifoOrder() { // Create main FIFO queue and error FIFO queue - var mainQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "main-queue.fifo", Attributes = new Dictionary { ["FifoQueue"] = "true" } }, TestContext.Current.CancellationToken)).QueueUrl; - var errorQueueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "error-queue.fifo", Attributes = new Dictionary { ["FifoQueue"] = "true" } }, TestContext.Current.CancellationToken)).QueueUrl; - + var mainQueueUrl = + (await Sqs.CreateQueueAsync( + new CreateQueueRequest + { + QueueName = "main-queue.fifo", + Attributes = new Dictionary { ["FifoQueue"] = "true" } + }, TestContext.Current.CancellationToken)).QueueUrl; + var errorQueueUrl = + (await Sqs.CreateQueueAsync( + new CreateQueueRequest + { + QueueName = "error-queue.fifo", + Attributes = new Dictionary { ["FifoQueue"] = "true" } + }, TestContext.Current.CancellationToken)).QueueUrl; + // Set up redrive policy for the main queue await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest { @@ -474,8 +515,18 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest }; // Send messages to the main queue - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Message 1", MessageGroupId = "group1", MessageDeduplicationId = "dedup1" }, TestContext.Current.CancellationToken); - await Sqs.SendMessageAsync(new SendMessageRequest { QueueUrl = mainQueueUrl, MessageBody = "Message 2", MessageGroupId = "group1", MessageDeduplicationId = "dedup2" }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync( + new SendMessageRequest + { + QueueUrl = mainQueueUrl, MessageBody = "Message 1", MessageGroupId = "group1", + MessageDeduplicationId = "dedup1" + }, TestContext.Current.CancellationToken); + await Sqs.SendMessageAsync( + new SendMessageRequest + { + QueueUrl = mainQueueUrl, MessageBody = "Message 2", MessageGroupId = "group1", + MessageDeduplicationId = "dedup2" + }, TestContext.Current.CancellationToken); // Receive and fail to process each message 3 times for (int i = 0; i < 3; i++) @@ -486,16 +537,19 @@ await Sqs.SetQueueAttributesAsync(new SetQueueAttributesRequest } // Check the error queue - messages should be there in order - var errorQueueResult = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest { QueueUrl = errorQueueUrl, MaxNumberOfMessages = 10 }, TestContext.Current.CancellationToken); + var errorQueueResult = await Sqs.ReceiveMessageAsync( + new ReceiveMessageRequest { QueueUrl = errorQueueUrl, MaxNumberOfMessages = 10 }, + TestContext.Current.CancellationToken); Assert.Equal(2, errorQueueResult.Messages.Count); Assert.Equal("Message 1", errorQueueResult.Messages[0].Body); Assert.Equal("Message 2", errorQueueResult.Messages[1].Body); } - - [Fact] + + [Fact] public async Task ReceiveMessageAsync_SpecificMessageSystemAttributes_OnlyRequestedAttributesReturned() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; // Send a message with system attributes await Sqs.SendMessageAsync(new SendMessageRequest @@ -504,8 +558,10 @@ await Sqs.SendMessageAsync(new SendMessageRequest MessageBody = "Test message", MessageSystemAttributes = new Dictionary { - [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue { StringValue = "TestSender", DataType = "String"}, - [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue { StringValue = "1621234567890", DataType = "String"} + [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue + { StringValue = "TestSender", DataType = "String" }, + [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue + { StringValue = "1621234567890", DataType = "String" } } }, TestContext.Current.CancellationToken); @@ -529,7 +585,8 @@ await Sqs.SendMessageAsync(new SendMessageRequest [Fact] public async Task ReceiveMessageAsync_AllMessageSystemAttributes_AllAttributesReturned() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; // Send a message with system attributes await Sqs.SendMessageAsync(new SendMessageRequest @@ -539,10 +596,13 @@ await Sqs.SendMessageAsync(new SendMessageRequest MessageSystemAttributes = new Dictionary { [MessageSystemAttributeName.SenderId] = new() { StringValue = "TestSender", DataType = "String" }, - [MessageSystemAttributeName.SentTimestamp] = new() { StringValue = "1621234567890", DataType = "String"}, - [MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = new() { StringValue = "1621234567891", DataType = "String"}, - [MessageSystemAttributeName.ApproximateReceiveCount] = new() { StringValue = "0", DataType = "String"}, - [MessageSystemAttributeName.AWSTraceHeader] = new() { StringValue = "Root=1-5e3d83c1-e6a0db584850d61342823d4c", DataType = "String"} + [MessageSystemAttributeName.SentTimestamp] = + new() { StringValue = "1621234567890", DataType = "String" }, + [MessageSystemAttributeName.ApproximateFirstReceiveTimestamp] = + new() { StringValue = "1621234567891", DataType = "String" }, + [MessageSystemAttributeName.ApproximateReceiveCount] = new() { StringValue = "0", DataType = "String" }, + [MessageSystemAttributeName.AWSTraceHeader] = new() + { StringValue = "Root=1-5e3d83c1-e6a0db584850d61342823d4c", DataType = "String" } } }, TestContext.Current.CancellationToken); @@ -561,12 +621,14 @@ await Sqs.SendMessageAsync(new SendMessageRequest Assert.Equal(5, message.Attributes.Count); Assert.Equal("TestSender", message.Attributes[MessageSystemAttributeName.SenderId]); Assert.Equal("1", message.Attributes[MessageSystemAttributeName.ApproximateReceiveCount]); - Assert.Equal("Root=1-5e3d83c1-e6a0db584850d61342823d4c", message.Attributes[MessageSystemAttributeName.AWSTraceHeader]); + Assert.Equal("Root=1-5e3d83c1-e6a0db584850d61342823d4c", + message.Attributes[MessageSystemAttributeName.AWSTraceHeader]); if (false) #pragma warning disable CS0162 // Unreachable code detected { Assert.Equal("1621234567890", message.Attributes[MessageSystemAttributeName.SentTimestamp]); - Assert.Equal("1621234567891", message.Attributes[MessageSystemAttributeName.ApproximateFirstReceiveTimestamp]); + Assert.Equal("1621234567891", + message.Attributes[MessageSystemAttributeName.ApproximateFirstReceiveTimestamp]); } #pragma warning restore CS0162 // Unreachable code detected } @@ -574,7 +636,8 @@ await Sqs.SendMessageAsync(new SendMessageRequest [Fact] public async Task ReceiveMessageAsync_NoMessageSystemAttributes_NoAttributesReturned() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; // Send a message with system attributes await Sqs.SendMessageAsync(new SendMessageRequest @@ -583,8 +646,10 @@ await Sqs.SendMessageAsync(new SendMessageRequest MessageBody = "Test message", MessageSystemAttributes = new Dictionary { - [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue { StringValue = "TestSender", DataType = "String"}, - [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue { StringValue = "1621234567890", DataType = "String"} + [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue + { StringValue = "TestSender", DataType = "String" }, + [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue + { StringValue = "1621234567890", DataType = "String" } } }, TestContext.Current.CancellationToken); @@ -606,7 +671,8 @@ await Sqs.SendMessageAsync(new SendMessageRequest [Fact] public async Task ReceiveMessageAsync_MultipleMessages_CorrectAttributesReturnedForEach() { - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; // Send two messages with different system attributes await Sqs.SendMessageAsync(new SendMessageRequest @@ -615,8 +681,10 @@ await Sqs.SendMessageAsync(new SendMessageRequest MessageBody = "Message 1", MessageSystemAttributes = new Dictionary { - [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue { StringValue = "Sender1", DataType = "String"}, - [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue { StringValue = "1621234567890", DataType = "String"} + [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue + { StringValue = "Sender1", DataType = "String" }, + [MessageSystemAttributeName.SentTimestamp] = new MessageSystemAttributeValue + { StringValue = "1621234567890", DataType = "String" } } }, TestContext.Current.CancellationToken); @@ -626,8 +694,10 @@ await Sqs.SendMessageAsync(new SendMessageRequest MessageBody = "Message 2", MessageSystemAttributes = new Dictionary { - [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue { StringValue = "Sender2", DataType = "String"}, - [MessageSystemAttributeName.AWSTraceHeader] = new MessageSystemAttributeValue { StringValue = "Root=1-5e3d83c1-e6a0db584850d61342823d4c", DataType = "String"} + [MessageSystemAttributeName.SenderId] = new MessageSystemAttributeValue + { StringValue = "Sender2", DataType = "String" }, + [MessageSystemAttributeName.AWSTraceHeader] = new MessageSystemAttributeValue + { StringValue = "Root=1-5e3d83c1-e6a0db584850d61342823d4c", DataType = "String" } } }, TestContext.Current.CancellationToken); @@ -654,186 +724,408 @@ await Sqs.SendMessageAsync(new SendMessageRequest // Check attributes for Message 2 Assert.Equal(2, message2.Attributes.Count); Assert.Equal("Sender2", message2.Attributes[MessageSystemAttributeName.SenderId]); - Assert.Equal("Root=1-5e3d83c1-e6a0db584850d61342823d4c", message2.Attributes[MessageSystemAttributeName.AWSTraceHeader]); + Assert.Equal("Root=1-5e3d83c1-e6a0db584850d61342823d4c", + message2.Attributes[MessageSystemAttributeName.AWSTraceHeader]); } - + // Permission tests [Fact] -public async Task AddPermissionAsync_ValidRequest_AddsPermissionToPolicy() -{ - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - var request = new AddPermissionRequest - { - QueueUrl = queueUrl, - Label = "TestPermission", - AWSAccountIds = ["123456789012"], - Actions = ["SendMessage", "ReceiveMessage"] - }; - - var response = await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken); - - Assert.NotNull(response); - var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = queueUrl, - AttributeNames = ["Policy"] - }, TestContext.Current.CancellationToken); - Assert.True(attributes.Attributes.ContainsKey("Policy")); - var policy = Policy.FromJson(attributes.Attributes["Policy"]); - Assert.Contains(policy.Statements, s => s.Id == "TestPermission"); -} - -[Fact] -public async Task AddPermissionAsync_DuplicateLabel_ThrowsArgumentException() -{ - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - var request = new AddPermissionRequest + public async Task AddPermissionAsync_ValidRequest_AddsPermissionToPolicy() { - QueueUrl = queueUrl, - Label = "TestPermission", - AWSAccountIds = ["123456789012"], - Actions = ["SendMessage"] - }; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + var request = new AddPermissionRequest + { + QueueUrl = queueUrl, + Label = "TestPermission", + AWSAccountIds = ["123456789012"], + Actions = ["SendMessage", "ReceiveMessage"] + }; - await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken); + var response = await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken); - await Assert.ThrowsAnyAsync(async () => await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken)); -} + Assert.NotNull(response); + var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = queueUrl, + AttributeNames = ["Policy"] + }, TestContext.Current.CancellationToken); + Assert.True(attributes.Attributes.ContainsKey("Policy")); + var policy = Policy.FromJson(attributes.Attributes["Policy"]); + Assert.Contains(policy.Statements, s => s.Id == "TestPermission"); + } -[Fact] -public async Task AddPermissionAsync_QueueDoesNotExist_ThrowsQueueDoesNotExistException() -{ - var request = new AddPermissionRequest + [Fact] + public async Task AddPermissionAsync_DuplicateLabel_ThrowsArgumentException() { - QueueUrl = "http://sqs.us-east-1.amazonaws.com/123456789012/non-existent-queue", - Label = "TestPermission", - AWSAccountIds = ["123456789012"], - Actions = ["SendMessage"] - }; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + var request = new AddPermissionRequest + { + QueueUrl = queueUrl, + Label = "TestPermission", + AWSAccountIds = ["123456789012"], + Actions = ["SendMessage"] + }; - await Assert.ThrowsAsync(async () => await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken)); -} + await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken); -[Fact] -public async Task RemovePermissionAsync_ValidRequest_RemovesPermissionFromPolicy() -{ - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - await Sqs.AddPermissionAsync(new AddPermissionRequest + await Assert.ThrowsAnyAsync(async () => + await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task AddPermissionAsync_QueueDoesNotExist_ThrowsQueueDoesNotExistException() { - QueueUrl = queueUrl, - Label = "TestPermission", - AWSAccountIds = ["123456789012"], - Actions = ["SendMessage"] - }, TestContext.Current.CancellationToken); + var request = new AddPermissionRequest + { + QueueUrl = "http://sqs.us-east-1.amazonaws.com/123456789012/non-existent-queue", + Label = "TestPermission", + AWSAccountIds = ["123456789012"], + Actions = ["SendMessage"] + }; - var removeRequest = new RemovePermissionRequest + await Assert.ThrowsAsync(async () => + await Sqs.AddPermissionAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task RemovePermissionAsync_ValidRequest_RemovesPermissionFromPolicy() { - QueueUrl = queueUrl, - Label = "TestPermission" - }; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + await Sqs.AddPermissionAsync(new AddPermissionRequest + { + QueueUrl = queueUrl, + Label = "TestPermission", + AWSAccountIds = ["123456789012"], + Actions = ["SendMessage"] + }, TestContext.Current.CancellationToken); - var response = await Sqs.RemovePermissionAsync(removeRequest, TestContext.Current.CancellationToken); + var removeRequest = new RemovePermissionRequest + { + QueueUrl = queueUrl, + Label = "TestPermission" + }; - Assert.NotNull(response); - var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + var response = await Sqs.RemovePermissionAsync(removeRequest, TestContext.Current.CancellationToken); + + Assert.NotNull(response); + var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = queueUrl, + AttributeNames = ["Policy"] + }, TestContext.Current.CancellationToken); + Assert.False(attributes.Attributes.ContainsKey("Policy")); + } + + [Fact] + public async Task RemovePermissionAsync_LabelDoesNotExist_ThrowsArgumentException() { - QueueUrl = queueUrl, - AttributeNames = ["Policy"] - }, TestContext.Current.CancellationToken); - Assert.False(attributes.Attributes.ContainsKey("Policy")); -} + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + var request = new RemovePermissionRequest + { + QueueUrl = queueUrl, + Label = "NonExistentLabel" + }; -[Fact] -public async Task RemovePermissionAsync_LabelDoesNotExist_ThrowsArgumentException() -{ - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - var request = new RemovePermissionRequest + await Assert.ThrowsAnyAsync(async () => + await Sqs.RemovePermissionAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task RemovePermissionAsync_QueueDoesNotExist_ThrowsQueueDoesNotExistException() { - QueueUrl = queueUrl, - Label = "NonExistentLabel" - }; + var request = new RemovePermissionRequest + { + QueueUrl = "http://sqs.us-east-1.amazonaws.com/123456789012/non-existent-queue", + Label = "TestPermission" + }; - await Assert.ThrowsAnyAsync(async () => await Sqs.RemovePermissionAsync(request, TestContext.Current.CancellationToken)); -} + await Assert.ThrowsAsync(async () => + await Sqs.RemovePermissionAsync(request, TestContext.Current.CancellationToken)); + } -[Fact] -public async Task RemovePermissionAsync_QueueDoesNotExist_ThrowsQueueDoesNotExistException() -{ - var request = new RemovePermissionRequest + [Fact] + public async Task AddAndRemovePermission_MultiplePermissions_ManagesCorrectly() { - QueueUrl = "http://sqs.us-east-1.amazonaws.com/123456789012/non-existent-queue", - Label = "TestPermission" - }; + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; - await Assert.ThrowsAsync(async () => await Sqs.RemovePermissionAsync(request, TestContext.Current.CancellationToken)); -} + // Add first permission + await Sqs.AddPermissionAsync(new AddPermissionRequest + { + QueueUrl = queueUrl, + Label = "Permission1", + AWSAccountIds = ["123456789012"], + Actions = ["SendMessage"] + }, TestContext.Current.CancellationToken); -[Fact] -public async Task AddAndRemovePermission_MultiplePermissions_ManagesCorrectly() -{ - var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, TestContext.Current.CancellationToken)).QueueUrl; - - // Add first permission - await Sqs.AddPermissionAsync(new AddPermissionRequest - { - QueueUrl = queueUrl, - Label = "Permission1", - AWSAccountIds = ["123456789012"], - Actions = ["SendMessage"] - }, TestContext.Current.CancellationToken); - - // Add second permission - await Sqs.AddPermissionAsync(new AddPermissionRequest - { - QueueUrl = queueUrl, - Label = "Permission2", - AWSAccountIds = ["210987654321"], - Actions = ["ReceiveMessage"] - }, TestContext.Current.CancellationToken); - - // Verify both permissions exist - var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = queueUrl, - AttributeNames = ["Policy"] - }, TestContext.Current.CancellationToken); - var policy = Policy.FromJson(attributes.Attributes["Policy"]); - Assert.Equal(2, policy.Statements.Count); - Assert.Contains(policy.Statements, s => s.Id == "Permission1"); - Assert.Contains(policy.Statements, s => s.Id == "Permission2"); - - // Remove first permission - await Sqs.RemovePermissionAsync(new RemovePermissionRequest - { - QueueUrl = queueUrl, - Label = "Permission1" - }, TestContext.Current.CancellationToken); - - // Verify only second permission remains - attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = queueUrl, - AttributeNames = ["Policy"] - }, TestContext.Current.CancellationToken); - policy = Policy.FromJson(attributes.Attributes["Policy"]); - Assert.Single(policy.Statements); - Assert.Contains(policy.Statements, s => s.Id == "Permission2"); - - // Remove second permission - await Sqs.RemovePermissionAsync(new RemovePermissionRequest - { - QueueUrl = queueUrl, - Label = "Permission2" - }, TestContext.Current.CancellationToken); - - // Verify policy is removed - attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest - { - QueueUrl = queueUrl, - AttributeNames = ["Policy"] - }, TestContext.Current.CancellationToken); - Assert.False(attributes.Attributes.ContainsKey("Policy")); -} + // Add second permission + await Sqs.AddPermissionAsync(new AddPermissionRequest + { + QueueUrl = queueUrl, + Label = "Permission2", + AWSAccountIds = ["210987654321"], + Actions = ["ReceiveMessage"] + }, TestContext.Current.CancellationToken); + + // Verify both permissions exist + var attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = queueUrl, + AttributeNames = ["Policy"] + }, TestContext.Current.CancellationToken); + var policy = Policy.FromJson(attributes.Attributes["Policy"]); + Assert.Equal(2, policy.Statements.Count); + Assert.Contains(policy.Statements, s => s.Id == "Permission1"); + Assert.Contains(policy.Statements, s => s.Id == "Permission2"); + + // Remove first permission + await Sqs.RemovePermissionAsync(new RemovePermissionRequest + { + QueueUrl = queueUrl, + Label = "Permission1" + }, TestContext.Current.CancellationToken); + + // Verify only second permission remains + attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = queueUrl, + AttributeNames = ["Policy"] + }, TestContext.Current.CancellationToken); + policy = Policy.FromJson(attributes.Attributes["Policy"]); + Assert.Single(policy.Statements); + Assert.Contains(policy.Statements, s => s.Id == "Permission2"); + + // Remove second permission + await Sqs.RemovePermissionAsync(new RemovePermissionRequest + { + QueueUrl = queueUrl, + Label = "Permission2" + }, TestContext.Current.CancellationToken); + + // Verify policy is removed + attributes = await Sqs.GetQueueAttributesAsync(new GetQueueAttributesRequest + { + QueueUrl = queueUrl, + AttributeNames = ["Policy"] + }, TestContext.Current.CancellationToken); + Assert.False(attributes.Attributes.ContainsKey("Policy")); + } + + [Fact] + public async Task SendMessageAsync_MessageExceedsMaximumSize_ThrowsInvalidMessageContentsException() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + // Create a message that exceeds 256KB (262,144 bytes) + var largeMessage = new string('x', 262145); + + var request = new SendMessageRequest + { + QueueUrl = queueUrl, + MessageBody = largeMessage + }; + + await Assert.ThrowsAsync(() => + Sqs.SendMessageAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task SendMessageAsync_MessageAttributeFullSizeCalculation_ThrowsException() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + // The total size includes: + // 1. Message body + // 2. Each message attribute name + // 3. Each message attribute type (including "String", "Number", etc.) + // 4. Each message attribute value + var messageBody = new string('x', 200_000); + + var request = new SendMessageRequest + { + QueueUrl = queueUrl, + MessageBody = messageBody, + MessageAttributes = new Dictionary + { + // Long attribute name (contributes to size) + [new string('a', 1000)] = new MessageAttributeValue + { + DataType = "String", // 6 bytes + StringValue = new string('y', 62000) + }, + // Another attribute to push us over the limit + [new string('b', 100)] = new MessageAttributeValue + { + DataType = "Number", // 6 bytes + StringValue = "123" + } + } + }; + + await Assert.ThrowsAsync(() => + Sqs.SendMessageAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task SendMessageAsync_MultipleAttributesExactlyAtLimit_Succeeds() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + // Calculate sizes to reach exactly 256KB: + // - Message body: 200,000 bytes + // - First attribute: + // * Name: 100 bytes + // * Type: "String" (6 bytes) + // * Value: 31,000 bytes + // - Second attribute: + // * Name: 20 bytes + // * Type: "Number" (6 bytes) + // * Value: 31,012 bytes + // Total: 262,144 bytes (256KB) + + var sendRequest = new SendMessageRequest + { + QueueUrl = queueUrl, + MessageBody = new string('x', 200000), + MessageAttributes = new Dictionary + { + [new string('a', 100)] = new MessageAttributeValue + { + DataType = "String", + StringValue = new string('y', 31000) + }, + [new string('b', 20)] = new MessageAttributeValue + { + DataType = "Number", + StringValue = new string('z', 31012) + } + } + }; + + var sendResponse = await Sqs.SendMessageAsync(sendRequest, TestContext.Current.CancellationToken); + Assert.NotNull(sendResponse.MessageId); + + // Verify we can receive the message with attributes + var receiveResponse = await Sqs.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = queueUrl, + MaxNumberOfMessages = 1, + MessageAttributeNames = ["All"] + }, TestContext.Current.CancellationToken); + + var message = Assert.Single(receiveResponse.Messages); + Assert.Equal(2, message.MessageAttributes.Count); + } + + [Fact] + public async Task SendMessageAsync_BinaryAttributeSize_Succeeds() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + // Create a binary attribute + byte[] binaryData = new byte[1000]; +#pragma warning disable CA5394 + new Random(42).NextBytes(binaryData); +#pragma warning restore CA5394 + + var request = new SendMessageRequest + { + QueueUrl = queueUrl, + MessageBody = new string('x', 260000), + MessageAttributes = new Dictionary + { + ["BinaryAttribute"] = new MessageAttributeValue + { + DataType = "Binary", + BinaryValue = new MemoryStream(binaryData) + } + } + }; + + // Total size: 260,000 (body) + 14 (attribute name) + 6 (type) + 1,000 (binary value) = 261,020 bytes + var response = await Sqs.SendMessageAsync(request, TestContext.Current.CancellationToken); + Assert.NotNull(response.MessageId); + } + + [Fact] + public async Task SendMessageAsync_CustomAttributeTypeNames_CountTowardsLimit() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + // Using a custom attribute type name which counts towards the limit + var longCustomType = $"String.{new string('x', 1000)}"; + + var request = new SendMessageRequest + { + QueueUrl = queueUrl, + MessageBody = new string('x', 262000), + MessageAttributes = new Dictionary + { + ["CustomAttribute"] = new MessageAttributeValue + { + DataType = longCustomType, + StringValue = "test" + } + } + }; + + // The long custom type name should push us over the limit + await Assert.ThrowsAsync(() => + Sqs.SendMessageAsync(request, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task SendMessageAsync_BatchWithAttributeSizeLimits_PartialBatchFailure() + { + var queueUrl = (await Sqs.CreateQueueAsync(new CreateQueueRequest { QueueName = "test-queue" }, + TestContext.Current.CancellationToken)).QueueUrl; + + var validMessage = new SendMessageBatchRequestEntry + { + Id = "1", + MessageBody = "Valid message", + MessageAttributes = new Dictionary + { + ["attr1"] = new MessageAttributeValue + { + DataType = "String", + StringValue = "test" + } + } + }; + + var oversizedMessage = new SendMessageBatchRequestEntry + { + Id = "2", + MessageBody = new string('x', 260000), + MessageAttributes = new Dictionary + { + [new string('a', 1000)] = new MessageAttributeValue // Long attribute name + { + DataType = "String", + StringValue = new string('y', 2000) + } + } + }; + + var request = new SendMessageBatchRequest + { + QueueUrl = queueUrl, + Entries = [validMessage, oversizedMessage] + }; + + await Assert.ThrowsAsync(async () => + await Sqs.SendMessageBatchAsync(request, TestContext.Current.CancellationToken)); + } protected abstract Task AdvanceTime(TimeSpan timeSpan); } \ No newline at end of file diff --git a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsStartMessageMoveTaskAsyncTests.cs b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsStartMessageMoveTaskAsyncTests.cs index d657ffe..05c04bf 100644 --- a/tests/LocalSqsSnsMessaging.Tests.Shared/SqsStartMessageMoveTaskAsyncTests.cs +++ b/tests/LocalSqsSnsMessaging.Tests.Shared/SqsStartMessageMoveTaskAsyncTests.cs @@ -88,7 +88,7 @@ private async Task GetQueueArnFromUrl(string queueUrl) return attrResponse.Attributes["QueueArn"]; } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_ValidRequest_MovesMessage() { await SetupQueuesAndMessage(); @@ -115,7 +115,7 @@ public async Task StartMessageMoveTaskAsync_ValidRequest_MovesMessage() mainReceiveResult.Messages.Should().HaveCount(4); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_NonDLQSource_ThrowsException() { await SetupQueuesAndMessage(); @@ -137,7 +137,7 @@ await Assert.ThrowsAnyAsync(() => }, TestContext.Current.CancellationToken)); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_InvalidDestinationQueue_ThrowsException() { await SetupQueuesAndMessage(); @@ -151,7 +151,7 @@ await Assert.ThrowsAsync(() => }, TestContext.Current.CancellationToken)); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_EmptyDLQ_NoMessagesMoved() { await SetupQueuesAndMessage(); @@ -183,7 +183,7 @@ public async Task StartMessageMoveTaskAsync_EmptyDLQ_NoMessagesMoved() Assert.Empty(mainReceiveResult.Messages); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_MaxNumberOfMessagesPerSecond_RespectsLimit() { await SetupQueuesAndMessage(); @@ -228,7 +228,7 @@ await Sqs.SendMessageAsync(new SendMessageRequest Assert.InRange(sourceReceiveResult.Messages.Count, 4, 6); // Allow for some flexibility due to timing } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartMessageMoveTaskAsync_NoDestinationArn_MovesToOriginalSource() { await SetupQueuesAndMessage(); @@ -254,7 +254,7 @@ public async Task StartMessageMoveTaskAsync_NoDestinationArn_MovesToOriginalSour Assert.Empty(sourceReceiveResult.Messages); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task CancelMessageMoveTaskAsync_ValidTaskHandle_StopsTask() { await SetupQueuesAndMessage(); @@ -281,7 +281,7 @@ await Sqs.CancelMessageMoveTaskAsync(new CancelMessageMoveTaskRequest Assert.NotEmpty(sourceReceiveResult.Messages); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task StartingTwoMessageMoveTasksForTheSameQueue_Throws() { await SetupQueuesAndMessage(); @@ -305,7 +305,7 @@ await Assert.ThrowsAnyAsync(async () => }); } - [Fact] + [Fact, Trait("Category", "TimeBasedTests")] public async Task ListMessageMoveTasks_ReturnsAllActiveTasks() { await SetupQueuesAndMessage();