From 8866d81359033b6b559aaa0fea35a559241b1ec8 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 12 Mar 2025 19:24:15 +0900 Subject: [PATCH 1/8] Add initial `BeatmapStatusWatcher` component --- .../BeatmapStatusWatcher.cs | 124 ++++++++++++++++++ .../osu.Server.QueueProcessor.csproj | 1 + 2 files changed, 125 insertions(+) create mode 100644 osu.Server.QueueProcessor/BeatmapStatusWatcher.cs diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs new file mode 100644 index 0000000..b793b24 --- /dev/null +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -0,0 +1,124 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using MySqlConnector; + +namespace osu.Server.QueueProcessor +{ + /// + /// Provides insight into whenever a beatmap has changed status based on a user or system update. + /// + public static class BeatmapStatusWatcher + { + /// + /// Start a background task which will poll for beatmap sets with updates. + /// + /// A callback to receive information about any updated beatmap sets. + /// The number of milliseconds to wait between polls. Starts counting from response of previous poll. + /// The maximum number of beatmap sets to return in a single response. + /// An that should be disposed of to stop polling. + public static IDisposable StartPolling(Action callback, int pollMilliseconds = 10000, int limit = 50) => + new PollingBeatmapStatusWatcher(callback, pollMilliseconds, limit); + + /// + /// Check for any beatmap sets with updates since the provided queue ID. + /// Should be called on a regular basis. See for automatic polling. + /// + /// The last checked queue ID, ie . + /// The maximum number of beatmap sets to return in a single response. + /// A response containing information about any updated beatmap sets. + public static async Task GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50) + { + MySqlConnection connection = await DatabaseAccess.GetConnectionAsync(); + + if (lastQueueId.HasValue) + { + var items = (await connection.QueryAsync("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId LIMIT @limit", new + { + lastQueueId, + limit + })).ToArray(); + + return new BeatmapUpdates + { + BeatmapSetIDs = items.Select(i => i.beatmapset_id).ToArray(), + LastProcessedQueueID = items.LastOrDefault()?.queue_id ?? lastQueueId.Value + }; + } + + var lastEntry = await connection.QueryFirstOrDefaultAsync("SELECT * FROM bss_process_queue ORDER BY queue_id DESC LIMIT 1"); + + return new BeatmapUpdates + { + BeatmapSetIDs = [], + LastProcessedQueueID = lastEntry?.queue_id ?? 0 + }; + } + + // ReSharper disable InconsistentNaming (matches database table) + [Serializable] + public class bss_process_queue_item + { + public int queue_id; + public int beatmapset_id; + } + + public record BeatmapUpdates + { + public required int[] BeatmapSetIDs { get; init; } + public required int LastProcessedQueueID { get; init; } + } + } + + public class PollingBeatmapStatusWatcher : IDisposable + { + private readonly Action callback; + + private readonly int pollMilliseconds; + private readonly int limit; + + private int? lastQueueId; + private readonly CancellationTokenSource cts; + + public PollingBeatmapStatusWatcher(Action callback, int pollMilliseconds, int limit = 50) + { + this.pollMilliseconds = pollMilliseconds; + this.limit = limit; + this.callback = callback; + + cts = new CancellationTokenSource(); + + _ = poll(); + } + + private async Task poll() + { + try + { + var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSets(lastQueueId, limit); + + lastQueueId = result.LastProcessedQueueID; + if (result.BeatmapSetIDs.Length > 0) + callback(result); + } + catch (Exception e) + { + Console.WriteLine($"Poll failed with {e}."); + await Task.Delay(1000); + } + + _ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token); + } + + public void Dispose() + { + cts.Cancel(); + cts.Dispose(); + } + } +} diff --git a/osu.Server.QueueProcessor/osu.Server.QueueProcessor.csproj b/osu.Server.QueueProcessor/osu.Server.QueueProcessor.csproj index 2830c9d..f31bde7 100644 --- a/osu.Server.QueueProcessor/osu.Server.QueueProcessor.csproj +++ b/osu.Server.QueueProcessor/osu.Server.QueueProcessor.csproj @@ -11,6 +11,7 @@ + From 7a45186731472014b06c92d12a76aa2f39675da1 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 12:04:05 +0900 Subject: [PATCH 2/8] Better document flow and reasoning for initial non-poll operation --- .../BeatmapStatusWatcher.cs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs index b793b24..2b54b5c 100644 --- a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -18,16 +18,34 @@ public static class BeatmapStatusWatcher /// /// Start a background task which will poll for beatmap sets with updates. /// + /// + /// Prior to polling, a blocking call to is required to ensure no initial updates are missed. + /// The general flow of usage should be: + /// + /// // before doing anything else + /// var updates = await GetUpdatedBeatmapSets(); + /// // can now query and cache beatmaps. + /// StartPolling(updates, callback); + /// + /// void callback(BeatmapUpdates u) + /// { + /// foreach (int id in u.BeatmapSetIDs) + /// { + /// // invalidate `id` + /// } + /// } + /// + /// The response from an initial call to . /// A callback to receive information about any updated beatmap sets. /// The number of milliseconds to wait between polls. Starts counting from response of previous poll. /// The maximum number of beatmap sets to return in a single response. /// An that should be disposed of to stop polling. - public static IDisposable StartPolling(Action callback, int pollMilliseconds = 10000, int limit = 50) => + public static IDisposable StartPolling(BeatmapUpdates initialUpdates, Action callback, int pollMilliseconds = 10000, int limit = 50) => new PollingBeatmapStatusWatcher(callback, pollMilliseconds, limit); /// /// Check for any beatmap sets with updates since the provided queue ID. - /// Should be called on a regular basis. See for automatic polling. + /// Should be called on a regular basis. See for automatic polling after the first call. /// /// The last checked queue ID, ie . /// The maximum number of beatmap sets to return in a single response. From ed5f23ba104079c989a5180d1d02536f6f03290f Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 12:17:29 +0900 Subject: [PATCH 3/8] Change polling flow to not require initial separate call --- .../BeatmapStatusWatcher.cs | 99 +++++++++---------- osu.Server.QueueProcessor/BeatmapUpdates.cs | 11 +++ 2 files changed, 58 insertions(+), 52 deletions(-) create mode 100644 osu.Server.QueueProcessor/BeatmapUpdates.cs diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs index 2b54b5c..b5643eb 100644 --- a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -19,40 +19,40 @@ public static class BeatmapStatusWatcher /// Start a background task which will poll for beatmap sets with updates. /// /// - /// Prior to polling, a blocking call to is required to ensure no initial updates are missed. /// The general flow of usage should be: /// - /// // before doing anything else - /// var updates = await GetUpdatedBeatmapSets(); - /// // can now query and cache beatmaps. - /// StartPolling(updates, callback); + /// // before doing anything else, start polling. + /// // it's important to await the completion of this operation to ensure no updates are missed. + /// using var pollingOperation = await StartPollingAsync(updates, callback); /// /// void callback(BeatmapUpdates u) /// { - /// foreach (int id in u.BeatmapSetIDs) + /// foreach (int beatmapSetId in u.BeatmapSetIDs) /// { - /// // invalidate `id` + /// // invalidate anything related to `beatmapSetId` /// } /// } /// - /// The response from an initial call to . /// A callback to receive information about any updated beatmap sets. /// The number of milliseconds to wait between polls. Starts counting from response of previous poll. /// The maximum number of beatmap sets to return in a single response. /// An that should be disposed of to stop polling. - public static IDisposable StartPolling(BeatmapUpdates initialUpdates, Action callback, int pollMilliseconds = 10000, int limit = 50) => - new PollingBeatmapStatusWatcher(callback, pollMilliseconds, limit); + public static async Task StartPollingAsync(Action callback, int pollMilliseconds = 10000, int limit = 50) + { + var initialUpdates = await GetUpdatedBeatmapSetsAsync(limit: limit); + return new PollingBeatmapStatusWatcher(initialUpdates.LastProcessedQueueID, callback, pollMilliseconds, limit); + } /// /// Check for any beatmap sets with updates since the provided queue ID. - /// Should be called on a regular basis. See for automatic polling after the first call. + /// Should be called on a regular basis. See for automatic polling after the first call. /// /// The last checked queue ID, ie . /// The maximum number of beatmap sets to return in a single response. /// A response containing information about any updated beatmap sets. - public static async Task GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50) + public static async Task GetUpdatedBeatmapSetsAsync(int? lastQueueId = null, int limit = 50) { - MySqlConnection connection = await DatabaseAccess.GetConnectionAsync(); + using MySqlConnection connection = await DatabaseAccess.GetConnectionAsync(); if (lastQueueId.HasValue) { @@ -86,57 +86,52 @@ public class bss_process_queue_item public int beatmapset_id; } - public record BeatmapUpdates + private class PollingBeatmapStatusWatcher : IDisposable { - public required int[] BeatmapSetIDs { get; init; } - public required int LastProcessedQueueID { get; init; } - } - } - - public class PollingBeatmapStatusWatcher : IDisposable - { - private readonly Action callback; + private readonly Action callback; - private readonly int pollMilliseconds; - private readonly int limit; + private readonly int pollMilliseconds; + private readonly int limit; - private int? lastQueueId; - private readonly CancellationTokenSource cts; + private int lastQueueId; + private readonly CancellationTokenSource cts; - public PollingBeatmapStatusWatcher(Action callback, int pollMilliseconds, int limit = 50) - { - this.pollMilliseconds = pollMilliseconds; - this.limit = limit; - this.callback = callback; + public PollingBeatmapStatusWatcher(int initialQueueId, Action callback, int pollMilliseconds, int limit = 50) + { + this.lastQueueId = initialQueueId; + this.pollMilliseconds = pollMilliseconds; + this.limit = limit; + this.callback = callback; - cts = new CancellationTokenSource(); + cts = new CancellationTokenSource(); - _ = poll(); - } + _ = poll(); + } - private async Task poll() - { - try + private async Task poll() { - var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSets(lastQueueId, limit); + try + { + var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSetsAsync(lastQueueId, limit); - lastQueueId = result.LastProcessedQueueID; - if (result.BeatmapSetIDs.Length > 0) - callback(result); + lastQueueId = result.LastProcessedQueueID; + if (result.BeatmapSetIDs.Length > 0) + callback(result); + } + catch (Exception e) + { + Console.WriteLine($"Poll failed with {e}."); + await Task.Delay(1000); + } + + _ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token); } - catch (Exception e) + + public void Dispose() { - Console.WriteLine($"Poll failed with {e}."); - await Task.Delay(1000); + cts.Cancel(); + cts.Dispose(); } - - _ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token); - } - - public void Dispose() - { - cts.Cancel(); - cts.Dispose(); } } } diff --git a/osu.Server.QueueProcessor/BeatmapUpdates.cs b/osu.Server.QueueProcessor/BeatmapUpdates.cs new file mode 100644 index 0000000..39ebbc5 --- /dev/null +++ b/osu.Server.QueueProcessor/BeatmapUpdates.cs @@ -0,0 +1,11 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +namespace osu.Server.QueueProcessor +{ + public record BeatmapUpdates + { + public required int[] BeatmapSetIDs { get; init; } + public required int LastProcessedQueueID { get; init; } + } +} From aa3d55aa37c2bb1ba11c6c633e515a6f6b840d88 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 16:33:52 +0900 Subject: [PATCH 4/8] Update test setup to pull in docker dependencies --- .config/dotnet-tools.json | 24 +++++++++++++ .github/workflows/ci.yml | 71 ++++++++++++++++++++++++++------------- docker-compose.yml | 66 ++++++++++++++++++++++++++++++++++++ global.json | 7 ++++ 4 files changed, 145 insertions(+), 23 deletions(-) create mode 100644 .config/dotnet-tools.json create mode 100644 docker-compose.yml create mode 100644 global.json diff --git a/.config/dotnet-tools.json b/.config/dotnet-tools.json new file mode 100644 index 0000000..129c1f8 --- /dev/null +++ b/.config/dotnet-tools.json @@ -0,0 +1,24 @@ +{ + "version": 1, + "isRoot": true, + "tools": { + "jetbrains.resharper.globaltools": { + "version": "2023.3.3", + "commands": [ + "jb" + ] + }, + "nvika": { + "version": "4.0.0", + "commands": [ + "nvika" + ] + }, + "codefilesanity": { + "version": "0.0.36", + "commands": [ + "CodeFileSanity" + ] + } + } +} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 203f29a..9a32d0a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,36 +1,61 @@ -name: .NET Core - -on: - push: - branches: [ master ] - pull_request: - branches: [ master ] +on: [push, pull_request] +name: Continuous Integration +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true jobs: - unit-tests: + inspect-code: + name: Code Quality runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install .NET 8.0.x + uses: actions/setup-dotnet@v4 + with: + dotnet-version: "8.0.x" - services: - redis: - image: redis - ports: - - 6379:6379 - options: >- - --health-cmd "redis-cli ping" - --health-interval 10s - --health-timeout 5s - --health-retries 5 + - name: Restore Tools + run: dotnet tool restore + - name: Restore Packages + run: dotnet restore + + - name: CodeFileSanity + run: | + # TODO: Add ignore filters and GitHub Workflow Command Reporting in CFS. That way we don't have to do this workaround. + # FIXME: Suppress warnings from templates project + exit_code=0 + while read -r line; do + if [[ ! -z "$line" ]]; then + echo "::error::$line" + exit_code=1 + fi + done <<< $(dotnet codefilesanity) + exit $exit_code + + - name: InspectCode + run: dotnet jb inspectcode $(pwd)/osu.Server.QueueProcessor.sln --build --output="inspectcodereport.xml" --caches-home="inspectcode" --verbosity=WARN + + - name: NVika + run: dotnet nvika parsereport "${{github.workspace}}/inspectcodereport.xml" --treatwarningsaserrors + + test: + name: Test + runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v4 + - name: Install .NET 8.0.x uses: actions/setup-dotnet@v4 with: dotnet-version: "8.0.x" - - name: Install dependencies - run: dotnet restore - - name: Build - run: dotnet build --no-restore + + - name: Docker compose + run: docker compose up -d + - name: Test - run: dotnet test --no-restore --verbosity normal + run: dotnet test diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8a28028 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3.9' + +x-env: &x-env + DB_CONNECTION_STRING: Server=db;Database=osu;Uid=osuweb; + DB_HOST: db + DB_USERNAME: 'root' + APP_ENV: 'local' + GITHUB_TOKEN: "${GITHUB_TOKEN}" + BROADCAST_DRIVER: redis + CACHE_DRIVER: redis + NOTIFICATION_REDIS_HOST: redis + REDIS_HOST: redis + SESSION_DRIVER: redis + MYSQL_DATABASE: 'osu' + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: '%' + +services: + # just a placeholder service to ensure we wait for migrator to complete successfully. + ready_for_use: + image: hello-world:latest + depends_on: + migrator: + condition: service_completed_successfully + + migrator: + image: pppy/osu-web:latest-dev + command: ['artisan', 'db:setup'] + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + environment: + <<: *x-env + + db: + image: mysql/mysql-server:8.0 + environment: + <<: *x-env + volumes: + - database:/var/lib/mysql + ports: + - "${MYSQL_EXTERNAL_PORT:-3306}:3306" + command: --default-authentication-plugin=mysql_native_password + healthcheck: + # important to use 127.0.0.1 instead of localhost as mysql starts twice. + # the first time it listens on sockets but isn't actually ready + # see https://github.com/docker-library/mysql/issues/663 + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1"] + interval: 1s + timeout: 60s + start_period: 60s + + redis: + image: redis:latest + ports: + - "${REDIS_EXTERNAL_PORT:-6379}:6379" + healthcheck: + test: ["CMD", "redis-cli", "--raw", "incr", "ping"] + interval: 1s + timeout: 60s + start_period: 60s + +volumes: + database: diff --git a/global.json b/global.json new file mode 100644 index 0000000..789bff3 --- /dev/null +++ b/global.json @@ -0,0 +1,7 @@ +{ + "sdk": { + "version": "8.0.100", + "rollForward": "latestFeature", + "allowPrerelease": false + } +} \ No newline at end of file From 71c9f95d4ff4235b3e4ceaee3cc7c323df4fc121 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 16:47:40 +0900 Subject: [PATCH 5/8] Add test coverage --- .../BeatmapStatusWatcherTests.cs | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 osu.Server.QueueProcessor.Tests/BeatmapStatusWatcherTests.cs diff --git a/osu.Server.QueueProcessor.Tests/BeatmapStatusWatcherTests.cs b/osu.Server.QueueProcessor.Tests/BeatmapStatusWatcherTests.cs new file mode 100644 index 0000000..c82b2ea --- /dev/null +++ b/osu.Server.QueueProcessor.Tests/BeatmapStatusWatcherTests.cs @@ -0,0 +1,93 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using Xunit; + +namespace osu.Server.QueueProcessor.Tests +{ + public class BeatmapStatusWatcherTests + { + /// + /// Checking that processing an empty queue works as expected. + /// + [Fact] + public async Task TestBasic() + { + var cts = new CancellationTokenSource(10000); + + TaskCompletionSource tcs = new TaskCompletionSource(); + using var db = await DatabaseAccess.GetConnectionAsync(cts.Token); + + // just a safety measure for now to ensure we don't hit production. since i was running on production until now. + // will throw if not on test database. + if (db.QueryFirstOrDefault("SELECT `count` FROM `osu_counts` WHERE `name` = 'is_production'") != null) + throw new InvalidOperationException("You are trying to do something very silly."); + + await db.ExecuteAsync("TRUNCATE TABLE `bss_process_queue`"); + + using var poller = await BeatmapStatusWatcher.StartPollingAsync(updates => { tcs.SetResult(updates); }, pollMilliseconds: 100); + + await db.ExecuteAsync("INSERT INTO `bss_process_queue` (beatmapset_id) VALUES (1)"); + + var updates = await tcs.Task.WaitAsync(cts.Token); + + Assert.Equal(new[] { 1 }, updates.BeatmapSetIDs); + Assert.Equal(1, updates.LastProcessedQueueID); + + tcs = new TaskCompletionSource(); + + await db.ExecuteAsync("INSERT INTO `bss_process_queue` (beatmapset_id) VALUES (2), (3)"); + + updates = await tcs.Task.WaitAsync(cts.Token); + + Assert.Equal(new[] { 2, 3 }, updates.BeatmapSetIDs); + Assert.Equal(3, updates.LastProcessedQueueID); + } + + /// + /// Checking that processing an empty queue works as expected. + /// + [Fact] + public async Task TestLimit() + { + var cts = new CancellationTokenSource(10000); + + TaskCompletionSource tcs = new TaskCompletionSource(); + using var db = await DatabaseAccess.GetConnectionAsync(cts.Token); + + // just a safety measure for now to ensure we don't hit production. since i was running on production until now. + // will throw if not on test database. + if (db.QueryFirstOrDefault("SELECT `count` FROM `osu_counts` WHERE `name` = 'is_production'") != null) + throw new InvalidOperationException("You are trying to do something very silly."); + + await db.ExecuteAsync("TRUNCATE TABLE `bss_process_queue`"); + + using var poller = await BeatmapStatusWatcher.StartPollingAsync(updates => { tcs.SetResult(updates); }, limit: 1, pollMilliseconds: 100); + + await db.ExecuteAsync("INSERT INTO `bss_process_queue` (beatmapset_id) VALUES (1)"); + + var updates = await tcs.Task.WaitAsync(cts.Token); + tcs = new TaskCompletionSource(); + + Assert.Equal(new[] { 1 }, updates.BeatmapSetIDs); + Assert.Equal(1, updates.LastProcessedQueueID); + + await db.ExecuteAsync("INSERT INTO `bss_process_queue` (beatmapset_id) VALUES (2), (3)"); + + updates = await tcs.Task.WaitAsync(cts.Token); + tcs = new TaskCompletionSource(); + + Assert.Equal(new[] { 2 }, updates.BeatmapSetIDs); + Assert.Equal(2, updates.LastProcessedQueueID); + + updates = await tcs.Task.WaitAsync(cts.Token); + + Assert.Equal(new[] { 3 }, updates.BeatmapSetIDs); + Assert.Equal(3, updates.LastProcessedQueueID); + } + } +} From 7d3f97912f7cccb1458cf88a6978d58e120e4a75 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 21:57:16 +0900 Subject: [PATCH 6/8] Use loop instead of task refire --- .../BeatmapStatusWatcher.cs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs index b5643eb..118017f 100644 --- a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -98,33 +98,36 @@ private class PollingBeatmapStatusWatcher : IDisposable public PollingBeatmapStatusWatcher(int initialQueueId, Action callback, int pollMilliseconds, int limit = 50) { - this.lastQueueId = initialQueueId; + lastQueueId = initialQueueId; this.pollMilliseconds = pollMilliseconds; this.limit = limit; this.callback = callback; cts = new CancellationTokenSource(); - _ = poll(); + _ = Task.Factory.StartNew(poll, TaskCreationOptions.LongRunning); } private async Task poll() { - try + while (!cts.Token.IsCancellationRequested) { - var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSetsAsync(lastQueueId, limit); - - lastQueueId = result.LastProcessedQueueID; - if (result.BeatmapSetIDs.Length > 0) - callback(result); - } - catch (Exception e) - { - Console.WriteLine($"Poll failed with {e}."); - await Task.Delay(1000); + try + { + var result = await GetUpdatedBeatmapSetsAsync(lastQueueId, limit); + + lastQueueId = result.LastProcessedQueueID; + if (result.BeatmapSetIDs.Length > 0) + callback(result); + } + catch (Exception e) + { + Console.WriteLine($"Poll failed with {e}."); + await Task.Delay(1000, cts.Token); + } + + await Task.Delay(pollMilliseconds, cts.Token); } - - _ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token); } public void Dispose() From 7d64c8020b92fe8a8c620a62ee110ab6cfcd040f Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 13 Mar 2025 22:04:13 +0900 Subject: [PATCH 7/8] Remove cancellation token disposal --- osu.Server.QueueProcessor/BeatmapStatusWatcher.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs index 118017f..5ff4ce8 100644 --- a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -133,7 +133,6 @@ private async Task poll() public void Dispose() { cts.Cancel(); - cts.Dispose(); } } } From e9232becc79f739e01ce7600d530f26dbbe48060 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 17 Mar 2025 21:52:51 +0900 Subject: [PATCH 8/8] Add explicit `ORDER BY` for safety --- osu.Server.QueueProcessor/BeatmapStatusWatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs index 5ff4ce8..9ffdc50 100644 --- a/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs +++ b/osu.Server.QueueProcessor/BeatmapStatusWatcher.cs @@ -56,7 +56,7 @@ public static async Task GetUpdatedBeatmapSetsAsync(int? lastQue if (lastQueueId.HasValue) { - var items = (await connection.QueryAsync("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId LIMIT @limit", new + var items = (await connection.QueryAsync("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId ORDER BY queue_id LIMIT @limit", new { lastQueueId, limit