diff --git a/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmPersistenceService.java b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmPersistenceService.java index d0500fe..8d1e861 100644 --- a/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmPersistenceService.java +++ b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmPersistenceService.java @@ -25,7 +25,7 @@ public class FilmPersistenceService { private final FilmRepository filmRepository; public Iterable saveAllMergeIfExists(Iterable entities) { - //LOG.info("input size " + StreamSupport.stream(entities.spliterator(), false).count()); + LOG.info("FilmPersistenceService input size " + StreamSupport.stream(entities.spliterator(), false).count()); // for some reason the reference needs to be handled by us entities.forEach(film -> { film.insertTimestamp = LocalDateTime.now();film.urls.forEach(url -> { url.film = ((Film)film); });}); @@ -34,6 +34,7 @@ public Iterable saveAllMergeIfExists(Iterable entities) { //updateUuidIfFilmsAlreadyExists(modifiedEntities); //LOG.info("updated ids for existing " + modifiedEntities.size()); Iterable s = filmRepository.saveAll(modifiedEntities); + //LOG.info("saveAll ids for existing " + modifiedEntities.size()); return s; } diff --git a/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmRepository.java b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmRepository.java index 3933e46..c077b1c 100644 --- a/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmRepository.java +++ b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmRepository.java @@ -18,8 +18,14 @@ @Transactional public interface FilmRepository extends CrudRepository { + // OOM + //EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH) + @Transactional(readOnly = true) @EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH) List findByUuidNotNull(); + + @EntityGraph(attributePaths = {"urls", "audioDescriptions", "signLanguages", "subtitles", "geoLocations" }, type=EntityGraphType.FETCH) + List findBySender(Sender sender); Optional findBySenderAndTitelIgnoreCaseAndThemaIgnoreCaseAndDuration( Sender sender, String titel, String thema, Duration duration); diff --git a/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmlistMerge.java b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmlistMerge.java new file mode 100644 index 0000000..818a8f5 --- /dev/null +++ b/src/main/java/de/mediathekview/fimlistmerger/persistence/FilmlistMerge.java @@ -0,0 +1,79 @@ +package de.mediathekview.fimlistmerger.persistence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.persistence.EntityManager; +import javax.persistence.Table; + +@Service +@Transactional +public class FilmlistMerge { + Logger LOG = LoggerFactory.getLogger(FilmlistMerge.class); + + @Autowired + EntityManager entityManager; + + + public void removeOldEntries() { + String sql = createMergeQuery().toString(); + LOG.info("geneated sql " + sql); + //try { Thread.sleep(10*1000); } catch(Exception e) {} + javax.persistence.Query query = entityManager.createNativeQuery(sql); + int rs = query.executeUpdate(); + LOG.info("executed sql rs: " + rs); + } + + private StringBuilder createMergeQuery() { + StringBuilder sql = new StringBuilder(); + String tablename = "T" + System.currentTimeMillis(); + sql + .append("create table ").append(tablename).append("(id uuid, insert_date timestamp);"); + sql + .append("insert into ") + .append(tablename) + .append(" select uuid, insert_timestamp from (") + .append(" select uuid, insert_timestamp, row_number() over (partition by sender, thema, titel, duration order by insert_timestamp desc) rn") + .append(" from ").append(Film.class.getAnnotation(Table.class).name()) + .append(") oldEntries") + .append(" where rn > 1") + .append(";"); + sql + .append("delete from ").append("film_geo_locations") + .append(" where film_uuid in (select id from ").append(tablename).append(")") + .append(";"); + sql + .append("delete from ").append("film_subtitles") + .append(" where film_uuid in (select id from ").append(tablename).append(")") + .append(";"); + sql + .append("delete from ").append("film_url") + .append(" where film_uuid in (select id from ").append(tablename).append(")") + .append(";"); + sql + .append("delete from ").append(Film.class.getAnnotation(Table.class).name()) + .append(" where uuid in (select id from ").append(tablename).append(")") + .append(";"); + sql + .append("drop table ").append(tablename).append(";"); + return sql; + } + + /* + * create table temp_mergeIds (id uuid, insert_date timestamp); +insert into temp_mergeIds +select uuid, insert_timestamp from ( + select uuid, insert_timestamp, row_number() over (partition by sender, thema, titel, duration order by insert_timestamp desc) rn from film +) x where rn > 1; +delete from film_geo_locations where film_uuid in (select id from temp_mergeIds); +delete from film_subtitles where film_uuid in (select id from temp_mergeIds); +delete from film_url where film_id in (select id from temp_mergeIds); +delete from film where uuid in (select id from temp_mergeIds); + */ + + + +} diff --git a/src/main/java/de/mediathekview/fimlistmerger/routes/AfterDatabaseImportRoute.java b/src/main/java/de/mediathekview/fimlistmerger/routes/AfterDatabaseImportRoute.java new file mode 100644 index 0000000..68edcd2 --- /dev/null +++ b/src/main/java/de/mediathekview/fimlistmerger/routes/AfterDatabaseImportRoute.java @@ -0,0 +1,25 @@ +package de.mediathekview.fimlistmerger.routes; + +import de.mediathekview.fimlistmerger.persistence.FilmlistMerge; +import lombok.RequiredArgsConstructor; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class AfterDatabaseImportRoute extends RouteBuilder { + public static final String ROUTE_ID = "AfterDatabaseImportRoute"; + public static final String DIRECT_AFTER_DATABASE_IMPORT = "direct:afterDatabaseImportRoute"; + + private final FilmlistMerge filmlistMerge; + + @Override + public void configure() { + from(DIRECT_AFTER_DATABASE_IMPORT) + .routeId(ROUTE_ID) + .bean(filmlistMerge, "removeOldEntries") + .log("Removed old entries") + ; + } + +} diff --git a/src/main/java/de/mediathekview/fimlistmerger/routes/FilmToDatabaseTargetRoute.java b/src/main/java/de/mediathekview/fimlistmerger/routes/FilmToDatabaseTargetRoute.java index 9f61aa3..ef488c0 100644 --- a/src/main/java/de/mediathekview/fimlistmerger/routes/FilmToDatabaseTargetRoute.java +++ b/src/main/java/de/mediathekview/fimlistmerger/routes/FilmToDatabaseTargetRoute.java @@ -23,6 +23,7 @@ public void configure() { .routeId(ROUTE_ID) .to(Metrics.TIMER_WRITE_FILM_START.toString()) .onCompletion() + .log("Data stored in DB") .to(Metrics.TIMER_WRITE_FILM_STOP.toString()) .end() .process(filmToPersistenceFilmProcessor) diff --git a/src/main/java/de/mediathekview/fimlistmerger/routes/InputFilesRoute.java b/src/main/java/de/mediathekview/fimlistmerger/routes/InputFilesRoute.java index eb22298..8d2f4a4 100644 --- a/src/main/java/de/mediathekview/fimlistmerger/routes/InputFilesRoute.java +++ b/src/main/java/de/mediathekview/fimlistmerger/routes/InputFilesRoute.java @@ -28,6 +28,9 @@ public void configure() { .id(SWITCH_ON_FILMLIST_FORMAT_ROUTING_TARGET) .onCompletion() .onCompleteOnly() + .log(LoggingLevel.INFO, "Update filmlist database") + .to(AfterDatabaseImportRoute.DIRECT_AFTER_DATABASE_IMPORT) + .id("AfterDatabaseImportRoute") .log(LoggingLevel.INFO, "Completed reading files, writing consolidated filmlist now.") .to(WriteConsolidatedFilmlistRoute.ROUTE_FROM) .id(WRITE_CONSOLIDATED_FILMLIST_ROUTING_TARGET) diff --git a/src/main/java/de/mediathekview/fimlistmerger/routes/WriteConsolidatedFilmlistRoute.java b/src/main/java/de/mediathekview/fimlistmerger/routes/WriteConsolidatedFilmlistRoute.java index b415d6e..564e679 100644 --- a/src/main/java/de/mediathekview/fimlistmerger/routes/WriteConsolidatedFilmlistRoute.java +++ b/src/main/java/de/mediathekview/fimlistmerger/routes/WriteConsolidatedFilmlistRoute.java @@ -42,6 +42,7 @@ public void configure() { .setBody() //.method(filmRepository, "findAll") .method(filmRepository, "findByUuidNotNull") + //.method(filmRepository, "findBySender(ARD)") .process(persistenceFilmsToFilmlistProcessor) .log(LoggingLevel.INFO, "... finished reading from DB") .id(WRITE_CONSOLIDATED_FILMLIST_PERSISTENCE_FILMS_TO_FILMLIST_PROCESSOR) diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2603eac..26d3b62 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,9 +1,9 @@ # General camel.dataformat.json-jackson.auto-discover-object-mapper=true camel.springboot.main-run-controller=true -camel.threadpool.pool-size = 100 -camel.threadpool.max-pool-size = 100 -FilmlistSplitterBean.messageChunkSize = 100 +camel.threadpool.pool-size = 10 +camel.threadpool.max-pool-size = 10 +FilmlistSplitterBean.messageChunkSize = 10000 logging.level.liquibase=WARN logging.level.root=INFO @@ -17,11 +17,19 @@ logging.level.org.apache.camel=INFO camel.component.metrics.enabled=true -spring.datasource.hikari.maximum-pool-size=500 +spring.datasource.hikari.maximum-pool-size=100 + +# Database +spring.datasource.url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true&reWriteBatchedInserts=true +#spring.datasource.jdbc-url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true +spring.datasource.username=filmlistmerger +spring.datasource.password=filmlistmerger +spring.datasource.hikari.auto-commit=false +spring.jpa.properties.hibernate.jdbc.fetch_size=1000 spring.jpa.database-platform=org.hibernate.dialect.PostgreSQL9Dialect spring.jpa.hibernate.ddl-auto=none -spring.jpa.properties.hibernate.jdbc.batch_size=1000 +spring.jpa.properties.hibernate.jdbc.batch_size=10000 spring.jpa.properties.hibernate.order_updates = true spring.jpa.properties.hibernate.jdbc.batch_versioned_data = true spring.jpa.properties.hibernate.order_inserts=true @@ -41,12 +49,6 @@ management.metrics.export.influx.org=mediathekview management.metrics.export.influx.token= management.metrics.export.influx.uri=http://localhost:8086 -# Database -spring.datasource.url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true -#spring.datasource.jdbc-url=jdbc:postgresql://localhost:5432/filmlistmerger?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true -spring.datasource.username=filmlistmerger -spring.datasource.password=filmlistmerger - # Application filmlistmerger.input.path=input filmlistmerger.output.format=OLD