Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus][Test] extract delete messages test cases into a separate file #31701

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 0 additions & 112 deletions sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -814,118 +814,6 @@ describe("Batching Receiver", () => {
}
},
);

const getMessage = (): { body: string } => ({
body: `${Date.now()}-${Math.random().toString()}`,
});

it(noSessionTestClientType + ": deleteMessages", async function (): Promise<void> {
await beforeEachTest(noSessionTestClientType);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
toSend.push(getMessage());
}
await sender.sendMessages(toSend);

await testPeekMsgsLength(receiver2, numMessages);

// wait for things to be ready
await delay(10 * 1000);
await receiver2.deleteMessages({ maxMessageCount: numMessages });

await testPeekMsgsLength(receiver2, 0);
});

it(noSessionTestClientType + ": purgeMessages", async function (): Promise<void> {
await beforeEachTest(noSessionTestClientType);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

const numMessages = 5000;
let i = 0;
let batch = await sender.createMessageBatch();
while (i < numMessages) {
const message = getMessage();
if (!batch.tryAddMessage(message)) {
// Send the current batch as it is full and create a new one
await sender.sendMessages(batch);
batch = await sender.createMessageBatch();
} else {
i++;
}
}
if (batch.count) {
// Send the last batch
await sender.sendMessages(batch);
}

// wait for things to be ready
await delay(10 * 1000);
await receiver2.purgeMessages();

await testPeekMsgsLength(receiver2, 0);
});

it(
TestClientType.PartitionedQueue +
": deleteMessages with max message count of zero throws error",
async function (): Promise<void> {
await beforeEachTest(TestClientType.PartitionedQueue, "receiveAndDelete");

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
toSend.push(getMessage());
}
await sender.sendMessages(toSend);

await testPeekMsgsLength(receiver, numMessages);

try {
await receiver.deleteMessages({ maxMessageCount: 0 });
throw new Error("Test failure");
} catch (err: any) {
err.message.should.equal(
"Error 0: TypeError: 'messageCount' must be a number greater than 0.",
);
}
},
);

it(withSessionTestClientType + ": deleteMessages (session)", async function (): Promise<void> {
const randomSessionId = Math.random().toString();
const names = await serviceBusClient.test.createTestEntities(withSessionTestClientType);
const sender2 = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(names.queue ?? names.topic!),
);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver({
...names,
sessionId: randomSessionId,
});

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
const testMessage = {
...getMessage(),
sessionId: randomSessionId,
timeToLive: 24 * 60 * 60 * 1000,
};
toSend.push(testMessage);
}
await sender2.sendMessages(toSend);

const peeked = await receiver2.peekMessages(numMessages + 1);
assert.equal(peeked.length, numMessages);

// wait for things to be ready
await delay(10 * 1000);
await receiver2.deleteMessages({ maxMessageCount: numMessages });

await testPeekMsgsLength(receiver2, 0);
});
});

describe("Batch Receiver - disconnects", () => {
Expand Down
218 changes: 218 additions & 0 deletions sdk/servicebus/service-bus/test/internal/deleteMessages.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import type { ServiceBusSender } from "../../src/index.js";
import { delay } from "../../src/index.js";
import { TestClientType } from "../public/utils/testUtils.js";
import type { ServiceBusReceiver } from "../../src/receivers/receiver.js";
import type { ServiceBusClientForTests, EntityName } from "../public/utils/testutils2.js";
import { createServiceBusClientForTests, testPeekMsgsLength } from "../public/utils/testutils2.js";
import { afterAll, afterEach, beforeAll, describe, it } from "vitest";
import { assert } from "../public/utils/chai.js";

const sessionTestClientTypes = [
TestClientType.PartitionedQueueWithSessions,
TestClientType.PartitionedSubscriptionWithSessions,
TestClientType.UnpartitionedQueueWithSessions,
TestClientType.UnpartitionedSubscriptionWithSessions,
];
const noSessionTestClientTypes = [
TestClientType.PartitionedQueue,
TestClientType.PartitionedSubscription,
TestClientType.UnpartitionedQueue,
TestClientType.UnpartitionedSubscription,
];

let serviceBusClient: ServiceBusClientForTests;
let entityNames: EntityName;
let sender: ServiceBusSender;
let receiver: ServiceBusReceiver;

async function beforeEachTest(
entityType: TestClientType,
receiveMode: "peekLock" | "receiveAndDelete" = "peekLock",
): Promise<void> {
entityNames = await serviceBusClient.test.createTestEntities(entityType);
if (receiveMode === "receiveAndDelete") {
receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);
} else {
receiver = await serviceBusClient.test.createPeekLockReceiver(entityNames);
}

sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!),
);
}

function afterEachTest(): Promise<void> {
return serviceBusClient.test.afterEach();
}

describe.skip("Batch Receiver - batch delete messages", function (): void {
beforeAll(() => {
serviceBusClient = createServiceBusClientForTests();
});

afterAll(() => {
return serviceBusClient.test.after();
});

afterEach(async () => {
await afterEachTest();
});

const getMessage = (): { body: string } => ({
body: `${Date.now()}-${Math.random().toString()}`,
});

noSessionTestClientTypes.forEach((testClientType) => {
it(testClientType + ": deleteMessages", async function (): Promise<void> {
await beforeEachTest(testClientType);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
toSend.push(getMessage());
}
await sender.sendMessages(toSend);

await testPeekMsgsLength(receiver2, numMessages);

// wait for things to be ready
await delay(10 * 1000);
await receiver2.deleteMessages({ maxMessageCount: numMessages });

await testPeekMsgsLength(receiver2, 0);
});

it(testClientType + ": purgeMessages", async function (): Promise<void> {
await beforeEachTest(testClientType);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);

const numMessages = 5000;
let i = 0;
let batch = await sender.createMessageBatch();
while (i < numMessages) {
const message = getMessage();
if (!batch.tryAddMessage(message)) {
// Send the current batch as it is full and create a new one
await sender.sendMessages(batch);
batch = await sender.createMessageBatch();
} else {
i++;
}
}
if (batch.count) {
// Send the last batch
await sender.sendMessages(batch);
}

// wait for things to be ready
await delay(10 * 1000);
await receiver2.purgeMessages();

await testPeekMsgsLength(receiver2, 0);
});
});

it(
TestClientType.PartitionedQueue +
": deleteMessages with max message count of zero throws error",
async function (): Promise<void> {
await beforeEachTest(TestClientType.PartitionedQueue, "receiveAndDelete");

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
toSend.push(getMessage());
}
await sender.sendMessages(toSend);

await testPeekMsgsLength(receiver, numMessages);

try {
await receiver.deleteMessages({ maxMessageCount: 0 });
throw new Error("Test failure");
} catch (err: any) {
err.message.should.equal(
"Error 0: TypeError: 'messageCount' must be a number greater than 0.",
);
}
},
);

sessionTestClientTypes.forEach((testClientType) => {
it(testClientType + ": deleteMessages (session)", async function (): Promise<void> {
const randomSessionId = Math.random().toString();
const names = await serviceBusClient.test.createTestEntities(testClientType);
const sender2 = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(names.queue ?? names.topic!),
);

const numMessages = 3;
const toSend = [];
for (let i = 0; i < numMessages; i++) {
const testMessage = {
...getMessage(),
sessionId: randomSessionId,
timeToLive: 24 * 60 * 60 * 1000,
};
toSend.push(testMessage);
}
await sender2.sendMessages(toSend);

const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver({
...names,
sessionId: randomSessionId,
});
const peeked = await receiver2.peekMessages(numMessages + 10);
assert.equal(peeked.length, numMessages);

// wait for things to be ready
await delay(10 * 1000);
await receiver2.deleteMessages({ maxMessageCount: numMessages });

await testPeekMsgsLength(receiver2, 0);
});

it(testClientType + ": purgeMessages (session)", async function (): Promise<void> {
const randomSessionId = Math.random().toString();
const names = await serviceBusClient.test.createTestEntities(testClientType);
const sender2 = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(names.queue ?? names.topic!),
);

const numMessages = 5000;
let i = 0;
let batch = await sender2.createMessageBatch();
while (i < numMessages) {
const message = {
...getMessage(),
sessionId: randomSessionId,
timeToLive: 24 * 60 * 60 * 1000,
};
if (!batch.tryAddMessage(message)) {
// Send the current batch as it is full and create a new one
await sender2.sendMessages(batch);
batch = await sender2.createMessageBatch();
} else {
i++;
}
}
if (batch.count) {
// Send the last batch
await sender2.sendMessages(batch);
}

// wait for things to be ready
await delay(10 * 1000);
const receiver2 = await serviceBusClient.test.createReceiveAndDeleteReceiver({
...names,
sessionId: randomSessionId,
});
await receiver2.purgeMessages();

await testPeekMsgsLength(receiver2, 0);
});
});
});
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/test/public/utils/testutils2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ export class ServiceBusTestHelpers {

async purgeForClientType(...testClientTypes: TestClientType[]): Promise<void> {
await Promise.all(
testClientTypes.map((tct) => purgeForTestClientType(this._serviceBusClient, tct)),
testClientTypes.map((tct) => {
purgeForTestClientType(this._serviceBusClient, tct);
this._testClientEntities.delete(tct);
}),
);
}

Expand Down Expand Up @@ -577,7 +580,7 @@ export async function testPeekMsgsLength(
peekableReceiver: ServiceBusReceiver,
expectedPeekLength: number,
): Promise<void> {
const peekedMsgs = await peekableReceiver.peekMessages(expectedPeekLength + 1);
const peekedMsgs = await peekableReceiver.peekMessages(expectedPeekLength + 10);

should.equal(
peekedMsgs.length,
Expand Down
Loading