Skip to content

Commit

Permalink
Add signals middleware (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
silesky authored Feb 3, 2025
1 parent c9f81ac commit bf86857
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 384 deletions.
25 changes: 25 additions & 0 deletions .changeset/warm-lies-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
'@segment/analytics-signals': minor
---

Allow registration of middleware to allow for dropping and modification of signals

```ts
class MyMiddleware implements SignalsMiddleware {
process(signal: Signal) {
if (
signal.type === 'network' &&
signal.data.action === 'request' &&
...
) {
// drop or modify signal
return null
} else {
return signal
}
}
}
const signalsPlugin = new SignalsPlugin({
middleware: [new MyMiddleware()]
})
```
2 changes: 1 addition & 1 deletion packages/signals/signals-integration-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
".": "yarn run -T turbo run --filter=@internal/signals-integration-tests...",
"build": "webpack",
"test": "playwright test",
"test:vanilla": "playwright test src/tests/vanilla",
"test:vanilla": "playwright test src/tests/signals-vanilla",
"test:perf": "playwright test src/tests/performance",
"test:custom": "playwright test src/tests/custom",
"watch": "webpack -w",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export class BasePage {
public edgeFnDownloadURL = 'https://cdn.edgefn.segment.com/MY-WRITEKEY/foo.js'
public edgeFn!: string
public network!: PageNetworkUtils
public defaultSignalsPluginTestSettings: Partial<SignalsPluginSettingsConfig> =
{
disableSignalsRedaction: true,
enableSignalsIngestion: true,
flushInterval: 500,
}

constructor(path: string) {
this.url = 'http://localhost:5432/src/tests' + path
Expand All @@ -41,7 +47,12 @@ export class BasePage {
page: Page,
edgeFn: string,
signalSettings: Partial<SignalsPluginSettingsConfig> = {},
options: { updateURL?: (url: string) => string; sampleRate?: number } = {}
options: {
updateURL?: (url: string) => string
sampleRate?: number
middleware?: string
skipSignalsPluginInit?: boolean
} = {}
) {
logConsole(page)
this.page = page
Expand All @@ -50,10 +61,12 @@ export class BasePage {
await this.setupMockedRoutes(options.sampleRate)
const url = options.updateURL ? options.updateURL(this.url) : this.url
await this.page.goto(url, { waitUntil: 'domcontentloaded' })
void this.invokeAnalyticsLoad({
flushInterval: 500,
...signalSettings,
})
if (!options.skipSignalsPluginInit) {
void this.invokeAnalyticsLoad({
flushInterval: 500,
...signalSettings,
})
}
return this
}

Expand All @@ -76,18 +89,19 @@ export class BasePage {
signalSettings: Partial<SignalsPluginSettingsConfig> = {}
) {
await this.page.evaluate(
({ signalSettings }) => {
window.signalsPlugin = new window.SignalsPlugin({
disableSignalsRedaction: true,
enableSignalsIngestion: true,
...signalSettings,
})
({ settings }) => {
window.signalsPlugin = new window.SignalsPlugin(settings)
window.analytics.load({
writeKey: '<SOME_WRITE_KEY>',
plugins: [window.signalsPlugin],
})
},
{ signalSettings }
{
settings: {
...this.defaultSignalsPluginTestSettings,
...signalSettings,
},
}
)
return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { Page } from '@playwright/test'

export const logConsole = (page: Page) => {
page.on('console', (msg) => {
console.log(`console.${msg.type()}:`, msg.text())
const text = msg.text()
// keep stdout clean, e.g. by not printing intentional errors
const ignoreList = ['Bad Request']
if (ignoreList.some((str) => text.includes(str))) {
return
}
console.log(`console.${msg.type()}:`, text)
})
page.on('pageerror', (error) => {
console.error('Page error:', error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<head>
<!-- Dummy favicon -->
<link rel="icon" href="data:;base64,iVBORw0KGgo=">
<link rel="shortcut icon" href="#">>
</head>

<body>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<head>
<script src="../../../dist/signals-vanilla.bundle.js"></script>
<!-- Dummy favicon -->
<link rel="icon" href="data:;base64,iVBORw0KGgo=">
<link rel="shortcut icon" href="#">
</head>

<body>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AnalyticsBrowser } from '@segment/analytics-next'
import { SignalsPlugin } from '@segment/analytics-signals'

/**
* Not instantiating the analytics object here, as it will be instantiated in the test
* Not calling analytics.load() or instantiating Signals Plugin here, as all this configuration happens in the page object.
*/
window.SignalsPlugin = SignalsPlugin
window.analytics = new AnalyticsBrowser()
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<head>
<script src="../../../dist/signals-vanilla.bundle.js"></script>
<!-- Dummy favicon -->
<link rel="icon" href="data:;base64,iVBORw0KGgo=">
<link rel="shortcut icon" href="#">
</head>

<body>
<button id="some-button">Click me</button>
<button id="complex-button">
<img id="some-image" src="https://via.placeholder.com/150" alt="Placeholder Image">
<img id="some-image" src="" alt="Placeholder Image">
<div>
Other Example Button with <h1>Nested Text</h1>
</div>
Expand All @@ -26,4 +26,4 @@
<input type="submit" value="Submit">
</form>
</body>
</html>
</html>
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { test, expect } from '@playwright/test'
import { IndexPage } from './index-page'

const basicEdgeFn = `const processSignal = (signal) => {}`

let indexPage: IndexPage

test('middleware', async ({ page }) => {
indexPage = await new IndexPage().load(
page,
basicEdgeFn,
{},
{
skipSignalsPluginInit: true,
}
)

await page.evaluate(
({ settings }) => {
window.signalsPlugin = new window.SignalsPlugin({
middleware: [
{
load() {
return undefined
},
process: function (signal) {
// @ts-ignore
signal.data['middleware'] = 'test'
return signal
},
},
],
...settings,
})
window.analytics.load({
writeKey: '<SOME_WRITE_KEY>',
plugins: [window.signalsPlugin],
})
},
{
settings: {
...indexPage.defaultSignalsPluginTestSettings,
flushAt: 1,
},
}
)

/**
* Make an analytics.page() call, see that the middleware can modify the event
*/
await Promise.all([
indexPage.makeAnalyticsPageCall(),
indexPage.waitForSignalsApiFlush(),
])

const instrumentationEvents =
indexPage.signalsAPI.getEvents('instrumentation')
expect(instrumentationEvents).toHaveLength(1)
const ev = instrumentationEvents[0]
expect(ev.properties!.data['middleware']).toEqual('test')
})
24 changes: 24 additions & 0 deletions packages/signals/signals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,30 @@ signalsPlugin.onSignal((signal) => console.log(signal))
```


### Middleware / Plugins
#### Drop or modify signals
```ts
import { SignalsPlugin, SignalsMiddleware } from '@segment/analytics-signals'

class MyMiddleware implements SignalsMiddleware {
process(signal: Signal) {
// drop the event if some conditions are met
if (
signal.type === 'network' &&
signal.data.action === 'request' &&
...
) {
return null;
} else {
return signal;
}
}
}
const signalsPlugin = new SignalsPlugin({ middleware: [myMiddleware]})
analytics.register(signalsPlugin)
```


### Playground / Development / Testing
See the [signals example repo](../signals-example).

Expand Down
114 changes: 88 additions & 26 deletions packages/signals/signals/src/core/emitter/index.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,109 @@
import { Emitter } from '@segment/analytics-generic-utils'
import { logger } from '../../lib/logger'
import { Signal } from '@segment/analytics-signals-runtime'
import { SignalGlobalSettings } from '../signals'

export interface EmitSignal {
emit: (signal: Signal) => void
}

const logSignal = (signal: Signal) => {
logger.info(
'New signal:',
signal.type,
signal.data,
...(signal.type === 'interaction' && 'change' in signal.data
? ['change:', JSON.stringify(signal.data.change, null, 2)]
: [])
)
export interface SignalsMiddlewareContext {
/**
* These are global application settings. They are considered unstable, and should only be used internally.
* @interal
*/
unstableGlobalSettings: SignalGlobalSettings
writeKey: string
}

export interface PluginSettings {
writeKey: string
}

export interface SignalsMiddleware {
/**
* Wait for .load to complete before emitting signals
* This blocks the signal emitter until all plugins are loaded.
*/
load(ctx: SignalsMiddlewareContext): Promise<void> | void
process(signal: Signal): Signal | null
}

export interface SignalEmitterSettings {
middleware?: SignalsMiddleware[]
}
export class SignalEmitter implements EmitSignal {
private emitter = new Emitter<{ add: [Signal] }>()
private listeners = new Set<(signal: Signal) => void>()
private middlewares: SignalsMiddleware[] = []
private initialized = false // Controls buffering vs eager signal processing
private signalQueue: Signal[] = [] // Buffer for signals emitted before initialization

constructor(settings?: SignalEmitterSettings) {
if (settings?.middleware) this.middlewares.push(...settings.middleware)
}

// Emit a signal
emit(signal: Signal): void {
if (!this.initialized) {
// Buffer the signal if not initialized
this.signalQueue.push(signal)
return
}

// Process and notify listeners
this.processAndEmit(signal)
}

emit(signal: Signal) {
logSignal(signal)
this.emitter.emit('add', signal)
// Process and emit a signal
private processAndEmit(signal: Signal): void {
// Apply plugin; drop the signal if any plugin returns null
for (const plugin of this.middlewares) {
const processed = plugin.process(signal)
if (processed === null) return // Drop the signal
}

// Notify listeners
for (const listener of this.listeners) {
listener(signal)
}
}

subscribe(listener: (signal: Signal) => void) {
// Prevent duplicate subscriptions
// Initialize the emitter, load plugin, flush the buffer, and enable eager processing
async initialize({
globalSettings,
writeKey,
}: {
globalSettings: SignalGlobalSettings
writeKey: string
}): Promise<void> {
if (this.initialized) return

// Wait for all plugin to complete their load method
await Promise.all(
this.middlewares.map((mw) =>
mw.load({ unstableGlobalSettings: globalSettings, writeKey })
)
)

this.initialized = true

// Process and emit all buffered signals
while (this.signalQueue.length > 0) {
const signal = this.signalQueue.shift() as Signal
this.processAndEmit(signal)
}
}

/**
* Listen to signals emitted, once they have travelled through the plugin pipeline.
* This is equivalent to a destination plugin.
*/
subscribe(listener: (signal: Signal) => void): void {
if (!this.listeners.has(listener)) {
logger.debug('subscribed')
this.listeners.add(listener)
}
this.emitter.on('add', listener)
}

unsubscribe(listener: (signal: Signal) => void) {
// Unsubscribe a listener
unsubscribe(listener: (signal: Signal) => void): void {
this.listeners.delete(listener)
logger.debug('unsubscribed')
this.emitter.off('add', listener)
}

once(listener: (signal: Signal) => void) {
this.emitter.once('add', listener)
}
}
Loading

0 comments on commit bf86857

Please sign in to comment.