-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for a stream projection API similar to EventStoreDB #129
Comments
Main use case: Rewrite a stream into a "compacted" version to be able to eventually delete or archive the (high frequency) source stream(s) or keep those internal and publish only the redacted stream. Current issue with this |
The EventStoreDB API contains a state for projections - this is useful to do logic depending on previously seen events. Also, persisting that state makes the projection able to resume. Hence, a more fitting equivalent would be: const myProjection = eventstore.getConsumer('_all' /* or whatever more concrete stream you want to project from */, 'my-projection-id');
myProjection.on('data', event => {
if (myProjection.state... && event...) {
try {
eventstorage.commit('my-projection-stream', [new MyProjectedEvent(...)], myProjection.position);
myProjection.setState({...myProjection.state, ...});
} catch (e) {
// Handle duplicate processing
if (e instanceof OptimisticConcurrencyError) return true;
throw e;
}
}
}); The eventstorage.commit + the exception handling could be moved into the above mentioned |
Note: Setting expectedVersion to The API for a projection would need to be thought through. The projection method should not be an instance method, as that would mean a specifc projection needs to inherit from const myProjection = eventstore.getProjection('_all', 'my-projection-stream');
myProjection.on('event', (event, emit) => {
emit(new SomeProjectedEvent(myProjection.state, event));
myProjection.setState(state => { ...state, someData: state.someData + event.someProp });
}); A failing setState should optimally prevent the emit to happen. That is hard to achieve, because a commit is not cancelable. That means that on next execution the projection will reattempt the projection of the event and hence emit the projected event again, which will fail with an optimistic concurrency check that is caught and ignored. So far so good, but still until the setState eventually succeeds, the projected stream is inconsistent to the (persisted) projection state. |
On top of this, the addition of a const byEventType = eventstore.getProjection('_all', 'by-event-type');
byEventType.on('event', (event, emit, linkTo) => {
linkTo('type-' + event.type, event);
}); However, the same could be achieved with another concept: specify streams through a function that returns a stream name given the event, e.g.: eventstore.createDynamicStream((event) => 'type-' + event.type); See #140 |
Regarding naming: A projection that emits events is called a Regarding the issue of const myProjection = eventstore.getProjection('_all', 'my-projection-stream');
myProjection.on('data', (event) => {
myProjection.setState(state => { ...state, someData: state.someData + event.someProp });
});
myProjection.on('change', (emit[, linkTo]) => {
emit(new SomeProjectedEvent(myProjection.state));
}); The 'change' event could also do other work, but it may not call ** emit(event) {
eventstore.commit(projectionStream, event, { state: this.state, position: this.position });
}
restoreState(initialState, startFrom) {
const lastEvent = eventstore.getEventStream(projectionStream, -1, -1).next();
this.position = lastEvent !== false ? lastEvent.metadata.position : startFrom;
this.state = lastEvent !== false ? lastEvent.metadata.state : initialState;
} This is effectively an "event-sourced projection manager (with snapshots)". Alternatively, the state is not stored in the stream and on restore the state is rebuilt by running through all events until the last position again but with disabled "emit()"/'change' events. |
EventStoreDB provides a projection API that can project one or multiple streams into new streams by emitting new events, see https://eventstore.com/docs/projections/user-defined-projections/index.html#functions
The API could also provide an
emit
function that will effectively just do a commit to a given different stream (needs to be checked to avoid circular projecting).Right now, something like this can be achieved with a stream definition like:
With the drawback, that an empty stream index
my-stream-projection
will be created.Optimally, the API would not depend on the
eventstore
global variable, which could be solved by passing anemit
function as second argument:Even better would be, if the intermediary stream index wouldn't be necessary at all:
That would mean, that the projection is written into the stream index of the write stream it builds. The issue here is, that the projection invocation happens in the storage layer, but the emit needs to be at the EventStore layer.
The text was updated successfully, but these errors were encountered: