Skip to content

[FSSDK-11393] fix default event flush interval #1023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/event_processor/event_processor_factory.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import { LocalStorageCache } from '../utils/cache/local_storage_cache.browser';
import { SyncPrefixCache } from '../utils/cache/cache';
import { EVENT_STORE_PREFIX, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory';

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000;

export const createForwardingEventProcessor = (
eventDispatcher: EventDispatcher = defaultEventDispatcher,
): OpaqueEventProcessor => {
Expand All @@ -54,6 +57,8 @@ export const createBatchEventProcessor = (
(options.eventDispatcher ? undefined : sendBeaconEventDispatcher),
flushInterval: options.flushInterval,
batchSize: options.batchSize,
defaultFlushInterval: DEFAULT_EVENT_FLUSH_INTERVAL,
defaultBatchSize: DEFAULT_EVENT_BATCH_SIZE,
retryOptions: {
maxRetries: 5,
},
Expand Down
5 changes: 5 additions & 0 deletions lib/event_processor/event_processor_factory.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import {
wrapEventProcessor,
} from './event_processor_factory';

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 30_000;

export const createForwardingEventProcessor = (
eventDispatcher: EventDispatcher = defaultEventDispatcher,
): OpaqueEventProcessor => {
Expand All @@ -41,6 +44,8 @@ export const createBatchEventProcessor = (
closingEventDispatcher: options.closingEventDispatcher,
flushInterval: options.flushInterval,
batchSize: options.batchSize,
defaultFlushInterval: DEFAULT_EVENT_FLUSH_INTERVAL,
defaultBatchSize: DEFAULT_EVENT_BATCH_SIZE,
retryOptions: {
maxRetries: 10,
},
Expand Down
5 changes: 5 additions & 0 deletions lib/event_processor/event_processor_factory.react_native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import { AsyncStorageCache } from '../utils/cache/async_storage_cache.react_nati
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';
import { isAvailable as isNetInfoAvailable } from '../utils/import.react_native/@react-native-community/netinfo';

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000;

export const createForwardingEventProcessor = (
eventDispatcher: EventDispatcher = defaultEventDispatcher,
): OpaqueEventProcessor => {
Expand Down Expand Up @@ -62,6 +65,8 @@ export const createBatchEventProcessor = (
closingEventDispatcher: options.closingEventDispatcher,
flushInterval: options.flushInterval,
batchSize: options.batchSize,
defaultFlushInterval: DEFAULT_EVENT_FLUSH_INTERVAL,
defaultBatchSize: DEFAULT_EVENT_BATCH_SIZE,
retryOptions: {
maxRetries: 5,
},
Expand Down
58 changes: 48 additions & 10 deletions lib/event_processor/event_processor_factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { describe, it, expect, beforeEach, vi, MockInstance } from 'vitest';
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, getBatchEventProcessor } from './event_processor_factory';
import { getBatchEventProcessor } from './event_processor_factory';
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId,DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF } from './batch_event_processor';
import { ExponentialBackoff, IntervalRepeater } from '../utils/repeater/repeater';
import { getMockSyncCache } from '../tests/mock/mock_cache';
Expand Down Expand Up @@ -44,6 +44,8 @@ describe('getBatchEventProcessor', () => {
it('returns an instane of BatchEventProcessor if no subclass constructor is provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -60,6 +62,8 @@ describe('getBatchEventProcessor', () => {

const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
};

const processor = getBatchEventProcessor(options, CustomEventProcessor);
Expand All @@ -70,6 +74,8 @@ describe('getBatchEventProcessor', () => {
it('does not use retry if retryOptions is not provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -81,6 +87,8 @@ describe('getBatchEventProcessor', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
retryOptions: {},
defaultFlushInterval: 1000,
defaultBatchSize: 10,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -94,6 +102,8 @@ describe('getBatchEventProcessor', () => {
it('uses the correct maxRetries value when retryOptions is provided', () => {
const options1 = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
retryOptions: {
maxRetries: 10,
},
Expand All @@ -105,6 +115,8 @@ describe('getBatchEventProcessor', () => {

const options2 = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
retryOptions: {},
};

Expand All @@ -117,6 +129,8 @@ describe('getBatchEventProcessor', () => {
it('uses exponential backoff with default parameters when retryOptions is provided without backoff values', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
retryOptions: {},
};

Expand All @@ -133,6 +147,8 @@ describe('getBatchEventProcessor', () => {
it('uses exponential backoff with provided backoff values in retryOptions', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 1000,
defaultBatchSize: 10,
retryOptions: { minBackoff: 1000, maxBackoff: 2000 },
};

Expand All @@ -149,48 +165,54 @@ describe('getBatchEventProcessor', () => {
it('uses a IntervalRepeater with default flush interval and adds a startup log if flushInterval is not provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultFlushInterval: 12345,
defaultBatchSize: 77,
};

const processor = getBatchEventProcessor(options);

expect(Object.is(processor, MockBatchEventProcessor.mock.instances[0])).toBe(true);
const usedRepeater = MockBatchEventProcessor.mock.calls[0][0].dispatchRepeater;
expect(Object.is(usedRepeater, MockIntervalRepeater.mock.instances[0])).toBe(true);
expect(MockIntervalRepeater).toHaveBeenNthCalledWith(1, DEFAULT_EVENT_FLUSH_INTERVAL);
expect(MockIntervalRepeater).toHaveBeenNthCalledWith(1, 12345);

const startupLogs = MockBatchEventProcessor.mock.calls[0][0].startupLogs;
expect(startupLogs).toEqual(expect.arrayContaining([{
level: LogLevel.Warn,
message: 'Invalid flushInterval %s, defaulting to %s',
params: [undefined, DEFAULT_EVENT_FLUSH_INTERVAL],
params: [undefined, 12345],
}]));
});

it('uses default flush interval and adds a startup log if flushInterval is less than 1', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
flushInterval: -1,
defaultFlushInterval: 12345,
defaultBatchSize: 77,
};

const processor = getBatchEventProcessor(options);

expect(Object.is(processor, MockBatchEventProcessor.mock.instances[0])).toBe(true);
const usedRepeater = MockBatchEventProcessor.mock.calls[0][0].dispatchRepeater;
expect(Object.is(usedRepeater, MockIntervalRepeater.mock.instances[0])).toBe(true);
expect(MockIntervalRepeater).toHaveBeenNthCalledWith(1, DEFAULT_EVENT_FLUSH_INTERVAL);
expect(MockIntervalRepeater).toHaveBeenNthCalledWith(1, 12345);

const startupLogs = MockBatchEventProcessor.mock.calls[0][0].startupLogs;
expect(startupLogs).toEqual(expect.arrayContaining([{
level: LogLevel.Warn,
message: 'Invalid flushInterval %s, defaulting to %s',
params: [-1, DEFAULT_EVENT_FLUSH_INTERVAL],
params: [-1, 12345],
}]));
});

it('uses a IntervalRepeater with provided flushInterval and adds no startup log if provided flushInterval is valid', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
flushInterval: 12345,
defaultFlushInterval: 1000,
defaultBatchSize: 77,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -205,46 +227,52 @@ describe('getBatchEventProcessor', () => {
});


it('uses a IntervalRepeater with default flush interval and adds a startup log if flushInterval is not provided', () => {
it('uses default batch size and adds a startup log if batchSize is not provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);

expect(Object.is(processor, MockBatchEventProcessor.mock.instances[0])).toBe(true);
expect(MockBatchEventProcessor.mock.calls[0][0].batchSize).toBe(DEFAULT_EVENT_BATCH_SIZE);
expect(MockBatchEventProcessor.mock.calls[0][0].batchSize).toBe(77);

const startupLogs = MockBatchEventProcessor.mock.calls[0][0].startupLogs;
expect(startupLogs).toEqual(expect.arrayContaining([{
level: LogLevel.Warn,
message: 'Invalid batchSize %s, defaulting to %s',
params: [undefined, DEFAULT_EVENT_BATCH_SIZE],
params: [undefined, 77],
}]));
});

it('uses default size and adds a startup log if provided batchSize is less than 1', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
batchSize: -1,
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);

expect(Object.is(processor, MockBatchEventProcessor.mock.instances[0])).toBe(true);
expect(MockBatchEventProcessor.mock.calls[0][0].batchSize).toBe(DEFAULT_EVENT_BATCH_SIZE);
expect(MockBatchEventProcessor.mock.calls[0][0].batchSize).toBe(77);

const startupLogs = MockBatchEventProcessor.mock.calls[0][0].startupLogs;
expect(startupLogs).toEqual(expect.arrayContaining([{
level: LogLevel.Warn,
message: 'Invalid batchSize %s, defaulting to %s',
params: [-1, DEFAULT_EVENT_BATCH_SIZE],
params: [-1, 77],
}]));
});

it('does not use a failedEventRepeater if failedEventRetryInterval is not provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -257,6 +285,8 @@ describe('getBatchEventProcessor', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
failedEventRetryInterval: 12345,
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -270,6 +300,8 @@ describe('getBatchEventProcessor', () => {
const eventDispatcher = getMockEventDispatcher();
const options = {
eventDispatcher,
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -281,6 +313,8 @@ describe('getBatchEventProcessor', () => {
it('does not use any closingEventDispatcher if not provided', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -294,6 +328,8 @@ describe('getBatchEventProcessor', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
closingEventDispatcher,
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand All @@ -307,6 +343,8 @@ describe('getBatchEventProcessor', () => {
const options = {
eventDispatcher: getMockEventDispatcher(),
eventStore,
defaultBatchSize: 77,
defaultFlushInterval: 12345,
};

const processor = getBatchEventProcessor(options);
Expand Down
19 changes: 11 additions & 8 deletions lib/event_processor/event_processor_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import { EventProcessor } from "./event_processor";
import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor";
import { AsyncPrefixCache, Cache, SyncPrefixCache } from "../utils/cache/cache";

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1000;
export const DEFAULT_EVENT_MAX_QUEUE_SIZE = 10000;

export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000;
export const EVENT_STORE_PREFIX = 'optly_event:';

Expand Down Expand Up @@ -60,9 +58,12 @@ export type BatchEventProcessorOptions = {
eventStore?: Cache<string>;
};

export type BatchEventProcessorFactoryOptions = Omit<BatchEventProcessorOptions, 'eventDispatcher' | 'eventStore'> & {
export type BatchEventProcessorFactoryOptions = Omit<BatchEventProcessorOptions, 'eventDispatcher' | 'eventStore' > & {
eventDispatcher: EventDispatcher;
closingEventDispatcher?: EventDispatcher;
failedEventRetryInterval?: number;
defaultFlushInterval: number;
defaultBatchSize: number;
eventStore?: Cache<EventWithId>;
retryOptions?: {
maxRetries?: number;
Expand All @@ -88,23 +89,25 @@ export const getBatchEventProcessor = (

const startupLogs: StartupLog[] = [];

let flushInterval = DEFAULT_EVENT_FLUSH_INTERVAL;
const { defaultFlushInterval, defaultBatchSize } = options;

let flushInterval = defaultFlushInterval;
if (options.flushInterval === undefined || options.flushInterval <= 0) {
startupLogs.push({
level: LogLevel.Warn,
message: 'Invalid flushInterval %s, defaulting to %s',
params: [options.flushInterval, DEFAULT_EVENT_FLUSH_INTERVAL],
params: [options.flushInterval, defaultFlushInterval],
});
} else {
flushInterval = options.flushInterval;
}

let batchSize = DEFAULT_EVENT_BATCH_SIZE;
let batchSize = defaultBatchSize;
if (options.batchSize === undefined || options.batchSize <= 0) {
startupLogs.push({
level: LogLevel.Warn,
message: 'Invalid batchSize %s, defaulting to %s',
params: [options.batchSize, DEFAULT_EVENT_BATCH_SIZE],
params: [options.batchSize, defaultBatchSize],
});
} else {
batchSize = options.batchSize;
Expand Down
5 changes: 5 additions & 0 deletions lib/event_processor/event_processor_factory.universal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import {
getPrefixEventStore,
} from './event_processor_factory';

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000;

import { FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory';

export const createForwardingEventProcessor = (
Expand All @@ -47,6 +50,8 @@ export const createBatchEventProcessor = (
closingEventDispatcher: options.closingEventDispatcher,
flushInterval: options.flushInterval,
batchSize: options.batchSize,
defaultFlushInterval: DEFAULT_EVENT_FLUSH_INTERVAL,
defaultBatchSize: DEFAULT_EVENT_BATCH_SIZE,
retryOptions: {
maxRetries: 5,
},
Expand Down
Loading