Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff failing RSS feeds #890

Merged
merged 35 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e78d920
Backoff RSS requests if a url repeatedly fails.
Half-Shot Feb 6, 2024
f5568e2
Increase max backoff time to a day
Half-Shot Feb 6, 2024
2ee5012
Add backoff for failing feeds.
Half-Shot Feb 6, 2024
b7cd4fe
Remove unused finally
Half-Shot Feb 6, 2024
18429ce
Add this.feedLastBackoff
Half-Shot Feb 6, 2024
80d4b9b
Rewrite in rust.
Half-Shot Feb 6, 2024
eff30b8
linting
Half-Shot Feb 6, 2024
517d044
pop
Half-Shot Feb 6, 2024
ea7af88
Optimise backoff function further
Half-Shot Feb 6, 2024
965c30b
Drop only!
Half-Shot Feb 6, 2024
e3d085e
fix test
Half-Shot Feb 6, 2024
4274eb4
lint
Half-Shot Feb 6, 2024
64ab808
lint further
Half-Shot Feb 6, 2024
9232213
Better comments
Half-Shot Feb 6, 2024
d0846b1
Fix urls calculation
Half-Shot Feb 7, 2024
92842fd
Merge remote-tracking branch 'origin/main' into hs/add-sensible-rss-b…
Half-Shot Feb 7, 2024
5659d88
Remove testing URL
Half-Shot Feb 7, 2024
e6ddcb6
Add some variance to speed up while loop
Half-Shot Feb 7, 2024
4838ba1
correct comment
Half-Shot Feb 7, 2024
1614a2a
Follow the advice and use a VecDeque as it's slightly faster.
Half-Shot Feb 7, 2024
958bdcb
Vastly better shuffle method
Half-Shot Feb 7, 2024
bec670f
Speed up checking for previous guids.
Half-Shot Feb 7, 2024
be4a468
fix hasher function
Half-Shot Feb 7, 2024
bd0822e
lint
Half-Shot Feb 7, 2024
db23748
Content doesn't need to be calculated twice.
Half-Shot Feb 7, 2024
605139f
Slightly more efficient iteration
Half-Shot Feb 7, 2024
ffd3881
Improve performance of backoff insertion
Half-Shot Feb 7, 2024
6649143
Configure feed reader
Half-Shot Feb 8, 2024
149ca51
lint
Half-Shot Feb 8, 2024
5a10c14
Ensure appending and removing from the queue works as expected.
Half-Shot Feb 8, 2024
251e80a
Ensure we do keep urls that have been removed.
Half-Shot Feb 8, 2024
c67148c
lint
Half-Shot Feb 9, 2024
ce98771
Inc/dec metrics as queue items are added/deleted.
Half-Shot Feb 9, 2024
0bc49c2
Add comment
Half-Shot Feb 9, 2024
60a3a78
tidy up
Half-Shot Feb 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rss = "2.0"
atom_syndication = "0.12"
ruma = { version = "0.9", features = ["events", "html"] }
reqwest = "0.11"
rand = "0.8.5"

[build-dependencies]
napi-build = "2"
1 change: 1 addition & 0 deletions changelog.d/890.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown.
2 changes: 1 addition & 1 deletion spec/github.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('GitHub', () => {
return testEnv?.tearDown();
});

it.only('should be able to handle a GitHub event', async () => {
it('should be able to handle a GitHub event', async () => {
const user = testEnv.getUser('user');
const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user);
const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] });
Expand Down
17 changes: 9 additions & 8 deletions src/Connections/FeedConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
static readonly ServiceCategory = "feeds";


public static createConnectionForState(roomId: string, event: StateEvent<any>, {config, intent}: InstantiateConnectionOpts) {

Check warning on line 60 in src/Connections/FeedConnection.ts

View workflow job for this annotation

GitHub Actions / lint-node

Unexpected any. Specify a different type
if (!config.feeds?.enabled) {
throw Error('RSS/Atom feeds are not configured');
}
Expand Down Expand Up @@ -220,15 +220,16 @@

// We want to retry these sends, because sometimes the network / HS
// craps out.
const content = {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
};
await retry(
() => this.intent.sendEvent(this.roomId, {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
}),
() => this.intent.sendEvent(this.roomId, content),
SEND_EVENT_MAX_ATTEMPTS,
SEND_EVENT_INTERVAL_MS,
// Filter for showstopper errors like 4XX errors, but otherwise
Expand Down
5 changes: 3 additions & 2 deletions src/Stores/MemoryStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
return this.feedGuids.has(url);
}

async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return this.feedGuids.get(url)?.includes(guid) ?? false;
async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
const existing = this.feedGuids.get(url);
return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : [];
}

public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") {
Expand Down
20 changes: 14 additions & 6 deletions src/Stores/RedisStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens.";

const FEED_GUIDS = "feeds.guids.";



const log = new Logger("RedisASProvider");

export class RedisStorageContextualProvider implements IStorageProvider {

constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { }

public setSyncToken(token: string|null){
Expand Down Expand Up @@ -216,17 +215,26 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
await this.redis.set(key, JSON.stringify(value));
}

public async storeFeedGuids(url: string, ...guid: string[]): Promise<void> {
public async storeFeedGuids(url: string, ...guids: string[]): Promise<void> {
const feedKey = `${FEED_GUIDS}${url}`;
await this.redis.lpush(feedKey, ...guid);
await this.redis.lpush(feedKey, ...guids);
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
}

public async hasSeenFeed(url: string): Promise<boolean> {
return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1;
}

public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
let multi = this.redis.multi();
for (const guid of guids) {
multi = multi.lpos(`${FEED_GUIDS}${url}`, guid);
}
const res = await multi.exec();
if (res === null) {
// Just assume we've seen none.
return [];
}
return guids.filter((_guid, index) => res[index][1] !== null);
}
}
6 changes: 3 additions & 3 deletions src/Stores/StorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
setStoredTempFile(key: string, value: string): Promise<void>;
getGitlabDiscussionThreads(connectionId: string): Promise<SerializedGitlabDiscussionThreads>;
setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void>;
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
hasSeenFeed(url: string): Promise<boolean>;
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
}
122 changes: 77 additions & 45 deletions src/feeds/FeedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import { randomUUID } from "crypto";
import { readFeed } from "../libRs";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import UserAgent from "../UserAgent";
import { QueueWithBackoff } from "../libRs";

const log = new Logger("FeedReader");

const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000;
const BACKOFF_POW = 1.05;
const BACKOFF_TIME_MS = 5 * 1000;

export class FeedError extends Error {
constructor(
public url: string,
Expand Down Expand Up @@ -73,21 +79,11 @@ function normalizeUrl(input: string): string {
return url.toString();
}

function shuffle<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}

export class FeedReader {

private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
private observedFeedUrls: Set<string> = new Set();

private feedQueue: string[] = [];
private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);

// A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
Expand All @@ -100,11 +96,12 @@ export class FeedReader {

private shouldRun = true;
private readonly timeouts: (NodeJS.Timeout|undefined)[];
private readonly feedsToRetain = new Set();

get sleepingInterval() {
return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1)
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
}
Expand All @@ -120,22 +117,49 @@ export class FeedReader {
this.timeouts.fill(undefined);
Object.seal(this.timeouts);
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
this.calculateFeedUrls();
connectionManager.on('new-connection', c => {
if (c instanceof FeedConnection) {
log.debug('New connection tracked:', c.connectionId);
this.connections.push(c);
this.calculateFeedUrls();
const feeds = this.calculateInitialFeedUrls();
connectionManager.on('new-connection', newConnection => {
if (!(newConnection instanceof FeedConnection)) {
return;
}
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
if (!feeds.has(normalisedUrl)) {
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
this.feedQueue.push(normalisedUrl);
feeds.add(normalisedUrl);
Metrics.feedsCount.inc();
Metrics.feedsCountDeprecated.inc();
}
});
connectionManager.on('connection-removed', removed => {
if (removed instanceof FeedConnection) {
this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId);
this.calculateFeedUrls();
if (!(removed instanceof FeedConnection)) {
return;
}
let shouldKeepUrl = false;
const normalisedUrl = normalizeUrl(removed.feedUrl);
this.connections = this.connections.filter(c => {
// Cheeky reuse of iteration to determine if we should remove this URL.
if (c.connectionId !== removed.connectionId) {
shouldKeepUrl = shouldKeepUrl || normalizeUrl(c.feedUrl) === normalisedUrl;
return true;
}
return false;
});
if (shouldKeepUrl) {
log.info(`Connection removed, but not removing "${normalisedUrl}" as it is still in use`);
return;
}
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
this.feedsToRetain.delete(normalisedUrl);
this.feedQueue.remove(normalisedUrl);
feeds.delete(normalisedUrl);
this.feedsFailingHttp.delete(normalisedUrl);
this.feedsFailingParsing.delete(normalisedUrl);
Metrics.feedsCount.dec();
Metrics.feedsCountDeprecated.dec();
});

log.debug('Loaded feed URLs:', this.observedFeedUrls);
log.debug('Loaded feed URLs:', [...feeds].join(', '));

for (let i = 0; i < config.pollConcurrency; i++) {
void this.pollFeeds(i);
Expand All @@ -147,21 +171,24 @@ export class FeedReader {
this.timeouts.forEach(t => clearTimeout(t));
}

private calculateFeedUrls(): void {
/**
* Calculate the initial feed set for the reader. Should never
* be called twice.
*/
private calculateInitialFeedUrls(): Set<string> {
// just in case we got an invalid URL somehow
const normalizedUrls = [];
const observedFeedUrls = new Set<string>();
for (const conn of this.connections) {
try {
normalizedUrls.push(normalizeUrl(conn.feedUrl));
observedFeedUrls.add(normalizeUrl(conn.feedUrl));
} catch (err: unknown) {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
}
}
this.observedFeedUrls = new Set(normalizedUrls);
this.feedQueue = shuffle([...this.observedFeedUrls.values()]);

Metrics.feedsCount.set(this.observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
this.feedQueue.populate([...observedFeedUrls]);
Metrics.feedsCount.set(observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
return observedFeedUrls;
}

/**
Expand All @@ -173,6 +200,11 @@ export class FeedReader {
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
*/
public async pollFeed(url: string): Promise<boolean> {
// If a feed is deleted while it is being polled, we need
// to remember NOT to add it back to the queue. This
// set keeps track of all the feeds that *should* be
// requeued.
this.feedsToRetain.add(url);
let seenEntriesChanged = false;
const fetchKey = randomUUID();
const { etag, lastModified } = this.cacheTimes.get(url) || {};
Expand Down Expand Up @@ -203,22 +235,20 @@ export class FeedReader {
if (feed) {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);

const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
for (const item of feed.items) {
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
if (!item.hashId) {
log.error(`Could not determine guid for entry in ${url}, skipping`);
continue;
}
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
if (seenItems.includes(item.hashId)) {
continue;
}
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
newGuids.push(item.hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`);
continue;
}
const entry = {
Expand All @@ -243,25 +273,27 @@ export class FeedReader {
if (seenEntriesChanged && newGuids.length) {
await this.storage.storeFeedGuids(url, ...newGuids);
}

}
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
// Clear any feed failures
this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url);
if (this.feedsToRetain.has(url)) {
// If we've removed this feed since processing it, do not requeue.
this.feedQueue.push(url);
}
} catch (err: unknown) {
// TODO: Proper Rust Type error.
if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) {
this.feedsFailingHttp.add(url);
} else {
this.feedsFailingParsing.add(url);
}
const backoffDuration = this.feedQueue.backoff(url);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
} finally {
this.feedQueue.push(url);
}
return seenEntriesChanged;
}
Expand All @@ -277,11 +309,11 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);

log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`);
log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);

const fetchingStarted = Date.now();

const [ url ] = this.feedQueue.splice(0, 1);
const url = this.feedQueue.pop();
let sleepFor = this.sleepingInterval;

if (url) {
Expand All @@ -298,7 +330,7 @@ export class FeedReader {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
}
} else {
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId}`);
}

Expand Down
3 changes: 2 additions & 1 deletion src/feeds/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.map(|f| f.value)
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok()),
.and_then(|f| hash_id(f).ok())
.map(|f| format!("md5:{}", f)),
})
.collect(),
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod feeds;
pub mod format_util;
pub mod github;
pub mod jira;
pub mod util;

#[macro_use]
extern crate napi_derive;
Expand Down
Loading
Loading