From b1e63d5008357029ccbbe4088a647bc3d5db737a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=20G=C3=B3mez?= Date: Mon, 27 Nov 2023 12:43:57 +0100 Subject: [PATCH] feat: use generic event publisher for mysql consumer (#88) --- src/mooc/main/resources/database/mooc.sql | 91 +++++------ .../mysql/MySqlDomainEventsConsumer.java | 153 +++++++++--------- 2 files changed, 123 insertions(+), 121 deletions(-) diff --git a/src/mooc/main/resources/database/mooc.sql b/src/mooc/main/resources/database/mooc.sql index 6cc6def..e3ac50b 100644 --- a/src/mooc/main/resources/database/mooc.sql +++ b/src/mooc/main/resources/database/mooc.sql @@ -1,62 +1,63 @@ CREATE TABLE IF NOT EXISTS courses ( - id CHAR(36) NOT NULL, - name VARCHAR(255) NOT NULL, - duration VARCHAR(255) NOT NULL, - PRIMARY KEY (id) + id CHAR(36) NOT NULL, + name VARCHAR(255) NOT NULL, + duration VARCHAR(255) NOT NULL, + PRIMARY KEY (id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; CREATE TABLE IF NOT EXISTS courses_counter ( - id CHAR(36) NOT NULL, - total INT NOT NULL, - existing_courses JSON NOT NULL, - PRIMARY KEY (id) + id CHAR(36) NOT NULL, + total INT NOT NULL, + existing_courses JSON NOT NULL, + PRIMARY KEY (id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; -INSERT IGNORE INTO courses_counter (id, total, existing_courses) VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]'); + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; +INSERT IGNORE INTO courses_counter (id, total, existing_courses) +VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]'); CREATE TABLE IF NOT EXISTS steps ( - id CHAR(36) NOT NULL, - title VARCHAR(155) NOT NULL, - PRIMARY KEY (id) + id CHAR(36) NOT NULL, + title VARCHAR(155) NOT NULL, + PRIMARY KEY (id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; CREATE TABLE IF NOT EXISTS steps_challenges ( - id CHAR(36) NOT NULL, - statement TEXT NOT NULL, - PRIMARY KEY (id), - CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id) + id CHAR(36) NOT NULL, + statement TEXT NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; CREATE TABLE IF NOT EXISTS steps_videos ( - id CHAR(36) NOT NULL, - url VARCHAR(255) NOT NULL, - text TEXT NOT NULL, - PRIMARY KEY (id), - CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id) + id CHAR(36) NOT NULL, + url VARCHAR(255) NOT NULL, + text TEXT NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; CREATE TABLE IF NOT EXISTS domain_events ( - id CHAR(36) NOT NULL, - aggregate_id CHAR(36) NOT NULL, - name VARCHAR(255) NOT NULL, - body JSON NOT NULL, - occurred_on TIMESTAMP NOT NULL, - PRIMARY KEY (id) + id CHAR(36) NOT NULL, + aggregate_id CHAR(36) NOT NULL, + name VARCHAR(255) NOT NULL, + body JSON NOT NULL, + occurred_on TIMESTAMP NOT NULL, + PRIMARY KEY (id) ) - ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_unicode_ci; + ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_unicode_ci; diff --git a/src/shared/main/tv/codely/shared/infrastructure/bus/event/mysql/MySqlDomainEventsConsumer.java b/src/shared/main/tv/codely/shared/infrastructure/bus/event/mysql/MySqlDomainEventsConsumer.java index 263da44..7b043ba 100644 --- a/src/shared/main/tv/codely/shared/infrastructure/bus/event/mysql/MySqlDomainEventsConsumer.java +++ b/src/shared/main/tv/codely/shared/infrastructure/bus/event/mysql/MySqlDomainEventsConsumer.java @@ -6,8 +6,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import tv.codely.shared.domain.Utils; import tv.codely.shared.domain.bus.event.DomainEvent; +import tv.codely.shared.domain.bus.event.EventBus; import tv.codely.shared.infrastructure.bus.event.DomainEventsInformation; -import tv.codely.shared.infrastructure.bus.event.spring.SpringApplicationEventBus; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -17,79 +17,80 @@ import java.util.List; public class MySqlDomainEventsConsumer { - private final SessionFactory sessionFactory; - private final DomainEventsInformation domainEventsInformation; - private final SpringApplicationEventBus bus; - private final Integer CHUNKS = 200; - private Boolean shouldStop = false; - - public MySqlDomainEventsConsumer( - @Qualifier("mooc-session_factory") SessionFactory sessionFactory, - DomainEventsInformation domainEventsInformation, - SpringApplicationEventBus bus - ) { - this.sessionFactory = sessionFactory; - this.domainEventsInformation = domainEventsInformation; - this.bus = bus; - } - - @Transactional - public void consume() { - while (!shouldStop) { - NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery( - "SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk" - ); - - query.setParameter("chunk", CHUNKS); - - List events = query.list(); - - try { - for (Object[] event : events) { - executeSubscribers( - (String) event[0], - (String) event[1], - (String) event[2], - (String) event[3], - (Timestamp) event[4] - ); - } - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) { - e.printStackTrace(); - } - - sessionFactory.getCurrentSession().clear(); - } - } - - public void stop() { - shouldStop = true; - } - - private void executeSubscribers( - String id, String aggregateId, String eventName, String body, Timestamp occurredOn - ) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - - Class domainEventClass = domainEventsInformation.forName(eventName); - - DomainEvent nullInstance = domainEventClass.getConstructor().newInstance(); - - Method fromPrimitivesMethod = domainEventClass.getMethod( - "fromPrimitives", - String.class, - HashMap.class, - String.class, - String.class - ); - - Object domainEvent = fromPrimitivesMethod.invoke( - nullInstance, - aggregateId, - Utils.jsonDecode(body), - id, - Utils.dateToString(occurredOn) - ); - - bus.publish(Collections.singletonList((DomainEvent) domainEvent)); - } + private final SessionFactory sessionFactory; + private final DomainEventsInformation domainEventsInformation; + private final EventBus bus; + private final Integer CHUNKS = 200; + private Boolean shouldStop = false; + + public MySqlDomainEventsConsumer( + @Qualifier("mooc-session_factory") SessionFactory sessionFactory, + DomainEventsInformation domainEventsInformation, + EventBus bus + ) { + this.sessionFactory = sessionFactory; + this.domainEventsInformation = domainEventsInformation; + this.bus = bus; + } + + @Transactional + public void consume() { + while (!shouldStop) { + NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery( + "SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk" + ); + + query.setParameter("chunk", CHUNKS); + + List events = query.list(); + + try { + for (Object[] event : events) { + executeSubscribers( + (String) event[0], + (String) event[1], + (String) event[2], + (String) event[3], + (Timestamp) event[4] + ); + } + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | + InstantiationException e) { + e.printStackTrace(); + } + + sessionFactory.getCurrentSession().clear(); + } + } + + public void stop() { + shouldStop = true; + } + + private void executeSubscribers( + String id, String aggregateId, String eventName, String body, Timestamp occurredOn + ) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + + Class domainEventClass = domainEventsInformation.forName(eventName); + + DomainEvent nullInstance = domainEventClass.getConstructor().newInstance(); + + Method fromPrimitivesMethod = domainEventClass.getMethod( + "fromPrimitives", + String.class, + HashMap.class, + String.class, + String.class + ); + + Object domainEvent = fromPrimitivesMethod.invoke( + nullInstance, + aggregateId, + Utils.jsonDecode(body), + id, + Utils.dateToString(occurredOn) + ); + + bus.publish(Collections.singletonList((DomainEvent) domainEvent)); + } }