@@ -169,6 +169,35 @@ protected function countByAggregateId(Uuid $aggregateId): int
169
169
return $ result ['count ' ];
170
170
}
171
171
172
+ protected function countGivenEventsByAggregateId (Uuid $ aggregateId , string ...$ eventNames ): int
173
+ {
174
+ $ stmt = $ this ->connection
175
+ ->createQueryBuilder ()
176
+ ->select ('count(message_id) as count ' )
177
+ ->from ($ this ->tableName ())
178
+ ->where ('message_name IN (:eventNames) ' );
179
+
180
+ $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
181
+ $ stmt ->setParameter ('eventNames ' , $ eventNames , Connection::PARAM_STR_ARRAY );
182
+
183
+ return $ stmt ->execute ()->fetchOne ();
184
+ }
185
+
186
+ protected function countFilteredEventsByAggregateId (Uuid $ aggregateId , string ...$ eventNames ): int
187
+ {
188
+ $ stmt = $ this ->connection
189
+ ->createQueryBuilder ()
190
+ ->select ('count(message_id) as count ' )
191
+ ->from ($ this ->tableName ())
192
+ ->where ('message_name NOT IN (:eventNames) ' );
193
+
194
+ $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
195
+ $ stmt ->setParameter ('eventNames ' , $ eventNames , Connection::PARAM_STR_ARRAY );
196
+
197
+ return $ stmt ->execute ()->fetchOne ();
198
+ }
199
+
200
+
172
201
protected function countByAggregateIdSince (Uuid $ aggregateId , DateTimeValueObject $ since ): int
173
202
{
174
203
$ stmt = $ this ->connection ->prepare (
@@ -264,6 +293,35 @@ protected function queryGivenEventsByAggregateIdPaginated(
264
293
return $ events ;
265
294
}
266
295
296
+ protected function queryEventsFilteredByAggregateIdPaginated (
297
+ Uuid $ aggregateId ,
298
+ int $ offset ,
299
+ int $ limit ,
300
+ string ...$ eventNames
301
+ ): array {
302
+ $ stmt = $ this ->connection
303
+ ->createQueryBuilder ()
304
+ ->addSelect ('a.message_id, a.aggregate_id, a.aggregate_version, a.occurred_on, a.message_name, a.payload ' )
305
+ ->from ($ this ->tableName (), 'a ' )
306
+ ->where ('a.aggregate_id = :aggregateId ' )
307
+ ->andWhere ('a.message_name NOT IN (:eventNames) ' )
308
+ ->setParameter ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR )
309
+ ->setParameter ('eventNames ' , $ eventNames , Connection::PARAM_STR_ARRAY )
310
+ ->setFirstResult ($ offset )
311
+ ->setMaxResults ($ limit )
312
+ ->orderBy ('a.occurred_on ' , 'DESC ' )
313
+ ->addOrderBy ('a.aggregate_version ' , 'ASC ' )
314
+ ->execute ();
315
+
316
+ $ events = $ stmt ->fetchAll ();
317
+
318
+ foreach ($ events as $ key => $ event ) {
319
+ $ events [$ key ]['payload ' ] = \json_decode ($ event ['payload ' ], true );
320
+ }
321
+
322
+ return $ events ;
323
+ }
324
+
267
325
protected function execute (Statement $ stmt ): void
268
326
{
269
327
$ result = $ stmt ->execute ();
0 commit comments