From e78d92033766af3ba6b18aa3a06c2e4843eeb554 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 11:58:17 +0000 Subject: [PATCH 01/34] Backoff RSS requests if a url repeatedly fails. --- src/feeds/FeedReader.ts | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 46ccadb2c..ffb551034 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -10,6 +10,10 @@ import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; +const FEED_BACKOFF_TIME_MS = 5 * 1000; +const FEED_BACKOFF_POW = 1.05; +const FEED_BACKOFF_TIME_MAX_MS = 60 * 60 * 1000; + const log = new Logger("FeedReader"); export class FeedError extends Error { constructor( @@ -89,6 +93,9 @@ export class FeedReader { private feedQueue: string[] = []; + private feedBackoff: Map = new Map(); + private feedLastBackoff: Map = new Map(); + // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -249,6 +256,8 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); + this.feedLastBackoff.delete(url); + this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) { @@ -256,12 +265,14 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } + const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( + Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); + this.feedBackoff.set(url, Date.now() + backoffDuration); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); - log.error("Unable to read feed:", feedError.message); + log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); } finally { - this.feedQueue.push(url); } return seenEntriesChanged; } @@ -308,5 +319,15 @@ export class FeedReader { } void this.pollFeeds(workerId); }, sleepFor); + + // Reinsert any feeds that we may have backed off. + for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) { + if (retryAfter < Date.now()) { + log.debug(`Adding back ${feedUrl} from backoff set`); + this.feedQueue.push(feedUrl); + // Store the last backoff time. + this.feedBackoff.delete(feedUrl) + } + } } } From f5568e2fb9adbe12f712352aba87ea2856cd6af9 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 11:58:58 +0000 Subject: [PATCH 02/34] Increase max backoff time to a day --- src/feeds/FeedReader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index ffb551034..8cce8566b 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -12,7 +12,7 @@ import UserAgent from "../UserAgent"; const FEED_BACKOFF_TIME_MS = 5 * 1000; const FEED_BACKOFF_POW = 1.05; -const FEED_BACKOFF_TIME_MAX_MS = 60 * 60 * 1000; +const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; const log = new Logger("FeedReader"); export class FeedError extends Error { From 2ee501291c55e3f6dd42241817c9d23b0f83aa53 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:08:15 +0000 Subject: [PATCH 03/34] Add backoff for failing feeds. --- changelog.d/890.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/890.misc diff --git a/changelog.d/890.misc b/changelog.d/890.misc new file mode 100644 index 000000000..23b8fb1c1 --- /dev/null +++ b/changelog.d/890.misc @@ -0,0 +1 @@ +Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown. \ No newline at end of file From b7cd4fec5d3f1655ec3628ca62fc7e881baaa1cb Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:09:04 +0000 Subject: [PATCH 04/34] Remove unused finally --- src/feeds/FeedReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 8cce8566b..2c1cd1c7f 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -272,7 +272,6 @@ export class FeedReader { const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); - } finally { } return seenEntriesChanged; } From 18429cef94e660306451d483b3c9e2cff9f48320 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:10:04 +0000 Subject: [PATCH 05/34] Add this.feedLastBackoff --- src/feeds/FeedReader.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 2c1cd1c7f..d4909ba72 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -268,6 +268,7 @@ export class FeedReader { const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); this.feedBackoff.set(url, Date.now() + backoffDuration); + this.feedLastBackoff.set(url, backoffDuration); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); From 80d4b9badcbaf871cdb83c1a891245ea24e3f30f Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:58:37 +0000 Subject: [PATCH 06/34] Rewrite in rust. --- Cargo.lock | 1 + Cargo.toml | 1 + src/feeds/FeedReader.ts | 48 ++++--------------- src/lib.rs | 1 + src/util/mod.rs | 100 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 38 deletions(-) create mode 100644 src/util/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7d8fdad65..e844d1cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,6 +681,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "rand", "reqwest", "rgb", "rss", diff --git a/Cargo.toml b/Cargo.toml index 4d75e4089..ab6e33d66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ rss = "2.0" atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" +rand = "0.8.5" [build-dependencies] napi-build = "2" diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index d4909ba72..28fcf7a9e 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -9,10 +9,7 @@ import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; - -const FEED_BACKOFF_TIME_MS = 5 * 1000; -const FEED_BACKOFF_POW = 1.05; -const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); export class FeedError extends Error { @@ -77,24 +74,13 @@ function normalizeUrl(input: string): string { return url.toString(); } -function shuffle(array: T[]): T[] { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; -} - export class FeedReader { private connections: FeedConnection[]; // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) private observedFeedUrls: Set = new Set(); - private feedQueue: string[] = []; - - private feedBackoff: Map = new Map(); - private feedLastBackoff: Map = new Map(); + private feedQueue = new QueueWithBackoff(); // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -111,7 +97,7 @@ export class FeedReader { get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -156,16 +142,16 @@ export class FeedReader { private calculateFeedUrls(): void { // just in case we got an invalid URL somehow - const normalizedUrls = []; + const observedFeedUrls = new Set(); for (const conn of this.connections) { try { - normalizedUrls.push(normalizeUrl(conn.feedUrl)); + observedFeedUrls.add(normalizeUrl(conn.feedUrl)); } catch (err: unknown) { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.observedFeedUrls = new Set(normalizedUrls); - this.feedQueue = shuffle([...this.observedFeedUrls.values()]); + observedFeedUrls.forEach(url => this.feedQueue.push(url)); + this.feedQueue.shuffle(); Metrics.feedsCount.set(this.observedFeedUrls.size); Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); @@ -256,7 +242,6 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); - this.feedLastBackoff.delete(url); this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. @@ -265,10 +250,7 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } - const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( - Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); - this.feedBackoff.set(url, Date.now() + backoffDuration); - this.feedLastBackoff.set(url, backoffDuration); + const backoffDuration = this.feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); @@ -288,11 +270,11 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); const fetchingStarted = Date.now(); - const [ url ] = this.feedQueue.splice(0, 1); + const url = this.feedQueue.next(); let sleepFor = this.sleepingInterval; if (url) { @@ -319,15 +301,5 @@ export class FeedReader { } void this.pollFeeds(workerId); }, sleepFor); - - // Reinsert any feeds that we may have backed off. - for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) { - if (retryAfter < Date.now()) { - log.debug(`Adding back ${feedUrl} from backoff set`); - this.feedQueue.push(feedUrl); - // Store the last backoff time. - this.feedBackoff.delete(feedUrl) - } - } } } diff --git a/src/lib.rs b/src/lib.rs index 1d03f680a..3a15bd657 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod feeds; pub mod format_util; pub mod github; pub mod jira; +pub mod util; #[macro_use] extern crate napi_derive; diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..42e453538 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,100 @@ +use std::collections::LinkedList; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; +use rand::prelude::*; + +const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; +const BACKOFF_POW: f32 = 1.05f32; +const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; + +#[napi] + +pub struct QueueWithBackoff { + queue: LinkedList, + backoff: HashMap, + last_backoff: HashMap +} + +#[napi] + +impl QueueWithBackoff { + #[napi(constructor)] + pub fn new() -> Self { + QueueWithBackoff { + queue: LinkedList::new(), + backoff: HashMap::new(), + last_backoff: HashMap::new(), + } + } + + + #[napi] + pub fn next(&mut self) -> Option { + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let mut items_to_rm: Vec = vec![]; + for item in self.backoff.iter() { + if *item.1 < since_the_epoch.as_millis() { + self.queue.push_back(item.0.clone()); + items_to_rm.push(item.0.clone()); + } + } + + for item in items_to_rm { + self.backoff.remove(&item); + } + + return self.queue.pop_front() + } + + + #[napi] + pub fn push(&mut self, item: String) { + self.last_backoff.remove(&item); + self.queue.push_back(item); + } + + + #[napi] + pub fn backoff(&mut self, item: String) -> u32 { + let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; + + let mut rng = rand::thread_rng(); + let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 + + let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_item = item.clone(); + self.last_backoff.insert(item, backoff_duration); + + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let time = since_the_epoch.as_millis() + backoff_duration as u128; + + self.backoff.insert(backoff_item, time); + return backoff_duration; + } + + + #[napi] + pub fn length(&self) -> u32 { + self.queue.len() as u32 + } + + #[napi] + pub fn shuffle(&mut self) { + let mut rng = rand::thread_rng(); + let old_queue = self.queue.clone(); + self.queue.clear(); + for item in old_queue { + if rng.gen_bool(0.5) { + self.queue.push_front(item); + } else { + self.queue.push_back(item); + } + } + } +} \ No newline at end of file From eff30b8f36ca29593103dbe2e314173f523ce8be Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 13:02:04 +0000 Subject: [PATCH 07/34] linting --- src/util/mod.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 42e453538..b2d7963a1 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,7 @@ -use std::collections::LinkedList; +use rand::prelude::*; use std::collections::HashMap; +use std::collections::LinkedList; use std::time::{SystemTime, UNIX_EPOCH}; -use rand::prelude::*; const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; const BACKOFF_POW: f32 = 1.05f32; @@ -12,9 +12,14 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, backoff: HashMap, - last_backoff: HashMap + last_backoff: HashMap, } +impl Default for QueueWithBackoff { + fn default() -> Self { + Self::new() + } +} #[napi] impl QueueWithBackoff { @@ -27,12 +32,10 @@ impl QueueWithBackoff { } } - #[napi] - pub fn next(&mut self) -> Option { + pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH).unwrap(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); let mut items_to_rm: Vec = vec![]; for item in self.backoff.iter() { @@ -46,17 +49,15 @@ impl QueueWithBackoff { self.backoff.remove(&item); } - return self.queue.pop_front() + self.queue.pop_front() } - #[napi] pub fn push(&mut self, item: String) { self.last_backoff.remove(&item); self.queue.push_back(item); } - #[napi] pub fn backoff(&mut self, item: String) -> u32 { let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; @@ -64,21 +65,20 @@ impl QueueWithBackoff { let mut rng = rand::thread_rng(); let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 - let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) + .min(BACKOFF_TIME_MAX_MS) as u32; let backoff_item = item.clone(); self.last_backoff.insert(item, backoff_duration); let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH).unwrap(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); let time = since_the_epoch.as_millis() + backoff_duration as u128; self.backoff.insert(backoff_item, time); - return backoff_duration; + backoff_duration } - #[napi] pub fn length(&self) -> u32 { self.queue.len() as u32 @@ -97,4 +97,4 @@ impl QueueWithBackoff { } } } -} \ No newline at end of file +} From 517d04482cea9454e3ce7edcf0f80aa31d3d2f89 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 13:03:24 +0000 Subject: [PATCH 08/34] pop --- src/feeds/FeedReader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 28fcf7a9e..dd8ec8a38 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -274,7 +274,7 @@ export class FeedReader { const fetchingStarted = Date.now(); - const url = this.feedQueue.next(); + const url = this.feedQueue.pop(); let sleepFor = this.sleepingInterval; if (url) { From ea7af887841dff02bebc1c62210aea7675cd1d38 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:13:11 +0000 Subject: [PATCH 09/34] Optimise backoff function further --- src/util/mod.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index b2d7963a1..78cfa6483 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ use rand::prelude::*; +use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::LinkedList; use std::time::{SystemTime, UNIX_EPOCH}; @@ -11,7 +12,7 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, - backoff: HashMap, + backoff: BTreeMap, last_backoff: HashMap, } @@ -27,7 +28,7 @@ impl QueueWithBackoff { pub fn new() -> Self { QueueWithBackoff { queue: LinkedList::new(), - backoff: HashMap::new(), + backoff: BTreeMap::new(), last_backoff: HashMap::new(), } } @@ -35,19 +36,17 @@ impl QueueWithBackoff { #[napi] pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - - let mut items_to_rm: Vec = vec![]; - for item in self.backoff.iter() { - if *item.1 < since_the_epoch.as_millis() { - self.queue.push_back(item.0.clone()); - items_to_rm.push(item.0.clone()); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); + + // We only need to check this once, as we won't be adding to the backoff queue + // as often as we pull from it. + if let Some(item) = self.backoff.first_entry() { + if *item.key() < since_the_epoch { + let v = item.remove(); + self.queue.push_back(v); } } - for item in items_to_rm { - self.backoff.remove(&item); - } self.queue.pop_front() } @@ -73,9 +72,15 @@ impl QueueWithBackoff { let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - let time = since_the_epoch.as_millis() + backoff_duration as u128; + let mut time = since_the_epoch.as_millis() + backoff_duration as u128; - self.backoff.insert(backoff_item, time); + // If the backoff queue contains this time (unlikely, but we don't) + // want to overwrite, then add an extra ms. + while self.backoff.contains_key(&time) { + time = time + 1; + } + + self.backoff.insert(time, backoff_item); backoff_duration } From 965c30bc01489c271a8c9883bc587407e77ebebe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:14:48 +0000 Subject: [PATCH 10/34] Drop only! --- spec/github.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/github.spec.ts b/spec/github.spec.ts index 57a431c5b..132674298 100644 --- a/spec/github.spec.ts +++ b/spec/github.spec.ts @@ -58,7 +58,7 @@ describe('GitHub', () => { return testEnv?.tearDown(); }); - it.only('should be able to handle a GitHub event', async () => { + it('should be able to handle a GitHub event', async () => { const user = testEnv.getUser('user'); const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user); const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] }); From e3d085e5d6ad05db94035ce48798b0c19a5ca9fe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:14:53 +0000 Subject: [PATCH 11/34] fix test --- tests/connections/GithubRepoTest.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/connections/GithubRepoTest.ts b/tests/connections/GithubRepoTest.ts index 85af40c62..e686f2a5c 100644 --- a/tests/connections/GithubRepoTest.ts +++ b/tests/connections/GithubRepoTest.ts @@ -23,6 +23,7 @@ const GITHUB_ISSUE = { }, html_url: `https://github.com/${GITHUB_ORG_REPO.org}/${GITHUB_ORG_REPO.repo}/issues/1234`, title: "My issue", + assignees: [] }; const GITHUB_ISSUE_CREATED_PAYLOAD = { @@ -137,7 +138,7 @@ describe("GitHubRepoConnection", () => { intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.html_url, 0); intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.title, 0); }); - it.only("will handle assignees on issue creation", async () => { + it("will handle assignees on issue creation", async () => { const { connection, intent } = createConnection(); await connection.onIssueCreated({ ...GITHUB_ISSUE_CREATED_PAYLOAD, From 4274eb41a6c9df2127540ad8fe9785fe2d80a899 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:15:40 +0000 Subject: [PATCH 12/34] lint --- src/util/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 78cfa6483..2246a0e3d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -39,7 +39,7 @@ impl QueueWithBackoff { let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); // We only need to check this once, as we won't be adding to the backoff queue - // as often as we pull from it. + // as often as we pull from it. if let Some(item) = self.backoff.first_entry() { if *item.key() < since_the_epoch { let v = item.remove(); @@ -47,7 +47,6 @@ impl QueueWithBackoff { } } - self.queue.pop_front() } @@ -79,7 +78,7 @@ impl QueueWithBackoff { while self.backoff.contains_key(&time) { time = time + 1; } - + self.backoff.insert(time, backoff_item); backoff_duration } From 64ab8081798f48764f8a0caa4ca4a9ef4412ca9a Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:16:01 +0000 Subject: [PATCH 13/34] lint further --- src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 2246a0e3d..5b535ef72 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -76,7 +76,7 @@ impl QueueWithBackoff { // If the backoff queue contains this time (unlikely, but we don't) // want to overwrite, then add an extra ms. while self.backoff.contains_key(&time) { - time = time + 1; + time += 1; } self.backoff.insert(time, backoff_item); From 92322131595a627bf35600b5cc1ec029d54d1c33 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:20:21 +0000 Subject: [PATCH 14/34] Better comments --- src/util/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 5b535ef72..00944d82f 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -12,8 +12,14 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, + /** + * A map of absolute backoff timestamps mapped to the value. + */ backoff: BTreeMap, - last_backoff: HashMap, + /** + * The last duration applied when a value was backed off. + */ + last_backoff_duration: HashMap, } impl Default for QueueWithBackoff { @@ -29,7 +35,7 @@ impl QueueWithBackoff { QueueWithBackoff { queue: LinkedList::new(), backoff: BTreeMap::new(), - last_backoff: HashMap::new(), + last_backoff_duration: HashMap::new(), } } @@ -52,13 +58,13 @@ impl QueueWithBackoff { #[napi] pub fn push(&mut self, item: String) { - self.last_backoff.remove(&item); + self.last_backoff_duration.remove(&item); self.queue.push_back(item); } #[napi] pub fn backoff(&mut self, item: String) -> u32 { - let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; let mut rng = rand::thread_rng(); let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 @@ -66,7 +72,7 @@ impl QueueWithBackoff { let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) .min(BACKOFF_TIME_MAX_MS) as u32; let backoff_item = item.clone(); - self.last_backoff.insert(item, backoff_duration); + self.last_backoff_duration.insert(item, backoff_duration); let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); From d0846b132541c417d6d5174aad506a0d76e5228d Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:21:17 +0000 Subject: [PATCH 15/34] Fix urls calculation --- src/feeds/FeedReader.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index dd8ec8a38..b2460dd41 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -77,8 +77,6 @@ function normalizeUrl(input: string): string { export class FeedReader { private connections: FeedConnection[]; - // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) - private observedFeedUrls: Set = new Set(); private feedQueue = new QueueWithBackoff(); @@ -113,7 +111,7 @@ export class FeedReader { this.timeouts.fill(undefined); Object.seal(this.timeouts); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); - this.calculateFeedUrls(); + const initialFeeds = this.calculateFeedUrls(); connectionManager.on('new-connection', c => { if (c instanceof FeedConnection) { log.debug('New connection tracked:', c.connectionId); @@ -128,7 +126,7 @@ export class FeedReader { } }); - log.debug('Loaded feed URLs:', this.observedFeedUrls); + log.debug('Loaded feed URLs:', [...initialFeeds].join(', ')); for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); @@ -140,7 +138,7 @@ export class FeedReader { this.timeouts.forEach(t => clearTimeout(t)); } - private calculateFeedUrls(): void { + private calculateFeedUrls(): Set { // just in case we got an invalid URL somehow const observedFeedUrls = new Set(); for (const conn of this.connections) { @@ -150,11 +148,14 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } + observedFeedUrls.add("http://example.com/not-an-rss-feed"); observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); - Metrics.feedsCount.set(this.observedFeedUrls.size); - Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); + + Metrics.feedsCount.set(observedFeedUrls.size); + Metrics.feedsCountDeprecated.set(observedFeedUrls.size); + return observedFeedUrls; } /** From 5659d889da891b4e11980a2d3cfa02b1f2d9a2f8 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:13 +0000 Subject: [PATCH 16/34] Remove testing URL --- src/feeds/FeedReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index b2460dd41..373f77724 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -148,7 +148,6 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - observedFeedUrls.add("http://example.com/not-an-rss-feed"); observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); From e6ddcb624fb8efefe9654b0805b4d795ede1782e Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:21 +0000 Subject: [PATCH 17/34] Add some variance to speed up while loop --- src/util/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 00944d82f..e6520bb8d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -80,9 +80,9 @@ impl QueueWithBackoff { let mut time = since_the_epoch.as_millis() + backoff_duration as u128; // If the backoff queue contains this time (unlikely, but we don't) - // want to overwrite, then add an extra ms. + // want to overwrite, then add some variance. while self.backoff.contains_key(&time) { - time += 1; + time += (y * BACKOFF_TIME_MS) as u128; } self.backoff.insert(time, backoff_item); From 4838ba1fab8fefcd90e64ca8e0a94be3edfcf807 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:34 +0000 Subject: [PATCH 18/34] correct comment --- src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index e6520bb8d..9a6b6a4ca 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -67,7 +67,7 @@ impl QueueWithBackoff { let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; let mut rng = rand::thread_rng(); - let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 + let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0.5 and 1.1 let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) .min(BACKOFF_TIME_MAX_MS) as u32; From 1614a2ab96707053a239dbfa55875e66e7142750 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:44:40 +0000 Subject: [PATCH 19/34] Follow the advice and use a VecDeque as it's slightly faster. --- src/util/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 9a6b6a4ca..3fd9dff24 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,7 @@ use rand::prelude::*; use std::collections::BTreeMap; use std::collections::HashMap; -use std::collections::LinkedList; +use std::collections::VecDeque; use std::time::{SystemTime, UNIX_EPOCH}; const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; @@ -11,7 +11,7 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; #[napi] pub struct QueueWithBackoff { - queue: LinkedList, + queue: VecDeque, /** * A map of absolute backoff timestamps mapped to the value. */ @@ -33,7 +33,7 @@ impl QueueWithBackoff { #[napi(constructor)] pub fn new() -> Self { QueueWithBackoff { - queue: LinkedList::new(), + queue: VecDeque::new(), backoff: BTreeMap::new(), last_backoff_duration: HashMap::new(), } From 958bdcb26d6bbffb704aa7ad45bdaf5f6f2cc1f0 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:09:55 +0000 Subject: [PATCH 20/34] Vastly better shuffle method --- src/util/mod.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 3fd9dff24..5e0d9621f 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -97,14 +97,6 @@ impl QueueWithBackoff { #[napi] pub fn shuffle(&mut self) { let mut rng = rand::thread_rng(); - let old_queue = self.queue.clone(); - self.queue.clear(); - for item in old_queue { - if rng.gen_bool(0.5) { - self.queue.push_front(item); - } else { - self.queue.push_back(item); - } - } + self.queue.make_contiguous().shuffle(&mut rng); } } From bec670f71213afcebcdc5a638981d3d2ea3caf72 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:53:21 +0000 Subject: [PATCH 21/34] Speed up checking for previous guids. --- src/Stores/MemoryStorageProvider.ts | 4 ++-- src/Stores/RedisStorageProvider.ts | 20 ++++++++++++++------ src/Stores/StorageProvider.ts | 6 +++--- src/feeds/FeedReader.ts | 17 +++++++---------- src/feeds/parser.rs | 2 +- 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index 257c1da3c..d44add7f0 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -35,8 +35,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider return this.feedGuids.has(url); } - async hasSeenFeedGuid(url: string, guid: string): Promise { - return this.feedGuids.get(url)?.includes(guid) ?? false; + async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + return this.feedGuids.get(url)?.filter((existingGuid) => guids.includes(existingGuid)) ?? []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { diff --git a/src/Stores/RedisStorageProvider.ts b/src/Stores/RedisStorageProvider.ts index 4f2343ce5..e1dc77433 100644 --- a/src/Stores/RedisStorageProvider.ts +++ b/src/Stores/RedisStorageProvider.ts @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens."; const FEED_GUIDS = "feeds.guids."; - - const log = new Logger("RedisASProvider"); export class RedisStorageContextualProvider implements IStorageProvider { + constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { } public setSyncToken(token: string|null){ @@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme await this.redis.set(key, JSON.stringify(value)); } - public async storeFeedGuids(url: string, ...guid: string[]): Promise { + public async storeFeedGuids(url: string, ...guids: string[]): Promise { const feedKey = `${FEED_GUIDS}${url}`; - await this.redis.lpush(feedKey, ...guid); + await this.redis.lpush(feedKey, ...guids); await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS); } @@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1; } - public async hasSeenFeedGuid(url: string, guid: string): Promise { - return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null; + public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + let multi = this.redis.multi(); + for (const guid of guids) { + multi = multi.lpos(`${FEED_GUIDS}${url}`, guid); + } + const res = await multi.exec(); + if (res === null) { + // Just assume we've seen none. + return []; + } + return guids.filter((_guid, index) => res[index][1] !== null); } } diff --git a/src/Stores/StorageProvider.ts b/src/Stores/StorageProvider.ts index 73790ff95..50175d75e 100644 --- a/src/Stores/StorageProvider.ts +++ b/src/Stores/StorageProvider.ts @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto setStoredTempFile(key: string, value: string): Promise; getGitlabDiscussionThreads(connectionId: string): Promise; setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise; - storeFeedGuids(url: string, ...guid: string[]): Promise; - hasSeenFeed(url: string, ...guid: string[]): Promise; - hasSeenFeedGuid(url: string, guid: string): Promise; + storeFeedGuids(url: string, ...guids: string[]): Promise; + hasSeenFeed(url: string): Promise; + hasSeenFeedGuids(url: string, ...guids: string[]): Promise; } \ No newline at end of file diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 373f77724..04dcda996 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -196,22 +196,20 @@ export class FeedReader { if (feed) { // If undefined, we got a not-modified. log.debug(`Found ${feed.items.length} entries in ${url}`); - + const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!)) for (const item of feed.items) { // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) { log.error(`Could not determine guid for entry in ${url}, skipping`); continue; } - const hashId = `md5:${item.hashId}`; - newGuids.push(hashId); - - if (initialSync) { - log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`); + if (seenItems.includes(item.hashId)) { continue; } - if (await this.storage.hasSeenFeedGuid(url, hashId)) { - log.debug('Skipping already seen entry', item.id ?? hashId); + newGuids.push(item.hashId); + + if (initialSync) { + log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`); continue; } const entry = { @@ -236,7 +234,6 @@ export class FeedReader { if (seenEntriesChanged && newGuids.length) { await this.storage.storeFeedGuids(url, ...newGuids); } - } this.queue.push({ eventName: 'feed.success', sender: 'FeedReader', data: { url } }); // Clear any feed failures @@ -291,7 +288,7 @@ export class FeedReader { log.warn(`It took us longer to update the feeds than the configured pool interval`); } } else { - // It may be possible that we have more workers than feeds. This will cause the worker to just sleep. + // It is possible that we have more workers than feeds. This will cause the worker to just sleep. log.debug(`No feeds available to poll for worker ${workerId}`); } diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0dcbc7d16..0bb332c57 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -70,7 +70,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| hash_id(f).ok()), + .and_then(|f| Some(format!("md5:{:?}", hash_id(f)))), }) .collect(), } From be4a46894afa037154edd5bc19be985b8639f954 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:58:58 +0000 Subject: [PATCH 22/34] fix hasher function --- src/feeds/parser.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0bb332c57..3624c3c1b 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| Some(format!("md5:{:?}", hash_id(f)))), + .and_then(|f| hash_id(f).ok()) + .and_then(|f| Some(format!("md5:{}", f))) }) .collect(), } From bd0822ee9585e96a5b288c5339fe4e37b36f9660 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 12:59:30 +0000 Subject: [PATCH 23/34] lint --- src/feeds/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 3624c3c1b..f6e845035 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -71,7 +71,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .or(item.link.clone()) .or(item.title.clone()) .and_then(|f| hash_id(f).ok()) - .and_then(|f| Some(format!("md5:{}", f))) + .and_then(|f| Some(format!("md5:{}", f))), }) .collect(), } From db23748dba1d317b6f486b220d40f581d9e44fbe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:11:32 +0000 Subject: [PATCH 24/34] Content doesn't need to be calculated twice. --- src/Connections/FeedConnection.ts | 17 +++++++++-------- src/feeds/FeedReader.ts | 2 -- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/Connections/FeedConnection.ts b/src/Connections/FeedConnection.ts index d1aa9b248..26f427f38 100644 --- a/src/Connections/FeedConnection.ts +++ b/src/Connections/FeedConnection.ts @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection { // We want to retry these sends, because sometimes the network / HS // craps out. + const content = { + msgtype: 'm.notice', + format: "org.matrix.custom.html", + formatted_body: md.renderInline(message), + body: message, + external_url: entry.link ?? undefined, + "uk.half-shot.matrix-hookshot.feeds.item": entry, + }; await retry( - () => this.intent.sendEvent(this.roomId, { - msgtype: 'm.notice', - format: "org.matrix.custom.html", - formatted_body: md.renderInline(message), - body: message, - external_url: entry.link ?? undefined, - "uk.half-shot.matrix-hookshot.feeds.item": entry, - }), + () => this.intent.sendEvent(this.roomId, content), SEND_EVENT_MAX_ATTEMPTS, SEND_EVENT_INTERVAL_MS, // Filter for showstopper errors like 4XX errors, but otherwise diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 04dcda996..81ed74755 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -150,8 +150,6 @@ export class FeedReader { } observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); - - Metrics.feedsCount.set(observedFeedUrls.size); Metrics.feedsCountDeprecated.set(observedFeedUrls.size); return observedFeedUrls; From 605139f6fdb2610db7d7f1b5cd3d1d115d684bb5 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:12:54 +0000 Subject: [PATCH 25/34] Slightly more efficient iteration --- src/Stores/MemoryStorageProvider.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index d44add7f0..52b5bf545 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -36,7 +36,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider } async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { - return this.feedGuids.get(url)?.filter((existingGuid) => guids.includes(existingGuid)) ?? []; + const existing = this.feedGuids.get(url); + return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { From ffd3881fbf6002146621f476a350d402c6275059 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:50:49 +0000 Subject: [PATCH 26/34] Improve performance of backoff insertion --- src/util/mod.rs | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 5e0d9621f..4b3adeef0 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -4,9 +4,9 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::time::{SystemTime, UNIX_EPOCH}; -const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; -const BACKOFF_POW: f32 = 1.05f32; -const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; +const DEFAULT_BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const DEFAULT_BACKOFF_POW: f64 = 1.05f64; +const DEFAULT_BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; #[napi] @@ -15,34 +15,41 @@ pub struct QueueWithBackoff { /** * A map of absolute backoff timestamps mapped to the value. */ - backoff: BTreeMap, + backoff: BTreeMap, /** * The last duration applied when a value was backed off. */ last_backoff_duration: HashMap, + + backoff_time: f64, + backoff_exponent: f64, + backoff_max: f64, } impl Default for QueueWithBackoff { fn default() -> Self { - Self::new() + Self::new(DEFAULT_BACKOFF_TIME_MS, DEFAULT_BACKOFF_POW, DEFAULT_BACKOFF_TIME_MAX_MS) } } #[napi] impl QueueWithBackoff { #[napi(constructor)] - pub fn new() -> Self { + pub fn new(backoff_time: f64, backoff_exponent: f64, backoff_max: f64) -> Self { QueueWithBackoff { queue: VecDeque::new(), backoff: BTreeMap::new(), last_backoff_duration: HashMap::new(), + backoff_time, + backoff_exponent, + backoff_max, } } #[napi] pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; // We only need to check this once, as we won't be adding to the backoff queue // as often as we pull from it. @@ -64,25 +71,27 @@ impl QueueWithBackoff { #[napi] pub fn backoff(&mut self, item: String) -> u32 { - let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f64; let mut rng = rand::thread_rng(); - let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0.5 and 1.1 + let y: f64 = rng.gen::() + 0.5f64; // generates a float between 0.5 and 1.1 - let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) - .min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_duration = ((y * self.backoff_time) + last_backoff.powf(self.backoff_exponent)) + .min(self.backoff_max) as u32; let backoff_item = item.clone(); self.last_backoff_duration.insert(item, backoff_duration); let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - let mut time = since_the_epoch.as_millis() + backoff_duration as u128; + let mut time = since_the_epoch.as_millis() as u64 + backoff_duration as u64; - // If the backoff queue contains this time (unlikely, but we don't) - // want to overwrite, then add some variance. + // If the backoff queue contains this time (likely) + // then we want to increase the backoff time slightly + // to allow for it. + let incr: f64 = (rng.gen::() * 2f64) + 2f64; while self.backoff.contains_key(&time) { - time += (y * BACKOFF_TIME_MS) as u128; + time += (incr * self.backoff_time) as u64; } self.backoff.insert(time, backoff_item); From 6649143ee5c70f429c3fdaf2764c7654bfa0cfae Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 10:13:44 +0000 Subject: [PATCH 27/34] Configure feed reader --- src/feeds/FeedReader.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 81ed74755..405b2eb99 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -12,6 +12,11 @@ import UserAgent from "../UserAgent"; import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); + +const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +const BACKOFF_POW = 1.05; +const BACKOFF_TIME_MS = 5 * 1000; + export class FeedError extends Error { constructor( public url: string, @@ -78,7 +83,7 @@ export class FeedReader { private connections: FeedConnection[]; - private feedQueue = new QueueWithBackoff(); + private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); // A set of last modified times for each url. private cacheTimes: Map = new Map(); From 149ca517c76f09bffecd2185723ed8f8abffd769 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 11:41:52 +0000 Subject: [PATCH 28/34] lint --- src/util/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 4b3adeef0..215600ad7 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -28,7 +28,11 @@ pub struct QueueWithBackoff { impl Default for QueueWithBackoff { fn default() -> Self { - Self::new(DEFAULT_BACKOFF_TIME_MS, DEFAULT_BACKOFF_POW, DEFAULT_BACKOFF_TIME_MAX_MS) + Self::new( + DEFAULT_BACKOFF_TIME_MS, + DEFAULT_BACKOFF_POW, + DEFAULT_BACKOFF_TIME_MAX_MS, + ) } } #[napi] From 5a10c14b7ad400c2b391868dcd88c3afc381a10a Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 12:59:51 +0000 Subject: [PATCH 29/34] Ensure appending and removing from the queue works as expected. --- src/feeds/FeedReader.ts | 52 ++++++++++++++++++++++++++++++----------- src/util/mod.rs | 36 +++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 405b2eb99..d46552b6d 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -96,6 +96,7 @@ export class FeedReader { private shouldRun = true; private readonly timeouts: (NodeJS.Timeout|undefined)[]; + private readonly feedsToRetain = new Set(); get sleepingInterval() { return ( @@ -116,22 +117,38 @@ export class FeedReader { this.timeouts.fill(undefined); Object.seal(this.timeouts); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); - const initialFeeds = this.calculateFeedUrls(); - connectionManager.on('new-connection', c => { - if (c instanceof FeedConnection) { - log.debug('New connection tracked:', c.connectionId); - this.connections.push(c); - this.calculateFeedUrls(); + const feeds = this.calculateInitialFeedUrls(); + connectionManager.on('new-connection', newConnection => { + if (!(newConnection instanceof FeedConnection)) { + return; + } + const normalisedUrl = normalizeUrl(newConnection.feedUrl); + if (!feeds.has(normalisedUrl)) { + this.feedQueue.push(normalisedUrl); + feeds.add(normalisedUrl); } }); connectionManager.on('connection-removed', removed => { - if (removed instanceof FeedConnection) { - this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId); - this.calculateFeedUrls(); + if (!(removed instanceof FeedConnection)) { + return; } + let shouldKeepUrl = false; + const normalisedUrl = normalizeUrl(removed.feedUrl); + this.connections = this.connections.filter(c => { + // Cheeky reuse of iteration to determine if we should remove this URL. + if (c.connectionId !== removed.connectionId) { + shouldKeepUrl = shouldKeepUrl || normalizeUrl(c.feedUrl) === normalisedUrl; + return true; + } + return false; + }); + this.feedQueue.remove(normalisedUrl); + feeds.delete(normalisedUrl); + this.feedsFailingHttp.delete(normalisedUrl); + this.feedsFailingParsing.delete(normalisedUrl); }); - log.debug('Loaded feed URLs:', [...initialFeeds].join(', ')); + log.debug('Loaded feed URLs:', [...feeds].join(', ')); for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); @@ -143,7 +160,11 @@ export class FeedReader { this.timeouts.forEach(t => clearTimeout(t)); } - private calculateFeedUrls(): Set { + /** + * Calculate the initial feed set for the reader. Should never + * be called twice. + */ + private calculateInitialFeedUrls(): Set { // just in case we got an invalid URL somehow const observedFeedUrls = new Set(); for (const conn of this.connections) { @@ -153,8 +174,7 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - observedFeedUrls.forEach(url => this.feedQueue.push(url)); - this.feedQueue.shuffle(); + this.feedQueue.populate([...observedFeedUrls]); Metrics.feedsCount.set(observedFeedUrls.size); Metrics.feedsCountDeprecated.set(observedFeedUrls.size); return observedFeedUrls; @@ -169,6 +189,7 @@ export class FeedReader { * @returns A boolean that returns if we saw any changes on the feed since the last poll time. */ public async pollFeed(url: string): Promise { + this.feedsToRetain.add(url); let seenEntriesChanged = false; const fetchKey = randomUUID(); const { etag, lastModified } = this.cacheTimes.get(url) || {}; @@ -242,7 +263,10 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); - this.feedQueue.push(url); + if (this.feedsToRetain.has(url)) { + // If we've removed this feed since processing it, do not requeue. + this.feedQueue.push(url); + } } catch (err: unknown) { // TODO: Proper Rust Type error. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) { diff --git a/src/util/mod.rs b/src/util/mod.rs index 215600ad7..231cc883a 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -20,7 +20,6 @@ pub struct QueueWithBackoff { * The last duration applied when a value was backed off. */ last_backoff_duration: HashMap, - backoff_time: f64, backoff_exponent: f64, backoff_max: f64, @@ -67,6 +66,32 @@ impl QueueWithBackoff { self.queue.pop_front() } + #[napi] + pub fn remove(&mut self, item: String) -> bool { + // Remove from the queue + if let Ok(index) = self.queue.binary_search(&item) { + self.queue.remove(index); + return true; + } else { + // We didn't find the key queued, so let's ensure we delete it + // from any backoff. + // This is *expensive* but hopefully called rarely. + let mut found_key: u64 = 0; + for (key, value) in self.backoff.iter() { + if *value == item { + found_key = *key; + } + } + if found_key != 0 { + self.backoff.remove(&found_key); + return true; + } + } + // Always remove the duration on removal. + self.last_backoff_duration.remove(&item); + return false; + } + #[napi] pub fn push(&mut self, item: String) { self.last_backoff_duration.remove(&item); @@ -112,4 +137,13 @@ impl QueueWithBackoff { let mut rng = rand::thread_rng(); self.queue.make_contiguous().shuffle(&mut rng); } + + #[napi] + pub fn populate(&mut self, values: Vec) { + // This assumes an empty queue. + for v in values { + self.queue.push_back(v); + } + self.shuffle(); + } } From 251e80aeb888dab2ea95d81f77eefab9b1f46450 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 13:02:22 +0000 Subject: [PATCH 30/34] Ensure we do keep urls that have been removed. --- src/feeds/FeedReader.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index d46552b6d..6c78f6cbd 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -124,6 +124,7 @@ export class FeedReader { } const normalisedUrl = normalizeUrl(newConnection.feedUrl); if (!feeds.has(normalisedUrl)) { + log.info(`Connection added, adding "${normalisedUrl}" to queue`); this.feedQueue.push(normalisedUrl); feeds.add(normalisedUrl); } @@ -142,6 +143,12 @@ export class FeedReader { } return false; }); + if (shouldKeepUrl) { + log.info(`Connection removed, but not removing "${normalisedUrl}" as it is still in use`); + return; + } + log.info(`Connection removed, removing "${normalisedUrl}" from queue`); + this.feedsToRetain.delete(normalisedUrl); this.feedQueue.remove(normalisedUrl); feeds.delete(normalisedUrl); this.feedsFailingHttp.delete(normalisedUrl); From c67148cb391cbaec0c4aba756cd999aba0ae945f Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Fri, 9 Feb 2024 00:05:37 +0000 Subject: [PATCH 31/34] lint --- src/feeds/parser.rs | 2 +- src/util/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index f6e845035..7d2add886 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -71,7 +71,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .or(item.link.clone()) .or(item.title.clone()) .and_then(|f| hash_id(f).ok()) - .and_then(|f| Some(format!("md5:{}", f))), + .map(|f| format!("md5:{}", f)), }) .collect(), } diff --git a/src/util/mod.rs b/src/util/mod.rs index 231cc883a..42d5c914e 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -89,7 +89,7 @@ impl QueueWithBackoff { } // Always remove the duration on removal. self.last_backoff_duration.remove(&item); - return false; + false } #[napi] From ce98771241f883b7351d29d999eb77ae2be28a26 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Fri, 9 Feb 2024 00:10:41 +0000 Subject: [PATCH 32/34] Inc/dec metrics as queue items are added/deleted. --- src/feeds/FeedReader.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 6c78f6cbd..ca619e1b4 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -127,6 +127,8 @@ export class FeedReader { log.info(`Connection added, adding "${normalisedUrl}" to queue`); this.feedQueue.push(normalisedUrl); feeds.add(normalisedUrl); + Metrics.feedsCount.inc(); + Metrics.feedsCountDeprecated.inc(); } }); connectionManager.on('connection-removed', removed => { @@ -153,6 +155,8 @@ export class FeedReader { feeds.delete(normalisedUrl); this.feedsFailingHttp.delete(normalisedUrl); this.feedsFailingParsing.delete(normalisedUrl); + Metrics.feedsCount.dec(); + Metrics.feedsCountDeprecated.dec(); }); log.debug('Loaded feed URLs:', [...feeds].join(', ')); From 0bc49c2f2a87176c25d4be96977e076c81108376 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Fri, 9 Feb 2024 00:11:59 +0000 Subject: [PATCH 33/34] Add comment --- src/feeds/FeedReader.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index ca619e1b4..699598ad6 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -200,6 +200,10 @@ export class FeedReader { * @returns A boolean that returns if we saw any changes on the feed since the last poll time. */ public async pollFeed(url: string): Promise { + // If a feed is deleted while it is being polled, we need + // to remember NOT to add it back to the queue. This + // set keeps track of all the feeds that *should* be + // requeued. this.feedsToRetain.add(url); let seenEntriesChanged = false; const fetchKey = randomUUID(); From 60a3a783a7e11c8eb0417fa1312df33fab7d9768 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 20 Feb 2024 00:29:01 +0000 Subject: [PATCH 34/34] tidy up --- src/util/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 42d5c914e..593b1f51d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -132,8 +132,7 @@ impl QueueWithBackoff { self.queue.len() as u32 } - #[napi] - pub fn shuffle(&mut self) { + fn shuffle(&mut self) { let mut rng = rand::thread_rng(); self.queue.make_contiguous().shuffle(&mut rng); }