Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add indexers that allow to create ad-hoc indexes #140

Closed
wants to merge 11 commits into from
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,30 @@ let myProjectionStream = eventstore.createStream('my-projection-stream', (event)
for (let event of myProjectionStream) {
//...
}

// This stream will include all events that have a `someProperty` with exactly the value 'equalsThisValue'
let myPropertyMatchingStream = eventstore.createStream('my-property-stream', { payload: { someProperty: 'equalsThisValue' } });
```

For creating streams dynamically depending on the events coming in, since version 0.8 you can define streams functionally.

```javascript
// Will create a separate stream for every event type that occurs in the system.
eventstore.createDynamicStream((event) => 'type-' + event.payload.type);

// Will create a separate stream for every event type and use an object matcher
// instead of executing the mapper for every subsequent event. Use this for optimization
// if the logic is relatively complex. Hint: this is a bad example!
eventstore.createDynamicStream((event) => ['type-' + event.payload.type, { payload: { type: event.type } }]);
```

**NOTE**
> Defining dynamic streams may lead to a lot of streams being created, which will cost a lot of performance eventually.
> Make sure to only use this when absolutely needed. For the event type case you might rather iterate all your known
> types during start-up and create streams for those. If you want to iterate events by some random property, like e.g.
> correlationId, a sane approach is to only create a stream for each prefix like the first two or three characters, then
> filter the stream events when iterating.

### Optimistic concurrency

Optimistic concurrency control is required when multiple sources generate events concurrently.
Expand Down
31 changes: 30 additions & 1 deletion src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ class EventStore extends events.EventEmitter {
*
* @api
* @param {string} streamName The name of the stream to create.
* @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
* @param {object|function(object):boolean} matcher A matcher object, denoting the properties that need to match on an event or a function that takes the event and returns true if the event should be added.
* @returns {EventStream} The EventStream with all existing events matching the matcher.
* @throws {Error} If a stream with that name already exists.
* @throws {Error} If the stream could not be created.
Expand Down Expand Up @@ -393,6 +393,35 @@ class EventStore extends events.EventEmitter {
consumer.streamName = streamName;
return consumer.pipe(new EventUnwrapper());
}

/**
* Create a dynamic stream from a function that maps an event and it's metadata to a stream name.
* The mapping method needs to be pure and not depend on external variables.
*
* NOTE: Use this sparingly! It is costly to create streams dynamically and can lead to a lot of streams being created,
* which puts more work on the writer.
*
* @typedef {string|[string, object]|null} MappedStream
* @param {function(object):MappedStream} streamMapper A method that maps an event and it's metadata to a stream name
*/
createDynamicStream(streamMapper) {
const matcherFunc = `(storedEvent) => (${streamMapper.toString()})(storedEvent) === $streamName`;
this.storage.addIndexer((storedEvent) => {
const streamName = streamMapper(storedEvent);
if (streamName instanceof Array) {
const [name, matcher] = streamName;
if (name in this.streams) {
return null;
}
return { name: 'stream-' + name, matcher };
}
if (!streamName || streamName in this.streams) {
return null;
}
const matcher = eval(matcherFunc.replace('$streamName', `'${streamName}'`)); // jshint ignore:line
return { name: 'stream-' + streamName, matcher };
});
}
}

module.exports = EventStore;
Expand Down
58 changes: 52 additions & 6 deletions src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class WritableStorage extends ReadableStorage {
}
super(storageName, config);
this.partitioner = config.partitioner;
this.indexers = [];
}

/**
Expand Down Expand Up @@ -192,6 +193,7 @@ class WritableStorage extends ReadableStorage {
assert(position !== false, 'Error writing document.');

const indexEntry = this.addIndex(partition.id, position, dataSize, document);
this.runIndexers(document);
this.forEachSecondaryIndex((index, name) => {
if (!index.isOpen()) {
index.open();
Expand All @@ -203,17 +205,30 @@ class WritableStorage extends ReadableStorage {
return this.index.length;
}

/**
* Add an indexer, which will be invoked for every document and ensures an index exists with the returned name and matcher.
* @typedef {{name:string|null, matcher:Matcher}|null} IndexDefinition
* @param {function(object):IndexDefinition} indexer The indexer function, which returns an object containing the index name and matcher or null if the document should not be indexed. Alternatively the index name may be null.
*/
addIndexer(indexer) {
/* istanbul ignore else */
if (typeof indexer === 'function') {
this.indexers.push(indexer);
}
}

/**
* Ensure that an index with the given name and document matcher exists.
* Will create the index if it doesn't exist, otherwise return the existing index.
*
* @api
* @param {string} name The index name.
* @param {Matcher} [matcher] An object that describes the document properties that need to match to add it this index or a function that receives a document and returns true if the document should be indexed.
* @returns {ReadableIndex} The index containing all documents that match the query.
* @param {boolean} [updateIndex] If set to false the index will not be matched against all existing documents.
* @returns {ReadableIndex|WritableIndex} The index containing all documents that match the query.
* @throws {Error} if the index doesn't exist yet and no matcher was specified.
*/
ensureIndex(name, matcher) {
ensureIndex(name, matcher, updateIndex = true) {
if (name in this.secondaryIndexes) {
return this.secondaryIndexes[name].index;
}
Expand All @@ -227,6 +242,24 @@ class WritableStorage extends ReadableStorage {

const metadata = buildMetadataForMatcher(matcher, this.hmac);
const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
if (updateIndex) {
this.updateIndex(index, matcher);
}

this.secondaryIndexes[name] = { index, matcher };
this.emit('index-created', name);
return index;
}

/**
* Run the given matcher through all existing documents and add them to the index on match.
* If an error occurs during indexing, the index will be destroyed.
*
* @private
* @param {WritableIndex} index
* @param {Matcher} matcher
*/
updateIndex(index, matcher) {
try {
this.forEachDocument((document, indexEntry) => {
if (matches(document, matcher)) {
Expand All @@ -237,10 +270,6 @@ class WritableStorage extends ReadableStorage {
index.destroy();
throw e;
}

this.secondaryIndexes[name] = { index, matcher };
this.emit('index-created', name);
return index;
}

/**
Expand Down Expand Up @@ -277,6 +306,23 @@ class WritableStorage extends ReadableStorage {
}
}

/**
* Run all indexers against the given document and ensure according indexes.
* Note: This will not cause newly created indexes to index all existing documents.
*
* @private
* @param {object} document
*/
runIndexers(document) {
for (let indexer of this.indexers) {
const index = indexer(document);
if (!index || !index.name) {
continue;
}
this.ensureIndex(index.name, index.matcher, false);
}
}

/**
* Truncate all partitions after the given (global) sequence number.
*
Expand Down
39 changes: 39 additions & 0 deletions test/EventStore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,45 @@ describe('EventStore', function() {
});
});

describe('createDynamicStream', function() {

it('creates a proper stream with matcher', function() {
eventstore = new EventStore({
storageDirectory
});
eventstore.createDynamicStream((event) => 'type-'+event.payload.type);
eventstore.commit('foo', [{ type: 'one' }]);
eventstore.commit('foo', [{ type: 'two' }]);
});

it('allows to specify an event matcher object as second return value', function(done) {
eventstore = new EventStore({
storageDirectory
});
eventstore.createDynamicStream(({ payload: { type }}) => ['type-'+type, { payload: { type } }]);
eventstore.commit('foo', [{ type: 'one' }]);
eventstore.commit('foo', [{ type: 'two' }], () => {
expect(eventstore.getStreamVersion('type-one')).to.be(1);
expect(eventstore.getStreamVersion('type-two')).to.be(1);
done();
});
});

it('allows to specify a metadata matcher object', function(done) {
eventstore = new EventStore({
storageDirectory
});
eventstore.createDynamicStream(({ metadata }) => ['correlation-'+metadata.correlationId, { metadata : { correlationId: metadata.correlationId } }]);
eventstore.commit('foo', [{ type: 'one' }], { correlationId: 1 });
eventstore.commit('foo', [{ type: 'two' }], { correlationId: 2 }, () => {
expect(eventstore.getStreamVersion('correlation-1')).to.be(1);
expect(eventstore.getStreamVersion('correlation-2')).to.be(1);
done();
});
});

});

describe('deleteEventStream', function() {

it('throws in read-only mode', function(done) {
Expand Down
24 changes: 24 additions & 0 deletions test/Storage.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ describe('Storage', function() {
expect(index.isOpen()).to.be(true);
});

it('invokes indexers and correctly indexes documents', function() {
const sampleIndexer = (document) => {
const name = 'type-' + document.type;
const matcher = { type: document.type };
return {name, matcher};
};
storage = createStorage();
storage.addIndexer(sampleIndexer);
storage.addIndexer((document) => null);
storage.addIndexer((document) => ({ name: null, matcher: null }));

expect(() => storage.openIndex('type-bar')).to.throwError();
expect(() => storage.openIndex('type-foo')).to.throwError();
for (let i = 1; i <= 10; i++) {
storage.write({ type: (i % 3) ? 'bar' : 'foo', id: i });
}
expect(Object.keys(storage.secondaryIndexes)).to.eql(['type-bar', 'type-foo']);
const barIndex = storage.openIndex('type-bar');
const fooIndex = storage.openIndex('type-foo');

expect(barIndex.length).to.be(7);
expect(fooIndex.length).to.be(3);
});

});

describe('length', function() {
Expand Down